标签:split closed 复用 客户 received tor attribute encoding [1]
作业需求:
1. 用户登陆
2. 上传/下载文件
3. 不同用户家目录不同
4. 查看当前目录下文件
5. 充分使用面向对象知识
思维图:
待补充()
思维分析:
1.用户登陆保存到文件对比用户名密码
2.上传用json序列化文件名,文件路径,文件大小传给服务器端。根据得到的字段内容操作上传动作
3.下载代码和上传基本可以互换,因为文件名都一样所以传一个文件大小即可
4.查看当前目录下文件,调用cd命令,既然能分解get 和put动作就可以看cd动作
5.添加了LINUX和Windows不同系统的命令,路径,判断,命令有cd,ls,mkdir,等
6.添加了少部分的异常处理
7.添加了 IO多路复用SocketServer
8.未完成日志,md5校验的功能
README:
作者:yaobin
版本:简单Ftp示例版本 v0.1
开发环境: python3.6
程序介绍:
1. 用户登陆
2. 上传/下载文件
3. 不同用户家目录不同
4. 查看当前目录下文件
5. 充分使用面向对象知识
使用说明:
1.可以在Linux和Windows都可以运行
2.Linux调用了cd,mkdir,ls,rm,命令
3.Windows调用了cd,md,dir,del,命令
On Linux,Windows
Client: Python3 ./Ftp_Client.py
put 上传
get 下载
mkdir 创建目录
ls 查看文件信息
rm 删除文件
drm 删除目录
Server:Python3 ./Ftp_Server.py
文件目录结构:
├─简单Ftp #程序执行目录
│ README
│ __init__.py
│
├─bin
│ Ftp_Client.py #客户端程序
│ Ftp_Server.py #服务器端程序
│ __init__.py
│
├─conf
│ │ setting.py #配置文件
│ │ __init__.py
│ │
│ └─__pycache__
│ setting.cpython-36.pyc
│ __init__.cpython-36.pyc
│
├─core
│ │ auth.py #用户验证逻辑交互
│ │ commands.py #命令逻辑交互
│ │ ftp_client.py #sock_客户端逻辑交互
│ │ ftp_server.py #sock_服务端逻辑交互
│ │ logger.py #日志逻辑交互---未完成
│ │ main.py #客户端程序
│ │ __init__.py
│ │
│ └─__pycache__
│ auth.cpython-36.pyc
│ commands.cpython-36.pyc
│ ftp_client.cpython-36.pyc
│ ftp_server.cpython-36.pyc
│ main.cpython-36.pyc
│ __init__.cpython-36.pyc
│
├─db
│ │ __init__.py
│ │
│ ├─colin #用户目录
│ │ │ colin.bak
│ │ │ colin.dat #用户账号密码文件
│ │ │ colin.dir
│ │ │ __init__.py
│ │ │
│ │ └─colin #用户ftp家目录
│ │ │ __init__.py
│ │ │
│ │ └─aaa
│ ├─pub #ftp程序模拟pub目录
│ │ FTP作业.7z
│ │ socket通信client.py
│ │ __init__.py
│ │ 选课系统.png
│ │
│ └─user_path #用户路径文件,判断用户进入系统后在哪里处理
│ path
│
├─logs #日志未完成
│ access.log
│ transmission.log
│ __init__.py
│
├─src
│ │ auth_class.py #用户验证类
│ │ linux_cmd_class.py #linux命令类
│ │ server_class.py #server_socket类
│ │ windows_cmd_class.py #server命令类
│ │ __init__.py
│ │
│ └─__pycache__
│ auth_class.cpython-36.pyc
│ linux_cmd_class.cpython-36.pyc
│ server_class.cpython-36.pyc
│ windows_cmd_class.cpython-36.pyc
│ __init__.cpython-36.pyc
│
└─test #测试
args_test.py
__init__.py
FTP程序代码:
bin/Ftp_client.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os,sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR) #添加环境变量
from core.main import Admin
if __name__ == ‘__main__‘:
Admin.run(‘‘)
bin/Ftp_Server.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os,sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR) #添加环境变量
from core import ftp_server
if __name__ == ‘__main__‘:
ftp_server.socketserver.ThreadingTCPServer((‘127.0.0.1‘, 62000), ftp_server.Ftp_server).serve_forever()
conf/setting.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import logging
import os,sys,platform,json
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR) #添加环境变量
os_res = platform.system()
if os_res == ‘Windows‘:
data_path = os.path.join(BASE_DIR + ‘\db‘)
ftp_path = os.path.join(BASE_DIR + ‘\db\pub‘)
path_file = os.path.join(BASE_DIR + ‘\db\\user_path\path‘)
else:
data_path = os.path.join(BASE_DIR + ‘/db‘)
ftp_path = os.path.join(BASE_DIR + ‘\db\pub‘)
path_file = os.path.join(BASE_DIR + ‘/db/user_path/path‘)
with open(path_file, ‘r‘, encoding=‘utf-8‘)as f:
file = f.readlines()
file_object=file[0]
core/auth.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os,sys,shelve
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR) #添加环境变量
from conf import setting
from src.auth_class import Auth
from core.commands import Commands
class Auth_ftp(object):
def __init__(self,username,user_passwd):
self.user_data = {}
self.username = username #用户名
self.username_passwd = user_passwd #密码
os_res = setting.platform.system()
if os_res == ‘Windows‘: #用户密码文件
self.passwd_data_path = os.path.join(setting.data_path
+ ‘\\‘+ username + ‘\\‘+ username + ‘.‘ + ‘dat‘) #Windows用户密码文件路径
self.passwd_data = os.path.join(setting.data_path + ‘\\‘ + username+ ‘\\‘ + username) # Windows用户密码文件
self.file_object = os.path.join(setting.data_path + ‘\\‘ + self.username)
else:
self.passwd_data_path = os.path.join(setting.data_path + ‘/‘ + username + ‘/‘ + username + ‘.‘ + ‘dat‘) # Linux用户密码文件路径
self.passwd_data = os.path.join(setting.data_path + ‘/‘ + username + ‘/‘ + username) # Linux用户密码文件
self.file_object = os.path.join(setting.data_path + ‘/‘ + username)
user_obj = (self.username,self.username_passwd,self.passwd_data) # ‘‘‘用户名作为key,把用户名,密码,目录,写到字典中方便调用‘‘‘
self.user_data[self.username] = user_obj
‘‘‘验证用户是否存在‘‘‘
def auth_user_passwd(self):
‘‘‘判断文件路径是否存在然后返回用户是否存在‘‘‘
os_res = os.path.exists(self.passwd_data_path)
if os_res !=False:
user_file = shelve.open(self.passwd_data)
if self.user_data[self.username][0] in user_file and user_file[self.username][1] == self.username_passwd:
print("Welcome,%s,您的身份验证成功"%self.username)
user_file.close()
else:
return False
else:
return True
def add_user_passwd(self):
res = os.path.exists(self.file_object)
if res != True:
Commands(self.file_object).mkdir() #用户账号密码文件
Commands(self.passwd_data).mkdir() #用户上传下载目录
user_file = shelve.open(self.passwd_data)
user_file.update(self.user_data)
print("用户创建成功")
else:
print("账号信息出现问题,请联系管理员....")
core/commands.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os,sys,platform
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR) #添加环境变量
from conf import setting
from src.linux_cmd_class import L_commands
from src.windows_cmd_class import W_commands
class Commands(object):
def __init__(self,file_object):
self.file_object=file_object
def cd1(self):
if setting.os_res == ‘Windows‘:
res = W_commands(self.file_object).cd1()
return res
elif setting.os_res == ‘Linux‘:
res = L_commands(self.file_object).cd1()
return res
else:
print(‘不支持此操作系统‘)
‘‘‘进入目录‘‘‘
def cd(self):
if setting.os_res ==‘Windows‘:
res= W_commands(self.file_object).cd()
return res
elif setting.os_res == ‘Linux‘:
res=L_commands(self.file_object).cd()
return res
else:
print(‘不支持此操作系统‘)
‘‘‘创建目录‘‘‘
def mkdir(self):
if setting.os_res ==‘Windows‘:
res= W_commands(self.file_object).mkdir()
return res
elif setting.os_res == ‘Linux‘:
res=L_commands(self.file_object).mkdir()
return res
else:
print(‘不支持此操作系统‘)
‘‘‘删除文件‘‘‘
def rm(self):
if setting.os_res == ‘Windows‘:
res=W_commands(self.file_object).rm()
return res
elif setting.os_res == ‘Linux‘:
res=L_commands(self.file_object).rm()
return res
else:
print(‘不支持此操作系统‘)
‘‘‘删除目录‘‘‘
def drm(self):
if setting.os_res == ‘Windows‘:
res=W_commands(self.file_object).drm()
return res
elif setting.os_res == ‘Linux‘:
res=L_commands(self.file_object).drm()
return res
else:
print(‘不支持此操作系统‘)
core/ftp_client.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os, sys
import platform, socket
import time,hashlib,json
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR) # 添加环境变量
from conf import setting
from core.commands import Commands
from conf import setting
# from src.server_class import Server_class
‘‘‘定义client连接的协议和方式后面需要补充‘‘‘
‘‘‘创建连接输入IP地址和密码‘‘‘
class Ftp_client(object):
def link(self):
try:
os.chdir(setting.file_object) #直接进入FTP指定的用户目录
self.sending_msg_list = []
self.ip_addr = ‘127.0.0.1‘
self.ip_port = 62000
self.client = socket.socket()
self.client.connect((self.ip_addr, self.ip_port))
while True:
self.sending_msg = None
self.data = self.client.recv(1024)
print("[+]Server>>>recv: %s" % self.data.decode())
self.menu()
sending_msg = input(‘请输入命令>>>:‘)
self.sending_msg_list = sending_msg.split()
if len(sending_msg) == 0:
data_header = {
‘test1‘: {
‘action‘: ‘‘,
‘file_name‘: ‘‘,
‘size‘: 0
}
}
self.client.send(json.dumps(data_header).encode())
elif len(sending_msg) >= 2 :
if setting.os_res == ‘Windows‘:
try :
new_path = self.sending_msg_list[1].encode(‘utf-8‘)
self.res_new = self.sending_msg_list[1].strip().split(‘\\‘) # 截取获得文件名
self.file_name1 = self.res_new[-1]
except IndexError:
pass
elif setting.os_res == ‘Linux‘:
try:
self.res_new = self.sending_msg_list[1].strip().split(‘/‘)
self.file_name1 = self.res_new[-1]
except IndexError:
pass
if self.sending_msg_list[0] == "put":
try:
self.put(self.sending_msg_list[1])
except IndexError:
self.client.send(‘put‘.encode())
if self.sending_msg_list[0] == "get":
try:
self.get(self.file_name1)
except IndexError and ValueError:
self.client.send(‘get‘.encode())
elif self.sending_msg_list[0] == "exit":
break
else:#cd ls rm drm mkdir 命令等
try:
data_header = {
‘test1‘: {
‘action‘: self.sending_msg_list[0],
‘file_name‘: self.file_name1,
‘size‘: 0
}
}
self.client.send(json.dumps(data_header).encode())
except AttributeError:
data_header = {
‘test1‘: {
‘action‘: self.sending_msg_list[0],
‘file_name‘: ‘‘,
‘size‘: 0
}
}
self.client.send(json.dumps(data_header).encode())
except ConnectionResetError and ConnectionAbortedError:
print("[+]Server is Down ....Try to Reconnect......")
self.link()
def get(self,file_name):
data_header = {
‘test1‘: {
‘action‘: ‘get‘,
‘file_name‘: file_name,
‘size‘: 0
}
}
#这里没做同名文件判断,下一次补充
self.client.send(json.dumps(data_header).encode()) #发送get请求
print(os.getcwd())
self.data = self.client.recv(1024) #拿到size
self.client.send(b‘come on‘)
file_size = int(self.data.decode())
def file_tr():
file_object = open(file_name, ‘wb‘)
received_size = 0
while received_size < file_size:
recv_data = self.client.recv(1024)
file_object.write(recv_data)
received_size += len(recv_data) # 规定多少但不一定收到那么多
print(file_size, received_size)
else:
print(‘[+]Client:File Recv Successful‘)
file_object.close()
if os.path.exists(file_name) == False: # 判断本地目录文件是否存在
file_tr()
else:
print(‘文件已经存在将要覆盖‘)
file_tr()
def put(self,file_name):
if os.path.exists(file_name)== True: #判断文件路径# 是否存不存在
if os.path.isfile(file_name):
file_obj = open(file_name, "rb")
data_header = {
‘test1‘: {
‘action‘: ‘put‘,
‘file_name‘: self.file_name1,
‘size‘: os.path.getsize(self.sending_msg_list[1].encode())
}
}
self.client.send(json.dumps(data_header).encode())
for line in file_obj:
self.client.send(line)
file_obj.close()
print("[+]-----send file down-----")
else:
print(‘[+]file is no valid.‘)
self.client.send(‘cmd‘.encode())
else:
print(‘[+] File Not Found‘)
data_header = {
‘test1‘: {
‘action‘: ‘aaaa‘,
‘file_name‘: ‘‘,
‘size‘: 0
}
}
self.client.send(json.dumps(data_header).encode())
def menu(self):
menu = ‘‘‘
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
帮 助 请注意windows和linux的路径稍有不同
查看文件 ls(file) eg: ls /tmp/Python3.6.3/README
进入目录 cd eg: cd /tmp/python
创建目录 mkdir(dir) eg: mkdir /tmp/python
删除文件 rm eg: rm /tmp/python/README
删除目录 drm eg: drm /tmp/python
上 传 put eg: put /tmp/python/README
下 载 get eg: get /tmp/python/README
登 出 exit
默认上传文件夹 用户目录下用用户名的命名的文件夹
默认下载文件夹 db/pub
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
‘‘‘
print(menu)
core/ftp_server.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os,sys,time
import subprocess,subprocess,json
import socketserver,socket,traceback
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from core.main import Admin
from core.auth import Auth_ftp
from core.commands import Commands
from conf import setting
from src.server_class import Server_class
class Ftp_server(socketserver.BaseRequestHandler):
#都懵比了很多代码重复了、等着重新回炉下~~~
def parsecmd(self,data):
data = json.loads(data.decode())
file_action = data[‘test1‘][‘action‘]
file_path = data[‘test1‘][‘file_name‘]
file_size = int(data[‘test1‘][‘size‘])
file_obj = setting.file_object
print(‘from ip : %s information : %s‘ % (self.client_address[0], self.data.decode()))
try:
if file_action == ‘put‘:
os.chdir(file_obj) # 默认用用户的目录作为上传目录
def file_tr():
self.request.send(b‘recv data‘)
file_object = open(file_path, ‘wb‘)
received_size = 0
while received_size < file_size:
recv_data = self.request.recv(1024)
file_object.write(recv_data)
received_size += len(recv_data) # 规定多少但不一定收到那么多
print(file_size, received_size)
else:
print(‘[+]File Recv Successful‘)
file_object.close()
if os.path.exists(file_path) == False: # 判断文件是否存在
file_tr()
else:
self.request.send(b‘[+] this file[%s] will cover ....‘ % file_path.encode())
file_tr()
elif file_action == ‘get‘:
os.chdir(setting.ftp_path) # get对象工作的目录pub
# # get文件首先需要其他目录,或者更改名字 ,然后进行getget的地址应该是自己家目录的地址
# # 下载文件时候客户端在用户目录下面去下载
#判断size不同os的路径不同
if setting.os_res == ‘Windows‘:
file_size = os.path.getsize(setting.ftp_path+‘\\‘+file_path)
if setting.os_res == ‘Linux‘:
file_size = os.path.getsize(setting.ftp_path + ‘/‘ + file_path)
try:
if os.path.exists(file_path) == True: # 判断文件路径# 是否存在
if os.path.isfile(file_path):
file_obj = open(file_path, "rb")
#直接copy 客户端put的代码就可以,这里self.conn.send 改为request.send
self.request.send(str(file_size).encode())
self.request.recv(1024)
for line in file_obj:
self.request.send(line)
file_obj.close()
self.request.send(b"-b:bash:[+]Server[%s]---send file down-----"%file_path.encode())
else:
print(‘[+]file is no valid.‘)
self.request.send(b"-b:bash:[+]Server[%s]---file is no valid.-----" % file_path.encode())
#这里不写上下俩条的话客户端就会卡住、处于一直等待中
else:
self.request.send(b"-b:bash:[+]Server[%s]---file is no valid.-----" % file_path.encode())
# except json.decoder.JSONDecodeError:
# self.request.send(b‘-b:bash:[+]what are you doing???‘)
except:
pass
elif file_action== ‘cd‘:
if os.path.exists(file_path) == True:
res = Commands(file_path).cd()
self.request.send(b‘-bash: [%s] [%s]: ‘ % (file_action.encode(), file_path.encode()))
else:
self.request.send(b‘-bash:Directory Exitis‘)
elif file_action == ‘mkdir‘:
os.chdir(file_obj)
# 文件夹mkdir命令处理,如果是windows默认应该是gbk生成的bytes类型‘‘‘
if os.path.exists(file_path) == True:# 进入要传递目录
self.request.send(b‘-bash: directory exitis ‘)
else:
res = Commands(file_path).mkdir()
self.request.send(b‘-bash: [%s] [%s]:‘ % (file_action.encode(), file_path.encode()))
elif file_action == ‘ls‘: #测试利用json返回命令结果、反正挺好玩的!!!!用subporcess 用的挺high的
res = os.listdir()
res = json.dumps(res)
self.request.send(res.encode())
# ‘‘‘文件删除命令处理, 如果是windows默认应该是gbk生成的bytes类型‘‘‘
elif file_action == ‘rm‘:
os.chdir(file_obj)
if os.path.isfile(file_path) == True:
res = Commands(file_path).rm()
self.request.send(b‘-bash: [%s] [%s]:‘ % (file_action.encode(), file_path.encode()))
else:
self.request.send(b‘-bash: [%s]: Not file ‘ % file_path.encode())
# 目录删除命令处理, 如果是windows默认应该是gbk生成的bytes类型‘‘‘
elif file_action == ‘drm‘:
os.chdir(file_obj) # 进入要传递目录
if os.path.isdir(file_path) == True: # 判断是否是目录
Commands(file_path).drm()
self.request.send(b‘-bash: %s: Delete OK ‘%file_path.encode())
else:
self.request.send(b‘-bash: [%s]: No such File or Directory ‘ %file_path.encode())
else:
self.request.send(b‘-bash:Command or File Not Found ‘)
except Exception and UnicodeDecodeError:
traceback.print_exc()
def handle(self):
print("[+] Server is running on port:62000", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
while True:
try: # 调整一下socket.socket()的位置每次重新连接都生成新的socket实例,避免因为意外而导致socket断开连接
print("[+] Connect success -> %s at ", self.client_address, time.strftime("%Y%m%d %H:%M:%S", time.localtime()))
self.request.send(b‘\033[34;1mWelcome,-bash version 0.0.1-release \033[0m ‘)
while True:
self.data = self.request.recv(1024)
data = self.data
self.parsecmd(data)
if not self.data:
print("[+]Error: Client is lost")
break
except socket.error:
print("[+]Error get connect error")
break
continue
socketserver.ThreadingTCPServer((‘127.0.0.1‘, 62000), Ftp_server).serve_forever()
core/main.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os,sys,json
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR) #添加环境变量
from conf import setting
from core.auth import Auth_ftp
from core.commands import Commands
from core.ftp_client import Ftp_client
class Admin(object):
def run(self):
exit_flag = False
print(‘欢迎来到简单FTP程序,登陆程序后server会在本地自动启动‘)
menu = u‘‘‘
\033[32;1m
1.登陆
2.注册
3.退出\033[0m
‘‘‘
while not exit_flag:
print(menu)
user_option = input(‘请输入您的操作,输入q退出>>>:‘).strip()
‘‘‘登陆‘‘‘
if user_option == ‘1‘:
user_name = input(‘请输入用户名>>>:‘).strip()
user_passwd = input(‘请输入您的密码>>>:‘).strip()
file_object = (Auth_ftp(user_name, user_passwd).passwd_data) #传入路径变量
res = Auth_ftp(user_name,user_passwd).auth_user_passwd()
if res == None:
with open(setting.path_file, ‘w‘,encoding=‘utf-8‘) as f:
f.write(file_object);f.close()
Ftp_client().link()
elif res == False:
print(‘%s用户密码不正确‘ % user_name)
else:
print(‘请先注册‘)
elif user_option == ‘2‘:
user_name = input(‘请输入用户名>>>:‘).strip()
user_passwd = input(‘请输入您的密码>>>:‘).strip()
user_res = Auth_ftp(user_name, user_passwd).auth_user_passwd()
if user_res == None:
print("%s用户不需要注册"%user_name) #不需要注册但是很是应该能进入ftp进行操作
file_object = (Auth_ftp(user_name, user_passwd).passwd_data)
with open(setting.path_file, ‘w‘,encoding=‘utf-8‘) as f:
f.write(file_object);f.close()
Ftp_client().link()
elif user_res == False:
print("%已注册用户,密码不正确" % user_name)
elif user_res == True:
Auth_ftp(user_name, user_passwd).add_user_passwd() #创建用户名密码文件等
file_object = (Auth_ftp(user_name, user_passwd).passwd_data)
with open(setting.path_file, ‘w‘,encoding=‘utf-8‘) as f:
f.write(file_object);f.close()
Ftp_client().link()
else:
sys.exit(‘异常退出‘)
elif user_option == ‘q‘ or user_option == ‘3‘:
sys.exit()
else:
print(‘输入的选项不正确,请重新输入‘)
#Admin().run()
db
logs
src/auth_class.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
class Auth(object):
‘‘‘用户验证类类为用户、密码家目录‘‘‘
def __init__(self,name,passwd):
self.name = name
self.passwd = passwd
src/linux_cmd.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
class Auth(object):
‘‘‘用户验证类类为用户、密码家目录‘‘‘
def __init__(self,name,passwd):
self.name = name
self.passwd = passwd
src/windows_cmd.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os,sys,subprocess
class L_commands(object):
def __init__(self,file_object):
self.cwd = os.getcwd() # 获取当前目录
self.file_object = [file_object] #列表文件 #取0得到文件名/或目录路径
self.directory_object=[file_object] #目录文件
self.file = self.file_object[0]
self.directory = self.file_object[0]
def cd1(self):
os.chdir(self.directory_object[0])
‘‘‘进入目录‘‘‘
def cd(self):
res = subprocess.Popen([‘cd %s‘ % self.directory], shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.msg = res.stdout.read()
if len(self.msg) == 0:
self.msg = res.stderr.read()
return self.msg
‘‘‘创建目录‘‘‘
def mkdir(self):
res = subprocess.call([‘mkdir -p %s‘ % self.directory], shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.msg = res.stdout.read()
if len(self.msg) == 0:
self.msg = res.stderr.read()
return self.msg
‘‘‘删除文件‘‘‘
def rm(self):
res = subprocess.call([‘rm -f %s‘ % self.directory], shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.msg = res.stdout.read()
if len(self.msg) == 0:
self.msg = res.stderr.read()
return self.msg
‘‘‘删除目录‘‘‘
def drm(self):
res = subprocess.call([‘rm -rf %s‘ % self.directory], shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.msg = res.stdout.read()
if len(self.msg) == 0:
self.msg = res.stderr.read()
return self.msg
src/server_class.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
‘‘‘ftpserver类‘‘‘
import os,socket
class Server_class(object):
def __init__(self,ip_addr,port):
self.ip_addr = ip_addr
self.port = port
self.server = socket.socket()
程序运行部分截图:
用户验证:
系统命令:
上传和下载:
Put
Get
标签:split closed 复用 客户 received tor attribute encoding [1]
原文地址:http://www.cnblogs.com/sean-yao/p/7772159.html