标签:xen pytho 类型 发送 learning while protocol The 数据
硬件:XENA Valkyrie 或 Vantage主机,测试板卡不限,本方法适用于其100M~400G所有速率端口
环境配置:Python 3
实现功能:
1.控制流量仪进行流量测试,预定配置的流量发送,报文统计,丢包率,延迟等信息的统计
2.单个Python脚本配合不同的JSON对象文件,来实现不同端口数量,不同拓扑模型下的流量测试
3.在发包测试前及运行过程中都对发包端口的链路状态进行监控100M/1G/2.5G/5G/10G
4.嵌入ARP探测,获取DUT的MAC信息,并记录在测试日志中
5.统计端口流量的Tx,Rx信息,丢包率,延迟信息
设计思路:main文件控制主流程,嵌入Statistics模块进行数据统计,CreateClass模块进行命令预定义
运行效果:
主要实现代码:
1 import threading, time, json, random, queue, types, sys, socket
2 from binascii import hexlify
3 from TestUtilsL23 import XenaScriptTools
4 from CreateClass import StreamCreate
5 from Statistics import CollectTestresult
6
7 def BuildDict(ports, configvalue, watching, watching_2):
8 #Generate ports Dictionary
9 a = 1
10 for port in ports:
11 ip_address_b = configvalue[‘IP‘][0:10]
12 ip_address_c = ip_address_b + str(a)
13 mac_a = configvalue[‘mac‘][0:10]
14 if a > 9:
15 mac_address = mac_a + str(a)
16 else:
17 mac_address = mac_a + ‘0‘ + str(a)
18 ip_address_a = hexlify(socket.inet_aton(ip_address_c)).decode()
19 mac_ip = [str(mac_address), str(ip_address_a)]
20 watching[port] = mac_ip
21 a += 1
22
23 watching_2[‘type_eth‘] = ‘FFFF‘,
24 watching_2[‘type_vlan‘] = ‘8100‘,
25 watching_2[‘type_ip‘] = ‘08004500006A000000007F11BA2D‘,
26 watching_2[‘type_udp‘] = ‘00000000‘,
27 watching_2[‘type_tcp‘] = ‘000000000000000050000000E5A40000‘,
28 watching_2[‘tcpid‘] = [hex(int(configvalue[‘tcpPort‘][0]))[2:].zfill(4) + hex(int(configvalue[‘tcpPort‘][1]))[2:].zfill(4)]
29
30
31 def PrintMes(ports, configvalue, ip_address):
32 localtime = time.asctime( time.localtime(time.time()) )
33 print("\n##############################################")
34 print(" 测试配置 ")
35 print("##############################################\n")
36 print("测试时间 : " + str(localtime))
37 print("设备IP地址 : " + ip_address)
38 print("测试端口 : " + str(ports))
39 print("测试模式 : " + configvalue[‘Stype‘])
40 print("持续时间 : " + configvalue[‘testtime‘])
41 print("报文类型 : " + configvalue[‘header_type‘])
42 print("报文长度 : " + configvalue[‘packet‘])
43 if configvalue[‘Stype‘] == ‘Aggregation‘:
44 print("上行VLAN : " + str(configvalue[‘uvlan‘]))
45 print("下行VLAN : " + str(configvalue[‘uvlan‘]))
46 print("WAN速率 : " + str(configvalue[‘wanrate‘]))
47 print("LAN速率 : " + str(configvalue[‘lanrate‘]))
48 else:
49 print("VLAN : " + str(configvalue[‘uvlan‘]))
50 print("速率设置 : " + str(configvalue[‘wanrate‘]))
51 print("端口速率 : " + str(configvalue[‘portrate‘]))
52 print("学习时间 : " + configvalue[‘learntime‘])
53 print("丢包率设置 : " + configvalue[‘threshold‘])
54 if configvalue[‘snlearnenable‘] == ‘1‘:
55 print("自动学习SN : Enable")
56 else:
57 print("自动学习SN : Disable")
58 if configvalue[‘portspeedcheck‘] == ‘1‘:
59 print("端口速率检测 : Enable" )
60 else:
61 print("端口速率检测 : Disable" )
62
63
64 def Maclearning(xm, ports, configvalue):
65 if configvalue[‘snlearnenable‘] == ‘1‘:
66 xm.SendExpectOK(ports [1] + " PS_CREATE [0]")
67 xm.SendExpectOK(ports [1] + " ps_tpldid [0] 0")
68 xm.SendExpectOK(ports [1] + " PS_PACKETLENGTH [0] FIXED 70 1518")
69 xm.SendExpectOK(ports [1] + " PS_HEADERPROTOCOL [0] ETHERNET ARP")
70 learnIP_hex = hexlify(socket.inet_aton(configvalue[‘learnIP‘])).decode()
71 xm.SendExpectOK(ports [1] + " PS_PACKETHEADER [0] 0xFFFFFFFFFFFF00000000000108060001080006040001000000000001C0A80164FFFFFFFFFFFF" + learnIP_hex)
72 xm.SendExpectOK(ports [1] + " PS_RATEPPS [0] 2")
73 xm.SendExpectOK(ports [1] + " PS_ENABLE [0] on")
74 xm.SendExpectOK(ports [1] + " P_TRAFFIC ON")
75 xm.SendExpectOK(ports [1] + " P_CAPTURE ON")
76 time.sleep(1)
77 xm.SendExpectOK(ports [1] + " P_CAPTURE OFF")
78 xm.SendExpectOK(ports [1] + " P_TRAFFIC OFF")
79 SN = xm.Send(ports[1] + " PC_PACKET [0] ?")
80 SN = SN.split(‘ ‘)[-1]
81 xm.SendExpectOK(ports [1] + " PS_DELETE [0]")
82 Serial = str(SN)
83 if SN == ‘<BADINDEX>‘:
84 print ("\nTest result :Failed;[ARP learning failed!!]")
85 sys.exit()
86 print ("DUT SN : " + str(Serial[16:28]))
87
88
89 def PortSpeed(xm, ports, PSC, configvalue):
90 if configvalue[‘PRcheck‘] == ‘1‘:
91 xm.PortSyncCheck(ports)
92 i = 0
93 for port in ports:
94 PS = xm.Send(port + ‘ p_speed ?‘)
95 PS = PS.split(‘ ‘)[-1]
96 PSC.append(str(PS))
97 if configvalue[‘portspeedcheck‘] == ‘1‘:
98 if str(PS) != configvalue[‘portrate‘][i]:
99 print (‘Test Result:Failed;[‘ + port + ‘ 端口速率不匹配,请检查配置...]‘)
100 sys.exit()
101 i += 1
102
103
104 def runtest(xm, ports, configvalue):
105 xm.PortTrafficStart(ports)
106 time.sleep(float(configvalue[‘learntime‘]))
107 xm.PortTrafficStop(ports)
108 time.sleep(0.5)
109 xm.Statisticsclear(ports)
110 time.sleep(0.5)
111 xm.Porttimelimit(ports, configvalue[‘testtime‘])
112 time.sleep(0.2)
113 xm.PortTrafficStart(ports)
114 count = 0
115 print ("\n开始打流,请耐心等候, 测试时间约为" + configvalue[‘testtime‘] + "秒......\n")
116 while True:
117 print (">>>>>>>>>", end=‘‘)
118 xm.Send(ports[0] + " P_TRAFFIC ?")
119 sys.stdout.flush()
120 count += 1
121 time.sleep(1)
122 if count > int(configvalue[‘testtime‘]):
123 print (‘\n\n测试结束,正在收集测试数据......‘)
124 break
125 print()
126 xm.PortTrafficStop(ports)
127 time.sleep(1)
128
129
130 def main(argv):
131 a = sys.argv
132 configvalue = {}
133 filename = str(a[1])
134 with open(filename,‘rb‘) as f:
135 configs=json.loads(f.read())
136
137 ip_address = configs.get(‘ip_address‘)
138 ports = configs.get(‘ports‘, ‘‘)
139 configvalue[‘uvlan‘] = configs.get(‘uvlan‘, ‘‘)
140 configvalue[‘dvlan‘] = configs.get(‘dvlan‘, ‘‘)
141 configvalue[‘tcpPort‘] = configs.get(‘tcpUdpPort‘, ‘‘)
142 configvalue[‘packet‘] = configs.get(‘packet‘, ‘‘)
143 configvalue[‘testtime‘] = configs.get(‘testtime‘, ‘‘)
144 configvalue[‘threshold‘] = configs.get(‘threshold‘, ‘‘)
145 configvalue[‘header_type‘] = configs.get(‘headertype‘, ‘‘)
146 configvalue[‘learntime‘] = configs.get(‘learntime‘, ‘‘)
147 configvalue[‘payload‘] = configs.get(‘payload‘, ‘‘)
148 configvalue[‘wanrate‘] = configs.get(‘wanrate‘, ‘‘)
149 configvalue[‘lanrate‘] = configs.get(‘lanrate‘, ‘‘)
150 configvalue[‘portrate‘] = configs.get(‘portrate‘, ‘‘)
151 configvalue[‘PRcheck‘] = configs.get(‘PRcheck‘, ‘‘)
152 configvalue[‘IP‘] = configs.get(‘IP‘, ‘‘)
153 configvalue[‘mac‘] = configs.get(‘mac‘, ‘‘)
154 configvalue[‘snlearnenable‘] = configs.get(‘snlernenable‘, ‘‘)
155 configvalue[‘learnIP‘] = configs.get(‘learnIP‘, ‘‘)
156 configvalue[‘Stype‘] = configs.get(‘Stype‘, ‘‘)
157 configvalue[‘portspeedcheck‘] = configs.get(‘PScheck‘, ‘‘)
158
159 PSC = []
160 FCS = []
161 watching = {}
162 watching_2 = {}
163 testresult = {}
164
165 PrintMes(ports, configvalue, ip_address)
166
167 xm = XenaScriptTools(ip_address)
168 xm.LogonSetOwner("xena", "python_test_1")
169 xm.PortRelinquish(ports)
170 xm.PortReserve(ports)
171 xm.PortTrafficStop(ports)
172 xm.StreamsDelete(ports)
173 sc = StreamCreate(xm, ports, watching, watching_2, configvalue)
174 cr = CollectTestresult(xm, ports, PSC, configvalue, testresult, FCS)
175
176
177 BuildDict(ports, configvalue, watching, watching_2)
178 PortSpeed(xm, ports, PSC, configvalue)
179 if configvalue[‘snlearnenable‘] == ‘1‘:
180 Maclearning(xm, ports, configvalue)
181 if configvalue[‘Stype‘] == ‘Loopback‘:
182 sc.Loopback(xm, ports, watching, watching_2, configvalue)
183 runtest(xm, ports, configvalue)
184 cr.LoopbackStatistics(xm, ports, testresult)
185 if configvalue[‘Stype‘] == ‘Eachother‘:
186 sc.Eachother(xm, ports, watching, watching_2, configvalue)
187 runtest(xm, ports, configvalue)
188 cr.EachotherStatistics(xm, ports, testresult)
189 if configvalue[‘Stype‘] == ‘Aggregation‘:
190 sc.Aggregation(xm, ports, watching, watching_2, configvalue)
191 runtest(xm, ports, configvalue)
192 cr.AggregationStatistics(xm, ports, testresult)
193 cr.FCSGetValue(xm, ports, FCS)
194 cr.OutputStatistics(ports, PSC, configvalue, testresult, FCS)
195 xm.PortRelease(ports)
196
197 if __name__ == ‘__main__‘:
198 sys.exit(main(sys.argv))
JSON对象配置文件
{
"##常用配置修改":"",
"ports": ["0/0","0/1","0/2","0/3","0/4","0/5"],
"packet": "FIXED 64 1518",
"testtime": "10",
"##注意:dvlan和lanrate只有在Aggregation模式生效,其他模式仅需uvlan和wanrate":"",
"##其中vlan设置为-1则表示不添加vlan":"",
"uvlan": "-1",
"dvlan": "-1",
"wanrate": ["100", "100", "100","100", "100", "100"],
"lanrate": ["21", "22", "23","24"],
"portrate": ["1000","1000","1000","1000","1000","1000"],
"##Stype为测试模式,分别可以配置为:Loopback, Eachother, Aggregation":"",
"##其中Loopback为环回测试,Eachother为两两互打测试,Aggregation为汇聚测试":"",
"Stype":"Eachother",
"##headertype为报文类型,分别可以配置为:TCP, UDP, IP, Ethernet":"",
"headertype": "Ethernet",
"##threshold为丢包率设置":"",
"threshold": "0",
"##不常用配置修改":"",
"##发送学习报文的时间":"",
"learntime": "3",
"##payload类型分别有: Pattern, Random":"",
"payload": "Pattern",
"##PRcheck为使能端口状态检测,1为开启,0为关闭;使能后端口为连接的时候会报错":"",
"PRcheck": "1",
"##PScheck为检测端口速率是否匹配,1为开启,0为关闭":"",
"PScheck":"1",
"tcpUdpPort": ["1024", "2048"],
"IP": "192.168.163.1",
"mac": "000000033333",
"##学习DUT的MAC地址":"",
"snlernenable": "0",
"learnIP": "192.168.2.1",
"##测试仪IP地址":"",
"ip_address": "192.168.1.200"
}
案例一:利于Python调用JSON对象来实现对XENA流量测试仪的灵活发包测试,能够适应Pair,Rotate,1-to-Many等多种拓扑模型
标签:xen pytho 类型 发送 learning while protocol The 数据
原文地址:https://www.cnblogs.com/xena/p/11579033.html