码迷,mamicode.com
首页 > 数据库 > 详细

SQLAlchemy模型使用

时间:2016-08-03 01:33:07      阅读:394      评论:0      收藏:0      [点我收藏+]

标签:

一、SQLAlchemy初始化和创建连接

1. SQLAlchemy初始化

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column
from sqlalchemy.types import String, Integer
from sqlalchemy.ext.declarative import declarative_base
#导入相应的模块
engine = create_engine("mysql+pymysql://mysql:123456@10.0.0.8:3306/test", max_overflow=5) #创建数据库连接,max_overflow指定最大连接数
DBSession = sessionmaker(engine)#创建DBSession类型
session = DBSession()#创建session对象

BaseModel = declarative_base()#创建对象的基类

class User(BaseModel): #定义User对象
    __tablename__ = user‘ #创建表,指定表名称

#指定表的结构 id = Column(String, primary_key=True) username = Column(String, index=True) class Session(BaseModel): __tablename__ = session id = Column(String, primary_key=True) user = Column(String, index=True) ip = Column(String)
BaseModel.metadata.create_all(engine)#创建表,执行所有BaseModel类的子类
session.commit()#提交,必须
 

2. SQLAlchemy创建连接

SQLAlchemy 的连接创建是 Lazy 的方式, 即在需要使用时才会去真正创建. 之前做的工作, 全是"定义".连接的定义是在 engine 中做的.

2.1. Engine

engine 的定义包含了三部分的内容, 一是具体数据库类型的实现, 二是连接池, 三是策略(即engine 自己的实现).

所谓的数据库类型即是 MYSQL , Postgresql , SQLite 这些不同的数据库.

一般创建 engine 是使用 create_engine 方法:

engine = create_engine("mysql+pymysql://mysql:123456@10.0.0.8:3306/test")

参数字符串的各部分的意义:

"mysql+pymysql://mysql:123456@10.0.0.8:3306/test"

对于这个字符串, SQLAlchemy 提供了工具可用于处理它:

 1 # -*- coding: utf-8 -*-
 2 
 3 from sqlalchemy import create_engine
 4 from sqlalchemy.engine.url import make_url
 5 from sqlalchemy.engine.url import URL
 6 
 7 s = postgresql://test@localhost:5432/bbcustom
 8 url = make_url(s)
 9 s = URL(drivername=postgresql, username=test, password="",host="localhost", port=5432, database="bbcustom")
10 
11 engine = create_engine(url)
12 engine = create_engine(s)
13 
14 print engine.execute(select id from "user").fetchall()

create_engine 函数有很多的控制参数, 这个后面再详细说.

2.2. Engine的策略

create_engine 的调用, 实际上会变成 strategy.create 的调用. 而 strategy 就是 engine 的实现细节. strategy 可以在 create_engine 调用时通过 strategy 参数指定, 目前官方的支持有三种:

  • plain, 默认的
  • threadlocal, 连接是线程局部的
  • mock, 所有的 SQL 语句的执行会使用指定的函数

mock 这个实现, 会把所有的 SQL 语句的执行交给指定的函数来做, 这个函数是由create_engine 的 executor 参数指定:

 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 from sqlalchemy.ext.declarative import declarative_base
 6 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
 7 from sqlalchemy.orm import sessionmaker, relationship
 8 from sqlalchemy import create_engine

9 def f(sql,*args,**kwargs): 10 print(sql,args,kwargs) 11 s="mysql+pymysql://mysql:123456@10.0.0.8:3306/test" 12 engin=create_engine(s,strategy=mock,executor=f) 13 print(engin.execute(select id from "user"))

2.3. 各数据库实现

各数据库的实现在 SQLAlchemy 中分成了两个部分, 一是数据库的类型, 二是具体数据库中适配的客户端实现. 比如对于 Postgresql 的访问, 可以使用 psycopg2 , 也可以使用 pg8000 :

1 s = postgresql+psycopg2://test@localhost:5432/bbcustom
2 s = postgresql+pg8000://test@localhost:5432/bbcustom
3 engine = create_engine(s)

具体的适配工作, 是需要在代码中实现一个 Dialect 类来完成的. 官方的实现在 dialects 目录下.

获取具体的 Dialect 的行为, 则是前面提到的 URL 对象的 get_dialect 方法. create_engine 时你单传一个字符串, SQLAlchemy 自己也会使用 make_url 得到一个 URL 的实例).

2.4. 连接池

SQLAlchemy 支持连接池, 在 create_engine 时添加相关参数即可使用.

  • pool_size 连接数
  • max_overflow 最多多几个连接
  • pool_recycle 连接重置周期
  • pool_timeout 连接超时时间

连接池效果:

# -*- coding: utf-8 -*-

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column
from sqlalchemy.types import String, Integer
from sqlalchemy.ext.declarative import declarative_base

engine = create_engine("mysql+pymysql://mysql:123456@10.0.0.8:3306/test", pollsize=2,max_overflow=0)
DBSession = sessionmaker(engine)
session = DBSession()
BaseModel = declarative_base()


from threading import Thread
def f():
    print (engine.execute(show databases).fetchall())


p = []
for i in range(3):
    p.append(Thread(target=f))

for t in p:
    t.start()

 

3.SQLAlchemy 模型使用

3.1. 模型定义

对于 Table 的定义, 本来是直接的实例化调用, 通过 declarative 的包装, 可以像"定义类"这样的更直观的方式来完成.

user = Table(user,
    Column(user_id, Integer, primary_key = True),
    Column(user_name, String(16), nullable = False),
    Column(email_address, String(60)),
    Column(password, String(20), nullable = False)
)                                                                                     
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from sqlalchemy.ext.declarative import declarative_base
 4 from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
 5 from sqlalchemy.orm import sessionmaker, relationship
 6 from sqlalchemy import create_engine
 7 
 8 engine = create_engine("mysql+pymysql://mysql:123456@10.0.0.8:3306/test", max_overflow=5) #创建数据库连接
 9 DBSession=sessionmaker(engine)#创建了一个自定义了的 Session类
10 session=DBSession()
11 Base = declarative_base()#创建对象的基类
12 
13 
14 class Blog(Base):
15     __tablename__ = blog
16 
17     id = Column(Integer, primary_key=True)
18     title = Column(String(64), server_default=‘‘, nullable=False)
19     text = Column(String, server_default=‘‘, nullable=False)
20     user = Column(String(32), index=True, server_default=‘‘, nullable=False)
21     create = Column(String(32), index=True, server_default=0, nullable=False)
22 
23 
24 class User(Base):
25     __tablename__ = user
26 
27     id = Column(Integer, primary_key=True)
28     name = Column(String(32), server_default=‘‘, nullable=False)
29     username = Column(String(32), index=True, server_default=‘‘, nullable=False)
30     password = Column(String(64), server_default=‘‘, nullable=False)
31 
32 
33 def init_db():
34     Base.metadata.create_all(engine)
35 
36 def drop_db():
37     Base.metadata.drop_all(engine)
38 
39 if __name__ == __main__:
40     init_db()
41     #drop_db()
42     #Base.metadata.tables[‘user‘].create(engine, checkfirst=True)
43     #Base.metadata.tables[‘user‘].drop(engine, checkfirst=False)
44     #pass

3.2. 创建

1 session = Session()
2 session.add(User(id=1)) #创建一个
3 session.add(Blog(id=1))
4 session.add_all([ User(id=2),Blog(id=2)]) #创建多个
5 session.commit()

执行的顺序并不一定会和代码顺序一致, SQLAlchemy 自己会整合逻辑再执行.

3.3. 查询

SQLAlchemy 实现的查询非常强大, 写起来有一种随心所欲的感觉.

查询的结果, 有几种不同的类型, 这个需要注意, 像是:

  • instance
  • instance of list
  • keyed tuple of list
  • value of list

基本查询:

 1 session.query(User).filter_by(username=abc).all()#查询用户User创建表中username字段等于abc的内容
 2 session.query(User).filter(User.username==abc).all()
#平时使用的时候,两者区别主要就是当使用filter的时候条件之间是使用“==",fitler_by使用的是"="

3 session.query(Blog).filter(Blog.create >= 0).all() #查询Blog对象中创建表中create字段大于等于0的所有内容 4 session.query(Blog).filter(Blog.create >= 0).first() 5 session.query(Blog).filter(Blog.create >= 0 | Blog.title == A).first()#条件或 6 session.query(Blog).filter(Blog.create >= 0 & Blog.title == A).first()#条件与 7 session.query(Blog).filter(Blog.create >= 0).offset(1).limit(1).scalar()# offset(N)从第N条开始返回 limit(N)最多返回 N 条记录 scalar() 如果有记录 8 session.query(User).filter(User.username == abc).scalar() 9 session.query(User.id).filter(User.username == abc).scalar() 10 session.query(Blog.id).filter(Blog.create >= 0).all() 11 session.query(Blog.id).filter(Blog.create >= 0).all()[0].id 12 dict(session.query(Blog.id, Blog.title).filter(Blog.create >= 0).all()) 13 session.query(Blog.id, Blog.title).filter(Blog.create >= 0).first().title #记录不存在时,first() 会返回 None 14 session.query(User.id).order_by(id desc).all()#对结果集进行排序 15 session.query(User.id).order_by(id).first() 16 session.query(User.id).order_by(User.id).first() 17 session.query(User.id).order_by(-User.id).first() 18 session.query(id, username).select_from(User).all()#与 19 session.query(User).get(16e19a64d5874c308421e1a835b01c69)# 以主键获取

多表查询:

session.query(Blog, User).filter(Blog.user == User.id).first().User.username
session.query(Blog, User.id, User.username).filter(Blog.user == User.id).first().id
session.query(Blog.id,User.id,User.username).filter(Blog.user == User.id).first().keys()

条件查询:

from sqlalchemy import or_, not_

session.query(User).filter(or_(User.id == ‘‘,
                               User.id == 1)).all()
session.query(User).filter(not_(User.id == 1)).all()
session.query(User).filter(User.id.in_([1])).all()
session.query(User).filter(User.id.like(1%)).all()
session.query(User).filter(User.id.startswith(1)).all()
dir(User.id)

函数:

from sqlalchemy import func
session.query(func.count(1)).select_from(User).scalar()
session.query(func.count(1), func.max(User.username)).select_from(User).first()
session.query(func.count(1)).select_from(User).scalar()
session.query(func.md5(User.username)).select_from(User).all()
session.query(func.current_timestamp()).scalar()
session.query(User).count()

3.4. 修改

还是通常的两种方式:

1 session.query(User).filter(User.username == abc).update({name: 123})
2 session.commit()
3 
4 user=session.query(User).filter_by(username=abc).scalar()
5 user.name = 223
6 session.commit()

如果涉及对属性原值的引用, 则要考虑 synchronize_session 这个参数.

  • ‘evaluate‘ 默认值, 会同时修改当前 session 中的对象属性.
  • ‘fetch‘ 修改前, 会先通过 select 查询条目的值.
  • False 不修改当前 session 中的对象属性.

在默认情况下, 因为会有修改当前会话中的对象属性, 所以如果语句中有 SQL 函数, 或者"原值引用", 那是无法完成的操作, 自然也会报错, 比如:

from sqlalchemy import func
session.query(User).update({User.name: func.trim(123 )})
session.query(User).update({User.name: User.name + x})

这种情况下, 就不能要求 SQLAlchemy 修改当前 session 的对象属性了, 而是直接进行数据库的交互, 不管当前会话值:

session.query(User).update({User.name: User.name + x}, synchronize_session=False)

是否修改当前会话的对象属性, 涉及到当前会话的状态. 如果当前会话过期, 那么在获取相关对象的属性值时, SQLAlchemy 会自动作一次数据库查询, 以便获取正确的值:

user = session.query(User).filter_by(username=abc).scalar()
print user.name
session.query(User).update({User.name: new}, synchronize_session=False)
print (user.name)
session.commit()
print (user.name)

执行了 update 之后, 虽然相关对象的实际的属性值已变更, 但是当前会话中的对象属性值并没有改变. 直到 session.commit() 之后, 当前会话变成"过期"状态, 再次获取 user.name 时, SQLAlchemy 通过 user 的 id 属性, 重新去数据库查询了新值. (如果 user 的 id 变了呢? 那就会出事了啊.)

synchronize_session 设置成 ‘fetch‘ 不会有这样的问题, 因为在做 update 时已经修改了当前会话中的对象了.

不管 synchronize_session 的行为如何, commit 之后 session 都会过期, 再次获取相关对象值时, 都会重新作一次查询.

3.5. 删除

session.query(User).filter_by(username=abc).delete()

user = session.query(User).filter_by(username=abc).first()
session.delete(user)

删除同样有像修改一样的 synchronize_session 参数的问题, 影响当前会话的状态.

3.6. JOIN

SQLAlchemy 可以很直观地作 join 的支持:

1 r = session.query(Blog, User).join(User, Blog.user == User.id).all()
2 for blog, user in r:
3     print (blog.id, blog.user, user.id)

4 r = session.query(Blog, User.name, User.username).join(User, Blog.user == User.id).all()
5 print (r)

 

SQLAlchemy模型使用

标签:

原文地址:http://www.cnblogs.com/panwenbin-logs/p/5731265.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!