标签:values count load doc express upd orm 分享 语言
一、对象映射关系(ORM)
orm英文全称object relational mapping,就是对象映射关系程序,简单来说我们类似python这种面向对象的程序来说一切皆对象,但是我们使用的数据库却都是关系型的,为了保证一致的使用习惯,通过orm将编程语言的对象模型和数据库的关系模型建立映射关系,这样我们在使用编程语言对数据库进行操作的时候可以直接使用编程语言的对象模型进行操作就可以了,而不用直接使用sql语言
优点:
缺点:
二、SQLAlchemy
SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
在Python中,最有名的ORM框架是SQLAlchemy。用户包括openstack\Dropbox等知名公司或应用
Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL
-
Python
mysql
+
mysqldb:
/
/
<user>:<password>@<host>[:<port>]
/
<dbname>
pymysql
mysql
+
pymysql:
/
/
<username>:<password>@<host>
/
<dbname>[?<options>]
MySQL
-
Connector
mysql
+
mysqlconnector:
/
/
<user>:<password>@<host>[:<port>]
/
<dbname>
cx_Oracle
oracle
+
cx_oracle:
/
/
user:
pass
@host:port
/
dbname[?key
=
value&key
=
value...]
更多详见:http:
/
/
docs.sqlalchemy.org
/
en
/
latest
/
dialects
/
index.html
注:支持连接MySQL、Oracles数据库
安装:
pip install SQLAlchemy
pip install pymysql
#由于mysqldb依然不支持py3,所以这里我们用pymysql与sqlalchemy交互
步骤一:
使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from
sqlalchemy
import
create_engine
engine
=
create_engine(
"mysql+mysqldb://root:123@127.0.0.1:3306/s11"
, max_overflow
=
5
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (‘2‘, ‘v1‘)"
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%s, %s)"
,
((
555
,
"v1"
),(
666
,
"v1"
),)
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)"
,
id
=
999
, name
=
"v1"
)
result
=
engine.execute(
‘select * from ts_test‘
)
result.fetchall()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
# 事务操作
with engine.begin() as conn:
conn.execute("insert into table (x, y, z) values (1, 2, 3)")
conn.execute("my_special_procedure(5)")
conn = engine.connect()
# 事务操作
with conn.begin():
conn.execute("some statement", {‘x‘:5, ‘y‘:10})
注:查看数据库连接:show status like ‘Threads%‘;
步骤二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from
sqlalchemy
import
create_engine, Table, Column, Integer, String, MetaData, ForeignKey
metadata
=
MetaData()
user
=
Table(
‘user‘
, metadata,
Column(
‘id‘
, Integer, primary_key
=
True
),
Column(
‘name‘
, String(
20
)),
)
color
=
Table(
‘color‘
, metadata,
Column(
‘id‘
, Integer, primary_key
=
True
),
Column(
‘name‘
, String(
20
)),
)
engine
=
create_engine(
"mysql+mysqldb://root:123@127.0.0.1:3306/s11"
, max_overflow
=
5
)
metadata.create_all(engine)
# metadata.clear()
# metadata.remove()
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table(‘user‘, metadata, Column(‘id‘, Integer, primary_key=True), Column(‘name‘, String(20)), ) color = Table(‘color‘, metadata, Column(‘id‘, Integer, primary_key=True), Column(‘name‘, String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) conn = engine.connect() # 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name) conn.execute(user.insert(),{‘id‘:7,‘name‘:‘seven‘}) conn.close() # sql = user.insert().values(id=123, name=‘wu‘) # conn.execute(sql) # conn.close() # sql = user.delete().where(user.c.id > 1) # sql = user.update().values(fullname=user.c.name) # sql = user.update().where(user.c.name == ‘jack‘).values(name=‘ed‘) # sql = select([user, ]) # sql = select([user.c.id, ]) # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id) # sql = select([user.c.name]).order_by(user.c.name) # sql = select([user]).group_by(user.c.name) # result = conn.execute(sql) # print result.fetchall() # conn.close()
注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。
步骤三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
Column, Integer, String
from
sqlalchemy.orm
import
sessionmaker
from
sqlalchemy
import
create_engine
engine
=
create_engine(
"mysql+mysqldb://root:123@127.0.0.1:3306/s11"
, max_overflow
=
5
)
Base
=
declarative_base()
class
User(Base):
__tablename__
=
‘users‘
id
=
Column(Integer, primary_key
=
True
)
name
=
Column(String(
50
))
# 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息
# Base.metadata.create_all(engine)
Session
=
sessionmaker(bind
=
engine)
session
=
Session()
# ########## 增 ##########
# u = User(id=2, name=‘sb‘)
# session.add(u)
# session.add_all([
# User(id=3, name=‘sb‘),
# User(id=4, name=‘sb‘)
# ])
# session.commit()
# ########## 删除 ##########
# session.query(User).filter(User.id > 2).delete()
# session.commit()
# ########## 修改 ##########
# session.query(User).filter(User.id > 2).update({‘cluster_id‘ : 0})
# session.commit()
# ########## 查 ##########
# ret = session.query(User).filter_by(name=‘sb‘).first()
# ret = session.query(User).filter_by(name=‘sb‘).all()
# print ret
# ret = session.query(User).filter(User.name.in_([‘sb‘,‘bb‘])).all()
# print ret
# ret = session.query(User.name.label(‘name_label‘)).all()
# print ret,type(ret)
# ret = session.query(User).order_by(User.id).all()
# print ret
# ret = session.query(User).order_by(User.id)[1:3]
# print ret
# session.commit()
1、基本使用
CREATE TABLE user (
id
INTEGER NOT NULL AUTO_INCREMENT,
name VARCHAR(
32
),
password VARCHAR(
64
),
PRIMARY KEY (
id
)
)
# 创建表结构
import
sqlalchemy
from
sqlalchemy
import
create_engine
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
Column,Integer,String
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.20.219/lzl"
,
encoding
=
"utf-8"
,echo
=
True
)
#echo=True 打印程序运行详细信息
Base
=
declarative_base()
#生成orm基类
class
User(Base):
__tablename__
=
"user"
#表名
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
))
password
=
Column(String(
64
))
Base.metadata.create_all(engine)
#创建表结构
# 打印输出
# 2016-10-26 08:42:02,619 INFO sqlalchemy.engine.base.Engine SHOW VARIABLES LIKE ‘sql_mode‘
# 2016-10-26 08:42:02,619 INFO sqlalchemy.engine.base.Engine {}
# 2016-10-26 08:42:02,622 INFO sqlalchemy.engine.base.Engine SELECT DATABASE()
# 2016-10-26 08:42:02,622 INFO sqlalchemy.engine.base.Engine {}
# 2016-10-26 08:42:02,624 INFO sqlalchemy.engine.base.Engine show collation where `Charset` = ‘utf8‘ and `Collation` = ‘utf8_bin‘
# 2016-10-26 08:42:02,624 INFO sqlalchemy.engine.base.Engine {}
# 2016-10-26 08:42:02,649 INFO sqlalchemy.engine.base.Engine SELECT CAST(‘test plain returns‘ AS CHAR(60)) AS anon_1
# 2016-10-26 08:42:02,649 INFO sqlalchemy.engine.base.Engine {}
# 2016-10-26 08:42:02,651 INFO sqlalchemy.engine.base.Engine SELECT CAST(‘test unicode returns‘ AS CHAR(60)) AS anon_1
# 2016-10-26 08:42:02,651 INFO sqlalchemy.engine.base.Engine {}
# 2016-10-26 08:42:02,652 INFO sqlalchemy.engine.base.Engine SELECT CAST(‘test collated returns‘ AS CHAR CHARACTER SET utf8)
# COLLATE utf8_bin AS anon_1
# 2016-10-26 08:42:02,652 INFO sqlalchemy.engine.base.Engine {}
# 2016-10-26 08:42:02,655 INFO sqlalchemy.engine.base.Engine DESCRIBE `user`
# 2016-10-26 08:42:02,655 INFO sqlalchemy.engine.base.Engine {}
# 2016-10-26 08:42:02,657 INFO sqlalchemy.engine.base.Engine ROLLBACK
# 2016-10-26 08:42:02,660 INFO sqlalchemy.engine.base.Engine
# CREATE TABLE user (
# id INTEGER NOT NULL AUTO_INCREMENT,
# name VARCHAR(32),
# password VARCHAR(64),
# PRIMARY KEY (id)
# )
#
#
# 2016-10-26 08:42:02,660 INFO sqlalchemy.engine.base.Engine {}
# 2016-10-26 08:42:02,904 INFO sqlalchemy.engine.base.Engine COMMIT
mysql> desc user;
+
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
+
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
| Field |
Type
| Null | Key | Default | Extra |
+
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
+
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
|
id
|
int
(
11
) | NO | PRI | NULL | auto_increment |
| name | varchar(
32
) | YES | | NULL | |
| password | varchar(
64
) | YES | | NULL | |
+
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
+
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
3
rows
in
set
(
0.01
sec)
2、创建数据
最基本的表我们创建好了,那我们开始用orm创建一条数据试试
# 创建表数据
from
sqlalchemy
import
create_engine
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
String,Integer,Column
from
sqlalchemy.orm
import
sessionmaker
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.20.219/lzl"
,
encoding
=
"utf-8"
)
Base
=
declarative_base()
#生成orm基类
class
User(Base):
__tablename__
=
"user"
#表名
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
))
password
=
Column(String(
64
))
#Base.metadata.create_all(engine) #创建表结构
Session_class
=
sessionmaker(bind
=
engine)
#Session_class现在不是实例,而是类
Session
=
Session_class()
#生成Session实例
user_obj
=
User(name
=
"lzl"
,password
=
"123456"
)
#生成你要创建的数据对象
print
(user_obj.name,user_obj.
id
)
#此时还没创建对象呢,不信你打印一下id发现还是None
Session.add(user_obj)
#把要创建的数据对象添加到这个session里, 一会统一创建
print
(user_obj.name,user_obj.
id
)
#此时也依然还没创建
Session.commit()
#现此才统一提交,创建数据
# 查询
from
sqlalchemy
import
create_engine
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
String,Integer,Column
from
sqlalchemy.orm
import
sessionmaker
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.20.219/lzl"
,
encoding
=
"utf-8"
)
Base
=
declarative_base()
#生成orm基类
class
User(Base):
__tablename__
=
"user"
#表名
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
))
password
=
Column(String(
64
))
Session_class
=
sessionmaker(bind
=
engine)
#Session_class现在不是实例,而是类
Session
=
Session_class()
#生成Session实例
my_user
=
Session.query(User).filter_by(name
=
"lzl"
).first()
print
(my_user)
#my_user此时是一个对象
#<__main__.User object at 0x03EFC6D0>
print
(my_user.
id
,my_user.name,my_user.password)
from
sqlalchemy
import
create_engine
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
String,Integer,Column
from
sqlalchemy.orm
import
sessionmaker
from
sqlalchemy
import
func
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.20.219/lzl"
,
encoding
=
"utf-8"
)
Base
=
declarative_base()
#生成orm基类
class
User(Base):
__tablename__
=
"user"
#表名
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
))
password
=
Column(String(
64
))
def
__repr__(
self
):
return
"<User(name=‘%s‘, password=‘%s‘)>"
%
(
self
.name,
self
.password)
Session_class
=
sessionmaker(bind
=
engine)
#Session_class现在不是实例,而是类
Session
=
Session_class()
#生成Session实例
objs
=
Session.query(User).
filter
(User.
id
>
3
).
filter
(User.
id
<
8
).
all
()
print
(objs)
# 统计
from
sqlalchemy
import
create_engine
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
String,Integer,Column
from
sqlalchemy.orm
import
sessionmaker
from
sqlalchemy
import
func
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.20.219/lzl"
,
encoding
=
"utf-8"
)
Base
=
declarative_base()
#生成orm基类
class
User(Base):
__tablename__
=
"user"
#表名
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
))
password
=
Column(String(
64
))
def
__repr__(
self
):
return
"<User(name=‘%s‘, password=‘%s‘)>"
%
(
self
.name,
self
.password)
Session_class
=
sessionmaker(bind
=
engine)
#Session_class现在不是实例,而是类
Session
=
Session_class()
#生成Session实例
# 分组
from
sqlalchemy
import
create_engine
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
String,Integer,Column
from
sqlalchemy.orm
import
sessionmaker
from
sqlalchemy
import
func
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.20.219/lzl"
,
encoding
=
"utf-8"
)
Base
=
declarative_base()
#生成orm基类
class
User(Base):
__tablename__
=
"user"
#表名
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
))
password
=
Column(String(
64
))
def
__repr__(
self
):
return
"<User(name=‘%s‘, password=‘%s‘)>"
%
(
self
.name,
self
.password)
Session_class
=
sessionmaker(bind
=
engine)
#Session_class现在不是实例,而是类
Session
=
Session_class()
#生成Session实例
print
(Session.query(func.count(User.name),User.name).group_by(User.name).
all
())
# 修改
from
sqlalchemy
import
create_engine
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
String,Integer,Column
from
sqlalchemy.orm
import
sessionmaker
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.20.219/lzl"
,
encoding
=
"utf-8"
)
Base
=
declarative_base()
#生成orm基类
class
User(Base):
__tablename__
=
"user"
#表名
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
))
password
=
Column(String(
64
))
def
__str__(
self
):
return
"<User(name=‘%s‘, password=‘%s‘)>"
%
(
self
.name,
self
.password)
Session_class
=
sessionmaker(bind
=
engine)
#Session_class现在不是实例,而是类
Session
=
Session_class()
#生成Session实例
my_user
=
Session.query(User).filter_by(name
=
"ljb"
).first()
print
(my_user)
#<User(name=‘ljb‘, password=‘123456‘)>
my_user.name
=
"lijiabao"
Session.commit()
#提交数据
# 回滚
from
sqlalchemy
import
create_engine
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
String,Integer,Column
from
sqlalchemy.orm
import
sessionmaker
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.20.219/lzl"
,
encoding
=
"utf-8"
)
Base
=
declarative_base()
#生成orm基类
class
User(Base):
__tablename__
=
"user"
#表名
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
))
password
=
Column(String(
64
))
def
__repr__(
self
):
return
"<User(name=‘%s‘, password=‘%s‘)>"
%
(
self
.name,
self
.password)
Session_class
=
sessionmaker(bind
=
engine)
#Session_class现在不是实例,而是类
Session
=
Session_class()
#生成Session实例
my_user
=
Session.query(User).filter_by(
id
=
1
).first()
my_user.name
=
"Jack"
print
(my_user)
# <User(name=‘Jack‘, password=‘123456‘)>
Rain_user
=
User(name
=
"Rain"
,password
=
"12345"
)
Session.add(Rain_user)
print
(Session.query(User).
filter
(User.name.in_([
"Jack"
,
"Rain"
])).
all
())
# 这时看session里有你刚添加和修改的数据
#[<User(name=‘Jack‘, password=‘123456‘)>, <User(name=‘Rain‘, password=‘12345‘)>]
Session.rollback()
print
(Session.query(User).
filter
(User.name.in_([
"Jack"
,
"Rain"
])).
all
())
#再查就发现刚才添加的数据没有了。
我们先创建个study_record表与student进行关联
# 外键关联
from
sqlalchemy
import
create_engine
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
String,Column,Integer,ForeignKey,DATE
from
sqlalchemy.orm
import
sessionmaker,relationship
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.0.59/lzl"
,
encoding
=
"utf-8"
)
Base
=
declarative_base()
class
Student(Base):
__tablename__
=
"student"
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
),nullable
=
False
)
register_date
=
Column(DATE,nullable
=
False
)
def
__repr__(
self
):
return
"<%s name:%s>"
%
(
self
.
id
,
self
.name)
class
StudyRecord(Base):
__tablename__
=
"study_record"
id
=
Column(Integer,primary_key
=
True
)
day
=
Column(Integer,nullable
=
False
)
status
=
Column(String(
32
),nullable
=
False
)
stu_id
=
Column(Integer,ForeignKey(
"student.id"
))
#关联student表里的id
my_student
=
relationship(
"Student"
,backref
=
"my_study_record"
)
# Student为关联的类
def
__repr__(
self
):
return
"<%s name:%s>"
%
(
self
.
id
,
self
.name)
Base.metadata.create_all(engine)
Session_class
=
sessionmaker(bind
=
engine)
session
=
Session_class()
s2
=
Student(name
=
"alex"
,register_date
=
"2015-10-26"
)
s3
=
Student(name
=
"eric"
,register_date
=
"2014-10-26"
)
s4
=
Student(name
=
"rain"
,register_date
=
"2013-10-26"
)
r1
=
StudyRecord(day
=
1
,status
=
"YES"
,stu_id
=
1
)
r2
=
StudyRecord(day
=
2
,status
=
"No"
,stu_id
=
1
)
r3
=
StudyRecord(day
=
3
,status
=
"YES"
,stu_id
=
1
)
r4
=
StudyRecord(day
=
1
,status
=
"YES"
,stu_id
=
2
)
session.add_all([s1,s2,s3,s4,r1,r2,r3,r4])
session.commit()
注:my_student = relationship("Student",backref="my_study_record")这个nb,允许你在user表里通过backref字段反向查出所有它在addresses表里的关联项
查询:
# 外键查询
from
sqlalchemy
import
create_engine
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
String,Column,Integer,ForeignKey,DATE
from
sqlalchemy.orm
import
sessionmaker,relationship
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.0.59/lzl"
,
encoding
=
"utf-8"
)
Base
=
declarative_base()
class
Student(Base):
__tablename__
=
"student"
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
),nullable
=
False
)
register_date
=
Column(DATE,nullable
=
False
)
def
__repr__(
self
):
return
"<id:%s name:%s>"
%
(
self
.
id
,
self
.name)
class
StudyRecord(Base):
__tablename__
=
"study_record"
id
=
Column(Integer,primary_key
=
True
)
day
=
Column(Integer,nullable
=
False
)
status
=
Column(String(
32
),nullable
=
False
)
stu_id
=
Column(Integer,ForeignKey(
"student.id"
))
#关联student表里的id
my_student
=
relationship(
"Student"
,backref
=
"my_study_record"
)
# Student为关联的类
def
__repr__(
self
):
return
"<name:%s day:%s status:%s>"
%
(
self
.my_student.name,
self
.day,
self
.status)
Base.metadata.create_all(engine)
Session_class
=
sessionmaker(bind
=
engine)
session
=
Session_class()
stu_obj
=
session.query(Student).
filter
(Student.name
=
=
"ljb"
).first()
print
(stu_obj)
print
(stu_obj.my_study_record)
下表中,Customer表有2个字段都关联了Address表,首先先创建表结构
# 多外键关联
from
sqlalchemy
import
create_engine
from
sqlalchemy
import
Integer,String,Column,ForeignKey
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy.orm
import
sessionmaker,relationship
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.20.219/lzl"
,
encoding
=
"utf-8"
,echo
=
True
)
Base
=
declarative_base()
class
Customer(Base):
__tablename__
=
"customer"
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
))
billing_address_id
=
Column(Integer,ForeignKey(
"address.id"
))
shipping_address_id
=
Column(Integer, ForeignKey(
"address.id"
))
billing_address
=
relationship(
"Address"
,foreign_keys
=
[billing_address_id])
#必须写foreign_keys
shipping_address
=
relationship(
"Address"
,foreign_keys
=
[shipping_address_id])
class
Address(Base):
__tablename__
=
‘address‘
id
=
Column(Integer, primary_key
=
True
)
street
=
Column(String(
32
))
city
=
Column(String(
32
))
state
=
Column(String(
32
))
Base.metadata.create_all(engine)
Session
=
sessionmaker(bind
=
engine)
session
=
Session()
a1
=
Address(street
=
"Tiantongyuan"
,city
=
"ChangPing"
,state
=
"BJ"
)
a2
=
Address(street
=
"Wudaokou"
,city
=
"HaiDian"
,state
=
"BJ"
)
a3
=
Address(street
=
"Yanjiao"
,city
=
"LangFang"
,state
=
"HB"
)
session.add_all([a1,a2,a3])
c1
=
Customer(name
=
"lzl"
,billing_address_id
=
1
,shipping_address_id
=
2
)
c2
=
Customer(name
=
"Alex"
,billing_address_id
=
3
,shipping_address_id
=
3
)
session.add_all([c1,c2])
session.commit()
class
Customer(Base):
__tablename__
=
"customer"
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
32
))
billing_address_id
=
Column(Integer,ForeignKey(
"address.id"
))
shipping_address_id
=
Column(Integer, ForeignKey(
"address.id"
))
billing_address
=
relationship(
"Address"
,foreign_keys
=
[billing_address_id])
#必须写foreign_keys
shipping_address
=
relationship(
"Address"
,foreign_keys
=
[shipping_address_id])
def
__repr__(
self
):
return
"<name:%s billing_add:%s shipping_add:%s>"
%
(
self
.name,
self
.billing_address.street,
self
.shipping_address.street)
#Base.metadata.create_all(engine)
Session
=
sessionmaker(bind
=
engine)
session
=
Session()
cus_obj
=
session.query(Customer).filter_by(name
=
"ljb"
).first()
print
(cus_obj)
现在来设计一个能描述“图书”与“作者”的关系的表结构,需求是
#一本书可以有多个作者,一个作者又可以出版多本书
from
sqlalchemy
import
Table, Column, Integer,String,DATE, ForeignKey
from
sqlalchemy.orm
import
relationship
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
create_engine
from
sqlalchemy.orm
import
sessionmaker
engine
=
create_engine(
"mysql+pymysql://root:zyw@123@192.168.20.219/lzl"
,
encoding
=
"utf-8"
)
Base
=
declarative_base()
#创建book_m2m_author表,表不用用户操作,系统自动维护,自动添加数据
book_m2m_author
=
Table(
‘book_m2m_author‘
, Base.metadata,
Column(
‘book_id‘
,Integer,ForeignKey(
‘books.id‘
)),
Column(
‘author_id‘
,Integer,ForeignKey(
‘authors.id‘
)),
)
class
Book(Base):
__tablename__
=
‘books‘
id
=
Column(Integer,primary_key
=
True
)
name
=
Column(String(
64
))
pub_date
=
Column(DATE)
#关联Author类,secondary表示通过book_m2m_author表进行查询关联数据,backref反向查询也一样
authors
=
relationship(
‘Author‘
,secondary
=
book_m2m_author,backref
=
‘books‘
)
def
__repr__(
self
):
return
self
.name
class
Author(Base):
__tablename__
=
‘authors‘
id
=
Column(Integer, primary_key
=
True
)
name
=
Column(String(
32
))
def
__repr__(
self
):
return
self
.name
Base.metadata.create_all(engine)
#创建数据
Session
=
sessionmaker(bind
=
engine)
session
=
Session()
b1
=
Book(name
=
"learn python with Alex"
,pub_date
=
"2014-05-02"
)
b2
=
Book(name
=
"learn linux with Alex"
,pub_date
=
"2015-05-02"
)
b3
=
Book(name
=
"learn go with Alex"
,pub_date
=
"2016-05-02"
)
a1
=
Author(name
=
"Alex"
)
a2
=
Author(name
=
"Jack"
)
a3
=
Author(name
=
"Rain"
)
#关键来了,创建关联关系
b1.authors
=
[a1,a3]
b3.authors
=
[a1,a2,a3]
session.add_all([b1,b2,b3,a1,a2,a3])
session.commit()
author_obj
=
session.query(Author).filter_by(name
=
"Alex"
).first()
print
(author_obj,author_obj.books)
book_obj
=
session.query(Book).filter_by(
id
=
2
).first()
print
(book_obj,book_obj.authors)
9、多对多删除
删除数据时不用管boo_m2m_authors , sqlalchemy会自动帮你把对应的数据删除
通过书删除作者
author_obj
=
s.query(Author).filter_by(name
=
"Jack"
).first()
book_obj
=
s.query(Book).filter_by(name
=
"跟Alex学把妹"
).first()
book_obj.authors.remove(author_obj)
#从一本书里删除一个作者
s.commit()
直接删除作者
删除作者时,会把这个作者跟所有书的关联关系数据也自动删除
author_obj
=
s.query(Author).filter_by(name
=
"Alex"
).first()
# print(author_obj.name , author_obj.books)
s.delete(author_obj)
s.commit()
标签:values count load doc express upd orm 分享 语言
原文地址:http://www.cnblogs.com/Jeb15/p/6284916.html