Python SQLAlchemy基本操作和常用技巧包含大量实例,非常好python

http://www.makaidong.com/%E8%84%9A%E6%9C%AC%E4%B9%8B%E5%AE%B6/28053.shtml

"Python SQLAlchemy基本操作和常用技巧包含大量实例,非常好python":关键词python sqlalchemy 基本 作和 常用 技巧 包含 大量 实例 非常好 python

首先说下,由于最新的 0.8 版还是开发版本,因此我使用的是 0.79 版,api 也许会有些不同。
因为我是搭配 mysql innodb 使用,所以使用其他其他数据库 的也不能完全照搬本文。

接着就从安装开始介绍吧,以 debian/ubuntu 为例(请确保有管理员权限):
1.mysql

复制代码 代码如下:
apt-get install mysql-server
apt-get install mysql-client
apt-get install libmysqlclient15-dev

2.python-mysqldb

复制代码 代码如下:
apt-get install python-mysqldb

3.easy_install

复制代码 代码如下:
wget http://peak.telecommunity.com/dist/ez_setup.py

python ez_setup.py
4.mysql-python

复制代码 代码如下:
easy_install mysql-python

5.sqlalchemy

复制代码 代码如下:
easy_install sqlalchemy

如果是用其他操作系统,遇到问题就 google 一下吧。我是在 mac os x 上开发的,途中也遇到些问题,不过当时没记下来……
值得一提的是我用了 mysql-python 来连 mysql,因为不支持异步调用,所以和 tornado 不是很搭。不过性能其实很好,因此以后再去研究下其他方案吧……

装好后就可以开始使用了:

复制代码 代码如下:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

db_connect_string = 'mysql+mysqldb://root:123@localhost/ooxx?charset=utf8'
engine = create_engine(db_connect_string, echo=true)
db_session = sessionmaker(bind=engine)
session = db_session()

这里的 db_connect_string 就是连接其他数据库 的路径。“mysql+mysqldb”指定了使用 mysql-python 来连接,“root”和“123”分别是用户名和密码,“localhost”是其他数据库 的域名,“ooxx”是使用的其他数据库 名(可省略),“charset”指定了连接时使用的字符集(可省略)。
create_engine() 会返回一个其他数据库 引擎,echo 参数为 true 时,会显示每条执行的 sql 语句,生产环境下可关闭。
sessionmaker() 会生成一个其他数据库 会话类。这个类的实例可以当成一个其他数据库 连接,它同时还记录了一些查询的数据,并决定什么时候执行 sql 语句。由于 sqlalchemy 自己维护了一个其他数据库 连接池(默认 5 个连接),因此初始化一个会话的开销并不大。对 tornado 而言,可以在 basehandler 的 initialize() 里初始化:

复制代码 代码如下:
class basehandler(tornado.web.requesthandler):
    def initialize(self):
        self.session = models.db_session()

def on_finish(self):
        self.session.close()

对其他 web 服务器开发来说,可以使用 sqlalchemy.orm.scoped_session,它能保证每个线程获得的 session 对象都是唯一的。不过 tornado 本身就是单线程的,如果使用了异步方式,就可能会出现问题,因此我并没使用它。

拿到 session 后,就可以执行 sql 了:

复制代码 代码如下:
session.execute('create database abc')
print session.execute('show database s').fetchall()
session.execute('use abc')
# 建 user 表的开发过程
print session.execute('select * from user where id = 1').first()
print session.execute('select * from user where id = :id', {'id': 1}).first()

不过这和直接使用 mysql-python 没啥区别,所以就不介绍了;我还是喜欢 orm 的方式,这也是我采用 sqlalchemy 的唯一原因。

于是来定义一个表:

复制代码 代码如下:
from sqlalchemy import column
from sqlalchemy.types import char, integer, string
from sqlalchemy.ext.declarative import declarative_base

basemodel = declarative_base()

def init_db():
    basemodel.metadata.create_all(engine)

def drop_db():
    basemodel.metadata.drop_all(engine)

class user(basemodel):
    __tablename__ = 'user'

id = column(integer, primary_key=true)
    name = column(char(30)) # or column(string(30))

init_db()

declarative_base() 创建了一个 basemodel 类,这个类的子类可以自动与一个表关联。
以 user 类为例,它的 __tablename__ 属性就是其他数据库 中该表的名称,它有 id 和 name 这两个字段,分别为整型和 30 个定长字符。column 还有一些其他的参数,我就不解释了。
最后,basemodel.metadata.create_all(engine) 会找到 basemodel 的所有子类,并在其他数据库 中建立这些表;drop_all() 则是删除这些表。

接着就开始使用这个表吧:

复制代码 代码如下:
from sqlalchemy import func, or_, not_

user = user(name='a')
session.add(user)
user = user(name='b')
session.add(user)
user = user(name='a')
session.add(user)
user = user()
session.add(user)
session.commit()

query = session.query(user)
print query # 显示sql 语句
print query.statement # 同上
for user in query: # 遍历时查询
    print user.name
print query.all() # 返回的是一个类似列表的对象
print query.first().name # 记录不存在时,first() 会返回 none
# print query.one().name # 不存在,或有多行记录时会抛出异常
print query.filter(user.id == 2).first().name
print query.get(2).name # 以主键获取,等效于上句
print query.filter('id = 2').first().name # 支持字符串

query2 = session.query(user.name)
print query2.all() # 每行是个元组
print query2.limit(1).all() # 最多返回 1 条记录
print query2.offset(1).all() # 从第 2 条记录开始返回
print query2.order_by(user.name).all()
print query2.order_by('name').all()
print query2.order_by(user.name.desc()).all()
print query2.order_by('name desc').all()
print session.query(user.id).order_by(user.name.desc(), user.id).all()

print query2.filter(user.id == 1).scalar() # 如果有记录,返

此文来自: 马开东博客 转载请注明出处 网址:http://www.makaidong.com

回第一条记录的第一个元素
print session.query('id').select_from(user).filter('id = 1').scalar()
print query2.filter(user.id > 1, user.name != 'a').scalar() # and
query3 = query2.filter(user.id > 1) # 多次拼接的 filter 也是 and
query3 = query3.filter(user.name != 'a')
print query3.scalar()
print query2.filter(or_(user.id == 1, user.id == 2)).all() # or
print query2.filter(user.id.in_((1, 2))).all() # in

query4 = session.query(user.id)
print query4.filter(user.name == none).scalar()
print query4.filter('name is null').scalar()
print query4.filter(not_(user.name == none)).all() # not
print query4.filter(user.name != none).all()

print query4.count()
print session.query(func.count('*')).select_from(user).scalar()
print session.query(func.count('1')).select_from(user).scalar()
print session.query(func.count(user.id)).scalar()
print session.query(func.count('*')).filter(user.id > 0).scalar() # filter() 中包含 user,因此不需要指定表
print session.query(func.count('*')).filter(user.name == 'a').limit(1).scalar() == 1 # 可以用 limit() 限制 count() 的返回数
print session.query(func.sum(user.id)).scalar()
print session.query(func.now()).scalar() # func 后可以跟任意函数名,只要该其他数据库 支持
print session.query(func.current_timestamp()).scalar()
print session.query(func.md5(user.name)).filter(user.id == 1).scalar()

query.filter(user.id == 1).update({user.name: 'c'})
user = query.get(1)
print user.name

user.name = 'd'
session.flush() # 写其他数据库 ,但并不提交
print query.get(1).name

session.delete(user)
session.flush()
print query.get(1)

session.rollback()
print query.get(1).name
query.filter(user.id == 1).delete()
session.commit()
print query.get(1)

增删改查都涉及到了,自己看看输出的 sql 语句就知道了,于是基础知识就介绍到此了。

下面开始介绍一些进阶的知识。

如何批量插入大批数据?

可以使用非 orm 的方式:

复制代码 代码如下:
session.execute(
    user.__table__.insert(),
    [{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
session.commit()

上面我批量插入了 10000 条记录,半秒内就执行完了;而 orm 方式会花掉很长时间。

如何让执行的 sql 语句增加前缀?

使用 query 对象的 prefix_with() 开发方法

复制代码 代码如下:
session.query(user.name).prefix_with('high_priority').all()
session.execute(user.__table__.insert().prefix_with('ignore'), {'id': 1, 'name': '1'})

如何替换一个已有主键的记录?

使用 session.merge() 开发方法 替代 session.add(),其实就是 select + update:

复制代码 代码如下:
user = user(id=1, name='ooxx')
session.merge(user)
session.commit()

或者使用
mysql 的 insert … on duplicate key update,需要用到 @compiles
装饰器,有点难懂,自己搜索看吧:《sqlalchemy on duplicate key update》 和
sqlalchemy_mysql_ext。

如何使用无符号整数?

可以使用 mysql 的方言:

复制代码 代码如下:
from sqlalchemy.dialects.mysql import integer

id = column(integer(unsigned=true), primary_key=true)

模型的属性名需要和表的字段名不一样怎么办?

开发时遇到过一个奇怪的需求,有个其他系统开发的表里包含了一个“from”字段,这在 python 里是关键字,于是只能这样处理了:

复制代码 代码如下:
from_ = column('from', char(10))

如何获取字段的长度?

column 会生成一个很复杂的对象,想获取长度比较麻烦,这里以 user.name 为例:

复制代码 代码如下:
user.name.property.columns[0].type.length

如何指定使用 innodb,以及使用 utf-8 编码?

最简单的方式就是修改其他数据库 的默认配置。如果非要在代码里指定的话,可以这样:

复制代码 代码如下:
class user(basemodel):
    __table_args__ = {
        'mysql_engine': 'innodb',
        'mysql_charset': 'utf8'
    }

mysql 5.5 开始支持存储 4 字节的 utf-8 编码的字符了,ios 里自带的 emoji(如

上一篇:unity shader在小米2s上的问题


下一篇:clientX,screenX,pageX,offsetX的异同 【转载】