微信定时发送倒数日
Table of Contents
微信定时发送倒数日是基于Github项目 WeChatFerry 实现的
这个项目实现的功能:
- 检查登录状态
- 获取登录账号的 wxid
- 获取消息类型
- 获取所有联系人
- 获取所有好友
- 获取数据库
- 获取某数据库下的表
- 获取用户信息
- 发送文本消息(可 @)
- 发送图片
- 发送文件
- 允许接收消息
- 停止接收消息
- 执行 SQL 查询
- 接受好友申请
- 添加群成员
- 删除群成员
- 解密图片
- 获取朋友圈消息
- 保存图片
- 保存语音
- 发送卡片消息
- 拍一拍群友
- 邀请群成员
- 图片 OCR
同时支持多个客户端:
- Go
- HTTP
- Java
- Python
- Rust
对于HTTP客户端:
# 安装
pip install --upgrade wcfhttp
# 运行
# 查看版本
wcfhttp -v
# 查看帮助
wcfhttp -h
# 忽略新消息运行
wcfhttp
# 新消息转发到指定地址
wcfhttp --cb http://your_host:your_port/callback
访问http://your_host:your_port/docs
就可以查看接口文档
对于Python客户端:
# 安装Python库
pip install --upgrade wcfhttp
实现过程
保存联系人列表
def save_contacts(wcf):
# 保存联系人列表
data = wcf.get_contacts()
for i in data:
wxid = i['wxid']
code = i['code']
name = i['name']
if i['wxid'].startswith('wxid_'):
contacts_dict[wxid] = [name, code]
elif i['wxid'].endswith('@chatroom'):
chatroom_dict[wxid] = [name, code]
with open('contacts.csv', 'w', encoding='utf-8') as f:
for i in contacts_dict:
f.write(f'{i},{contacts_dict[i][0]},{contacts_dict[i][1]}\n')
with open('chatroom.csv', 'w', encoding='utf-8') as f:
for i in chatroom_dict:
f.write(f'{i},{chatroom_dict[i][0]},{chatroom_dict[i][1]}\n')
有关倒数日的操作
获取倒数日
def get_countdown():
# 获取倒数日
if not os.path.exists('countdown.csv'):
# 创建一个新的文件
with open('countdown.csv', 'w', encoding='utf-8') as f:
pass
with open('countdown.csv', 'r', encoding='utf-8') as f:
for i in f:
title, date = i.strip().split(',')
countdown_day[title] = date
print('[+] 获取倒数日成功')
保存倒数日
def save_countdown():
# 保存倒数日
with open('countdown.csv', 'w', encoding='utf-8') as f:
for i in countdown_day:
f.write(f'{i},{countdown_day[i]}\n')
print('[+] 保存倒数日成功')
处理倒数日
def process_countdown(wcf):
# 处理倒数日
while True:
try:
if time.strftime('%H:%M', time.localtime()) == time.strftime('%H:%M', remind_time):
content = f'今天是{time.strftime("%Y-%m-%d", time.localtime())}\n\n'
for i in countdown_day:
# 计算还剩多少天
days = (datetime.datetime.strptime(countdown_day[i], '%Y-%m-%d') - datetime.datetime.strptime(time.strftime('%Y-%m-%d', time.localtime()), '%Y-%m-%d')).days
content += f'[+] [{i}]\n'
content += f'[{countdown_day[i]}] [还剩{days}天]\n\n'
# 如果是最后一天 则删除倒数日
if days == 0:
countdown_day.pop(i)
for j in send_wxid:
wcf.send_text(content, j)
print('[+] 发送倒数日成功')
time.sleep(60)
except Exception as e:
print('[-] 处理倒数日错误:', e)
添加倒数日
def add_countdown(wcf, msg):
# 添加倒数日
stop_date = msg.content.split(' ')[2]
title = msg.content.split(' ')[1]
# 检查时间格式
try:
time.strptime(stop_date, '%Y-%m-%d')
# 如果倒数日已存在
if title in countdown_day:
wcf.send_text(f'[-] 倒数日 {title} 已存在', msg.sender)
return
# 如果时间已过
if (datetime.datetime.strptime(stop_date, '%Y-%m-%d') - datetime.datetime.strptime(time.strftime('%Y-%m-%d', time.localtime()), '%Y-%m-%d')).days < 0:
wcf.send_text(f'[-] 倒数日 {title} 时间已过', msg.sender)
return
except Exception as e:
print('[-] 添加倒数日错误:', e)
wcf.send_text('[-] 添加倒数日错误', msg.sender)
else:
countdown_day[title] = stop_date
print(f'[+] {msg.sender} 添加倒数日 {title} {stop_date}')
wcf.send_text(f'[+] 添加倒数日 {title} 截止日期 {stop_date}', msg.sender)
删除倒数日
def del_countdown(wcf, msg):
# 删除倒数日
title = msg.content.split(' ')[1]
try:
countdown_day.pop(title)
except Exception as e:
print('[-] 删除倒数日错误:', e)
wcf.send_text('[-] 删除倒数日错误', msg.sender)
else:
print(f'[+] {msg.sender} 删除倒数日 {title}')
wcf.send_text(f'[+] 删除倒数日 {title}', msg.sender)
列出倒数日
def list_countdown(wcf, msg):
# 列出倒数日
content = f'今天是{time.strftime("%Y-%m-%d", time.localtime())}\n\n'
for i in countdown_day:
# 计算还剩多少天
days = (datetime.datetime.strptime(countdown_day[i], '%Y-%m-%d') - datetime.datetime.strptime(time.strftime('%Y-%m-%d', time.localtime()), '%Y-%m-%d')).days
content += f'[+] [{i}]\n'
content += f'[{countdown_day[i]}] [还剩{days}天]\n\n'
content += f'\n提醒时间为每天的 {time.strftime("%H:%M", remind_time)}\n'
content += f'发送倒数日的对象为 {send_wxid}'
print(f'[+] {msg.sender} 列出倒数日')
wcf.send_text(content, msg.sender)
设置提醒时间
def set_remind_time(wcf, msg):
# 设置提醒时间
global remind_time
try:
time_ = msg.content.split(' ')[1]
# 检查时间格式
remind_time = time.strptime(time_, '%H:%M')
except Exception as e:
print('[-] 设置提醒时间错误:', e)
wcf.send_text('[-] 设置提醒时间错误', msg.sender)
else:
print(f'[+] {msg.sender} 设置提醒时间为每天的 {time.strftime("%H:%M", remind_time)}')
wcf.send_text(f'[+] 设置提醒时间为每天的 {time.strftime("%H:%M", remind_time)}', msg.sender)
设置发送倒数日的对象
def update_send_wxid(wcf, msg):
# 设置发送倒数日的对象
global send_wxid
try:
if ',' in msg.content.split(' ')[1]:
send_wxid = msg.content.split(' ')[1].split(',')
else:
send_wxid = [msg.content.split(' ')[1]]
except Exception as e:
print('[-] 设置发送倒数日的对象错误:', e)
wcf.send_text('[-] 设置发送倒数日的对象错误', msg.sender)
print(f'[+] {msg.sender} 设置发送倒数日的对象为 {send_wxid}')
wcf.send_text(f'[+] 设置发送倒数日的对象为 {send_wxid}', msg.sender)
处理发送的指令
def process_msg(wcf):
while wcf.is_receiving_msg():
try:
msg = wcf.get_msg()
content = msg.content
if content.startswith('$add'): # 添加倒数日
add_countdown(wcf, msg)
elif content.startswith('$del'): # 删除倒数日
del_countdown(wcf, msg)
elif content == '$list': # 列出倒数日
list_countdown(wcf, msg)
elif content == '$help': # 帮助信息
help_info(wcf, msg)
elif content.startswith('$set'): # 设置提醒时间
set_remind_time(wcf, msg)
elif content.startswith('$send'): # 设置发送倒数日的对象
update_send_wxid(wcf, msg)
else:
continue
except Empty:
continue
except Exception as e:
print('[-] 处理消息错误:', e)
主函数
def main():
wcf = Wcf()
# 保存联系人列表
save_contacts(wcf)
# 获取倒数日
get_countdown()
# 处理倒数日
Thread(target=process_countdown, args=(wcf,)).start()
# 处理消息
wcf.enable_receiving_msg()
Thread(target=process_msg, args=(wcf,), daemon=True).start()
return wcf
if __name__ == '__main__':
wcf = None
try:
wcf = main()
wcf.keep_running()
except KeyboardInterrupt:
print('[-] 服务停止')
save_countdown()
wcf.cleanup()
sys.exit(0)
实现成果
增加讯飞星火大模型
在上面的基础上,也可以接入一些AI大模型或者一些智能聊天的机器人
我这里使用讯飞星火V3.0模型接口,个人认证和免费体验一年,200万token
参考Web API的文档
实现的结果: