标签:
一、单表
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5) Base = declarative_base() # 单表 class Test(Base): __tablename__ = ‘test‘ nid = Column(Integer, primary_key=True,autoincrement=True) name = Column(String(32))
二、一对多
1.创建表 创建数据
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5) Base = declarative_base() # 单表 class Test(Base): __tablename__ = ‘test‘ nid = Column(Integer, primary_key=True,autoincrement=True) name = Column(String(32)) # 一对多 class Group(Base): __tablename__ = ‘group‘ nid = Column(Integer, primary_key=True,autoincrement=True) caption = Column(String(32)) class User(Base): __tablename__ = ‘user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) group_id = Column(Integer, ForeignKey(‘group.nid‘)) #主动创建外键约束 def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) init_db() #创建表 Session = sessionmaker(bind=engine) session = Session() session.add(Group(caption=‘dba‘)) #外键添加group数据 session.add(Group(caption=‘ddd‘)) session.commit() session.add_all([ User(username=‘alex1‘,group_id=1), #键添加用户 User(username=‘alex2‘,group_id=2) ]) session.commit()
数据库显示结果
mysql> show tables;
+---------------+
| Tables_in_s13 |
+---------------+
| group |
| user |
+---------------+
mysql> select *from `group`;
+-----+---------+
| nid | caption |
+-----+---------+
| 1 | dba |
| 2 | ddd |
+-----+---------+
mysql> select *from user;
+-----+----------+----------+
| nid | username | group_id |
+-----+----------+----------+
| 1 | alex1 | 1 |
| 2 | alex2 | 2 |
+-----+----------+----------+
2.查询表(普通查询)
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5) Base = declarative_base() # 一对多 class Group(Base): __tablename__ = ‘group‘ nid = Column(Integer, primary_key=True,autoincrement=True) caption = Column(String(32)) class User(Base): __tablename__ = ‘user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) group_id = Column(Integer, ForeignKey(‘group.nid‘)) #主动创建外键约束
# 只是获取用户 ret = session.query(User).filter(User.username == ‘alex1‘).all() print(ret) ret = session.query(User).all() obj = ret[0] print(ret) print(obj) print(obj.nid) print(obj.username) print(obj.group_id) #以下为查询执行情况 [1 - alex1: 1] [1 - alex1: 1, 2 - alex2: 2] 1 - alex1: 1 1 alex1 1 #联表查询 ret = session.query(User.username).all() print(ret) sql = session.query(User,Group).join(Group, isouter=True) print(sql) ret = session.query(User,Group).join(Group, isouter=True).all() print(ret) sql = session.query(User.username,Group.caption).join(Group, isouter=True) print(sql)
ret = session.query(User.username,Group.caption).join(Group, isouter=True).all() print(ret) #以下为查询执行情况 [(‘alex1‘,), (‘alex2‘,)] SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_id, "group".nid AS group_nid, "group".caption AS group_caption FROM "user" LEFT OUTER JOIN "group" ON "group".nid = "user".group_id [(1 - alex1: 1, <s1.Group object at 0x000001A74F870978>), (2 - alex2: 2, <s1.Group object at 0x000001A74F870A58>)] SELECT "user".username AS user_username, "group".caption AS group_caption FROM "user" LEFT OUTER JOIN "group" ON "group".nid = "user".group_id [(‘alex1‘, ‘dba‘), (‘alex2‘, ‘ddd‘)]
3.优化联表查询
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5) Base = declarative_base() # 一对多 class Group(Base): __tablename__ = ‘group‘ nid = Column(Integer, primary_key=True,autoincrement=True) caption = Column(String(32)) class User(Base): __tablename__ = ‘user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) group_id = Column(Integer, ForeignKey(‘group.nid‘)) #主动创建外键约束 # 与生成表结构无关,仅用于查询方便 group = relationship("Group", backref=‘uuu‘) # backref yi def __repr__(self): #返回格式 temp = "%s - %s: %s" %(self.nid, self.username, self.group_id) return temp #优化: User类下添加一行 group = relationship("Group", backref=‘uuu‘) #1 # 原始方式 # ret = session.query(User.username,Group.caption).join(Group, isouter=True).all() # 新方式(正向查询) ret = session.query(User).all() #查询 for obj in ret: # obj代指user表的没一行数据 # obj.group代指group对象, print(obj.nid,obj.username,obj.group_id, obj.group,obj.group.nid,obj.group.caption) #以下为执行结果:
1 alex1 1 <s1.Group object at 0x0000029BB37BE2B0> 1 dba
2 alex2 2 <s1.Group object at 0x0000029BB37BE470> 2 ddd #2 # 原始方式 # ret = session.query(User.username,Group.caption).join(Group, isouter=True).filter(Group.caption == ‘DBA‘).all() # 新方式(反向查询) obj = session.query(Group).filter(Group.caption == ‘DBA‘).first() print(obj.nid) print(obj.caption) print(obj.uuu) sql = session.query(Group).filter(Group.caption == ‘DBA‘) print(sql) sql = session.query(Group).get(1) print(sql) #以下为执行结果: 1
dba
[1 - alex1: 1]
SELECT "group".nid AS group_nid, "group".caption AS group_caption
FROM "group"
WHERE "group".caption = :caption_1
<s1.Group object at 0x0000024A4A84CB70>
三、多对多
1.创建表 创建数据
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5) Base = declarative_base() # 多对多 class HostToHostUser(Base): __tablename__ = ‘host_to_host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) host_id = Column(Integer ,ForeignKey(‘host.nid‘)) host_user_id = Column(Integer, ForeignKey(‘host_user.nid‘)) class Host(Base): # metaclass,Host.table对象 __tablename__ = ‘host‘ nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) class HostUser(Base): __tablename__ = ‘host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) init_db() #创建表 Session = sessionmaker(bind=engine) session = Session() session.add_all([ #插入数据 Host(hostname=‘c1‘,port=‘22‘,ip=‘1.1.1.1‘), Host(hostname=‘c2‘,port=‘22‘,ip=‘1.1.1.2‘), Host(hostname=‘c3‘,port=‘22‘,ip=‘1.1.1.3‘), Host(hostname=‘c4‘,port=‘22‘,ip=‘1.1.1.4‘), Host(hostname=‘c5‘,port=‘22‘,ip=‘1.1.1.5‘), ]) session.commit() session.add_all([ HostUser(username=‘root‘), HostUser(username=‘db‘), HostUser(username=‘nb‘), HostUser(username=‘sb‘), ]) session.commit() session.add_all([ HostToHostUser(host_id=1,host_user_id=1), HostToHostUser(host_id=1,host_user_id=2), HostToHostUser(host_id=1,host_user_id=3), HostToHostUser(host_id=2,host_user_id=2), HostToHostUser(host_id=2,host_user_id=4), HostToHostUser(host_id=2,host_user_id=3), ]) session.commit() #以下为创建的数据 mysql> select *from `host`; +-----+----------+------+---------+ | nid | hostname | port | ip | +-----+----------+------+---------+ | 1 | c1 | 22 | 1.1.1.1 | | 2 | c2 | 22 | 1.1.1.2 | | 3 | c3 | 22 | 1.1.1.3 | | 4 | c4 | 22 | 1.1.1.4 | | 5 | c5 | 22 | 1.1.1.5 | +-----+----------+------+---------+ mysql> select *from `host_user`; +-----+----------+ | nid | username | +-----+----------+ | 1 | root | | 2 | db | | 3 | nb | | 4 | sb | +-----+----------+ mysql> select *from `host_to_host_user`; +-----+---------+--------------+ | nid | host_id | host_user_id | +-----+---------+--------------+ | 1 | 1 | 1 | | 2 | 1 | 2 | | 3 | 1 | 3 | | 4 | 2 | 2 | | 5 | 2 | 4 | | 6 | 2 | 3 | +-----+---------+--------------+
2.查询表(普通查询)
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5) Base = declarative_base() # 多对多 class HostToHostUser(Base): __tablename__ = ‘host_to_host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) host_id = Column(Integer ,ForeignKey(‘host.nid‘)) host_user_id = Column(Integer, ForeignKey(‘host_user.nid‘)) class Host(Base): # metaclass,Host.table对象 __tablename__ = ‘host‘ nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) host_user = relationship(‘HostUser‘, secondary=lamda:HostToHostUser.__table__, backref=‘h‘) #secondary指定通过哪个中间表关联
来进行反向查询 class HostUser(Base): __tablename__ = ‘host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) #init_db() #创建表 Session = sessionmaker(bind=engine) session = Session() # 1 host_obj = session.query(Host).filter(Host.hostname == ‘c1‘).first() host_obj.nid session.query() session.query(Host.nid).filter(Host.hostname == ‘c1‘) # 2,所有用户ID host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all() print(host_2_host_user) # [(1,), (2,), (3,)] r = zip(*host_2_host_user) #\print(list(r)[0]) # [1,2,3] # 3、根据用户ID找到所有用户 users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all() print(users) #以下为执行结果: [(1,), (2,), (3,)] [(‘root‘,), (‘db‘,), (‘nb‘,)]
3.优化联表查询
#!/usr/bin/env python
# -*- coding:utf-8 -*- # Author:wuwenyu from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,Table from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5) Base = declarative_base() class Host(Base): __tablename__ = ‘host‘ nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) host_user = relationship(‘HostUser‘, secondary=lambda:HostToHostUser.__table__, backref=‘h‘) def __repr__(self): temp = "%s - %s: %s" % (self.nid, self.username, self.group_id) return temp class HostUser(Base): __tablename__ = ‘host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) class HostToHostUser(Base): __tablename__ = ‘host_to_host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) host_id = Column(Integer,ForeignKey(‘host.nid‘)) host_user_id = Column(Integer,ForeignKey(‘host_user.nid‘)) host = relationship("Host", backref=‘h‘) # hostUser = relationship("Host_user", backref=‘u‘) #init_db() #创建表 Session = sessionmaker(bind=engine) session = Session() #原始的方式 #obj=session.query(HostUser.name).filter(HostUser.nid.in_(session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == session.query(Host.nid).filter(Host.hostname == ‘c1‘)))) #新方式(反向查询) host_obj = session.query(Host).filter(Host.hostname==‘c1‘).first() print(host_obj.nid) print(host_obj.hostname) # 第三表对应的对象
#类HostToHostUser 添加 :host = relationship("Host", backref=‘h‘) 则可以使用
print(host_obj.h) # # 循环获取的第三表对应的对象 # for item in host_obj.h: # print(item.host_user,item.host_user.nid,item.host_user.username)
#Host 类中添加:host_user = relationship(‘HostUser‘, secondary=lambda:HostToHostUser.__table__, backref=‘h‘) 可以使用
print(host_obj.host_user) for item in host_obj.host_user: print(item.username) #以下为查询结果 1 c1 [<s1.HostToHostUser object at 0x000002315C9FCA58>, <s1.HostToHostUser object at 0x000002315CA140F0>, <s1.HostToHostUser object at 0x000002315CA14160>] [<s1.HostUser object at 0x000002315CA14710>, <s1.HostUser object at 0x000002315CA14780>, <s1.HostUser object at 0x000002315CA147F0>] root db nb
#paramiko 登陆 代码 +tab键 #python2适用
#encoding:utf-8 import paramiko import sys import os import socket import select import getpass import termios import tty # from paramiko.py3compat import u tran = paramiko.Transport((‘127.0.0.01, 22,)) tran.start_client() tran.auth_password(‘root‘, ‘123‘) # 打开一个通道 chan = tran.open_session() # 获取一个终端 chan.get_pty() # 激活器 chan.invoke_shell() # 获取原tty属性 oldtty = termios.tcgetattr(sys.stdin) try: # 为tty设置新属性 # 默认当前tty设备属性: # 输入一行回车,执行 # CTRL+C 进程退出,遇到特殊字符,特殊处理。 # 这是为原始模式,不认识所有特殊符号 # 放置特殊字符应用在当前终端,如此设置,将所有的用户输入均发送到远程服务器 tty.setraw(sys.stdin.fileno()) chan.settimeout(0.0) while True: # 监视 用户输入 和 远程服务器返回数据(socket) # 阻塞,直到句柄可读 r, w, e = select.select([chan, sys.stdin], [], [], 1) if chan in r: try: x = chan.recv(1024) if len(x) == 0: print(‘\r\n*** EOF\r\n‘) break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) if len(x) == 0: break chan.send(x) finally: # 重新设置终端属性 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) chan.close() tran.close()
#完美 #python2适用
#encoding:utf-8 import paramiko import sys import os import socket import getpass import time from paramiko.py3compat import u # windows does not have termios... try: import termios import tty has_termios = True except ImportError: has_termios = False def interactive_shell(chan): if has_termios: posix_shell(chan) else: windows_shell(chan) def posix_shell(chan): import select oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) log = open(‘handle.log‘, ‘a+‘) flag = False temp_list = [] while True: r, w, e = select.select([chan, sys.stdin], [], []) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write(‘\r\n*** EOF\r\n‘) break if flag: if x.startswith(‘\r\n‘): pass else: temp_list.append(x) flag = False #print(x) sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) import json if len(x) == 0: break if x == ‘\t‘: flag = True else: temp_list.append(x) if x == ‘\r‘: print temp_list if ‘‘.join(temp_list) == "su -\r": print("aaaaaaaaaaaaa") date = time.ctime() log.write("%s wuwenyui %s \n" % (‘‘.join(temp_list),str(date))) log.flush() temp_list=[] chan.send(x) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) def windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write(‘\r\n*** EOF ***\r\n\r\n‘) sys.stdout.flush() break sys.stdout.write(data) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 pass def run(): tran = paramiko.Transport((‘127.0.0.1‘, 1715,)) tran.start_client() tran.auth_password(‘root‘, ‘123‘) # 打开一个通道 chan = tran.open_session() # 获取一个终端 chan.get_pty() # 激活器 chan.invoke_shell() interactive_shell(chan) chan.close() tran.close() if __name_
#增加输入iput 以及日志 #python2适用
#encoding:utf-8 import paramiko import sys import os import socket import select import getpass import termios import tty from paramiko.py3compat import u # windows does not have termios... try: import termios import tty has_termios = True except ImportError: has_termios = False def interactive_shell(chan): if has_termios: posix_shell(chan) else: windows_shell(chan) def posix_shell(chan): import select oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) log = open(‘handle.log‘, ‘a+‘) flag = False temp_list = [] while True: r, w, e = select.select([chan, sys.stdin], [], []) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write(‘\r\n*** EOF\r\n‘) break if flag: if x.startswith(‘\r\n‘): pass else: temp_list.append(x) flag = False sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) import json if len(x) == 0: break if x == ‘\t‘: flag = True else: temp_list.append(x) if x == ‘\r‘: log.write(‘‘.join(temp_list)) log.flush() temp_list=[] chan.send(x) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) def windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write(‘\r\n*** EOF ***\r\n\r\n‘) sys.stdout.flush() break sys.stdout.write(data) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 pass def run(): default_username = getpass.getuser() username = raw_input(‘Username [%s]: ‘ % default_username) if len(username) == 0: username = default_username hostname = raw_input(‘Hostname: ‘) print (hostname) if len(hostname) == 0: print(‘*** Hostname required.‘) sys.exit(1) tran = paramiko.Transport((hostname, 1715,)) #tran = paramiko.Transport((hostname, 1715,)) tran.start_client() default_auth = "p" auth = raw_input(‘Auth by (p)assword or (r)sa key[%s] ‘ % default_auth) if len(auth) == 0: auth = default_auth if auth == ‘r‘: default_path = os.path.join(os.environ[‘HOME‘], ‘.ssh‘, ‘id_rsa‘) path = raw_input(‘RSA key [%s]: ‘ % default_path) if len(path) == 0: path = default_path try: key = paramiko.RSAKey.from_private_key_file(path) except paramiko.PasswordRequiredException: password = getpass.getpass(‘RSA key password: ‘) key = paramiko.RSAKey.from_private_key_file(path, password) tran.auth_publickey(username, key) else: pw = getpass.getpass(‘Password for %s@%s: ‘ % (username, hostname)) print(pw) tran.auth_password(username, pw) # 打开一个通道 chan = tran.open_session() # 获取一个终端 chan.get_pty() # 激活器 chan.invoke_shell() interactive_shell(chan) chan.close() tran.close() if __name__ == ‘__main__‘: run()
标签:
原文地址:http://www.cnblogs.com/wudalang/p/5723229.html