码迷,mamicode.com
首页 > 编程语言 > 详细

python3实现的rtsp客户端脚本

时间:2018-05-25 21:18:41      阅读:1122      评论:0      收藏:0      [点我收藏+]

标签:针对   eve   err   ast   ref   1.0   serve   art   sleep   

一、说明

此客户端使用python3编写

此客户端实现RTSP的OPTIONS, DESCRIBE, SETUP , PLAY, GET_PARAMETER,TEARDOWN方法,未实现SET_PARAMETER方法(因为不懂怎么才能生成这个请求没拦截到数据包)

RTSP有且只有以上7种请求,而且基本是对同一URI依次执行以上7种请求;或者说RTSP就是“7种请求”+“1个URL”,如果想做渗透测试也就针对这“7+1”做参数溢出测试,没很多可测的

此客户端自动依次执行请求:OPTIONS--DESCRIBE(2次)--SETUP(2次)--PLAY--GET_PARAMETER(5次)--TEARDOWN

RTSP有Basic和Digest两种验证方法,此客户端实现Digest方法

总体执行效果如下图所示

技术分享图片

 

二、客户端源代码

技术分享图片
import socket
import hashlib
import time

global var_dict
var_dict = {
    server_username: admin,
    server_password: admin,
    server_ip: 192.168.220.128,
    server_port: 554,
    server_url: /chIP=1&streamType=main/,
    cseq: 2,
    user_agent: LibVLC/3.0.2 (LIVE555 Streaming Media v2016.11.28),
    buffer_len: 1024
}


def gen_response_value(url,public_method,realm,nonce):
    frist_pre_md5_value = hashlib.md5((var_dict[server_username] + : + realm + : + var_dict[server_password]).encode()).hexdigest()
    first_post_md5_value = hashlib.md5((public_method+: + url).encode()).hexdigest()
    response_value = hashlib.md5((frist_pre_md5_value + : + nonce + : + first_post_md5_value).encode()).hexdigest()
    return response_value


def gen_options_header():
    global var_dict
    str_options_header = OPTIONS rtsp:// + var_dict[server_ip] + : + str(var_dict[server_port]) + var_dict[
        server_url] +  RTSP/1.0\r\n
    str_options_header += CSeq:  + str(var_dict[cseq]) + \r\n
    str_options_header += User-Agent:  + var_dict[user_agent] + \r\n
    str_options_header += \r\n
    return str_options_header


def gen_describe_header():
    global var_dict
    str_describe_header = DESCRIBE rtsp:// + var_dict[server_ip] + : + str(var_dict[server_port]) + var_dict[
        server_url] +  RTSP/1.0\r\n
    str_describe_header += CSeq:  + str(var_dict[cseq] + 1) + \r\n
    str_describe_header += User-Agent:  + var_dict[user_agent] + \r\n
    str_describe_header += Accept: application/sdp\r\n
    str_describe_header += \r\n
    return str_describe_header


def gen_describe_auth_header(url,realm,nonce):
    global var_dict
    public_method = DESCRIBE
    response_value = gen_response_value(url,public_method,realm,nonce)
    str_describe_auth_header = DESCRIBE rtsp:// + var_dict[server_ip] + : + str(var_dict[server_port]) + var_dict[server_url] +  RTSP/1.0\r\n
    str_describe_auth_header += CSeq:  + str(var_dict[cseq] + 2) + \r\n
    str_describe_auth_header += Authorization: Digest username="+var_dict[server_username]+", realm="+realm+", nonce="+nonce+", uri="+url+", response="+response_value+"\r\n
    str_describe_auth_header += User-Agent:  + var_dict[user_agent] + \r\n
    str_describe_auth_header += Accept: application/sdp\r\n
    str_describe_auth_header += \r\n
    return str_describe_auth_header


def gen_setup_header(url, realm, nonce):
    global var_dict
    public_method = SETUP
    response_value = gen_response_value(url, public_method, realm, nonce)
    str_setup_header  = SETUP rtsp:// + var_dict[server_ip] + : + str(var_dict[server_port]) + var_dict[server_url] + /trackID=0 RTSP/1.0\r\n
    str_setup_header += CSeq:  + str(var_dict[cseq] + 3) + \r\n
    str_setup_header += Authorization: Digest username="+var_dict[server_username]+", realm="+realm+", nonce="+nonce+", uri="+url+", response="+response_value+"\r\n
    str_setup_header += User-Agent:  + var_dict[user_agent] + \r\n
    str_setup_header += Transport: RTP/AVP;unicast;client_port=50166-50167\r\n
    str_setup_header += \r\n
    return str_setup_header


def gen_setup_session_header(url, realm, nonce,session):
    global var_dict
    public_method = SETUP
    response_value = gen_response_value(url, public_method, realm, nonce)
    str_setup_session_header = SETUP rtsp:// + var_dict[server_ip] + : + str(var_dict[server_port]) + var_dict[
        server_url] + /trackID=1 RTSP/1.0\r\n
    str_setup_session_header += CSeq:  + str(var_dict[cseq] + 4) + \r\n
    str_setup_session_header += Authorization: Digest username="+var_dict[server_username]+", realm="+realm+", nonce="+nonce+", uri="+url+", response="+response_value+"\r\n
    str_setup_session_header += User-Agent:  + var_dict[user_agent] + \r\n
    str_setup_session_header += Transport: RTP/AVP;unicast;client_port=50168-50169\r\n
    str_setup_session_header += Session: +session+\r\n
    str_setup_session_header += \r\n
    return str_setup_session_header


def gen_play_header(url, realm, nonce,session):
    global var_dict
    public_method = PLAY
    response_value = gen_response_value(url, public_method, realm, nonce)
    str_play_header = PLAY rtsp:// + var_dict[server_ip] + : + str(var_dict[server_port]) + var_dict[
        server_url] +  RTSP/1.0\r\n
    str_play_header += CSeq:  + str(var_dict[cseq] + 5) + \r\n
    str_play_header += Authorization: Digest username="+var_dict[server_username]+", realm="+realm+", nonce="+nonce+", uri="+url+", response="+response_value+"\r\n
    str_play_header += User-Agent:  + var_dict[user_agent] + \r\n
    str_play_header += Session: +session+\r\n
    str_play_header += Range: npt=0.000-\r\n
    str_play_header += \r\n
    return str_play_header


def gen_get_parameter_header(url, realm, nonce, session,count):
    global var_dict
    public_method = GET_PARAMETER
    response_value = gen_response_value(url, public_method, realm, nonce)
    str_get_parameter_header = GET_PARAMETER rtsp:// + var_dict[server_ip] + : + str(var_dict[server_port]) + var_dict[
        server_url] +  RTSP/1.0\r\n
    str_get_parameter_header += CSeq:  + str(var_dict[cseq] + 6+int(count)) + \r\n
    str_get_parameter_header += Authorization: Digest username=" + var_dict[
        server_username] + ", realm=" + realm + ", nonce=" + nonce + ", uri=" + url + ", response=" + response_value + "\r\n
    str_get_parameter_header += User-Agent:  + var_dict[user_agent] + \r\n
    str_get_parameter_header += Session:  + session + \r\n
    str_get_parameter_header += \r\n
    return str_get_parameter_header


def gen_teardown_header(url, realm, nonce, session):
    global var_dict
    public_method = TEARDOWN
    response_value = gen_response_value(url, public_method, realm, nonce)
    str_teardown_header = TEARDOWN rtsp:// + var_dict[server_ip] + : + str(var_dict[server_port]) + var_dict[server_url] +  RTSP/1.0\r\n
    str_teardown_header += CSeq:  + str(var_dict[cseq] + 11) + \r\n
    str_teardown_header += Authorization: Digest username=" + var_dict[
        server_username] + ", realm=" + realm + ", nonce=" + nonce + ", uri=" + url + ", response=" + response_value + "\r\n
    str_teardown_header += User-Agent:  + var_dict[user_agent] + \r\n
    str_teardown_header += Session:  + session + \r\n
    str_teardown_header += \r\n
    return str_teardown_header


socket_send = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_send.settimeout(5)
socket_send.connect((var_dict[server_ip], var_dict[server_port]))

url = rtsp:// + var_dict[server_ip] + : + str(var_dict[server_port]) + var_dict[server_url]

print(now start to check options operation)
str_options_header = gen_options_header()
socket_send.send(str_options_header.encode())
msg_recv = socket_send.recv(var_dict[buffer_len]).decode()
if 200 OK in msg_recv:
    print(OPTIONS request is OK)
else:
    print(OPTIONS request is BAD)
str_describe_header = gen_describe_header()
socket_send.send(str_describe_header.encode())
msg_recv = socket_send.recv(var_dict[buffer_len]).decode()
if msg_recv.find(401 Unauthorized) == -1:
    msg_recv_dict = msg_recv.split(\r\n)
    print(first DESCRIBE request occur error: )
    print(msg_recv_dict[0])
else:
    print(first DESCRIBE is ok,now we will execute second DESCRIBE for auth)
    realm_pos = msg_recv.find(realm)
    realm_value_begin_pos = msg_recv.find(", realm_pos)+1
    realm_value_end_pos = msg_recv.find(", realm_pos + 8)
    realm_value = msg_recv[realm_value_begin_pos:realm_value_end_pos]
    nonce_pos = msg_recv.find(nonce)
    nonce_value_begin_pos = msg_recv.find(", nonce_pos)+1
    nonce_value_end_pos = msg_recv.find(", nonce_pos + 8)
    nonce_value = msg_recv[nonce_value_begin_pos:nonce_value_end_pos]
    str_describe_header = gen_describe_auth_header(url, realm_value, nonce_value)
    socket_send.send(str_describe_header.encode())
    msg_recv = socket_send.recv(var_dict[buffer_len]).decode()
    if msg_recv.find(200 OK) == -1:
        msg_recv_dict = msg_recv.split(\r\n)
        print(second DESCRIBE request occur error: )
        print(msg_recv_dict[0])
    else:
        print(second DESCRIBE is ok,now we will execute first SETUP for session)
        str_setup_header = gen_setup_header(url, realm_value, nonce_value)
        socket_send.send(str_setup_header.encode())
        msg_recv = socket_send.recv(var_dict[buffer_len]).decode()
        if msg_recv.find(200 OK) == -1:
            msg_recv_dict = msg_recv.split(\r\n)
            print(first SETUP request occur error: )
            print(msg_recv_dict[0])
        else:
            print(first SETUP is ok,now we will execute second SETUP)
            session_pos = msg_recv.find(Session)
            session_value_begin_pos = msg_recv.find( ,session_pos+8)+1
            session_value_end_pos = msg_recv.find(;,session_pos+8)
            session_value = msg_recv[session_value_begin_pos:session_value_end_pos]
            str_setup_session_header = gen_setup_session_header(url, realm_value, nonce_value,session_value)
            socket_send.send(str_setup_session_header.encode())
            msg_recv = socket_send.recv(var_dict[buffer_len]).decode()
            if msg_recv.find(200 OK) == -1:
                msg_recv_dict = msg_recv.split(\r\n)
                print(first SETUP request occur error: )
                print(msg_recv_dict[0])
            else:
                print(second SETUP is ok, now we wil execute PLAY)
                str_play_header = gen_play_header(url, realm_value, nonce_value, session_value)
                socket_send.send(str_play_header.encode())
                msg_recv = socket_send.recv(var_dict[buffer_len]).decode()
                if msg_recv.find(200 OK) == -1:
                    msg_recv_dict = msg_recv.split(\r\n)
                    print(PLAY request occur error: )
                    print(msg_recv_dict[0])
                else:
                    print(PLAY is ok, we will execute GET_PARAMETER every 10 seconds and 5 times total)
                    for i in range(5):
                        str_get_parameter_header = gen_get_parameter_header(url, realm_value, nonce_value, session_value,str(i))
                        socket_send.send(str_get_parameter_header.encode())
                        msg_recv = socket_send.recv(var_dict[buffer_len]).decode()
                        msg_recv_dict = msg_recv.split(\r\n)
                        print(str(i)+*10:+msg_recv_dict[0])
                        time.sleep(10)
                    print(now we will execute TEARDOWN to disconnect with server)
                    str_teardown_header = gen_teardown_header(url, realm_value, nonce_value, session_value)
                    socket_send.send(str_teardown_header.encode())
                    msg_recv = socket_send.recv(var_dict[buffer_len]).decode()
                    print(msg_recv)
                    print(program execute finished, thank you)


socket_send.close()
View Code

 

参考:

https://www.cnblogs.com/MikeZhang/archive/2012/10/29/rtspTcpClient_DSS_20121029.html

https://blog.csdn.net/joeblackzqq/article/details/22383005

https://docs.python.org/3/library/index.html

 

python3实现的rtsp客户端脚本

标签:针对   eve   err   ast   ref   1.0   serve   art   sleep   

原文地址:https://www.cnblogs.com/lsdb/p/9090198.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!