标签:ons 数据库连接 多个 信息 test cmd 接收 数据库 global
涉及到的技术有itchat,redis,mysql,最主要的还是mysql咯,当然咯,这么多东西,我就只介绍我代码需要用到的,其他的,如果需要了解的话,就需要看参考资料了哟
实现的功能:
1.可以保存微信的消息,包括群聊和好友(文字/视频/语音/图片)
2.在群里@自己,可以调用图灵机器人的API进行文字回复(类似于机器人)
3.调用定时任务,在指定时间发送消息至某人
需要了解的基础:
1.python基础
2.mysql基础
3.redis基础
只需要在数据库中填写相应的数据,包括,发送给谁(to_user),如果有多个,则用分号(;)分开,执行间隔分钟数(exe_time),如,1440则代表一天,下一次执行时间戳(next_time),主要是抓这个时间来进行发送,抓取的键值(redis_keys)
发送的效果是这样的:
官方参考文档:https://itchat.readthedocs.io/zh/latest/
pip install itchat / pip3 install itchat
#产生二维码 itchat.auto_login() #定义用户的昵称 send_userid=‘用户的昵称‘ #查找用户的userid itcaht_user_name = itchat.search_friends(name=send_userid)[0][‘UserName‘] #利用send_msg发送消息 itchat.send_msg(‘这是一个测试‘,toUserName=itcaht_user_name)
#!/usr/bin/env python3 import itchat #产生二维码 itchat.auto_login() #定义用户的昵称 send_userid=‘亲爱的‘ #查找用户的userid itcaht_user_name = itchat.search_friends(name=send_userid)[0][‘UserName‘] #利用send_msg发送消息 itchat.send_msg(‘这是一个测试‘,toUserName=itcaht_user_name)
在执行之后,会弹出一个二维码,然后会让你登录到网页微信,紧接着会去搜索用户信息,搜索到了之后会去取用户的ID,这个ID是微信随机给的,然后利用send_msg进行发送信息
参考文档:http://www.runoob.com/python3/python3-mysql.html
pip install pymysql / pip3 install pymysql
db = pymysql.connect(host=‘hostname‘,port=port,user=‘username‘,passwd=‘password‘,db=‘databasename‘,charset=‘charset‘)
填写正确的信息后就成功的连接了mysql,注意,port这里,端口号不能加‘‘,否则会连接不上mysql
#连接DB db = pymysql.connect(host=‘127.0.0.1‘,port=3306,user=‘root‘,passwd=‘root‘,db=‘test_1‘,charset=‘utf8‘) #编写SQL sql = "insert into test_message values (%d);" %(int(time.time())) #获取游标 cursor = db.cursor() #数据库重连 db.ping(reconnect=True) #执行SQL cursor.execute(sql) #commit db.commit()
执行完毕后,如果正确的话,就可以看到表中有数据了,当然,数据库和数据表得自己去建立才行,如果没有数据,则需要看看程序的输出结果了,还有一点,在做DML的时候,须要加上commit,这是事务的一个特性,否则数据会丢失的
#连接DB db = pymysql.connect(host=‘127.0.0.1‘,port=3306,user=‘root‘,passwd=‘root‘,db=‘test_1‘,charset=‘utf8‘) #编写SQL sql = "select * from test_message" #获取游标 cursor = db.cursor() #数据库重连 db.ping(reconnect=True) #执行SQL cursor.execute(sql) #获取全部结果 cursor.fetchall() #返回单个数据 cursor.fetchone()
执行完毕后,如果正确的话,就可以看到结果了
参考文档:https://www.cnblogs.com/xiaoming279/p/6293583.html
pip install redis / pip3 install redis
r = redis.Redis(‘host‘,port,db_port,‘password‘)
r = redis.Redis(‘localhost‘,6379,0,‘redis‘) keys = ‘123‘ if r.exists(keys): print ("存在") else: print ("不存在")
r = redis.Redis(‘localhost‘,6379,0,‘redis‘) len_keys = r.llen(keys) random_int = int(random.randint(0,len_keys-1)) print (r.lindex(keys,random_int).decode(‘utf-8‘))
基本上在下面代码中,几乎会用到如上基础
CREATE DATABASE `itchat` /*!40100 DEFAULT CHARACTER SET utf8mb4 */
itchat_login_info
CREATE TABLE `itchat_login_info` ( `createdate` int(11) NOT NULL, `myid` varchar(128) DEFAULT NULL, `username` varchar(128) DEFAULT NULL, `mysignature` varchar(128) DEFAULT NULL, `mysex` varchar(4) DEFAULT NULL, `myuseruin` varchar(128) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
itchat_message
CREATE TABLE `itchat_message` ( `createdate` int(11) NOT NULL, `msgid` varchar(128) NOT NULL, `nickname` varchar(128) DEFAULT NULL, `username` varchar(128) DEFAULT NULL, `actualnickname` varchar(128) DEFAULT NULL, `actualusername` varchar(128) DEFAULT NULL, `msgtext` varchar(5000) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
itchat_friend_message
CREATE TABLE `itchat_friend_message` ( `createdate` int(11) NOT NULL, `msgid` varchar(128) NOT NULL, `fromuser` varchar(128) NOT NULL, `fromuserid` varchar(128) NOT NULL, `fromsex` varchar(4) NOT NULL, `touser` varchar(128) NOT NULL, `touserid` varchar(128) NOT NULL, `tousersex` varchar(4) NOT NULL, `msgtext` varchar(5000) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
auto_timer
CREATE TABLE `auto_timer` ( `timer_id` int(11) NOT NULL, `createdate` int(11) NOT NULL, `content` varchar(200) DEFAULT NULL, `to_user` varchar(1000) NOT NULL, `exe_time` int(11) NOT NULL, `last_time` int(11) DEFAULT NULL, `next_time` int(11) NOT NULL, `redis_keys` varchar(100) NOT NULL, `timer_count` bigint(20) NOT NULL, PRIMARY KEY (`timer_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import itchat from itchat.content import * import collections_logs import save_db import liwang_redis_itchat import time import threading import os #定义装饰器,用于监听图片,视频等消息,并且下载下来 @itchat.msg_register([PICTURE, RECORDING, ATTACHMENT, VIDEO],isFriendChat=True, isGroupChat=True) def download_files(msg): logger.debug(msg[‘FromUserName‘]) logger.debug("发送的是语音/视频等,需要保存至本地") msg.download(msg.fileName) file_value = ‘127.0.0.1/‘ + msg.FileName + ‘_‘ + msg[‘Type‘] logger.debug(file_value) #将消息写入数据库 if ‘@@‘ in msg[‘FromUserName‘] : logger.debug("判断为群聊,将数据写入MySQL数据库itchat_message中") sql = "insert into itchat_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[‘MsgId‘],msg[‘User‘][‘NickName‘],msg[‘User‘][‘UserName‘],msg[‘ActualNickName‘],msg[‘ActualUserName‘],(file_value)) save_db.exe_db(db,sql,logger) elif (msg[‘FromUserName‘] == MyID and ‘@@‘ in msg[‘ToUserName‘]): logger.debug("判断为群聊,且消息为自己发送的,将数据写入MySQL数据库itchat_message中") sql = "insert into itchat_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[‘MsgId‘],msg[‘User‘][‘NickName‘],msg[‘User‘][‘UserName‘],myUserName,msg[‘ActualUserName‘],(file_value)) save_db.exe_db(db,sql,logger) else : #将群聊设置为0,让后面的@抓不到 isgroup = 0 #判断是否是自己发送 if msg[‘FromUserName‘] == MyID : logger.debug("判断为好友且为自己发送的,将数据写入MySQL数据库itchat_friend_message中") sql = "insert into itchat_friend_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[‘MsgId‘],myUserName,MyID,mysex,msg[‘User‘][‘NickName‘],msg[‘User‘][‘UserName‘],msg[‘User‘][‘Sex‘],file_value) save_db.exe_db(db,sql,logger) else: logger.debug("判断为好友且为别人发送的,将数据写入MySQL数据库itchat_friend_message中") sql = "insert into itchat_friend_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[‘MsgId‘],msg[‘User‘][‘NickName‘],msg[‘User‘][‘UserName‘],msg[‘User‘][‘Sex‘],myUserName,MyID,mysex,file_value) save_db.exe_db(db,sql,logger) #定义装饰器,用于监听好友和群聊微信群聊内容 @itchat.msg_register(TEXT, isFriendChat=True, isGroupChat=True) def simple_reply(msg): #定义群聊 isgroup = 1 #将消息写入数据库 if ‘@@‘ in msg[‘FromUserName‘] : logger.debug("判断为群聊,将数据写入MySQL数据库itchat_message中") sql = "insert into itchat_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[‘MsgId‘],msg[‘User‘][‘NickName‘],msg[‘User‘][‘UserName‘],msg[‘ActualNickName‘],msg[‘ActualUserName‘],(msg[‘Text‘])) save_db.exe_db(db,sql,logger) elif (msg[‘FromUserName‘] == MyID and ‘@@‘ in msg[‘ToUserName‘]): logger.debug("判断为群聊,且消息为自己发送的,将数据写入MySQL数据库itchat_message中") sql = "insert into itchat_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[‘MsgId‘],msg[‘User‘][‘NickName‘],msg[‘User‘][‘UserName‘],myUserName,msg[‘ActualUserName‘],(msg[‘Text‘])) save_db.exe_db(db,sql,logger) else : #将群聊设置为0,让后面的@抓不到 isgroup = 0 #判断是否是自己发送 if msg[‘FromUserName‘] == MyID : logger.debug("判断为好友且为自己发送的,将数据写入MySQL数据库itchat_friend_message中") sql = "insert into itchat_friend_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[‘MsgId‘],myUserName,MyID,mysex,msg[‘User‘][‘NickName‘],msg[‘User‘][‘UserName‘],msg[‘User‘][‘Sex‘],msg[‘Text‘]) save_db.exe_db(db,sql,logger) else: logger.debug("判断为好友且为别人发送的,将数据写入MySQL数据库itchat_friend_message中") sql = "insert into itchat_friend_message values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),msg[‘MsgId‘],msg[‘User‘][‘NickName‘],msg[‘User‘][‘UserName‘],msg[‘User‘][‘Sex‘],myUserName,MyID,mysex,msg[‘Text‘]) save_db.exe_db(db,sql,logger) #将所有的聊天信息都写入到mongodb中 #暂时不写,mongodb还未掌握基本的使用 #这里写聊天机器人端口 #判断是否@了自己,且为群聊 if isgroup == 1 : if msg[‘isAt‘] : logger.debug("@了自己,移动到redis_itchat") from_message = msg[‘Text‘] send_msg = liwang_redis_itchat.auto_box(from_message,redis_db,logger) itchat.send_msg(send_msg,toUserName=msg[‘ToUserName‘]) itchat.send_msg(send_msg,toUserName=msg[‘FromUserName‘]) else: logger.debug("没有@自己") #空出两个空LOG logger.debug("-------------------------------------------------------------------") logger.debug("-------------------------------------------------------------------") def auto_reply(): logger.debug("============================================================================================") #获取全部ID sql sql = "select timer_id from auto_timer;" #获取结果 userid_list = save_db.select_db(db,sql,logger) #遍历auto_timer ID for userid in userid_list: #打印正在处理的信息 sql = "select content from auto_timer where timer_id = ‘%s‘;" %(userid) id_content = save_db.select_db(db,sql,logger)[0] logger.debug("判断ID:%s" %(userid)) logger.debug("判断内容:%s" %(id_content)) #判断时间是否符合 #获取当前的时间戳 now_time = int(time.time()) logger.debug("当前时间为:%s" %(now_time)) sql = "select next_time from auto_timer where timer_id = ‘%s‘;" %(userid) next_time = save_db.select_db(db,sql,logger)[0] next_time = int(next_time[0]) logger.debug("下次执行时间戳:%s" %(next_time)) #判断,如果当前时间大于或等于下一次执行时间,就执行 if now_time >= next_time : logger.debug("处理内容:%s" %(id_content)) #获取发送人 sql = "select to_user from auto_timer where timer_id = ‘%s‘;" %(userid) to_user = save_db.select_db(db,sql,logger)[0] to_user = to_user[0] logger.debug("查找用户为:%s" %(to_user)) #单用户 single_user = 1 #判断字符串是否有分割 if ‘;‘ in to_user : allocation_user = to_user.split(‘;‘) logger.debug("需要发送多人,信息为:%s" %(allocation_user)) single_user = 0 else: allocation_user = to_user logger.debug("需要发送一人,信息为:%s" %(allocation_user)) #判断为单用户,不需要做处理 if single_user == 1 : to_user = to_user + ‘;‘ allocation_user = to_user.split(‘;‘) for send_userid in allocation_user : logger.debug("正在处理%s用户" %(send_userid)) #查找微信是否存在此用户 try: itcaht_user_name = itchat.search_friends(name=send_userid)[0][‘UserName‘] if itcaht_user_name == " ": logger.debug("微信不存在此好友,请检查") else: #获取发送的内容: sql = "select redis_keys from auto_timer where timer_id = ‘%s‘;" %(userid) redis_keys = save_db.select_db(db,sql,logger)[0] redis_keys = str(redis_keys[0]) return_values = liwang_redis_itchat.return_rand_keys(redis_db,logger,redis_keys) #获取其他信息 sql = "select last_time from auto_timer where timer_id = ‘%s‘;" %(userid) last_time = save_db.select_db(db,sql,logger)[0] last_time = last_time[0] sql = "select timer_count from auto_timer where timer_id = ‘%s‘;" %(userid) timer_count = save_db.select_db(db,sql,logger)[0] timer_count = int(timer_count[0]) sql = "select exe_time from auto_timer where timer_id = ‘%s‘;" %(userid) exe_time = save_db.select_db(db,sql,logger)[0] exe_time = int(exe_time[0]) * 60 logger.debug("将要发送的信息如下") logger.debug("timer_ID:%s" %(userid)) logger.debug("发送简介为:%s" %(id_content)) logger.debug("上一次执行的时间戳是:%s" %(last_time)) logger.debug("下一次执行的时间戳是:%s" %(next_time)) logger.debug("获取Redis的值为:%s" %(redis_keys)) logger.debug("间隔描述为:%s" %(exe_time)) logger.debug("已经执行次数为:%s" %(timer_count)) try: logger.debug("将要发送的消息: %s" %(return_values)) itchat.send_msg(return_values,toUserName=itcaht_user_name) except Exception as e: logger.debug("信息发送失败,详细信息如下:") logger.debug(e) except Exception as e: logger.debug("未找到该用户信息,详细信息如下:") logger.debug(e) try: # 更新信息 last_time = now_time + exe_time timer_count = timer_count + 1 userid = userid[0] sql = "update auto_timer set last_time = ‘%s‘ , next_time = ‘%s‘ , timer_count = ‘%s‘ where timer_id = ‘%s‘;" %(now_time,last_time,timer_count,userid) save_db.exe_db(db,sql,logger) logger.debug("更新信息完毕") except Exception as e: logger.debug("消息已经发送,但是更新数据库失败,详细信息如下:") logger.debug(e) else: logger.debug("判断时间未到") #修整60秒 logger.debug("============================================================================================") time.sleep(3) time.sleep(60) if __name__ == "__main__" : #设置登陆的PK loginpk = ‘login_‘ + str(time.time()) #定义微信自动登录 itchat.auto_login(enableCmdQR=2,statusStorageDir=loginpk) # itchat.auto_login(statusStorageDir=loginpk) #获取自己的ID global MyID MyID = itchat.get_friends(update=True)[0]["UserName"] #获取用户信息 global myUserName global mysex myUserName = itchat.get_friends()[0][‘NickName‘] myUserUin = itchat.get_friends()[0][‘Uin‘] mysignature = itchat.get_friends()[0][‘Signature‘] mysex = itchat.get_friends()[0][‘Sex‘] #设置logger global logger logname = str(myUserName) + ‘_‘ + str(myUserUin) logger = collections_logs.build_logs(logname) #定义数据库 global db db = save_db.itchat_connect_mysql(logger) global redis_db redis_db = liwang_redis_itchat.connect_redis(logger) #每登陆一次,记录一次login logger.debug("记录登陆Log") sql = "insert into itchat_login_info values (%d,‘%s‘,‘%s‘,‘%s‘,‘%s‘,‘%s‘);" %(int(time.time()),MyID,myUserName,mysignature,mysex,myUserUin) save_db.exe_db(db,sql,logger) #启动线程 threading._start_new_thread(itchat.run,()) #开始循环 while 1: itchat.configured_reply() auto_reply()
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import logging import time def build_logs(file_name): #获取当前时间 now_time = time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime()) #设置log名称 logname = ‘itchat‘ + file_name + now_time #定义logger logger = logging.getLogger() #设置级别为debug logger.setLevel(level = logging.DEBUG) #设置 logging文件名称 handler = logging.FileHandler(logname) #设置级别为debug handler.setLevel(logging.DEBUG) #设置log的格式 formatter = logging.Formatter(‘%(asctime)s - %(levelname)s - %(message)s‘) #将格式压进logger handler.setFormatter(formatter) console = logging.StreamHandler() console.setLevel(logging.DEBUG) #写入logger logger.addHandler(handler) logger.addHandler(console) #将logger返回 return logger
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import pymysql def itchat_connect_mysql(logger): try: db = pymysql.connect(host=‘127.0.0.1‘,port=3306,user=‘root‘,passwd=‘root‘,db=‘itchat‘,charset=‘utf8‘) logger.debug(‘MySQL数据库连接成功‘) logger.debug(‘数据库ID:%s‘ %(db)) return db except Exception as e: logger.debug(‘MySQL数据库连接失败‘) logger.debug(e) def exe_db(db,sql,logger): try: cursor = db.cursor() logger.debug(‘获取游标:%s‘ %(cursor)) db.ping(reconnect=True) logger.debug("数据库重连") cursor.execute(sql) logger.debug("执行SQL") logger.debug(sql) db.commit() logger.debug("数据库commit") except Exception as e: logger.debug(‘SQL执行失败,请至日志查看该SQL记录‘) logger.debug(sql) logger.debug(e) def select_db(db,sql,logger): try: cursor = db.cursor() logger.debug(‘获取游标:%s‘ %(cursor)) db.ping(reconnect=True) logger.debug("数据库重连") cursor.execute(sql) logger.debug("执行SQL") logger.debug(sql) return cursor.fetchall() except Exception as e: logger.debug(‘SQL执行失败,请至日志查看该SQL记录‘) logger.debug(sql) logger.debug(e)
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import redis import json import requests import random def connect_redis(logger): try: r = redis.Redis(‘127.0.0.1‘,6379,0,‘redis‘) logger.debug("Redis数据库连接成功") return r except Exception as e: logger.debug("Redis数据库连接失败") logger.debug(e) def return_rand_keys(redis_db,logger,keys): #判断是否有这个key if redis_db.exists(keys): len_keys = redis_db.llen(keys) random_int = int(random.randint(0,len_keys-1)) logger.debug("Redis获取的随机值为:%s" %(random_int)) return_redis_values = redis_db.lindex(keys,random_int).decode(‘utf-8‘) logger.debug("Redis将要返回的值为:%s" %(return_redis_values)) return return_redis_values else: logger.debug("没有找到相关key") return 0 def auto_box(message,redis_db,logger): #设置分隔符 if ‘\u2005‘ in message : myid=‘@小子\u2005‘ else: myid = ‘@小子 ‘ #获取分隔符之后的值 if len(message.split(myid)) == 1 : return "@我了要说话哟" else: dealwith = message.split(myid)[1] if dealwith == " " : logger.debug("只是@了我,并没有输入内容") else: #图灵机器人API接口 url_api = "http://openapi.tuling123.com/openapi/api/v2" #将问题赋值给box_quest box_quest = dealwith #定义json post request_json = json.dumps ({ "perception": { "inputText": { "text": box_quest }, "selfInfo": { "location": { "city": "成都", "province": "成都", "street": "天府三街" } } }, "userInfo": { "apiKey": "APIkey", "userId": "tuling" } } ) # 获取POST之后的值 r = requests.post(url_api, data=request_json) r_test = r.text r_json = json.loads(r_test) #定义返回值 return_str = (r_json[‘results‘][0][‘values‘][‘text‘]) logger.debug("图灵机器人将要返回的值:") logger.debug(return_str) #需要自己建立自己的机器人仓库 logger.debug("将返回的结果存入Redis数据库中") redis_db.rpush(box_quest,return_str) return return_str
如下以log展现
标签:ons 数据库连接 多个 信息 test cmd 接收 数据库 global
原文地址:https://www.cnblogs.com/wang-li/p/9744502.html