微信定时发送倒数日

Keywords: #技术 #Python
Table of Contents

微信定时发送倒数日是基于Github项目 WeChatFerry 实现的

这个项目实现的功能:

  • 检查登录状态
  • 获取登录账号的 wxid
  • 获取消息类型
  • 获取所有联系人
  • 获取所有好友
  • 获取数据库
  • 获取某数据库下的表
  • 获取用户信息
  • 发送文本消息(可 @)
  • 发送图片
  • 发送文件
  • 允许接收消息
  • 停止接收消息
  • 执行 SQL 查询
  • 接受好友申请
  • 添加群成员
  • 删除群成员
  • 解密图片
  • 获取朋友圈消息
  • 保存图片
  • 保存语音
  • 发送卡片消息
  • 拍一拍群友
  • 邀请群成员
  • 图片 OCR

同时支持多个客户端:

  • Go
  • HTTP
  • Java
  • Python
  • Rust

对于HTTP客户端:

# 安装
pip install --upgrade wcfhttp

# 运行
# 查看版本
wcfhttp -v
# 查看帮助
wcfhttp -h
# 忽略新消息运行
wcfhttp
# 新消息转发到指定地址
wcfhttp --cb http://your_host:your_port/callback

访问http://your_host:your_port/docs就可以查看接口文档

对于Python客户端:

# 安装Python库
pip install --upgrade wcfhttp

接口信息

示例程序

实现过程

保存联系人列表

def save_contacts(wcf):
    # 保存联系人列表
    data = wcf.get_contacts()
    for i in data:
        wxid = i['wxid']
        code = i['code']
        name = i['name']
        if i['wxid'].startswith('wxid_'):
            contacts_dict[wxid] = [name, code]
        elif i['wxid'].endswith('@chatroom'):
            chatroom_dict[wxid] = [name, code]
    with open('contacts.csv', 'w', encoding='utf-8') as f:
        for i in contacts_dict:
            f.write(f'{i},{contacts_dict[i][0]},{contacts_dict[i][1]}\n')
    with open('chatroom.csv', 'w', encoding='utf-8') as f:
        for i in chatroom_dict:
            f.write(f'{i},{chatroom_dict[i][0]},{chatroom_dict[i][1]}\n')

有关倒数日的操作

获取倒数日

def get_countdown():
    # 获取倒数日
    if not os.path.exists('countdown.csv'):
        # 创建一个新的文件
        with open('countdown.csv', 'w', encoding='utf-8') as f:
            pass
    with open('countdown.csv', 'r', encoding='utf-8') as f:
        for i in f:
            title, date = i.strip().split(',')
            countdown_day[title] = date
    print('[+] 获取倒数日成功')

保存倒数日

def save_countdown():
    # 保存倒数日
    with open('countdown.csv', 'w', encoding='utf-8') as f:
        for i in countdown_day:
            f.write(f'{i},{countdown_day[i]}\n')
    print('[+] 保存倒数日成功')

处理倒数日

def process_countdown(wcf):
    # 处理倒数日
    while True:
        try:
            if time.strftime('%H:%M', time.localtime()) == time.strftime('%H:%M', remind_time):
                content = f'今天是{time.strftime("%Y-%m-%d", time.localtime())}\n\n'
                for i in countdown_day:
                    # 计算还剩多少天
                    days = (datetime.datetime.strptime(countdown_day[i], '%Y-%m-%d') - datetime.datetime.strptime(time.strftime('%Y-%m-%d', time.localtime()), '%Y-%m-%d')).days
                    content += f'[+] [{i}]\n'
                    content += f'[{countdown_day[i]}] [还剩{days}天]\n\n'
                    # 如果是最后一天 则删除倒数日
                    if days == 0:
                        countdown_day.pop(i)
                for j in send_wxid:
                    wcf.send_text(content, j)
                print('[+] 发送倒数日成功')
                time.sleep(60)
        except Exception as e:
            print('[-] 处理倒数日错误:', e)

添加倒数日

def add_countdown(wcf, msg):
    # 添加倒数日
    stop_date = msg.content.split(' ')[2]
    title = msg.content.split(' ')[1]
    # 检查时间格式
    try:
        time.strptime(stop_date, '%Y-%m-%d')
        # 如果倒数日已存在
        if title in countdown_day:
            wcf.send_text(f'[-] 倒数日 {title} 已存在', msg.sender)
            return
        # 如果时间已过
        if (datetime.datetime.strptime(stop_date, '%Y-%m-%d') - datetime.datetime.strptime(time.strftime('%Y-%m-%d', time.localtime()), '%Y-%m-%d')).days < 0:
            wcf.send_text(f'[-] 倒数日 {title} 时间已过', msg.sender)
            return
    except Exception as e:
        print('[-] 添加倒数日错误:', e)
        wcf.send_text('[-] 添加倒数日错误', msg.sender)
    else:
        countdown_day[title] = stop_date
        print(f'[+] {msg.sender} 添加倒数日 {title} {stop_date}')
        wcf.send_text(f'[+] 添加倒数日 {title} 截止日期 {stop_date}', msg.sender)

删除倒数日

def del_countdown(wcf, msg):
    # 删除倒数日
    title = msg.content.split(' ')[1]
    try:
        countdown_day.pop(title)
    except Exception as e:
        print('[-] 删除倒数日错误:', e)
        wcf.send_text('[-] 删除倒数日错误', msg.sender)
    else:
        print(f'[+] {msg.sender} 删除倒数日 {title}')
        wcf.send_text(f'[+] 删除倒数日 {title}', msg.sender)

列出倒数日

def list_countdown(wcf, msg):
    # 列出倒数日
    content = f'今天是{time.strftime("%Y-%m-%d", time.localtime())}\n\n'
    for i in countdown_day:
        # 计算还剩多少天
        days = (datetime.datetime.strptime(countdown_day[i], '%Y-%m-%d') - datetime.datetime.strptime(time.strftime('%Y-%m-%d', time.localtime()), '%Y-%m-%d')).days
        content += f'[+] [{i}]\n'
        content += f'[{countdown_day[i]}] [还剩{days}天]\n\n'
    content += f'\n提醒时间为每天的 {time.strftime("%H:%M", remind_time)}\n'
    content += f'发送倒数日的对象为 {send_wxid}'
    print(f'[+] {msg.sender} 列出倒数日')
    wcf.send_text(content, msg.sender)

设置提醒时间

def set_remind_time(wcf, msg):
    # 设置提醒时间
    global remind_time
    try:
        time_ = msg.content.split(' ')[1]
        # 检查时间格式
        remind_time = time.strptime(time_, '%H:%M')
    except Exception as e:
        print('[-] 设置提醒时间错误:', e)
        wcf.send_text('[-] 设置提醒时间错误', msg.sender)
    else:
        print(f'[+] {msg.sender} 设置提醒时间为每天的 {time.strftime("%H:%M", remind_time)}')
        wcf.send_text(f'[+] 设置提醒时间为每天的 {time.strftime("%H:%M", remind_time)}', msg.sender)

设置发送倒数日的对象

def update_send_wxid(wcf, msg):
    # 设置发送倒数日的对象
    global send_wxid
    try:
        if ',' in msg.content.split(' ')[1]:
            send_wxid = msg.content.split(' ')[1].split(',')
        else:
            send_wxid = [msg.content.split(' ')[1]]
    except Exception as e:
        print('[-] 设置发送倒数日的对象错误:', e)
        wcf.send_text('[-] 设置发送倒数日的对象错误', msg.sender)
    print(f'[+] {msg.sender} 设置发送倒数日的对象为 {send_wxid}')
    wcf.send_text(f'[+] 设置发送倒数日的对象为 {send_wxid}', msg.sender)

处理发送的指令

def process_msg(wcf):
    while wcf.is_receiving_msg():
        try:
            msg = wcf.get_msg()
            content = msg.content
            if content.startswith('$add'):  # 添加倒数日
                add_countdown(wcf, msg)
            elif content.startswith('$del'):  # 删除倒数日
                del_countdown(wcf, msg)
            elif content == '$list':  # 列出倒数日
                list_countdown(wcf, msg)
            elif content == '$help':  # 帮助信息
                help_info(wcf, msg)
            elif content.startswith('$set'):  # 设置提醒时间
                set_remind_time(wcf, msg)
            elif content.startswith('$send'):  # 设置发送倒数日的对象
                update_send_wxid(wcf, msg)
            else:
                continue
        except Empty:
            continue
        except Exception as e:
            print('[-] 处理消息错误:', e)

主函数

def main():
    wcf = Wcf()
    # 保存联系人列表
    save_contacts(wcf)
    # 获取倒数日
    get_countdown()
    # 处理倒数日
    Thread(target=process_countdown, args=(wcf,)).start()
    # 处理消息
    wcf.enable_receiving_msg()
    Thread(target=process_msg, args=(wcf,), daemon=True).start()
    return wcf

if __name__ == '__main__':
    wcf = None
    try:
        wcf = main()
        wcf.keep_running()
    except KeyboardInterrupt:
        print('[-] 服务停止')
        save_countdown()
        wcf.cleanup()
        sys.exit(0)

实现成果

$help命令 $list命令 $add命令 $del命令 $send命令 $set命令 定时提醒效果

增加讯飞星火大模型

在上面的基础上,也可以接入一些AI大模型或者一些智能聊天的机器人

我这里使用讯飞星火V3.0模型接口,个人认证和免费体验一年,200万token

讯飞星火认知大模型

星火认知大模型Web API文档

参考Web API的文档

实现的结果:

讯飞星火模型