码迷,mamicode.com
首页 > 编程语言 > 详细

python---socket自动化交互

时间:2020-06-27 11:21:56      阅读:60      评论:0      收藏:0      [点我收藏+]

标签:elf   sys   自动   out   +=   最大连接数   return   action   绑定   

socket 的自动化交互

假设服务端运行程序为:

# coding:utf8
# python3

import random
import socket
import sys

# 创建 socket 对象
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

port = 9999

# 绑定端口号
serversocket.bind((‘‘, port))

# 设置最大连接数,超过后排队
serversocket.listen(5)

while True:
    print(‘等待连接...‘)
    # 建立客户端连接
    clientsocket, addr = serversocket.accept()
    print("连接地址: ", addr)

    first_num = str(random.randint(0, 10))
    second_num = str(random.randint(0, 10))

    clientsocket.send(‘input {}: ‘.format(first_num).encode(‘utf8‘))
    first_input = clientsocket.recv(10).decode(‘utf8‘).strip()
    clientsocket.send(‘input {}: ‘.format(second_num).encode(‘utf8‘))
    second_input = clientsocket.recv(10).decode(‘utf8‘).strip()

    print(first_input)
    print(second_input)

    if first_num == first_input and second_num == second_input:
        clientsocket.send(‘Right!\n‘.encode(‘utf8‘))
    else:
        clientsocket.send(‘Wrong!\n‘.encode(‘utf8‘))
    clientsocket.shutdown(socket.SHUT_RDWR)
    clientsocket.close()
serversocket.close()

运行如下脚本即可达到自动化交互的目的:

# coding:utf-8
# python3

import sys
import re
import socket
from threading import Thread


class SocketInteraction:
    tcp_socket = None

    def __init__(self, host, port):
        self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.tcp_socket.connect((host, port))

    def __del__(self):
        self.tcp_socket.close()

    def recvline(self):
        return self.recvuntil(‘\n‘)
    
    def recv_n(self, n):
        return self.tcp_socket.recv(n).decode(‘utf8‘)

    def recvuntil(self, want_end_str):
        current_str = ‘‘
        while True:
            current_str += self.recv_n(1)
            if current_str.endswith(want_end_str):
                return current_str

    def recvuntil_re(self, want_re_str):
        current_str = ‘‘
        while True:
            current_str += self.recv_n(1)
            s = re.search(want_re_str, current_str)
            if s:
                return [current_str, s]

    def send(self, send_str):
        final_bytes = send_str.encode(‘utf8‘)
        self.tcp_socket.send(final_bytes)

    def sendline(self, send_str):
        self.send(send_str+‘\n‘)
    
    def interact(self):
        def recv_loop():
            while True:
                c = self.recv_n(1)
                # print 写到控制台会有延时,直接用系统io写
                sys.stdout.write(c)
                sys.stdout.flush()

        def send_loop():
            while True:
                send_str = input()
                self.sendline(send_str)

        recv_thread = Thread(target=recv_loop)
        send_thread = Thread(target=send_loop)

        recv_thread.start()
        send_thread.start()

        recv_thread.join()
        send_thread.join()



si = SocketInteraction(‘127.0.0.1‘, 9999)
for i in range(2):
    content = si.recvuntil_re(r‘input (\d+): ‘)
    print(content[0])
    num = content[1].group(1)
    print(repr(num))
    si.sendline(num)
content = si.recvline()
print(content)

python---socket自动化交互

标签:elf   sys   自动   out   +=   最大连接数   return   action   绑定   

原文地址:https://www.cnblogs.com/-rvy-/p/13197534.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!