码迷,mamicode.com
首页 > 编程语言 > 详细

Python开发【项目】:RPC异步执行命令(RabbitMQ双向通信)

时间:2018-07-09 14:13:34      阅读:231      评论:0      收藏:0      [点我收藏+]

标签:parameter   ati   bsp   pika   sleep   __name__   topic   print   调用   

RPC异步执行命令
需求:
  • 利用RibbitMQ进行数据交互
  • 可以对多台服务器进行操作
  • 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
  • 实现异步操作

本节涉及最多的还是rabbitmq通信原理知识,要求安装rabbitmq服务

 

程序用广播topic模式做更好 

 

程序目录结构:

技术分享图片

程序简介:

技术分享图片
技术分享图片
# 异步rpc程序


## 1、需求
- [ ] 利用RibbitMQ进行数据交互
- [ ] 可以对多台服务器进行操作
- [ ] 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
- [ ] 实现异步操作

## 备注

- [ ] RabbitMQ队列名:
                    ①执行命令时,队列名为服务器端的IP
                    ②查询数据时,用的是回调时随机生成的callback_queue名
- [ ] threading多线程:
                    实现命令执行后不等待执行结果,依然可以输入新的指令

- [ ] 执行命令格式:
                 -->>run "dir" host 192.168.5.107 127.0.0.1
                        dir     server端要执行的命令
                        host    host后可跟一个或多个可以通过rabbitMQ的服务器地址

- [ ] 查看后台所有的TASK_ID信息:
                 -->>check_all
     显示结果样式:TASK_ID【76786】    HOST【192.168.5.107】    COMMAND【dir】
                  TASK_ID【10307】    HOST【127.0.0.1】    COMMAND【dir】

- [ ] 查看TASK_ID对应的执行结果:
                 -->>check_task 10307
                         10307 为check_all查到的TASK_ID
技术分享图片

程序流程图:

技术分享图片

服务器端:

技术分享图片
技术分享图片
#!/usr/bin/env python
# -*- coding:utf-8 -*-

# !/usr/bin/env python
# -*- coding:utf-8 -*-


import pika
import os

class Server(object):
    def __init__(self,rabbitmq,queue_name):
        self.queue_name = queue_name
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=rabbitmq))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue_name)

    def handle(self,command):
        command = command.decode()
        print(command,type(command))
        message = os.popen(command).read()
        if not message:
            message = "Wrong Command"
        return message

    def on_request(self,ch, method, props, body):
        response = self.handle(body)
        ch.basic_publish(exchange=‘‘,
                         routing_key=props.reply_to,  # 回信息队列名
                         properties=pika.BasicProperties(correlation_id=
                                                         props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)

    def start(self):
        self.channel.basic_consume(self.on_request,
                                   queue=self.queue_name)

        print(" [x] Awaiting RPC requests")
        self.channel.start_consuming()


if __name__ == "__main__":
    rabbitmq = "localhost"      #rabbitmq服务器地址
    queue_name = "192.168.20.22"    #queue_name为本地ip地址
    server = Server(rabbitmq,queue_name)
    server.start()
技术分享图片

 

客户端:

bin目录:

技术分享图片
技术分享图片
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys
import os
import platform


#添加BASE_DIR,添加顶级目录到路径中,方便调用其他目录模块
if platform.system() == Windows:
    print(os.path.abspath(os.path.dirname(__file__)).split(\\‘)[:-1])
    BASE_DIR = \\‘.join(os.path.abspath(os.path.dirname(__file__)).split(\\‘)[:-1])
else:
    BASE_DIR = /‘.join(os.path.abspath(os.path.dirname(__file__)).split(/‘)[:-1])


#加载环境变量
sys.path.append(BASE_DIR)
from conf import settings
from core import main

if __name__ == __main__:
    obj = main.Handler()
    obj.start()
技术分享图片

conf目录:

技术分享图片
技术分享图片
#!/usr/bin/env python
#-*- coding:utf-8 -*-


import os
import sys
import platform


if platform.system() == Windows:
    BASE_DIR = \\‘.join(os.path.abspath(os.path.dirname(__file__)).split(\\‘)[:-1])
    school_dbpaths = os.path.join(BASE_DIR,school_db)

else:
    BASE_DIR = /‘.join(os.path.abspath(os.path.dirname(__file__)).split(/‘)[:-1])
    school_dbpaths =os.path.join(BASE_DIR, school_db)


#rabbitmq服务地址ip
RabbitMQ_IP = localhost
技术分享图片

core目录

技术分享图片
技术分享图片
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

from conf import settings
from modules.client import Client
import random,time
import threading

class Handler(object):
    def __init__(self):
        self.information = {}   # 后台进程信息

    def check_all(self,*args):
        ‘‘‘查看所有task_id信息‘‘‘
        time.sleep(2)
        for key in self.information:
            print("TASK_ID【%s】\tHOST【%s】\tCOMMAND【%s】"%(key,self.information[key][0],
                                                                    self.information[key][1]))

    def check_task(self,user_cmd):
        ‘‘‘查看task_id执行结果‘‘‘
        time.sleep(2)
        try:
            task_id = user_cmd.split()[1]
            task_id = int(task_id)
            callback_queue=self.information[task_id][2]
            callback_id=self.information[task_id][3]
            client = Client()
            response = client.get_response(callback_queue, callback_id)
            print(response.decode())
            del self.information[task_id]

        except KeyError  as e :
            print("\33[31;0mWrong id[%s]\33[0m"%e)
        except IndexError as e:
            print("\33[31;0mWrong id[%s]\33[0m"%e)

    def run(self,user_cmd):
        ‘‘‘执行命令‘‘‘
        try:
            time.sleep(2)
            #print("--->>",user_cmd)
            command = user_cmd.split("\"")[1]
            hosts = user_cmd.split()[3:]
            for host in hosts:
                task_id = random.randint(10000, 99999)
                client = Client()
                response = client.call(host, command)
                # print(response)
                self.information[task_id] = [host, command, response[0],response[1]]
        except IndexError as e:
            print("\33[31;0mError:%s\33[0m"%e)

    def reflect(self,str,user_cmd):
        ‘‘‘反射‘‘‘
        if hasattr(self, str):
            getattr(self, str)(user_cmd)
        # else:
        #     setattr(self, str, self.foo)
        #     getattr(self, str)()

    def start(self):
        while True:
            user_cmd = input("->>").strip()
            if not user_cmd:continue
            str = user_cmd.split()[0]
            t1 = threading.Thread(target=self.reflect,args=(str,user_cmd))  #多线程
            t1.start()
技术分享图片

modules目录

技术分享图片 client.py

 

 

运行示例图

技术分享图片

技术分享图片

 

Python开发【项目】:RPC异步执行命令(RabbitMQ双向通信)

标签:parameter   ati   bsp   pika   sleep   __name__   topic   print   调用   

原文地址:https://www.cnblogs.com/fuyuteng/p/9283343.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!