码迷,mamicode.com
首页 > 编程语言 > 详细

利用python itchat给女朋友定时发信息

时间:2018-10-05 14:01:09      阅读:108      评论:0      收藏:0      [点我收藏+]

标签:ons   数据库连接   多个   信息   test   cmd   接收   数据库   global   

利用itchat给女朋友定时发信息

涉及到的技术有itchat,redis,mysql,最主要的还是mysql咯,当然咯,这么多东西,我就只介绍我代码需要用到的,其他的,如果需要了解的话,就需要看参考资料了哟

实现的功能:
1.可以保存微信的消息,包括群聊和好友(文字/视频/语音/图片)
2.在群里@自己,可以调用图灵机器人的API进行文字回复(类似于机器人)
3.调用定时任务,在指定时间发送消息至某人

需要了解的基础:
1.python基础
2.mysql基础
3.redis基础

 

实现效果如下:

技术分享图片

只需要在数据库中填写相应的数据,包括,发送给谁(to_user),如果有多个,则用分号(;)分开,执行间隔分钟数(exe_time),如,1440则代表一天,下一次执行时间戳(next_time),主要是抓这个时间来进行发送,抓取的键值(redis_keys)

发送的效果是这样的:

技术分享图片

 

 

项目基础

itchat模块

官方参考文档:https://itchat.readthedocs.io/zh/latest/

安装

pip install itchat / pip3 install itchat

最简单的测试给好友发送消息

#产生二维码
itchat.auto_login()
#定义用户的昵称
send_userid=用户的昵称
#查找用户的userid
itcaht_user_name = itchat.search_friends(name=send_userid)[0][UserName]
#利用send_msg发送消息
itchat.send_msg(这是一个测试,toUserName=itcaht_user_name)

测试小脚本

#!/usr/bin/env python3

import itchat

#产生二维码
itchat.auto_login()
#定义用户的昵称
send_userid=亲爱的
#查找用户的userid
itcaht_user_name = itchat.search_friends(name=send_userid)[0][UserName]
#利用send_msg发送消息
itchat.send_msg(这是一个测试,toUserName=itcaht_user_name)

在执行之后,会弹出一个二维码,然后会让你登录到网页微信,紧接着会去搜索用户信息,搜索到了之后会去取用户的ID,这个ID是微信随机给的,然后利用send_msg进行发送信息

 

mysql模块

参考文档:http://www.runoob.com/python3/python3-mysql.html

安装

pip install pymysql / pip3 install pymysql

pymysql的基本使用

连接mysql

db = pymysql.connect(host=hostname,port=port,user=username,passwd=password,db=databasename,charset=charset)

填写正确的信息后就成功的连接了mysql,注意,port这里,端口号不能加‘‘,否则会连接不上mysql

 

简单的操作mysql

insert
#连接DB
db = pymysql.connect(host=127.0.0.1,port=3306,user=root,passwd=root,db=test_1,charset=utf8)
#编写SQL
sql = "insert into test_message values (%d);" %(int(time.time()))
#获取游标
cursor = db.cursor()
#数据库重连
db.ping(reconnect=True)
#执行SQL
cursor.execute(sql)
#commit 
db.commit()

执行完毕后,如果正确的话,就可以看到表中有数据了,当然,数据库和数据表得自己去建立才行,如果没有数据,则需要看看程序的输出结果了,还有一点,在做DML的时候,须要加上commit,这是事务的一个特性,否则数据会丢失的

 

select
#连接DB
db = pymysql.connect(host=127.0.0.1,port=3306,user=root,passwd=root,db=test_1,charset=utf8)
#编写SQL
sql = "select * from test_message"
#获取游标
cursor = db.cursor()
#数据库重连
db.ping(reconnect=True)
#执行SQL
cursor.execute(sql)
#获取全部结果
cursor.fetchall()
#返回单个数据
cursor.fetchone()

执行完毕后,如果正确的话,就可以看到结果了

 

redis模块

 参考文档:https://www.cnblogs.com/xiaoming279/p/6293583.html

安装 

pip install redis / pip3 install redis  

连接Redis

r = redis.Redis(host,port,db_port,password)
判断键是否存在
r = redis.Redis(localhost,6379,0,redis)
keys = 123
if r.exists(keys):
    print ("存在")
else:
    print ("不存在")
随机取出列表中的数据
r = redis.Redis(localhost,6379,0,redis)
len_keys = r.llen(keys)
random_int = int(random.randint(0,len_keys-1))
print (r.lindex(keys,random_int).decode(utf-8))

基本上在下面代码中,几乎会用到如上基础

MySQL数据库/表建立

数据库建立 

CREATE DATABASE `itchat` /*!40100 DEFAULT CHARACTER SET utf8mb4 */

数据表的建立

itchat_login_info

CREATE TABLE `itchat_login_info` (
`createdate` int(11) NOT NULL,
`myid` varchar(128) DEFAULT NULL,
`username` varchar(128) DEFAULT NULL,
`mysignature` varchar(128) DEFAULT NULL,
`mysex` varchar(4) DEFAULT NULL,
`myuseruin` varchar(128) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

itchat_message

CREATE TABLE `itchat_message` (
`createdate` int(11) NOT NULL,
`msgid` varchar(128) NOT NULL,
`nickname` varchar(128) DEFAULT NULL,
`username` varchar(128) DEFAULT NULL,
`actualnickname` varchar(128) DEFAULT NULL,
`actualusername` varchar(128) DEFAULT NULL,
`msgtext` varchar(5000) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

itchat_friend_message

CREATE TABLE `itchat_friend_message` (
`createdate` int(11) NOT NULL,
`msgid` varchar(128) NOT NULL,
`fromuser` varchar(128) NOT NULL,
`fromuserid` varchar(128) NOT NULL,
`fromsex` varchar(4) NOT NULL,
`touser` varchar(128) NOT NULL,
`touserid` varchar(128) NOT NULL,
`tousersex` varchar(4) NOT NULL,
`msgtext` varchar(5000) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

auto_timer 

CREATE TABLE `auto_timer` (
`timer_id` int(11) NOT NULL,
`createdate` int(11) NOT NULL,
`content` varchar(200) DEFAULT NULL,
`to_user` varchar(1000) NOT NULL,
`exe_time` int(11) NOT NULL,
`last_time` int(11) DEFAULT NULL,
`next_time` int(11) NOT NULL,
`redis_keys` varchar(100) NOT NULL,
`timer_count` bigint(20) NOT NULL,
PRIMARY KEY (`timer_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

代码

mian.py

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import itchat
from itchat.content import *
import collections_logs
import save_db
import liwang_redis_itchat
import time 
import threading
import os

#定义装饰器,用于监听图片,视频等消息,并且下载下来
@itchat.msg_register([PICTURE, RECORDING, ATTACHMENT, VIDEO],isFriendChat=True, isGroupChat=True)
def download_files(msg):
    logger.debug(msg[FromUserName])
    logger.debug("发送的是语音/视频等,需要保存至本地")
    msg.download(msg.fileName)
    file_value = 127.0.0.1/ + msg.FileName + _ + msg[Type]
    logger.debug(file_value)

    #将消息写入数据库
    if @@ in msg[FromUserName] :
        logger.debug("判断为群聊,将数据写入MySQL数据库itchat_message中")
        sql = "insert into itchat_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[MsgId],msg[User][NickName],msg[User][UserName],msg[ActualNickName],msg[ActualUserName],(file_value))
        save_db.exe_db(db,sql,logger)
    elif (msg[FromUserName] == MyID and @@ in msg[ToUserName]):
        logger.debug("判断为群聊,且消息为自己发送的,将数据写入MySQL数据库itchat_message中")
        sql = "insert into itchat_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[MsgId],msg[User][NickName],msg[User][UserName],myUserName,msg[ActualUserName],(file_value))
        save_db.exe_db(db,sql,logger)
    else :
    #将群聊设置为0,让后面的@抓不到
        isgroup = 0
        #判断是否是自己发送
        if msg[FromUserName] == MyID :
            logger.debug("判断为好友且为自己发送的,将数据写入MySQL数据库itchat_friend_message中")
            sql = "insert into itchat_friend_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[MsgId],myUserName,MyID,mysex,msg[User][NickName],msg[User][UserName],msg[User][Sex],file_value)
            save_db.exe_db(db,sql,logger)
        else:
            logger.debug("判断为好友且为别人发送的,将数据写入MySQL数据库itchat_friend_message中")
            sql = "insert into itchat_friend_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[MsgId],msg[User][NickName],msg[User][UserName],msg[User][Sex],myUserName,MyID,mysex,file_value)
            save_db.exe_db(db,sql,logger)


#定义装饰器,用于监听好友和群聊微信群聊内容
@itchat.msg_register(TEXT, isFriendChat=True, isGroupChat=True)   
def simple_reply(msg):

    #定义群聊
    isgroup = 1

    #将消息写入数据库
    if @@ in msg[FromUserName] :
        logger.debug("判断为群聊,将数据写入MySQL数据库itchat_message中")
        sql = "insert into itchat_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[MsgId],msg[User][NickName],msg[User][UserName],msg[ActualNickName],msg[ActualUserName],(msg[Text]))
        save_db.exe_db(db,sql,logger)
    elif (msg[FromUserName] == MyID and @@ in msg[ToUserName]):
        logger.debug("判断为群聊,且消息为自己发送的,将数据写入MySQL数据库itchat_message中")
        sql = "insert into itchat_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[MsgId],msg[User][NickName],msg[User][UserName],myUserName,msg[ActualUserName],(msg[Text]))
        save_db.exe_db(db,sql,logger)
    else :
        #将群聊设置为0,让后面的@抓不到
        isgroup = 0
        #判断是否是自己发送
        if msg[FromUserName] == MyID :
            logger.debug("判断为好友且为自己发送的,将数据写入MySQL数据库itchat_friend_message中")
            sql = "insert into itchat_friend_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[MsgId],myUserName,MyID,mysex,msg[User][NickName],msg[User][UserName],msg[User][Sex],msg[Text])
            save_db.exe_db(db,sql,logger)
        else:
            logger.debug("判断为好友且为别人发送的,将数据写入MySQL数据库itchat_friend_message中")
            sql = "insert into itchat_friend_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[MsgId],msg[User][NickName],msg[User][UserName],msg[User][Sex],myUserName,MyID,mysex,msg[Text])
            save_db.exe_db(db,sql,logger)

    #将所有的聊天信息都写入到mongodb中
    #暂时不写,mongodb还未掌握基本的使用

    #这里写聊天机器人端口
    #判断是否@了自己,且为群聊
    if isgroup == 1 :
        if msg[isAt] :
            logger.debug("@了自己,移动到redis_itchat")
            from_message = msg[Text]
            send_msg = liwang_redis_itchat.auto_box(from_message,redis_db,logger)
            itchat.send_msg(send_msg,toUserName=msg[ToUserName])
            itchat.send_msg(send_msg,toUserName=msg[FromUserName])
        else:
            logger.debug("没有@自己")


    #空出两个空LOG
    logger.debug("-------------------------------------------------------------------")
    logger.debug("-------------------------------------------------------------------")

def auto_reply():
    logger.debug("============================================================================================")
    
    #获取全部ID sql 
    sql = "select timer_id from auto_timer;"
    #获取结果
    userid_list = save_db.select_db(db,sql,logger)

    #遍历auto_timer ID
    for userid in userid_list:
        #打印正在处理的信息
        sql = "select content from auto_timer where timer_id = ‘%s‘;" %(userid)
        id_content = save_db.select_db(db,sql,logger)[0]
        logger.debug("判断ID:%s" %(userid))
        logger.debug("判断内容:%s" %(id_content))

        #判断时间是否符合
        #获取当前的时间戳
        now_time = int(time.time())
        logger.debug("当前时间为:%s" %(now_time))
        sql = "select next_time from auto_timer where timer_id = ‘%s‘;" %(userid)
        next_time = save_db.select_db(db,sql,logger)[0]
        next_time = int(next_time[0])
        logger.debug("下次执行时间戳:%s" %(next_time))

        #判断,如果当前时间大于或等于下一次执行时间,就执行
        if now_time >= next_time :
            logger.debug("处理内容:%s" %(id_content))

            #获取发送人
            sql = "select to_user from auto_timer where timer_id = ‘%s‘;" %(userid)
            to_user = save_db.select_db(db,sql,logger)[0]
            to_user = to_user[0]
            logger.debug("查找用户为:%s" %(to_user))

            #单用户
            single_user = 1

            #判断字符串是否有分割
            if ; in to_user :
                allocation_user = to_user.split(;)
                logger.debug("需要发送多人,信息为:%s" %(allocation_user))
                single_user = 0
            else:
                allocation_user = to_user
                logger.debug("需要发送一人,信息为:%s" %(allocation_user))

            #判断为单用户,不需要做处理
            if single_user == 1 :
                to_user = to_user + ;
                allocation_user = to_user.split(;)
                


            for send_userid in allocation_user :
                logger.debug("正在处理%s用户" %(send_userid))
                #查找微信是否存在此用户
                try:
                    itcaht_user_name = itchat.search_friends(name=send_userid)[0][UserName]

                    if itcaht_user_name == " ":
                        logger.debug("微信不存在此好友,请检查")
                    else:
                        #获取发送的内容:
                        sql = "select redis_keys from auto_timer where timer_id = ‘%s‘;" %(userid)
                        redis_keys = save_db.select_db(db,sql,logger)[0]
                        redis_keys = str(redis_keys[0])
                        return_values = liwang_redis_itchat.return_rand_keys(redis_db,logger,redis_keys)

                        #获取其他信息
                        sql = "select last_time from auto_timer where timer_id = ‘%s‘;" %(userid)
                        last_time = save_db.select_db(db,sql,logger)[0]
                        last_time = last_time[0]

                        sql = "select timer_count from auto_timer where timer_id = ‘%s‘;" %(userid)
                        timer_count = save_db.select_db(db,sql,logger)[0]
                        timer_count = int(timer_count[0])

                        sql = "select exe_time from auto_timer where timer_id = ‘%s‘;" %(userid)
                        exe_time = save_db.select_db(db,sql,logger)[0]
                        exe_time = int(exe_time[0]) * 60


                        logger.debug("将要发送的信息如下")
                        logger.debug("timer_ID:%s" %(userid))
                        logger.debug("发送简介为:%s" %(id_content))
                        logger.debug("上一次执行的时间戳是:%s" %(last_time))
                        logger.debug("下一次执行的时间戳是:%s" %(next_time))
                        logger.debug("获取Redis的值为:%s" %(redis_keys))
                        logger.debug("间隔描述为:%s" %(exe_time))
                        logger.debug("已经执行次数为:%s" %(timer_count))

                        try:
                            logger.debug("将要发送的消息: %s" %(return_values))
                            itchat.send_msg(return_values,toUserName=itcaht_user_name)
                            
                        except Exception as e:
                            logger.debug("信息发送失败,详细信息如下:")
                            logger.debug(e)
                except Exception as e:
                    logger.debug("未找到该用户信息,详细信息如下:")
                    logger.debug(e)

            try:
                # 更新信息
                last_time = now_time + exe_time
                timer_count = timer_count + 1


                userid = userid[0]

                sql = "update auto_timer set last_time = ‘%s‘ , next_time = ‘%s‘ , timer_count = ‘%s‘ where timer_id = ‘%s‘;"                 %(now_time,last_time,timer_count,userid)

                save_db.exe_db(db,sql,logger)

                logger.debug("更新信息完毕")


            except Exception as e:
                logger.debug("消息已经发送,但是更新数据库失败,详细信息如下:")
                logger.debug(e)
        else:
            logger.debug("判断时间未到")

        #修整60秒
        logger.debug("============================================================================================")
        time.sleep(3)
    time.sleep(60)

if __name__ == "__main__" :
    
    #设置登陆的PK
    loginpk = login_ + str(time.time())
    
    #定义微信自动登录
    itchat.auto_login(enableCmdQR=2,statusStorageDir=loginpk)
#    itchat.auto_login(statusStorageDir=loginpk)

    #获取自己的ID
    global MyID
    MyID = itchat.get_friends(update=True)[0]["UserName"]

    #获取用户信息
    global myUserName
    global mysex
    myUserName = itchat.get_friends()[0][NickName]
    myUserUin = itchat.get_friends()[0][Uin]
    mysignature = itchat.get_friends()[0][Signature]
    mysex = itchat.get_friends()[0][Sex]

    #设置logger
    global logger
    logname = str(myUserName) + _ + str(myUserUin)
    logger = collections_logs.build_logs(logname)
    
    #定义数据库
    global db
    db = save_db.itchat_connect_mysql(logger)
    global redis_db
    redis_db = liwang_redis_itchat.connect_redis(logger)

    #每登陆一次,记录一次login
    logger.debug("记录登陆Log")
    sql = "insert into itchat_login_info values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),MyID,myUserName,mysignature,mysex,myUserUin)
    save_db.exe_db(db,sql,logger)

    #启动线程
    threading._start_new_thread(itchat.run,())

    #开始循环
    while 1:
        itchat.configured_reply()
        auto_reply()

collections_logs.py

 

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import logging
import time

def build_logs(file_name):

    #获取当前时间
    now_time = time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime())
    #设置log名称
    logname = itchat + file_name + now_time
    #定义logger
    logger = logging.getLogger()
    #设置级别为debug
    logger.setLevel(level = logging.DEBUG)
    #设置 logging文件名称
    handler = logging.FileHandler(logname)
    #设置级别为debug
    handler.setLevel(logging.DEBUG)
    #设置log的格式
    formatter = logging.Formatter(%(asctime)s - %(levelname)s - %(message)s)
    #将格式压进logger
    handler.setFormatter(formatter)
    console = logging.StreamHandler()
    console.setLevel(logging.DEBUG)
    #写入logger
    logger.addHandler(handler)
    logger.addHandler(console)
    #将logger返回
    return logger

save_db.py

 

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import pymysql

def itchat_connect_mysql(logger):

    try:
        db = pymysql.connect(host=127.0.0.1,port=3306,user=root,passwd=root,db=itchat,charset=utf8)
        logger.debug(MySQL数据库连接成功)
        logger.debug(数据库ID:%s %(db))
        return db
    except Exception as e:
        logger.debug(MySQL数据库连接失败)
        logger.debug(e)

def exe_db(db,sql,logger):
    try:
        cursor = db.cursor()
        logger.debug(获取游标:%s %(cursor))
        db.ping(reconnect=True)
        logger.debug("数据库重连")
        cursor.execute(sql)
        logger.debug("执行SQL")
        logger.debug(sql)
        db.commit()
        logger.debug("数据库commit")
    except Exception as e:
        logger.debug(SQL执行失败,请至日志查看该SQL记录)
        logger.debug(sql)
        logger.debug(e)

def select_db(db,sql,logger):
    try:
        cursor = db.cursor()
        logger.debug(获取游标:%s %(cursor))
        db.ping(reconnect=True)
        logger.debug("数据库重连")
        cursor.execute(sql)
        logger.debug("执行SQL")
        logger.debug(sql)
        return cursor.fetchall()
    except Exception as e:
        logger.debug(SQL执行失败,请至日志查看该SQL记录)
        logger.debug(sql)
        logger.debug(e)

liwang_redis_itchat.py

 

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import redis
import json
import requests
import random


def connect_redis(logger):
    try:
        r = redis.Redis(127.0.0.1,6379,0,redis)
        logger.debug("Redis数据库连接成功")
        return r
    except Exception as e:
        logger.debug("Redis数据库连接失败")
        logger.debug(e)

def return_rand_keys(redis_db,logger,keys):

    #判断是否有这个key

    if redis_db.exists(keys):

        len_keys = redis_db.llen(keys)
        
        random_int = int(random.randint(0,len_keys-1))
        logger.debug("Redis获取的随机值为:%s" %(random_int))

        return_redis_values = redis_db.lindex(keys,random_int).decode(utf-8)
        logger.debug("Redis将要返回的值为:%s" %(return_redis_values))

        return return_redis_values

    else:
        logger.debug("没有找到相关key")
        return 0

def auto_box(message,redis_db,logger):

    #设置分隔符
    if \u2005 in message :
        myid=@小子\u2005
    else:
        myid = @小子 

    #获取分隔符之后的值
    if len(message.split(myid)) == 1 :
        return "@我了要说话哟"
    else:
        dealwith = message.split(myid)[1]

    if dealwith == " " :
        logger.debug("只是@了我,并没有输入内容")
    else:
        #图灵机器人API接口
        url_api = "http://openapi.tuling123.com/openapi/api/v2"

        #将问题赋值给box_quest
        box_quest = dealwith

        #定义json post 
        request_json = json.dumps ({
            "perception":
            {
                "inputText":
                {
                    "text": box_quest
                },

                "selfInfo":
                {
                    "location":
                    {
                        "city": "成都",
                        "province": "成都",
                        "street": "天府三街"
                    }
                }
            },

            "userInfo": 
            {
                "apiKey": "APIkey",
                "userId": "tuling"
            }
        }
        )

        # 获取POST之后的值
        r = requests.post(url_api, data=request_json)
        r_test = r.text
        r_json = json.loads(r_test)

        #定义返回值
        return_str = (r_json[results][0][values][text])

        logger.debug("图灵机器人将要返回的值:")
        logger.debug(return_str)

        #需要自己建立自己的机器人仓库
        logger.debug("将返回的结果存入Redis数据库中")
        redis_db.rpush(box_quest,return_str)

        return return_str

实现的效果 

如下以log展现

接收消息并且保存至数据库

技术分享图片 

图灵机器人回复 技术分享图片

定时任务

技术分享图片

利用python itchat给女朋友定时发信息

标签:ons   数据库连接   多个   信息   test   cmd   接收   数据库   global   

原文地址:https://www.cnblogs.com/wang-li/p/9744502.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!