标签:elf sys 自动 out += 最大连接数 return action 绑定
socket 的自动化交互
假设服务端运行程序为:
# coding:utf8
# python3
import random
import socket
import sys
# 创建 socket 对象
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
port = 9999
# 绑定端口号
serversocket.bind((‘‘, port))
# 设置最大连接数,超过后排队
serversocket.listen(5)
while True:
print(‘等待连接...‘)
# 建立客户端连接
clientsocket, addr = serversocket.accept()
print("连接地址: ", addr)
first_num = str(random.randint(0, 10))
second_num = str(random.randint(0, 10))
clientsocket.send(‘input {}: ‘.format(first_num).encode(‘utf8‘))
first_input = clientsocket.recv(10).decode(‘utf8‘).strip()
clientsocket.send(‘input {}: ‘.format(second_num).encode(‘utf8‘))
second_input = clientsocket.recv(10).decode(‘utf8‘).strip()
print(first_input)
print(second_input)
if first_num == first_input and second_num == second_input:
clientsocket.send(‘Right!\n‘.encode(‘utf8‘))
else:
clientsocket.send(‘Wrong!\n‘.encode(‘utf8‘))
clientsocket.shutdown(socket.SHUT_RDWR)
clientsocket.close()
serversocket.close()
运行如下脚本即可达到自动化交互的目的:
# coding:utf-8
# python3
import sys
import re
import socket
from threading import Thread
class SocketInteraction:
tcp_socket = None
def __init__(self, host, port):
self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_socket.connect((host, port))
def __del__(self):
self.tcp_socket.close()
def recvline(self):
return self.recvuntil(‘\n‘)
def recv_n(self, n):
return self.tcp_socket.recv(n).decode(‘utf8‘)
def recvuntil(self, want_end_str):
current_str = ‘‘
while True:
current_str += self.recv_n(1)
if current_str.endswith(want_end_str):
return current_str
def recvuntil_re(self, want_re_str):
current_str = ‘‘
while True:
current_str += self.recv_n(1)
s = re.search(want_re_str, current_str)
if s:
return [current_str, s]
def send(self, send_str):
final_bytes = send_str.encode(‘utf8‘)
self.tcp_socket.send(final_bytes)
def sendline(self, send_str):
self.send(send_str+‘\n‘)
def interact(self):
def recv_loop():
while True:
c = self.recv_n(1)
# print 写到控制台会有延时,直接用系统io写
sys.stdout.write(c)
sys.stdout.flush()
def send_loop():
while True:
send_str = input()
self.sendline(send_str)
recv_thread = Thread(target=recv_loop)
send_thread = Thread(target=send_loop)
recv_thread.start()
send_thread.start()
recv_thread.join()
send_thread.join()
si = SocketInteraction(‘127.0.0.1‘, 9999)
for i in range(2):
content = si.recvuntil_re(r‘input (\d+): ‘)
print(content[0])
num = content[1].group(1)
print(repr(num))
si.sendline(num)
content = si.recvline()
print(content)
标签:elf sys 自动 out += 最大连接数 return action 绑定
原文地址:https://www.cnblogs.com/-rvy-/p/13197534.html