码迷,mamicode.com
首页 > 其他好文 > 详细

使用tornado建立一个测试桩,模拟服务器返回

时间:2021-07-02 16:20:36      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:head   art   format   orm   个数   ctr   one   else   url   

背景:当时在接到一个任务,任务是输入imei,查出所有相关imei的数据,请求入下图,和返回如下图(最后说我理解的不对,我X,写了也白忙)

请求:
{
    "imei":"xxx",
    "oaid":"xxx",
    "uid": "xxx",
    "appkey": "xxx",
    "idfa": "xxx",
    "limit": 10
}

返回:
{
    "pushList":[
        {
            "appkey":"xxx",  
            "uid":"123"      
        },
        {
            "appkey":"xxx",
            "uid":"456"
        }
    ],
    "imeis": ["12", "34"],
    "oaids": ["12", "34"],
    "idfas": ["12", "34"],
    "phone_salts":["xxx","xxx"] 
}

 1.使用tornado的post请求,创建一批数据,数据应该满足imei,能够进行模糊查询的

 def post(self):
        # 获取post方式传递的参数,以下参数都是必填项
        imei = self.get_argument(‘imei‘)
        oaid = self.get_argument(‘oaid‘)
        uid = self.get_argument(‘uid‘)
        appkey = self.get_argument(‘appkey‘)
        idfa = self.get_argument(‘idfa‘)
        phone_salt = self.get_argument(‘phone_salt‘)
        redis=initialise_redis().conn_redis()

        #生成的key
        key=imei+"_"+oaid+"_"+uid
        #插入所有内容
        mapping = {"appkey": appkey, "idfa": idfa, "imei": imei, "oaid": oaid, "phone_salt": phone_salt}
        key_witer=redis.hmset(key,mapping)
        logging.info("写入redis状态为%s!!插入的key为:%s!!!插入的数据内容为:%s"%(key_witer,key,mapping))
        #返回给页面请求,wirte_status:true 表示写入成功,key:表示写入入到redis的key,如果插入相同的就只保留一个
        result={
            ‘wirte_status‘:key_witer,
            ‘key‘:key
        }
        self.write(json.dumps(result))

  2.使用get请求获取到数据

class MainHandler(tornado.web.RequestHandler):

    def get(self):
        #从以下参数为必填项,
        imei=self.get_argument(‘imei‘)
        oaid = self.get_argument(‘oaid‘)
        uid = self.get_argument(‘uid‘)
        appkey = self.get_argument(‘appkey‘)
        idfas = self.get_argument(‘idfa‘)
        limit=self.get_argument(‘limit‘)
        #limit 默认给个10,

        #self.write("传入的imei为:%s   传入的oaid为:%s  传入的uid为 :%s   传入的appkey为 :%s  传入的idfas为 :%s "%(imei,oaid,uid,appkey,idfas))
        #查询redis的数据,使用的模糊查询
        redis=initialise_redis().conn_redis()
        #通过uid,查询所有的匹配imei
        key=imei + "_" + oaid + "_*"
        get_key=redis.keys(key)

        #预先定义几个列表,存储部分数据
        pushList=[]
        imeis=[]
        oaids=[]
        idfas=[]
        phone_salts=[]
        #用于控制显示的个数
        n=0

        # 拼接数据,用于返回给请求方
        result = {
            ‘pushList‘: pushList,
            ‘imeis‘: imeis,
            ‘oaids‘: oaids,
            ‘idfas‘: idfas,
            ‘phone_salts‘: phone_salts
        }

        for keys  in get_key:
            n+=1
            if n>int(limit):
                logging.info("限制条数为%s,已达到限制"%limit)
                break
            else:
                key=str(keys, encoding="utf-8")
                uid=key.rsplit(‘_‘, 1)[-1]
                get_allkey=redis.hgetall(key)
                logging.info("获取的所有数据为:%s"%get_allkey )
                #获取的redis数据都是一个字节,所以获取使用b字节
                push_list_dict={"appkey":str(get_allkey[b‘appkey‘],encoding="utf-8"),
                      "uid":uid
                      }

                pushList.append(push_list_dict)
                imeis.append(str(get_allkey[b‘imei‘],encoding="utf-8"))
                oaids.append(str(get_allkey[b‘oaid‘],encoding="utf-8"))
                idfas.append(str(get_allkey[b‘idfa‘],encoding="utf-8"))
                phone_salts.append(str(get_allkey[b‘phone_salt‘],encoding="utf-8"))

            logging.info("imei %s" % imeis)
            logging.info("pushList %s" % pushList)
            logging.info("oaids %s" % oaids)
            logging.info("idfas %s" % idfas)
            logging.info("phone_salts %s" % phone_salts)

        logging.info("返回数据为:%s"%result)
        self.set_header(‘Content-Type‘, ‘application/json; charset=UTF-8‘)
        self.write(json.dumps(result))

  3.调试,使用postman请求都可以

curl --location --request GET ‘172.17.9.9:9999/‘ --header ‘Content-Type: application/x-www-form-urlencoded‘ --data-urlencode ‘imei=868454039026356‘ --data-urlencode ‘oaid=ef2dfc45a5be382e‘ --data-urlencode ‘uid=8024436854‘ --data-urlencode ‘appkey=07b6ed26c8bcce540204c8f7‘ --data-urlencode ‘idfa=123456‘ --data-urlencode ‘limit=3‘

  技术图片

 

 

4.部署

选择一台机器部署就行,访问就通过IP+端口就可以了

运行脚本
nohup python3 -u  smartting.py > smartting.log 2>&1 & 

查看写入的日志

技术图片

 

全部的代码

# -*- coding:utf-8 -*-
#@Time : 2021/6/15 11:55
#@Author: 张君
#@File : Smarttiming.py


import tornado.web
from tornado.ioloop import IOLoop
import  logging,json
import  redis
# 设置日志级别
logging.basicConfig(level=logging.INFO, format=‘%(asctime)-16s %(levelname)-8s %(message)s:‘)

class initialise_redis(object):
    def conn_redis(self):
        redis_nodes = []
        # 观澜redis环境
        #redis_conn = StrictRedisCluster(startup_nodes=redis_nodes, decode_responses=True)   #如果不是集群,就使用redis

        redis_conn =redis.StrictRedis(host=‘172.17.9.106‘, port=16449)


        return redis_conn

class MainHandler(tornado.web.RequestHandler):

    def get(self):
        #从以下参数为必填项,
        imei=self.get_argument(‘imei‘)
        oaid = self.get_argument(‘oaid‘)
        uid = self.get_argument(‘uid‘)
        appkey = self.get_argument(‘appkey‘)
        idfas = self.get_argument(‘idfa‘)
        limit=self.get_argument(‘limit‘)
        #limit 默认给个10,

        #self.write("传入的imei为:%s   传入的oaid为:%s  传入的uid为 :%s   传入的appkey为 :%s  传入的idfas为 :%s "%(imei,oaid,uid,appkey,idfas))
        #查询redis的数据,使用的模糊查询
        redis=initialise_redis().conn_redis()
        #通过uid,查询所有的匹配imei
        key=imei + "_" + oaid + "_*"
        get_key=redis.keys(key)

        #预先定义几个列表,存储部分数据
        pushList=[]
        imeis=[]
        oaids=[]
        idfas=[]
        phone_salts=[]
        #用于控制显示的个数
        n=0

        # 拼接数据,用于返回给请求方
        result = {
            ‘pushList‘: pushList,
            ‘imeis‘: imeis,
            ‘oaids‘: oaids,
            ‘idfas‘: idfas,
            ‘phone_salts‘: phone_salts
        }

        for keys  in get_key:
            n+=1
            if n>int(limit):
                logging.info("限制条数为%s,已达到限制"%limit)
                break
            else:
                key=str(keys, encoding="utf-8")
                uid=key.rsplit(‘_‘, 1)[-1]
                get_allkey=redis.hgetall(key)
                logging.info("获取的所有数据为:%s"%get_allkey )
                #获取的redis数据都是一个字节,所以获取使用b字节
                push_list_dict={"appkey":str(get_allkey[b‘appkey‘],encoding="utf-8"),
                      "uid":uid
                      }

                pushList.append(push_list_dict)
                imeis.append(str(get_allkey[b‘imei‘],encoding="utf-8"))
                oaids.append(str(get_allkey[b‘oaid‘],encoding="utf-8"))
                idfas.append(str(get_allkey[b‘idfa‘],encoding="utf-8"))
                phone_salts.append(str(get_allkey[b‘phone_salt‘],encoding="utf-8"))

            logging.info("imei %s" % imeis)
            logging.info("pushList %s" % pushList)
            logging.info("oaids %s" % oaids)
            logging.info("idfas %s" % idfas)
            logging.info("phone_salts %s" % phone_salts)

        logging.info("返回数据为:%s"%result)
        self.set_header(‘Content-Type‘, ‘application/json; charset=UTF-8‘)
        self.write(json.dumps(result))



class StoryHandler(tornado.web.RequestHandler):
    """
    以下方法是插入内容到reids中,插入的key,是以imei_oaid_uid为key,内容是个hashmap
    """

    def post(self):
        # 获取post方式传递的参数,以下参数都是必填项
        imei = self.get_argument(‘imei‘)
        oaid = self.get_argument(‘oaid‘)
        uid = self.get_argument(‘uid‘)
        appkey = self.get_argument(‘appkey‘)
        idfa = self.get_argument(‘idfa‘)
        phone_salt = self.get_argument(‘phone_salt‘)
        redis=initialise_redis().conn_redis()

        #生成的key
        key=imei+"_"+oaid+"_"+uid
        #插入所有内容
        mapping = {"appkey": appkey, "idfa": idfa, "imei": imei, "oaid": oaid, "phone_salt": phone_salt}
        key_witer=redis.hmset(key,mapping)
        logging.info("写入redis状态为%s!!插入的key为:%s!!!插入的数据内容为:%s"%(key_witer,key,mapping))
        #返回给页面请求,wirte_status:true 表示写入成功,key:表示写入入到redis的key,如果插入相同的就只保留一个
        result={
            ‘wirte_status‘:key_witer,
            ‘key‘:key
        }
        self.write(json.dumps(result))

class MainHandler2(tornado.web.RequestHandler):
    ##以下废弃,
    def get(self):
        data=self.request.body
        #获取所有参数
        req_data = json.loads(data)

        imei=self.get_argument(‘imei‘)
        oaid = self.get_argument(‘oaid‘)
        uid = self.get_argument(‘uid‘)
        appkey = self.get_argument(‘appkey‘)
        idfas = self.get_argument(‘idfas‘)

def make_app():
    # 生成路由列表
    return tornado.web.Application([
        (r‘/‘, MainHandler),
        (r‘/test‘, MainHandler2),
        (r‘/Store‘, StoryHandler)
    ])


if __name__ == "__main__":
    app = make_app()
    logging.info("程序启动了------------")
    app.listen(9999, address="0.0.0.0")
    tornado.ioloop.IOLoop.current().start()
    IOLoop.current().start()

  

 

使用tornado建立一个测试桩,模拟服务器返回

标签:head   art   format   orm   个数   ctr   one   else   url   

原文地址:https://www.cnblogs.com/chongyou/p/14962533.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!