码迷,mamicode.com
首页 > 微信 > 详细

python发送微信

时间:2019-02-02 18:58:15      阅读:213      评论:0      收藏:0      [点我收藏+]

标签:socket   exec   业务逻辑   exist   个数   env   lob   收集   使用   

申请企业微信

使用python发送信息到企业微信,同时支持python2与python3环境,需要先申请一个企业微信,然后创建应用,获取以下三个信息

企业IP、Agentid、Secret

技术图片

网信为创建的应用名称

 

技术图片

 脚本描述

 将以上三个信息替换到脚本中,主要是

class WeiXin(object):部分,其他的辅助性工具类,收集的一些常用脚本可不用关注
#!/usr/bin/env python 
#coding=utf-8
‘‘‘
Created on 2018年2月8日

@author: root
‘‘‘


from datetime import datetime
import sys, os, re, json,socket,time
from subprocess import Popen, PIPE
from sys import version_info
if version_info.major == 3 and version_info.minor >=3:
    import urllib.request as urllib2
    pyversion = 3
elif version_info.major == 3:
    pyversion = 3
else:
    import urllib2
    pyversion = 2


try:
    if version_info.major and version_info.major == 3:
        pyversion=3
    elif version_info.major and version_info.major == 2:
        pyversion=2
    else:
        pyversion=2
except Exception as e:
    pyversion = 2

localpath = os.path.split(os.path.realpath(__file__))[0]


class OSCmd():
    ‘‘‘
    OS Command:直接可调用执行命令的方法,不包括业务逻辑
    本脚本为分好层次的项目中抽出出来的方法,归为3个类,一个是命令类OSCmd,一个是系统检查类;
    为保持代码统计,命令类OSCmd是项目是调试好的代码复制过来的,不在此脚本中修改,每次都去项目中取相应的方法
    系统检查逻辑类可以修改
    ‘‘‘

    def __init__(self):
        ‘‘‘
        Constructor
        ‘‘‘

    def exes(self, cmd_shell):
        ‘‘‘
        call shell command
        ‘‘‘
        s = Popen(cmd_shell, shell=True, stdout=PIPE);
        if pyversion == 3:
            s = s.encode(encoding="utf-8")
        return (s.communicate()[0]).strip(\n)

    def getexesfstval(self, cmd_shell):
        ‘‘‘
        call shell command
        比如在通过ps -ef过滤进程号的时候,在eclipse中执行可以返回正确的结果,然后在shell在测试脚本时却多返回一个数字(比如13742),这里取第一个数字,舍弃多返回的数字
        ‘‘‘
        s = Popen(cmd_shell, shell=True, stdout=PIPE);
        res = (s.communicate()[0]).strip(\n)
        ress = res.split(\n)
        return ress[0]

    def exef(self, filename, args):
        ‘‘‘
        filename : the file is needed to exec as the way like "./filename args"
        args: list []
        for exp: oscmd.exef(/scripts/test/t2.py, [a,b])
        ‘‘‘
        args.insert(0, ‘‘)
        if os.path.exists(filename):
            os.execv(filename, args)
        else:
            print(The {0} is not exist.format(filename))

    def getLineFromFile(self, targetFile, *param):
        ‘‘‘
        文件中,返回某行记录,适合小文件
        f:返回首行
        l:返回末行
        n:返回第n行,n为正整数
        默认返回最后一行数据
        ‘‘‘
        global f
        try:
            f = open(targetFile);
            pnum = len(param);
            with open(targetFile, r) as f:  # 打开文件,适合小文件
                lines = f.readlines()  # 读取所有行
                first_line = lines[0]  # 取第一行
                last_line = lines[-1]  # 取最后一行

            if pnum > 0:
                if type(param[0]) == type(a) and param[0].lower() == f:
                    return first_line
                elif type(param[0]) == type(a) and param[0].lower() == l:
                    return last_line
                else:
                    return lines[int(param[0]) - 1]
            return last_line
        finally:
            f.close();

    

    def timeminustoS(self, t1, t2):
        t1 = time.localtime(t1)
        t1 = time.strftime("%Y-%m-%d %H:%M:%S", t1)
        t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S")

        # t2=time.localtime(t2)
        t2 = time.strftime("%Y-%m-%d %H:%M:%S", t2)
        t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S")
        return self.total_seconds(t2 - t1)

    def total_seconds(self, time_delta):
        ‘‘‘
        python 2.6 has not total_seconds method
        ‘‘‘
        return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6

    def rmfileFrmNow(self,p_savenum, targetDir, mins, *contain):
        ‘‘‘
        删除指定目录下指定分钟之前的文件,不删除目录,也不递归目录

        删除/backup/rman/backdb目录下超过30个小时的文件
        rmfileFrmNow(/backup/rman/backdb, 30*60)

        删除/backup/rman/backdb目录下超过30个小时,包含_MEMDB_20字符的文件
        rmfileFrmNow(/backup/rman/backdb, 30*60,_MEMDB_20)

        删除/backup/rman/backdb目录下超过30个小时,同时包含back_full_、_MEMDB_20字符的文件
        rmfileFrmNow(/backup/rman/backdb, 30*60,back_full,_MEMDB_20)
        ‘‘‘
        clen = len(contain)
        defilist = []
        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                if clen > 0:
                    for c in contain:
                        if c in str(fil):
                            defilist.append(fil)
        #排序
        defilist = self.get_filist_bytime(defilist)
        lsz = len(defilist)
        if  lsz > p_savenum:
            defilist = defilist[0,lsz - p_savenum]

        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                flag = True
                if clen > 0:
                    for c in contain:
                        if not c in str(fil):
                            flag = False
                if flag:
                    fil = os.path.join(targetDir, fil)
                    if os.path.isfile(fil):
                        if self.isBeforeMins(fil, mins):
                            os.remove(fil)

    def isBeforeMins(self, fil, mins):
        ‘‘‘
        判断一个文件的最后修改时间距离当前时间,是否超过指定的分钟数
        ‘‘‘
        if os.path.isfile(fil):
            mtfile = os.path.getmtime(fil)
            tnow = time.localtime()
            sec = self.timeminustoS(mtfile, tnow)
            mms = round(sec / 60)
            mins = eval(mins)
            if mms - mins > 0:
                return True
            return False

    def isMorthanSize(self, fil, siz):
        ‘‘‘
        判断一个文件是否超过指定的大小,单位为M
        ‘‘‘
        if os.path.isfile(fil):
            filsiz = os.path.getsize(fil)
            fsiz = eval(siz)*1024*1024
            if filsiz - fsiz > 0:
                return True
            return False

    def rmfilMorthanSize(self, targetDir, siz, *contain):
        ‘‘‘
        删除指定目录下超过指定大小的文件,不删除目录,也不递归目录

        删除/backup/rman/backdb目录下超过2G大小的文件
        rmfileFrmNow(/backup/rman/backdb, 2*1024)

        删除/backup/rman/backdb目录下超过10G大小,包含_MEMDB_20字符的文件
        rmfileFrmNow(/backup/rman/backdb, 10*1024,_MEMDB_20)

        删除/backup/rman/backdb目录下超过3G大小,同时包含back_full_、_MEMDB_20字符的文件
        rmfileFrmNow(/backup/rman/backdb, 3*1024,back_full,_MEMDB_20)
        ‘‘‘
        clen = len(contain)
        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                flag = True
                if clen > 0:
                    for c in contain:
                        if not c in str(fil):
                            flag = False
                if flag:
                    fil = os.path.join(targetDir, fil)
                    if os.path.isfile(fil):
                        if self.isMorthanSize(fil, siz):
                            os.remove(fil)


    def mkdir(self, dr, *mod):
        # import stat
        s1 = str(dr)
        if s1.startswith("~/"):
            s1 = s1[1:]
            homedir = os.environ[HOME]
            s1 = %s%s % (homedir, s1)
        if not os.path.exists(s1):
            cmd_shell = mkdir -p %s % (s1)
            # os.mkdir(dir)  这个命令不识别linux 用户home目录“~”符号
            self.exes(cmd_shell)
            p_num = len(mod)
            chmod = chmod -R 755 %s % (s1)
            if p_num == 1:
                chmod = chmod -R %s %s % (mod[0], s1)
                self.exes(chmod)
                # os.chmod(dir, mod)
            else:
                # os.chmod(dir, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH)   该行会抛出异常 TypeError: coercing to Unicode: need string or buffer, builtin_function_or_method found
                self.exes(chmod)
            return s1

    def mknod(self, filename, *mod):
        s1 = str(filename)
        if s1.startswith("~/"):
            s1 = s1[1:]
            homedir = os.environ[HOME]
            s1 = %s%s % (homedir, s1)
        if not os.path.exists(s1):
            cmd_shell = touch %s % (s1)
            self.exes(cmd_shell)
            p_num = len(mod)
            chmod = chmod -R 644 %s % (s1)
            if p_num == 1:
                chmod = chmod -R %s %s % (mod[0], s1)
                self.exes(chmod)
            else:
                self.exes(chmod)
        return s1

    def get_filist_bytime(self,file_path):
        dir_list = os.listdir(file_path)
        if not dir_list:
            return
        else:
            # 注意,这里使用lambda表达式,将文件按照最后修改时间顺序升序排列
            # os.path.getmtime() 函数是获取文件最后修改时间
            # os.path.getctime() 函数是获取文件最后创建时间
            dir_list = sorted(dir_list, key=lambda x: os.path.getmtime(os.path.join(file_path, x)))
            return dir_list

    def shichaByMin(self, t1, t2):
        ‘‘‘
        计算以下三种类型之间的时间差
    time.time()-浮点型,time.localtime()-struct_time型、datetime.now()-datetime型
        ‘‘‘
        t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S")
        t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S")
        return round(self.total_seconds(t2 - t1) / 60)

    def total_seconds(self, time_delta):
        ‘‘‘
        python 2.6 has not total_seconds method
        ‘‘‘
        return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6


class Properties:
    # 脚本默认配置路径
    oscheck_properties = /tmp/.python-eggs/.oscheck.properties
    file_name = ‘‘
    oscmd = OSCmd()

    def __init__(self, file_name=.oscheck.properties):
        ‘‘‘
        将配置文件转化为列表,以列表的读取方式进行值的替换
        ‘‘‘
        dr = /tmp/.python-eggs/
        if not os.path.exists(dr):  # 当目录以~开头时该行永为True,但下面的语句会自判断
            self.oscmd.mkdir(dr,777)

        if not os.path.exists(file_name):  # 当目录以~开头时该行永为True,但下面的语句会自判断
            file_name = self.oscmd.mknod(file_name,666)
            # os.mknod(file_name)   ~

        self.file_name = file_name
        self.properties = {}
        try:
            fopen = open(self.file_name, r)
            for line in fopen:
                line = line.strip()
                if line.find(=) > 0 and not line.startswith(#):
                    strs = line.split(=)
                    self.properties[strs[0].strip()] = strs[1].strip()
        except Exception as e:
            raise e
        else:
            fopen.close()

    def has_key(self, key):
        return key in self.properties

    def keys(self, key):
        return self.properties.keys();

    def get(self, key, default_value=‘‘):
        if key in self.properties:
            return self.properties[key]
        return default_value

    def put(self, key, value):
        self.properties[key] = value
        self.replace_property(self.file_name, key + =.*, key + = + str(value), True)

    def putjson(self, key, values):
        ‘‘‘
        以json的形式存储prop中的value,可以处理一些特殊字符,比如=
        ‘‘‘
        pj = {key: values}
        value = json.dumps(pj)
        self.put(key, value)

    def getjsonvalue(self, key, default_value=‘‘):
        ‘‘‘
        以json的形式存储prop中的value,可以处理一些特殊字符,比如=
        ‘‘‘
        if key in self.properties:
            value = self.get(key)
            valuedict = json.loads(value)
            return valuedict[key]
        return default_value

    def toasc(self, ss):
        asclst = []
        for em in bytearray(ss):
            asclst.append(em)
        return asclst

    def asctostr(self, asclst):
        ss = ‘‘
        for em in asclst:
            ss += str(chr(em))
        return ss

    def putpwd(self, key, value):
        ‘‘‘
        字符串以ASCII码存储
        ‘‘‘
        asclst = self.toasc(value)
        self.putjson(key, asclst)

    def getpwd(self, key):
        asclst = self.getjsonvalue(key)
        pwd = self.asctostr(asclst)
        return pwd

    def replace_property(self, file_name, from_regex, to_str, append_on_not_exists=True):
        ‘‘‘
        新写入数据后,替换文件以永久保存
        :param file_name:
        :param from_regex:
        :param to_str:
        :param append_on_not_exists:
        :return:
        ‘‘‘
        import tempfile
        tmpfile = tempfile.TemporaryFile()

        if os.path.exists(file_name):
            r_open = open(file_name, r)
            pattern = re.compile(r‘‘ + from_regex)
            found = None
            for line in r_open:
                if pattern.search(line) and not line.strip().startswith(#):
                    found = True
                    line = re.sub(from_regex, to_str, line)
                if pyversion == 3:
                    line = line.encode(encoding="utf-8")
                tmpfile.write(line)
            if not found and append_on_not_exists:
                to_str = \n + to_str
                if pyversion == 3:
                    to_str = to_str.encode(encoding="utf-8")
                tmpfile.write(to_str)
            r_open.close()
            tmpfile.seek(0)

            content = tmpfile.read()

            if os.path.exists(file_name):
                os.remove(file_name)

            w_open = open(file_name, w)
            if pyversion == 3:
                content = content.decode(utf-8)
            w_open.write(content)
            w_open.close()
            tmpfile.close()
        else:
            print("file %s not found" % (file_name))


def parsePro(file_name=/tmp/.python-eggs/.oscheck.properties):
    return Properties(file_name)


class Log:
    def __init__(self):
        ‘‘‘
        Constructor
        ‘‘‘

    logfile = os.path.join(localpath, scheck.log)

    def setlog(self, logfile):
        self.logfile = logfile

    def log(self, strs, sp=a):
        open(self.logfile, sp).write(%s\n % (strs))

    def inserttime(self, sp=a):
        now = datetime.now()
        strs = now.strftime(%Y-%m-%d %H:%M:%S)
        oscmd = OSCmd()
        oscmd.mknod(self.logfile, 666)
        open(self.logfile, sp).write(%s\n % (strs))

    def frmMsg(self,content):
        ‘‘‘
        格式化信息输出
        :param content:
        :return:
        ‘‘‘
        curtime = datetime.now().strftime(%Y-%m-%d %H:%M:%S)
        hname = socket.gethostname()
        IPADDR = socket.gethostbyname(hname)
        cnt = "内容:{0} <br/>时间:{1} <br/>信息来自 {2} {3}".format(content,curtime,hname,IPADDR)
        return cnt


class WeiXin(object):
    ‘‘‘
    发送微信
    ‘‘‘
    token_url = https://qyapi.weixin.qq.com/cgi-bin/gettoken
    cropmsg ={
        corpid : wwe***2ed*********,
        corpsecret : Mgyi*****ahx3O-******HkLfg 
        }
    sendmsg = {}
    access_token_key="weixin_access_token"
    time_token_key="weixin_tokenkey_gettime"
    oscmd = OSCmd()
    prop = parsePro()
    log = Log()

    def __init__(self):
        ‘‘‘
        Constructor
        ‘‘‘

    def formatContent(self,content):
        cnt=self.log.frmMsg(content)
        return cnt
        
    def setMsg(self,param):
        arg_num=len(param)
        content = param[0]
        content = self.formatContent(content)
        if pyversion == 2:
            content = content.decode(utf-8)


        if arg_num == 1 :
            touser="@all"
            agentid="1000009"
        elif arg_num == 2 :
            touser=param[1]
            agentid="1000009"
        elif arg_num == 3 :
            touser=param[1]
            agentid=param[2]
            
        self.sendmsg = {
              "touser":touser,
              "agentid":agentid,
              "msgtype": "text",
              "text":{
                      "content":content
                      }
              }

    def updateToken(self):
        access_token_response = self.geturl(self.token_url, self.cropmsg)
        access_token = (json.loads(access_token_response)[access_token]).encode(utf-8)
        self.prop.put(self.access_token_key, access_token)
        self.prop.put(self.time_token_key, datetime.now().strftime(%Y-%m-%d %H:%M:%S))


    def get_access_token(self):
        ‘‘‘
        获取访问凭证,经过编码的一串字符串数据
        每两个小时取一次即可
        ‘‘‘
        if not self.prop.has_key(self.time_token_key):
            self.updateToken()
        else:
            curtime = datetime.now().strftime(%Y-%m-%d %H:%M:%S)
            oldtime = self.prop.get(self.time_token_key)
            shicha = self.oscmd.shichaByMin(oldtime,curtime)
            if shicha > 110:
                self.updateToken()

        return self.prop.get(self.access_token_key)
    

    def encodeurl(self,dic):
        ‘‘‘
        将字典转换为url参数传值方式,key1=value1&key2=value2
        ‘‘‘
        data = ‘‘
        for k,v in dic.items():
            data += %s=%s%s % (k,v,&)
        return data
    
    
    def geturl(self,url,data):
        ‘‘‘
        data为字典类型的参数,
        返回类型<type unicode>,json
        ‘‘‘
        data = self.encodeurl(data)
        response = urllib2.urlopen(%s?%s % (url,data))
        return response.read().decode(utf-8)
                
    def posturl(self,url,data,isjson = True):
        ‘‘‘
        发送json数据
        返回类型<type unicode>,json
        ‘‘‘
        if isjson:
            data = json.dumps(data)    #dict
            if pyversion == 3 :
                data = data.encode(encoding="utf-8")
            response = urllib2.urlopen(url,data)
            return response.read().decode(utf-8)
        
        
    def sampleSend(self,content):
        self.setMsg(content)
        # 获取企业访问凭证
        access_token = self.get_access_token()
        sendmsg_url = https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s % access_token 
        print (self.posturl(sendmsg_url,self.sendmsg))
        
    def showExample(self):
        if len(sys.argv) == 4:
            touser,notuse,content = sys.argv[1:] 
        else:
            print (error segments, now exit)
            sys.exit()
        
    def send(self,param):
        ll = Log()
        arg_num=len(param)
        if arg_num >=1:
            self.sampleSend(param)
            #ll.log(%s%(param))
            
        return ‘‘
    
    def showUsing(self):
        print (There must be more than 1 params. For example:)
        print (python sendweixin.py  "微信发送信息调试 "  )
        print (python sendweixin.py  "微信发送信息调试 " "微信用户ID" )
        print (注意事项,该脚本尚存在一个问题,首次发送微信时会报access_token missing,第二次无此问题)
        print (python sendweixin.py  "微信发送信息调试 " "微信用户ID" "企业微信号")
        print (error segments, now exit)
        

if __name__ == __main__:

    inp = sys.argv
    arg_num = len(inp)
    wxmsg = WeiXin()

    if arg_num > 1 :
        res = wxmsg.send(sys.argv[1:])
    else:
        wxmsg.showUsing()
        sys.exit()

 

重点为以下两段代码,企业与应用的标识

  cropmsg ={
        corpid : wwe***2ed*********,
        corpsecret : Mgyi*****ahx3O-******HkLfg 
        }

如果输入一个参数,则默认发送给企业微信中可以访问该应用的所有用户;第二个参数指定具体的微信号

  if arg_num == 1 :
            touser="@all"
            agentid="1000009"
        elif arg_num == 2 :
            touser=param[1]
            agentid="1000009"
        elif arg_num == 3 :
            touser=param[1]
            agentid=param[2]
            

示例

$ python sendwx.py "阳光、沙滩、海浪、老船长……"
{"errcode":0,"errmsg":"ok","invaliduser":""}

技术图片

 

python发送微信

标签:socket   exec   业务逻辑   exist   个数   env   lob   收集   使用   

原文地址:https://www.cnblogs.com/perfei/p/10348930.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!