码迷,mamicode.com
首页 > 数据库 > 详细

sqlalchemy 、paramiko

时间:2016-08-06 01:53:08      阅读:473      评论:0      收藏:0      [点我收藏+]

标签:

 

sqlalchemy

一、单表

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5)

Base = declarative_base()
# 单表
class Test(Base):
    __tablename__ = test
    nid = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(32))

 

二、一对多

  1.创建表 创建数据

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5)

Base = declarative_base()
# 单表
class Test(Base):
    __tablename__ = test
    nid = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(32))
# 一对多
class Group(Base):
    __tablename__ = group
    nid = Column(Integer, primary_key=True,autoincrement=True)
    caption = Column(String(32))
class User(Base):
    __tablename__ = user
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))
    group_id = Column(Integer, ForeignKey(group.nid)) #主动创建外键约束
def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

init_db()                                                   #创建表
Session = sessionmaker(bind=engine)
session = Session()

session.add(Group(caption=dba))                            #外键添加group数据
session.add(Group(caption=ddd))
session.commit()


session.add_all([
    User(username=alex1,group_id=1),                       #键添加用户
    User(username=alex2,group_id=2)
])
session.commit()

数据库显示结果
mysql> show tables;
+---------------+
| Tables_in_s13 |
+---------------+
| group         |
| user          |
+---------------+

mysql> select *from `group`;
+-----+---------+
| nid | caption |
+-----+---------+
|   1 | dba     |
|   2 | ddd     |
+-----+---------+

mysql> select *from user;
+-----+----------+----------+
| nid | username | group_id |
+-----+----------+----------+
|   1 | alex1    |        1 |
|   2 | alex2    |        2 |
+-----+----------+----------+

 

 

 

      2.查询表(普通查询)

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5)

Base = declarative_base()

# 一对多
class Group(Base):
    __tablename__ = group
    nid = Column(Integer, primary_key=True,autoincrement=True)
    caption = Column(String(32))
class User(Base):
    __tablename__ = user
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))
    group_id = Column(Integer, ForeignKey(group.nid)) #主动创建外键约束


# 只是获取用户 ret = session.query(User).filter(User.username == alex1).all() print(ret) ret = session.query(User).all() obj = ret[0] print(ret) print(obj) print(obj.nid) print(obj.username) print(obj.group_id) #以下为查询执行情况 [1 - alex1: 1] [1 - alex1: 1, 2 - alex2: 2] 1 - alex1: 1 1 alex1 1 #联表查询 ret = session.query(User.username).all() print(ret) sql = session.query(User,Group).join(Group, isouter=True) print(sql) ret = session.query(User,Group).join(Group, isouter=True).all() print(ret) sql = session.query(User.username,Group.caption).join(Group, isouter=True) print(sql)
ret
= session.query(User.username,Group.caption).join(Group, isouter=True).all() print(ret) #以下为查询执行情况 [(alex1,), (alex2,)] SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_id, "group".nid AS group_nid, "group".caption AS group_caption FROM "user" LEFT OUTER JOIN "group" ON "group".nid = "user".group_id [(1 - alex1: 1, <s1.Group object at 0x000001A74F870978>), (2 - alex2: 2, <s1.Group object at 0x000001A74F870A58>)] SELECT "user".username AS user_username, "group".caption AS group_caption FROM "user" LEFT OUTER JOIN "group" ON "group".nid = "user".group_id [(alex1, dba), (alex2, ddd)]







 

  3.优化联表查询

 

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5)
Base = declarative_base()

# 一对多
class Group(Base):
    __tablename__ = group
    nid = Column(Integer, primary_key=True,autoincrement=True)
    caption = Column(String(32))
class User(Base):
    __tablename__ = user
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))
    group_id = Column(Integer, ForeignKey(group.nid)) #主动创建外键约束
    # 与生成表结构无关,仅用于查询方便
    group = relationship("Group", backref=uuu)    # backref  yi

    def __repr__(self):                                  #返回格式
        temp = "%s - %s: %s" %(self.nid, self.username, self.group_id)
        return temp

#优化: User类下添加一行
    group = relationship("Group", backref=uuu)  

#1
# 原始方式
# ret = session.query(User.username,Group.caption).join(Group, isouter=True).all()
# 新方式(正向查询)
ret = session.query(User).all()                #查询
for obj in ret:
    # obj代指user表的没一行数据
    # obj.group代指group对象,
    print(obj.nid,obj.username,obj.group_id, obj.group,obj.group.nid,obj.group.caption)

#以下为执行结果: 
1 alex1 1 <s1.Group object at 0x0000029BB37BE2B0> 1 dba
2 alex2 2 <s1.Group object at 0x0000029BB37BE470> 2 ddd
#2 # 原始方式 # ret = session.query(User.username,Group.caption).join(Group, isouter=True).filter(Group.caption == ‘DBA‘).all() # 新方式(反向查询) obj = session.query(Group).filter(Group.caption == DBA).first() print(obj.nid) print(obj.caption) print(obj.uuu) sql = session.query(Group).filter(Group.caption == DBA) print(sql) sql = session.query(Group).get(1) print(sql) #以下为执行结果: 1
dba
[1 - alex1: 1]
SELECT "group".nid AS group_nid, "group".caption AS group_caption
FROM "group"
WHERE "group".caption = :caption_1
<s1.Group object at 0x0000024A4A84CB70>

 

 

三、多对多

   1.创建表 创建数据

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5)

Base = declarative_base()

# 多对多
class HostToHostUser(Base):
    __tablename__ = host_to_host_user
    nid = Column(Integer, primary_key=True,autoincrement=True)

    host_id = Column(Integer ,ForeignKey(host.nid))
    host_user_id = Column(Integer, ForeignKey(host_user.nid))

class Host(Base): # metaclass,Host.table对象
    __tablename__ = host
    nid = Column(Integer, primary_key=True,autoincrement=True)
    hostname = Column(String(32))
    port = Column(String(32))
    ip = Column(String(32))
class HostUser(Base):
    __tablename__ = host_user
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))


def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)

init_db()                                                   #创建表
Session = sessionmaker(bind=engine)
session = Session()
session.add_all([                                           #插入数据
    Host(hostname=c1,port=22,ip=1.1.1.1),
    Host(hostname=c2,port=22,ip=1.1.1.2),
    Host(hostname=c3,port=22,ip=1.1.1.3),
    Host(hostname=c4,port=22,ip=1.1.1.4),
    Host(hostname=c5,port=22,ip=1.1.1.5),
])
session.commit()


session.add_all([
    HostUser(username=root),
    HostUser(username=db),
    HostUser(username=nb),
    HostUser(username=sb),
])
session.commit()

session.add_all([
    HostToHostUser(host_id=1,host_user_id=1),
    HostToHostUser(host_id=1,host_user_id=2),
    HostToHostUser(host_id=1,host_user_id=3),
    HostToHostUser(host_id=2,host_user_id=2),
    HostToHostUser(host_id=2,host_user_id=4),
    HostToHostUser(host_id=2,host_user_id=3),
])
session.commit()




#以下为创建的数据


mysql> select *from `host`;
+-----+----------+------+---------+
| nid | hostname | port | ip      |
+-----+----------+------+---------+
|   1 | c1       | 22   | 1.1.1.1 |
|   2 | c2       | 22   | 1.1.1.2 |
|   3 | c3       | 22   | 1.1.1.3 |
|   4 | c4       | 22   | 1.1.1.4 |
|   5 | c5       | 22   | 1.1.1.5 |
+-----+----------+------+---------+


mysql> select *from `host_user`;
+-----+----------+
| nid | username |
+-----+----------+
|   1 | root     |
|   2 | db       |
|   3 | nb       |
|   4 | sb       |
+-----+----------+


mysql> select *from `host_to_host_user`;
+-----+---------+--------------+
| nid | host_id | host_user_id |
+-----+---------+--------------+
|   1 |       1 |            1 |
|   2 |       1 |            2 |
|   3 |       1 |            3 |
|   4 |       2 |            2 |
|   5 |       2 |            4 |
|   6 |       2 |            3 |
+-----+---------+--------------+

 

  2.查询表(普通查询)

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5)

Base = declarative_base()

# 多对多
class HostToHostUser(Base):
    __tablename__ = host_to_host_user
    nid = Column(Integer, primary_key=True,autoincrement=True)

    host_id = Column(Integer ,ForeignKey(host.nid))
    host_user_id = Column(Integer, ForeignKey(host_user.nid))

class Host(Base): # metaclass,Host.table对象
    __tablename__ = host
    nid = Column(Integer, primary_key=True,autoincrement=True)
    hostname = Column(String(32))
    port = Column(String(32))
    ip = Column(String(32))

    host_user = relationship(HostUser, secondary=lamda:HostToHostUser.__table__, backref=h)  #secondary指定通过哪个中间表关联
来进行反向查询

class HostUser(Base):
    __tablename__ = host_user
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))

#init_db()                                                   #创建表
Session = sessionmaker(bind=engine)
session = Session()

# 1
host_obj = session.query(Host).filter(Host.hostname == c1).first()
host_obj.nid
session.query()
session.query(Host.nid).filter(Host.hostname == c1)

# 2,所有用户ID
host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all()

print(host_2_host_user)
# [(1,), (2,), (3,)]
r = zip(*host_2_host_user)
#\print(list(r)[0])
# [1,2,3]
# 3、根据用户ID找到所有用户
users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()
print(users)


#以下为执行结果:
[(1,), (2,), (3,)]
[(root,), (db,), (nb,)]

 

  3.优化联表查询

    #!/usr/bin/env python

# -*- coding:utf-8 -*-
# Author:wuwenyu

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,Table
from sqlalchemy.orm import sessionmaker, relationship
engine = create_engine("mysql+pymysql://root:mysql@127.0.0.1:3307/s13", max_overflow=5)
Base = declarative_base()
class Host(Base):
    __tablename__ = host
    nid = Column(Integer, primary_key=True,autoincrement=True)
    hostname = Column(String(32))
    port = Column(String(32))
    ip = Column(String(32))
    host_user = relationship(HostUser, secondary=lambda:HostToHostUser.__table__, backref=h)

    def __repr__(self):
        temp = "%s - %s: %s" % (self.nid, self.username, self.group_id)
        return temp

class HostUser(Base):
    __tablename__ = host_user
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))


class HostToHostUser(Base):
    __tablename__ = host_to_host_user
    nid = Column(Integer, primary_key=True,autoincrement=True)
    host_id = Column(Integer,ForeignKey(host.nid))
    host_user_id = Column(Integer,ForeignKey(host_user.nid))
    host = relationship("Host", backref=h)
    # hostUser = relationship("Host_user", backref=‘u‘)
#init_db()                                                   #创建表
Session = sessionmaker(bind=engine)
session = Session()

#原始的方式
#obj=session.query(HostUser.name).filter(HostUser.nid.in_(session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == session.query(Host.nid).filter(Host.hostname == ‘c1‘))))

#新方式(反向查询)
host_obj = session.query(Host).filter(Host.hostname==c1).first()
print(host_obj.nid)
print(host_obj.hostname)
# 第三表对应的对象
#类
HostToHostUser 添加 :host = relationship("Host", backref=‘h‘) 则可以使用
print(host_obj.h)
# # 循环获取的第三表对应的对象
# for item in host_obj.h:
#     print(item.host_user,item.host_user.nid,item.host_user.username)


#
Host 类中添加:host_user = relationship(‘HostUser‘, secondary=lambda:HostToHostUser.__table__, backref=‘h‘) 可以使用

print(host_obj.host_user)
for item in host_obj.host_user:
    print(item.username)

#以下为查询结果

1
c1
[<s1.HostToHostUser object at 0x000002315C9FCA58>, <s1.HostToHostUser object at 0x000002315CA140F0>, <s1.HostToHostUser object at 0x000002315CA14160>]

[<s1.HostUser object at 0x000002315CA14710>, <s1.HostUser object at 0x000002315CA14780>, <s1.HostUser object at 0x000002315CA147F0>]
root
db
nb

 

 

 

 paramiko 

#paramiko  登陆 代码  +tab键 #python2适用

#encoding:utf-8
import paramiko
import sys
import os
import socket
import select
import getpass
import termios
import tty
# from paramiko.py3compat import u
 
tran = paramiko.Transport((127.0.0.01, 22,))
tran.start_client()
tran.auth_password(root, 123)
 
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()


# 获取原tty属性
oldtty = termios.tcgetattr(sys.stdin)
try:
    # 为tty设置新属性
    # 默认当前tty设备属性:
    #   输入一行回车,执行
    #   CTRL+C 进程退出,遇到特殊字符,特殊处理。

    # 这是为原始模式,不认识所有特殊符号
    # 放置特殊字符应用在当前终端,如此设置,将所有的用户输入均发送到远程服务器
    tty.setraw(sys.stdin.fileno())
    chan.settimeout(0.0)

    while True:
        # 监视 用户输入 和 远程服务器返回数据(socket)
        # 阻塞,直到句柄可读
        r, w, e = select.select([chan, sys.stdin], [], [], 1)
        if chan in r:
            try:
                x = chan.recv(1024)
                if len(x) == 0:
                    print(\r\n*** EOF\r\n)
                    break
                sys.stdout.write(x)
                sys.stdout.flush()
            except socket.timeout:
                pass
        if sys.stdin in r:
            x = sys.stdin.read(1)
            if len(x) == 0:
                break
            chan.send(x)

finally:
    # 重新设置终端属性
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)


chan.close()
tran.close()

 

#完美  #python2适用

#encoding:utf-8
import paramiko
import sys
import os
import socket
import getpass
import time
from paramiko.py3compat import u

# windows does not have termios...
try:
    import termios
    import tty
    has_termios = True
except ImportError:
    has_termios = False


def interactive_shell(chan):
    if has_termios:
        posix_shell(chan)
    else:
        windows_shell(chan)


def posix_shell(chan):
    import select

    oldtty = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        tty.setcbreak(sys.stdin.fileno())
        chan.settimeout(0.0)
        log = open(handle.log, a+)
        flag = False
        temp_list = []
        while True:
            r, w, e = select.select([chan, sys.stdin], [], [])
            if chan in r:
                try:
                    x = u(chan.recv(1024))
                    if len(x) == 0:
                        sys.stdout.write(\r\n*** EOF\r\n)
                        break
                    if flag:
                        if x.startswith(\r\n):
                            pass
                        else:
                            temp_list.append(x)
                        flag = False
                    #print(x)
                    sys.stdout.write(x)
                    sys.stdout.flush()
                except socket.timeout:
                    pass
            if sys.stdin in r:
                x = sys.stdin.read(1)
                import json

                if len(x) == 0:
                    break

                if x == \t:
                    flag = True
                else:
                    temp_list.append(x)
                if x == \r:
            print temp_list
                    if ‘‘.join(temp_list) == "su -\r":
                        print("aaaaaaaaaaaaa")
                    date = time.ctime()
                    log.write("%s  wuwenyui %s \n" % (‘‘.join(temp_list),str(date)))
                    log.flush()
                    temp_list=[]
                chan.send(x)

    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)


def windows_shell(chan):
    import threading

    sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")

    def writeall(sock):
        while True:
            data = sock.recv(256)
            if not data:
                sys.stdout.write(\r\n*** EOF ***\r\n\r\n)
                sys.stdout.flush()
                break
            sys.stdout.write(data)
            sys.stdout.flush()

    writer = threading.Thread(target=writeall, args=(chan,))
    writer.start()

    try:
        while True:
            d = sys.stdin.read(1)
            if not d:
                break
            chan.send(d)
    except EOFError:
        # user hit ^Z or F6
        pass


def run():
    tran = paramiko.Transport((127.0.0.1, 1715,))
    tran.start_client()
    tran.auth_password(root, 123)

    # 打开一个通道
    chan = tran.open_session()
    # 获取一个终端
    chan.get_pty()
    # 激活器
    chan.invoke_shell()

    interactive_shell(chan)

    chan.close()
    tran.close()


if __name_

 

 

 

#增加输入iput 以及日志 #python2适用

 

#encoding:utf-8
import paramiko
import sys
import os
import socket
import select
import getpass
import termios
import tty
from paramiko.py3compat import u

# windows does not have termios...
try:
    import termios
    import tty
    has_termios = True
except ImportError:
    has_termios = False


def interactive_shell(chan):
    if has_termios:
        posix_shell(chan)
    else:
        windows_shell(chan)


def posix_shell(chan):
    import select

    oldtty = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        tty.setcbreak(sys.stdin.fileno())
        chan.settimeout(0.0)
        log = open(handle.log, a+)
        flag = False
        temp_list = []
        while True:
            r, w, e = select.select([chan, sys.stdin], [], [])
            if chan in r:
                try:
                    x = u(chan.recv(1024))
                    if len(x) == 0:
                        sys.stdout.write(\r\n*** EOF\r\n)
                        break
                    if flag:
                        if x.startswith(\r\n):
                            pass
                        else:
                            temp_list.append(x)
                        flag = False
                    sys.stdout.write(x)
                    sys.stdout.flush()
                except socket.timeout:
                    pass
            if sys.stdin in r:
                x = sys.stdin.read(1)
                import json

                if len(x) == 0:
                    break

                if x == \t:
                    flag = True
                else:
                    temp_list.append(x)
                if x == \r:
                    log.write(‘‘.join(temp_list))
                    log.flush()
                    temp_list=[]
                chan.send(x)

    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)


def windows_shell(chan):
    import threading

    sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")

    def writeall(sock):
        while True:
            data = sock.recv(256)
            if not data:
                sys.stdout.write(\r\n*** EOF ***\r\n\r\n)
                sys.stdout.flush()
                break
            sys.stdout.write(data)
            sys.stdout.flush()

    writer = threading.Thread(target=writeall, args=(chan,))
    writer.start()

    try:
        while True:
            d = sys.stdin.read(1)
            if not d:
                break
            chan.send(d)
    except EOFError:
        # user hit ^Z or F6
        pass


def run():
    default_username = getpass.getuser()
    username = raw_input(Username [%s]:  % default_username)
    if len(username) == 0:
        username = default_username


    hostname = raw_input(Hostname: )
    print (hostname)
    if len(hostname) == 0:
        print(*** Hostname required.)
        sys.exit(1)
    tran = paramiko.Transport((hostname, 1715,))
    #tran = paramiko.Transport((hostname, 1715,))
    tran.start_client()

    default_auth = "p"
    auth = raw_input(Auth by (p)assword or (r)sa key[%s]  % default_auth)
    if len(auth) == 0:
        auth = default_auth

    if auth == r:
        default_path = os.path.join(os.environ[HOME], .ssh, id_rsa)
        path = raw_input(RSA key [%s]:  % default_path)
        if len(path) == 0:
            path = default_path
        try:
            key = paramiko.RSAKey.from_private_key_file(path)
        except paramiko.PasswordRequiredException:
            password = getpass.getpass(RSA key password: )
            key = paramiko.RSAKey.from_private_key_file(path, password)
        tran.auth_publickey(username, key)
    else:
        
        pw = getpass.getpass(Password for %s@%s:  % (username, hostname))
        print(pw)
        tran.auth_password(username, pw)

    # 打开一个通道
    chan = tran.open_session()
    # 获取一个终端
    chan.get_pty()
    # 激活器
    chan.invoke_shell()

    interactive_shell(chan)

    chan.close()
    tran.close()


if __name__ == __main__:
    run()

 

sqlalchemy 、paramiko

标签:

原文地址:http://www.cnblogs.com/wudalang/p/5723229.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!