一、关于SQLAlchemy
的安装
-
pip install SQLAlchemy
安装 - 如果上面的方式安装不成功的情况可以使用下面的方法
- 百度下载
window
或者linux
下面对应的sqlalchemy
的版本下载地址 - 解压下载的压缩包
- 进去该目录下使用
python setup.py install
- 测试安装是否成功
- 百度下载
二、开发基本的配置(以tornado
开发为参考)
- 1、新建一个包取名为
models
-
2、在
__init__.py
文件中写上基本的配置#!/usr/bin/env python # encoding: utf-8 #引入基本的包 from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker # 连接数据库的数据 HOSTNAME = '127.0.0.1' PORT = '3306' DATABASE = 'tornadotest' USERNAME = 'root' PASSWORD = 'root' # DB_URI的格式:dialect(mysql/sqlite)+driver://username:password@host:port/database?charset=utf8 DB_URI = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(USERNAME,PASSWORD,HOSTNAME,PORT,DATABASE) # 创建引擎 engine = create_engine(DB_URI, echo=False ) # sessionmaker生成一个session类 Session = sessionmaker(bind=engine) dbSession = Session()
-
3、在
models
包下创建一个实体的类(取名User.py
)# coding=utf-8 from datetime import datetime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey from models import engine from models import dbSession from sqlalchemy.orm import relationship Base = declarative_base(engine) # 定义好一些属性,与user表中的字段进行映射,并且这个属性要属于某个类型 class User(Base): __tablename__ = 'user1' id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(50), nullable=False) password = Column(String(100)) createtime = Column(DateTime, default=datetime.now) last_login = Column(DateTime) loginnum = Column(Integer, default=0) _locked = Column(Boolean, default=False, nullable=False) #可以在类里面写别的方法,类似查询方法 @classmethod def all(cls): return dbSession.query(cls).all()
-
4、在
tornado
的.py
文件中# coding:utf8 import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web from pycket.session import SessionMixin from tornado.options import define, options from models import dbSession from models.User import User1 import creat_tables #单独写一个文件用来创建表的 define("port", default=8000, help="run tornado service", type=int) define("tables", default=False, group="application", help="creat tables", type=bool) .... .... if __name__ == "__main__": tornado.options.parse_command_line() if options.tables: creat_tables.run() app = tornado.web.Application(handlers=[ (r"/", IndexHandle), (r"/test01", Test01) ], **settings) app.listen(options.port) tornado.httpserver.HTTPServer(app) tornado.ioloop.IOLoop.instance().start()
-
5、创建
creat_tables.py
文件,里面的代码如下:#coding=utf-8 from models import engine from models.User import Base #将创建好的User类,映射到数据库的users表中 def run(): print '------------create_all-------------' Base.metadata.create_all(engine) print '------------create_end-------------'
-
6、运行代码创建表
- 前提是在代码的服务器上创建了数据库
- 进入对应的工作空间
- 直接运行
python 项目文件名称 --tables[这个地方的名字由这个地方决定:define("tables", default=False, group="application", help="creat tables", type=bool)]
三、关于sqlalchemy
的基本操作
-
1、基本的配置
class BaseHandle(tornado.web.RequestHandler, SessionMixin): def initialize(self): self.db = dbSession print "-----------initialize方法---------" def get_current_user(self): username = self.session.get("username") return username if username else None def on_finish(self): self.db.close() print "-------------on_finish方法----------------"
-
2、查询数据(在创建数据的实体类中新增类方法)
@classmethod def all(cls): return dbSession.query(cls).all() @classmethod def by_id(cls, id): return dbSession.query(cls).filter_by(id=id).first() @classmethod def by_name(cls, name): return dbSession.query(cls).filter_by(username=name).first()
使用方式
# 测试sqlalchemy的test视图(all返回的是一个`list`,但是`first`返回的是一个对象) class Test01(BaseHandle): def get(self): # ===============写sql语句start==============# aa = self.db.query(User1).filter(User1.id == 1).all() print aa #或者直接使用定义好的方法 print User.all(); # ===============写sql语句end==============# self.write("我是测试数据")
-
3、关于增加数据(注意关于增删改的操作要进行
commit
提交)user = User() user.username = xxx user.password = xxx self.db.add(user) self.db.commit()
-
4、关于修改数据
#先查询出,然后重新赋值
-
5、查询
User1
表中全部数据user = self.db.query(User1).all()
-
6、查询某些字段
user = self.db.query(User1.username,User1.password).all()
-
7、根据条件查询返回一个对象
user = self.db.query(User1).filter(User1.password == "123").first()
-
8、查询计算总条数
user = self.db.query(User1).count()
-
9、使用
limit
分页查找user = self.db.query(User1).limit(3).all()
-
10、
slice
切片操作user = self.db.query(User1).slice(1,3).all()
-
11、使用
like
进行模糊查询user = self.db.query(User1).filter(User1.username.like("zhan%")).all()
-
12、
and_
的使用(需要先导包from sqlalchemy import text, and_,func
)user = self.db.query(User1).filter(and_(User1.username.like("zhan%"), User1.password.like("2%"))).all()
-
13、查找大于多少的值
user = self.db.query(User1).filter(User1.loginnum >= 3).all()
-
14、排序
desc
的使用user = self.db.query(User1).order_by(User1.loginnum.desc()).all()
-
15、
filter_by()
使用关键字进行条件查询,不需要用类名指定字段名user = self.db.query(User).filter_by(username='zhansan').all() user = self.db.query(User).filter_by(password='222').all()
-
16、
update()
更新操作,相当于执行了add
操作self.dbSession.query(User).filter(User.id==1) .update({User.username:"aaa"}) self.dbSession.commit()
-
17、
delete
删除数据aa = self.db.query(User).filter(User.id==3).first() self.db.delete(aa)
四、关于sqlalchemy
中使用mysql
的约束
- 1、主键约束:
primary key
不能为空且不能重复 - 2、主键的自动增长:
auto_increment
- 3、唯一约束:
unique
- 4、非空约束:
not null
- 5、外键约束:
foreign key
五、使用execute
批量创建数据
self.db.execute(表名.__table__.insert(), [
{'字段名1': randint(1, 100), '字段名2':'aaa','字段名3': randint(1, 100)}
for i in xrange(500)
])
#提交数据
self.db.commit()
六、关于sqlalchemy
的一对多的查询(重点也是常用的)
- 1、在一对多的处理中主要使用是一个表的一个字段是别的表某一列中指定的值(所谓的外键约束)
- 2、在一对多的关系表中子表中使用外键约束[
ForeignKey(主表.列名)
]注意点主表与子表关联的列的属性要一致 - 3、父与子表中互相查询使用
relationship
-
4、新建一个学生表(
student
)和一个班级表(classes
)from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey #导入relationship from sqlalchemy.orm import relationship #新创建一个学生表的类(子表) class Student(Base): __tablename__ = "student" id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(100), nullable=False) create_time = Column(DateTime, default=datetime.now) # 创建一个外键约束(在子表中创建外键约束,只能是父表中可枚举的值) class_id = Column(Integer, ForeignKey('classess.id')) """ 使用relationship()反向查找 理解: 在student表中可以通过classes查找到classess表里面的数据 在classess表里面可以通过students查找到student表里面的数据 """ classes = relationship("Classess", backref="students") # 创建一个班级表的类(父表) class Classess(Base): __tablename__ = "classess" id = Column(Integer, primary_key=True, autoincrement=True) classname = Column(String(100), nullable=False) createtime = Column(DateTime, default=datetime.now)
-
4、批量插入数据
#班级表中批量插入 self.db.execute(Classess.__table__.insert(), [ {'classname': str(i) + 'class'} for i in xrange(1, 4) ]) self.db.commit() #学生表中批量插入 self.db.execute(Student.__table__.insert(), [ {'username': 'username' + str(i), 'class_id': randint(1, 3)} for i in xrange(20) ]) self.db.commit()
-
5、关于一对多的查询的使用
#查询语句跟之前介绍的一样的 #通过学生查询到该学生的班级 aa = self.db.query(Student).filter_by(id=1).first() print aa.classes.classname #查询班级下所有的学生 bb = self.db.query(Classess).filter_by(id=2).first() for stu in bb.students: print stu.username
七、关于sqlalchemy
的一对一的使用(扩展一张表的信息)
-
1、新建一个用户表和用户扩展表
# 创建一个用户表的类 class User1(Base): __tablename__ = 'user1' id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(50), nullable=False) password = Column(String(100)) createtime = Column(DateTime, default=datetime.now) last_login = Column(DateTime) loginnum = Column(Integer, default=0) _locked = Column(Boolean, default=False, nullable=False) #添加一个字段关联到子表 user1Ext = relationship("User1Ext",uselist=False) #创建一个用户表的扩展表 class User1Ext(Base): __tablename__ = "user1ext" id = Column(Integer,primary_key=True) sex = Column(String(10)) age = Column(Integer) #扩展表当做是一个子表添加一个外键约束 user_id = Column(Integer,ForeignKey("user1.id"),unique=True) #添加一个与user关联的字段 user = relationship("User1",uselist=False)
-
2、添加数据
- 用户表
-
用户扩展表
#根据用户表查询到用户扩展表 aa = self.db.query(User1).filter_by(id=17).first() print aa.user1Ext.sex #根据用户扩展表查询用户信息 bb = self.db.query(User1Ext).filter_by(id=1).first() print bb.user.username
- 用户表
八、多对多的查询(需要一张中间表,然后根据一对多的方式查询)
案例:一个学生可以有多个学科,一个学科有多个学生
-
1、创建一张学生表的试视图类
# 创建学生表 class Student(Base): __tablename__ = "student" id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(100), nullable=False) createtime = Column(DateTime, default=datetime.now) # 新增关联到学科表的字段(secondary表示第二张表的意思,StudentToScore是中间表) student_core = relationship("Core", secondary=StudentToScore.__table__)
-
2、创建一个学科表的视图类
# 创建学科表 class Core(Base): __tablename__ = "score" id = Column(Integer, primary_key=True, autoincrement=True) corename = Column(String(100), nullable=False) core = Column(Integer) # 新增关联到学生表的字段(secondary表示第二张表的意思,StudentToScore是中间表) student_core = relationship("Student", secondary=StudentToScore.__table__)
-
3、创建一个中间表来关联学生表与学科表
# 创建中间表 class StudentToScore(Base): __tablename__ = "student_to_score" student_id = Column(Integer, ForeignKey("student.id"), primary_key=True) score_id = Column(Integer, ForeignKey("score.id"), primary_key=True)
-
4、总结上面创建的三张表
- 1、多对多的关系必须创建一个中间表作为桥梁
- 2、中间表是这两张表的子表,通过外键约束到对应,然后通过双主键约束
- 3、在各自的表中加上
relationship()
去关联对应的表
-
5、批量创建数据(注意先插入学生表与学科表数据后插入中间表的)
#批量插入学生表数据 self.db.execute(Student.__table__.insert(), [ {'username': 'username' + str(i + 1),'class_id': randint(1, 3)} for i in xrange(20) ]) self.db.commit() #批量插入学科表数据 self.db.execute(Core.__table__.insert(), [ {'corename': 'coursename' + str(i + 1),'score': randint(60, 80)} for i in xrange(5) ]) #批量插入中间表数据 self.db.execute(StudentToScore.__table__.insert(), [ {'student_id': randint(1, 20),'score_id': randint(1, 5)} for i in xrange(10)]) self.db.commit()
-
6、测试代码
#查询学生表id=1的学科 aa = self.db.query(Student).filter_by(id=1).first() for a in aa.student_core: print a.corename #查询学科id=1下全部的学生 bb = self.db.query(Core).filter_by(id=1).first() for b in bb.student_score: print b.username