标签:
内容目录:
ORM架构SQLalchemy
Paramiko
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
1、创建表
# 单表 class Test(Base): __tablename__ = ‘test‘ nid = Column(Integer, primary_key=True,autoincrement=True) name = Column(String(32)) # 一对多 class Group(Base): __tablename__ = ‘group‘ nid = Column(Integer, primary_key=True,autoincrement=True) caption = Column(String(32)) class User(Base): __tablename__ = ‘user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) group_id = Column(Integer, ForeignKey(‘group.nid‘)) group = relationship("Group", backref=‘uuu‘) def __repr__(self): temp = "%s - %s: %s" %(self.nid, self.username, self.group_id) return temp #多对多 class Host(Base): # metaclass,Host.table对象 __tablename__ = ‘host‘ nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) # host_user = relationship(‘HostUser‘, secondary=HostToHostUser, backref=‘h‘) host_user = relationship(‘HostUser‘, secondary=HostToHostUser.__table__, backref=‘h‘) class HostUser(Base): __tablename__ = ‘host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) def init_table(): Base.metadata.create_all(engine) def drop_table(): Base.metadata.drop_all(engine) init_table() Session = sessionmaker(bind=engine) session = Session()
2、操作表
增加操作
# 方法1 session.add(Group(caption=‘DBA‘)) session.add(Group(caption=‘SA‘)) session.commit() # 方法2 session.add_all([ User(username=‘jabe1‘,group_id = 1), User(username=‘jabe2‘,group_id = 2) ]) session.commit()
删除表数据操作
session.query(Users).filter(Users.id > 2).delete() session.commit()
修改表记录操作
session.query(Users).filter(Users.id > 2).update({"name" : "099"}) session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False) session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate") session.commit()
查询操作
#条件查询 ret = session.query(User).filter(User.username == ‘jabe1‘).all() print(ret) #单表查询 ret = session.query(User).all() obj = ret[0] print(ret) print(obj) print(obj.nid) print(obj.username) print(obj.group_id) #关联查询 sql = session.query(User).join(Group,isouter=True) print(sql) ret = session.query(User).join(Group,isouter=True).all() print(ret) # sql = session.query(User.username,Group.caption).join(Group,isouter=True) # print(sql) # ret = session.query(User.username,Group.caption).join(Group,isouter=True).all() # print(ret) #原始方式(查询用户名称和用户组) # ret = session.query(User.username,Group.caption).join(Group,isouter=True).all() # print(ret) #新方式正向查询 # ret = session.query(User).all() # for obj in ret: # #obj代指user表每一行数据 # #obj.group 代指group对象 # print(obj.nid,obj.username,obj.group_id,obj.group,obj.group.nid,obj.group.caption) #原始方式(查询所有属于DBA组的用户) ret = session.query(User.username,Group.caption).join(Group,isouter=True).filter(Group.caption == ‘DBA‘).all() print(ret) #新方式(反向查询) obj = session.query(Group).filter(Group.caption == ‘DBA‘).first() print(obj.nid) print(obj.caption) rint(obj.uuu)
附加
# 条件 ret = session.query(Users).filter_by(name=‘alex‘).all() ret = session.query(Users).filter(Users.id > 1, Users.name == ‘eric‘).all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == ‘eric‘).all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name=‘eric‘))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == ‘eric‘)).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == ‘eric‘)).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == ‘eric‘, Users.id > 3), Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like(‘e%‘)).all() ret = session.query(Users).filter(~Users.name.like(‘e%‘)).all() # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分组 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 组合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()
堡垒机
堡垒机执行流程:
注:配置.brashrc实现ssh登陆后自动执行脚本,如:/usr/bin/python /tmp/s13/day13/s7.py
练习代码:
import paramiko import sys import os import socket import getpass from paramiko.py3compat import u # windows does not have termios... try: import termios import tty has_termios = True except ImportError: has_termios = False def interactive_shell(chan): if has_termios: posix_shell(chan) else: windows_shell(chan) def posix_shell(chan): import select oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) log = open(‘handle.log‘, ‘a+‘, encoding=‘utf-8‘) flag = False temp_list = [] while True: r, w, e = select.select([chan, sys.stdin], [], []) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write(‘\r\n*** EOF\r\n‘) break if flag: if x.startswith(‘\r\n‘): pass else: temp_list.append(x) flag = False sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) import json if len(x) == 0: break if x == ‘\t‘: flag = True else: temp_list.append(x) if x == ‘\r‘: log.write(‘‘.join(temp_list)) log.flush() temp_list.clear() chan.send(x) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) def windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write(‘\r\n*** EOF ***\r\n\r\n‘) sys.stdout.flush() break sys.stdout.write(data) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 pass def run(): default_username = getpass.getuser() username = input(‘Username [%s]: ‘ % default_username) if len(username) == 0: username = default_username hostname = input(‘Hostname: ‘) if len(hostname) == 0: print(‘*** Hostname required.‘) sys.exit(1) tran = paramiko.Transport((hostname, 22,)) tran.start_client() default_auth = "p" auth = input(‘Auth by (p)assword or (r)sa key[%s] ‘ % default_auth) if len(auth) == 0: auth = default_auth if auth == ‘r‘: default_path = os.path.join(os.environ[‘HOME‘], ‘.ssh‘, ‘id_rsa‘) path = input(‘RSA key [%s]: ‘ % default_path) if len(path) == 0: path = default_path try: key = paramiko.RSAKey.from_private_key_file(path) except paramiko.PasswordRequiredException: password = getpass.getpass(‘RSA key password: ‘) key = paramiko.RSAKey.from_private_key_file(path, password) tran.auth_publickey(username, key) else: pw = getpass.getpass(‘Password for %s@%s: ‘ % (username, hostname)) tran.auth_password(username, pw) # 打开一个通道 chan = tran.open_session() # 获取一个终端 chan.get_pty() # 激活器 chan.invoke_shell() interactive_shell(chan) chan.close() tran.close() if __name__ == ‘__main__‘: run()
参考url:http://www.cnblogs.com/wupeiqi/articles/5699254.html
python运维开发(十三)----SQLalchemy和paramiko续
标签:
原文地址:http://www.cnblogs.com/Jabe/p/5727590.html