标签:
之前写过一篇博客介绍过sqlalchemy的基本用法,本篇博客主要介绍除增删改查以外SQLAlchemy对数据库表的操作,主要内容有单表操作、一对多操作、多对多操作。
单表操作的增删改查在上篇博客中已经详细介绍过,这里不再详细介绍,今天主要对数据库查询在详细介绍一下,下面我们先创建表并插入数据。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
#!/usr/bin/env python# -*- coding: utf-8 -*- from sqlalchemy import and_, or_from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm import sessionmaker, relationship Base = declarative_base() class Group(Base): __tablename__=‘group‘ nid = Column(Integer, primary_key=True,autoincrement=True) caption = Column(String(32)) class User(Base): __tablename__=‘user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) group_id = Column(Integer, ForeignKey(‘group.nid‘)) group = relationship("Group",backref=‘uuu‘) #跟Group表建立关系,方便查询,常和ForeignKey在一起使用 def init_table(): """ 创建表,调用Base类的子类 :return: """ Base.metadata.create_all(engine) def drop_table(): Base.metadata.drop_all(engine) init_table()Session = sessionmaker(bind=engine)session = Session() # 单表操作:session.add(Group(caption=‘dba‘)) #往组里添加数据session.add(Group(caption=‘dddd‘))session.commit() session.add_all([ User(username=‘jack1‘,group_id=1), User(username=‘jack2‘,group_id=1), User(username=‘jack1‘,group_id=2), User(username=‘jack1‘,group_id=1), User(username=‘jack2‘,group_id=1),])session.commit() |
|
1
2
3
4
5
6
7
8
9
|
#查询用户jack1的nid,filter和filter_by两种书写方式ret1 = session.query(User.nid).filter(User.username==‘jack1‘).all()print(ret1)ret2 = session.query(User.nid).filter_by(username=‘jack1‘).all()print(ret2)#结果:[(1,), (3,), (4,)][(1,), (3,), (4,)] |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#查询用户nid大于1并且username等于jack2的nidret1 = session.query(User.nid).filter(User.nid >1,User.username==‘jack2‘).all()print(ret1)#结果:[(2,), (5,)]#查询nid在1和3之间username等于jack1的所有信息ret2=session.query(User.nid,User.username).filter(User.nid.between(1,3),User.username==‘jack1‘).all()print(ret2)#结果:[(1, ‘jack1‘), (3, ‘jack1‘)] |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
#查询用户nid在1,3,4这个列表里的用户信息ret=session.query(User.nid,User.username).filter(User.nid.in_([1,3,4])).all()print(ret) #结果:[(1, ‘jack1‘), (3, ‘jack1‘), (4, ‘jack1‘)] #取反,查询用户nid不在1,3,4这个列表里的用户信息ret1=session.query(User.nid,User.username).filter(~User.nid.in_([1,3,4,])).all()print(ret1) #结果:[(2, ‘jack2‘), (5, ‘jack2‘)] #查询username=‘jack1‘的所有信息ret2 = session.query(User.nid,User.username).filter(User.nid.in_(session.query(User.nid).filter_by(username=‘jack1‘))).all()print(ret2) #结果:[(1, ‘jack1‘), (3, ‘jack1‘), (4, ‘jack1‘)] |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
#查询nid大于3并且username=‘jack1‘的信息ret = session.query(User.nid,User.username).filter(and_(User.nid >3,User.username==‘jack1‘)).all()print(ret)#结果:[(4, ‘jack1‘)]#查询nid小于2或者username等于jack1的数据ret = session.query(User.nid,User.username).filter(or_(User.nid < 2, User.username == ‘jack1‘)).all()print(ret)#查询用户nid小于2或者username等于jack1并且nid大于3的信息ret = session.query(User.nid,User.username).filter( or_(User.nid < 2,and_(User.username == ‘jack1‘, User.nid > 3))).all()print(ret)#结果:[(1, ‘jack1‘), (4, ‘jack1‘)] |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#模糊匹配用户名以字母j开头的所有数据ret = session.query(User.nid,User.username).filter(User.username.like(‘j%‘)).all()#结果:[(1, ‘jack1‘), (2, ‘jack2‘), (3, ‘jack1‘), (4, ‘jack1‘), (5, ‘jack2‘)]#取反ret1 = session.query(User.nid,User.username).filter(~User.username.like(‘j%‘)).all()print(ret)print(ret1)#结果:[] |
|
1
2
3
4
5
|
ret=session.query(User.nid,User.username)[1:2]print(ret)#结果:[(2, ‘jack2‘)] |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#倒序排序ret=session.query(User.nid,User.username).order_by(User.nid.desc()).all()print(ret) #结果:[(5, ‘jack2‘), (4, ‘jack1‘), (3, ‘jack1‘), (2, ‘jack2‘), (1, ‘jack1‘)]#正序排序ret1=session.query(User.nid,User.username).order_by(User.nid.asc()).all()print(ret1)#结果:[(1, ‘jack1‘), (2, ‘jack2‘), (3, ‘jack1‘), (4, ‘jack1‘), (5, ‘jack2‘)] |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
#导入模块from sqlalchemy.sql import func ret = session.query(User.nid,User.username).group_by(User.nid).all()print(ret) #结果:[(1, ‘jack1‘), (2, ‘jack2‘), (3, ‘jack1‘), (4, ‘jack1‘), (5, ‘jack2‘)] ret1=session.query( func.max(User.nid), func.sum(User.nid), func.min(User.nid),).group_by(User.username).all()print(ret1) #结果:[(4, Decimal(‘8‘), 1), (5, Decimal(‘7‘), 2)] ret2=session.query( func.max(User.nid), func.sum(User.nid), func.min(User.nid), ).group_by(User.username).having(func.min(User.nid)>1).all()print(ret2) #结果:[(5, Decimal(‘7‘), 2)]#打印SQL语句:from sqlalchemy.sql import funcret2=session.query( func.max(User.nid), func.sum(User.nid), func.min(User.nid), ).group_by(User.username).having(func.min(User.nid)>1)print(ret2)#结果:SELECT max("user".nid) AS max_1, sum("user".nid) AS sum_1, min("user".nid) AS min_1 FROM "user" GROUP BY "user".username HAVING min("user".nid) > :min_2[(‘jack1‘, ‘dba‘), (‘jack2‘, ‘dddd‘)]SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_id FROM "user" LEFT OUTER JOIN "group" ON "group".nid = "user".group_id |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
q1=session.query(User.username).filter(User.nid >2)q2=session.query(Group.caption).filter(Group.nid <2)ret = q1.union(q2).all()print(ret)#结果:[(‘jack1‘,), (‘jack2‘,), (‘dba‘,)]q1=session.query(User.username).filter(User.nid >2)q2=session.query(Group.caption).filter(Group.nid <2)ret = q1.union_all(q2).all()print(ret)#结果:[(‘jack1‘,), (‘jack1‘,), (‘jack2‘,), (‘dba‘,)] |
一对多的关系就需要我们外键来进行约束,下面我们来举例来说明一对多进行连表操作。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
ret = session.query(User.username,Group.caption).filter(User.nid==Group.nid).all()print(ret) #结果:[(‘jack1‘, ‘dba‘), (‘jack2‘, ‘dddd‘)] #通过join来进行连表操作,加isouter的区别:sql1 = session.query(User).join(Group,isouter=True)print(sql1) #结果:SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_idFROM "user" LEFT OUTER JOIN "group" ON "group".nid = "user".group_id sql2 = session.query(User).join(Group)print(sql2) #结果:SELECT "user".nid AS user_nid, "user".username AS user_username, "user".group_id AS user_group_idFROM "user" JOIN "group" ON "group".nid = "user".group_id#连表操作ret = session.query(User.username,Group.caption).join(Group,isouter=True).filter(Group.caption == ‘dba‘).all()print(ret)#结果:[(‘jack1‘, ‘dba‘), (‘jack2‘, ‘dba‘), (‘jack1‘, ‘dba‘)] |
|
1
2
3
4
5
6
7
|
#首先在创建表的类中加入relationship字段class User(Base): __tablename__=‘user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) group_id = Column(Integer, ForeignKey(‘group.nid‘)) group = relationship("Group",backref=‘uuu‘) #跟Group表建立关系,方便查询,backref为虚拟列 |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
ret = session.query(User).all()for obj in ret: #obj代指user表的每一行数据 #obj.group代指group对象 print(obj.nid,obj.username,obj.group_id,obj.group_id,obj.group, obj.group.nid,obj.group.caption)#结果:1 jack1 1 1 <__main__.Group object at 0x0000015D762F4630> 1 dba2 jack2 1 1 <__main__.Group object at 0x0000015D762F4630> 1 dba3 jack1 2 2 <__main__.Group object at 0x0000015D762F47F0> 2 dddd4 jack1 1 1 <__main__.Group object at 0x0000015D762F4630> 1 dba5 jack2 2 2 <__main__.Group object at 0x0000015D762F47F0> 2 dddd |
反向查询:通过Group表查询User表
|
1
2
3
4
5
6
7
8
9
|
obj = session.query(Group).filter(Group.caption == ‘dba‘).first()print(obj.nid)print(obj.caption)print(obj.uuu)#结果:1dba[<__main__.User object at 0x000002606096C5C0>, <__main__.User object at 0x000002606096C630>, <__main__.User object at 0x000002606096C6A0>] |
我们可以看到上面的例子输出的为对象的列表,输出不太友好,为了达到自己想要的结果,我们可以进行自定义返回结果,请看下面代码,加入__repr__函数:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class User(Base): __tablename__=‘user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) group_id = Column(Integer, ForeignKey(‘group.nid‘)) group = relationship("Group",backref=‘uuu‘) #跟Group表建立关系,方便查询,常和ForeignKey在一起使用 def __repr__(self): """ 自定义返回结果 :return: """ temp = ‘%s:%s:%s‘ % (self.nid,self.username,self.group_id) return temp |
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
#!/usr/bin/env python# -*- coding: utf-8 -*-from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm import sessionmaker, relationshipBase = declarative_base()class Host(Base): __tablename__ = ‘host‘ nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32))class HostUser(Base): __tablename__ = ‘host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32))#使用for循环时,通过正向反向查询class HostToHostUser(Base): __tablename__ = ‘host_to_host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) host_id = Column(Integer,ForeignKey(‘host.nid‘)) host_user_id = Column(Integer,ForeignKey(‘host_user.nid‘)) host = relationship("Host",backref=‘h‘) host_user = relationship("HostUser",backref=‘u‘)def init_table(): """ 创建表,调用Base类的子类 :return: """ Base.metadata.create_all(engine)def drop_table(): Base.metadata.drop_all(engine)init_table()Session = sessionmaker(bind=engine)session = Session()session.add_all([ Host(hostname=‘c1‘,port=‘22‘,ip=‘1.1.1.1‘), Host(hostname=‘c2‘,port=‘22‘,ip=‘1.1.1.2‘), Host(hostname=‘c3‘,port=‘22‘,ip=‘1.1.1.3‘), Host(hostname=‘c4‘,port=‘22‘,ip=‘1.1.1.4‘), Host(hostname=‘c5‘,port=‘22‘,ip=‘1.1.1.5‘),])session.commit()session.add_all([ HostUser(username=‘root‘), HostUser(username=‘db‘), HostUser(username=‘nb‘), HostUser(username=‘sb‘),])session.commit()session.add_all([ HostToHostUser(host_id=1,host_user_id=1), HostToHostUser(host_id=1,host_user_id=2), HostToHostUser(host_id=1,host_user_id=3), HostToHostUser(host_id=2,host_user_id=2), HostToHostUser(host_id=2,host_user_id=4), HostToHostUser(host_id=2,host_user_id=3),])session.commit() |
2,需求:获取主机1中所有的用户
方法一:通过一步一步取
|
1
2
3
4
5
6
7
8
9
10
11
12
|
host_obj = session.query(Host).filter(Host.hostname == ‘c1‘).first()# #取出host_obj.nidhost_to_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id== host_obj.nid).all()## #因为取出来的结果是[(1,),(2,),(3,)],我们通过内置函数zip来转换成想要的结果r = zip(*host_to_host_user)#users =session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()print(users)#结果:[(‘root‘,), (‘db‘,), (‘nb‘,)] |
方法二:通过join的方式
|
1
2
|
#通过代码整合的代码,相当复杂session.query(HostUser.name).filter(HostUser.nid.in_(session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == session.query(Host.nid).filter(Host.hostname == ‘c1‘)))) |
方法三:通过建立relationship的方式
1,对象
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
#!/usr/bin/env python# -*- coding: utf-8 -*-from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm import sessionmaker, relationshipBase = declarative_base()class HostToHostUser(Base): __tablename__ = ‘host_to_host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) host_id = Column(Integer,ForeignKey(‘host.nid‘)) host_user_id = Column(Integer,ForeignKey(‘host_user.nid‘))class Host(Base): __tablename__ = ‘host‘ nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) host_user = relationship(‘HostUser‘,secondary=HostToHostUser.__table__,backref=‘h‘)# host_user = relationship(‘HostUser‘, secondary=lambda: HostToHostUser.__table__, backref=‘h‘)#这里加lambda是因为关系表在下面,可以不加lambda,但是关系表要放上面class HostUser(Base): __tablename__ = ‘host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32))def init_table(): """ 创建表,调用Base类的子类 :return: """ Base.metadata.create_all(engine)def drop_table(): Base.metadata.drop_all(engine)Session = sessionmaker(bind=engine)session = Session()host_obj= session.query(Host).filter(Host.hostname==‘c1‘).first()print(host_obj.host_user) |
2,类
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
#!/usr/bin/env python# -*- coding: utf-8 -*-from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm import sessionmaker, relationshipBase = declarative_base()class HostToHostUser(Base): __tablename__ = ‘host_to_host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) host_id = Column(Integer,ForeignKey(‘host.nid‘)) host_user_id = Column(Integer,ForeignKey(‘host_user.nid‘))class Host(Base): __tablename__ = ‘host‘ nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) host_user = relationship(‘HostUser‘,secondary=HostToHostUser.__table__,backref=‘h‘)# host_user = relationship(‘HostUser‘, secondary=lambda: HostToHostUser.__table__, backref=‘h‘)#这里加lambda是因为关系表在下面,可以不加lambda,但是关系表要放上面class HostUser(Base): __tablename__ = ‘host_user‘ nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32))def init_table(): """ 创建表,调用Base类的子类 :return: """ Base.metadata.create_all(engine)def drop_table(): Base.metadata.drop_all(engine)Session = sessionmaker(bind=engine)session = Session()host_obj= session.query(Host).filter(Host.hostname==‘c1‘).first()print(host_obj.host_user) |
今天SQLALchemy就介绍到这里,更多参考信息请参考:
标签:
原文地址:http://www.cnblogs.com/phennry/p/5731299.html