引擎是sqlalchemy的核心,不管是 sql core 还是orm的使用都需要依赖引擎的创建,为此我们研究下,引擎是如何创建的。
1 from sqlalchemy import create_engine 2 engine = create_engine('mysql+pymysql://root:x@127.0.0.1/test', 3 echo=True, # 设置为True,则输出sql语句 4 pool_size=5, # 数据库连接池初始化的容量 5 max_overflow=10, # 连接池最大溢出容量,该容量+初始容量=最大容量。超出会堵塞等待,等待时间为timeout参数值默认30 6 7 pool_recycle=7200 # 重连周期 8 )
create_engine 创建引擎对象,源代码如下:
class PlainEngineStrategy(DefaultEngineStrategy): """Strategy for configuring a regular Engine.""" name = "plain" engine_cls = base.Engine PlainEngineStrategy()
这里有个参数 strategy:策略,一般情况默认是'plain',通过参数动态去实例策略类。我们看看对应默认的策略'plain'对应的类是哪个?
default_strategy = "plain" def create_engine(*args, **kwargs): strategy = kwargs.pop("strategy", default_strategy) strategy = strategies.strategies[strategy] return strategy.create(*args, **kwargs)
可以看到是PlainEngineStrategy(),接下来回到创建方法 strategy.create(*args, **kwargs),具体看看怎么创建的。 其实调用了父类DefaultEngineStrategy的方法create。
class DefaultEngineStrategy(EngineStrategy): """Base class for built-in strategies.""" def create(self, name_or_url, **kwargs): # create url.URL object u = url.make_url(name_or_url) plugins = u._instantiate_plugins(kwargs) u.query.pop("plugin", None) kwargs.pop("plugins", None) entrypoint = u._get_entrypoint() dialect_cls = entrypoint.get_dialect_cls(u) if kwargs.pop("_coerce_config", False): def pop_kwarg(key, default=None): value = kwargs.pop(key, default) if key in dialect_cls.engine_config_types: value = dialect_cls.engine_config_types[key](value) return value else: pop_kwarg = kwargs.pop dialect_args = {} # consume dialect arguments from kwargs for k in util.get_cls_kwargs(dialect_cls): if k in kwargs: dialect_args[k] = pop_kwarg(k) dbapi = kwargs.pop("module", None) if dbapi is None: dbapi_args = {} for k in util.get_func_kwargs(dialect_cls.dbapi): if k in kwargs: dbapi_args[k] = pop_kwarg(k) dbapi = dialect_cls.dbapi(**dbapi_args) dialect_args["dbapi"] = dbapi for plugin in plugins: plugin.handle_dialect_kwargs(dialect_cls, dialect_args) # create dialect dialect = dialect_cls(**dialect_args) # assemble connection arguments (cargs, cparams) = dialect.create_connect_args(u) cparams.update(pop_kwarg("connect_args", {})) cargs = list(cargs) # allow mutability # look for existing pool or create pool = pop_kwarg("pool", None) if pool is None: def connect(connection_record=None): if dialect._has_events: for fn in dialect.dispatch.do_connect: connection = fn( dialect, connection_record, cargs, cparams ) if connection is not None: return connection return dialect.connect(*cargs, **cparams) creator = pop_kwarg("creator", connect) poolclass = pop_kwarg("poolclass", None) if poolclass is None: poolclass = dialect_cls.get_pool_class(u) pool_args = {"dialect": dialect} # consume pool arguments from kwargs, translating a few of # the arguments translate = { "logging_name": "pool_logging_name", "echo": "echo_pool", "timeout": "pool_timeout", "recycle": "pool_recycle", "events": "pool_events", "use_threadlocal": "pool_threadlocal", "reset_on_return": "pool_reset_on_return", "pre_ping": "pool_pre_ping", "use_lifo": "pool_use_lifo", } for k in util.get_cls_kwargs(poolclass): tk = translate.get(k, k) if tk in kwargs: pool_args[k] = pop_kwarg(tk) for plugin in plugins: plugin.handle_pool_kwargs(poolclass, pool_args) pool = poolclass(creator, **pool_args) else: if isinstance(pool, poollib.dbapi_proxy._DBProxy): pool = pool.get_pool(*cargs, **cparams) else: pool = pool pool._dialect = dialect # create engine. engineclass = self.engine_cls engine_args = {} for k in util.get_cls_kwargs(engineclass): if k in kwargs: engine_args[k] = pop_kwarg(k) _initialize = kwargs.pop("_initialize", True) # all kwargs should be consumed if kwargs: raise TypeError( "Invalid argument(s) %s sent to create_engine(), " "using configuration %s/%s/%s. Please check that the " "keyword arguments are appropriate for this combination " "of components." % ( ",".join("'%s'" % k for k in kwargs), dialect.__class__.__name__, pool.__class__.__name__, engineclass.__name__, ) ) engine = engineclass(pool, dialect, u, **engine_args) if _initialize: do_on_connect = dialect.on_connect() if do_on_connect: def on_connect(dbapi_connection, connection_record): conn = getattr( dbapi_connection, "_sqla_unwrap", dbapi_connection ) if conn is None: return do_on_connect(conn) event.listen(pool, "first_connect", on_connect) event.listen(pool, "connect", on_connect) def first_connect(dbapi_connection, connection_record): c = base.Connection( engine, connection=dbapi_connection, _has_events=False ) c._execution_options = util.immutabledict() dialect.initialize(c) dialect.do_rollback(c.connection) event.listen(pool, "first_connect", first_connect, once=True) dialect_cls.engine_created(engine) if entrypoint is not dialect_cls: entrypoint.engine_created(engine) for plugin in plugins: plugin.engine_created(engine) return engine
我们逐一分析: u = url.make_url(name_or_url) # 这个方法解析传入的数据库连接的uri信息,符合条件最终返回一个URL对象 plugins = u._instantiate_plugins(kwargs) # 插件初始化, entrypoint = u._get_entrypoint() # 根据传入url中的数据库类型(mysql)和驱动库(pymysql),来注册插件,返回方言类 dialect_cls = entrypoint.get_dialect_cls(u) # 获取Dialect类 这里需要说明下Dialect(方言类)的作用是用来定义数据库和DBapi的行为
if kwargs.pop("_coerce_config", False): def pop_kwarg(key, default=None): value = kwargs.pop(key, default) if key in dialect_cls.engine_config_types: value = dialect_cls.engine_config_types[key](value) return value else: pop_kwarg = kwargs.pop dialect_args = {} # consume dialect arguments from kwargs for k in util.get_cls_kwargs(dialect_cls): if k in kwargs: dialect_args[k] = pop_kwarg(k)
这段代码没啥,创建出方言所需要的完整参数dialect_args
dbapi = kwargs.pop("module", None) if dbapi is None: dbapi_args = {} for k in util.get_func_kwargs(dialect_cls.dbapi): if k in kwargs: dbapi_args[k] = pop_kwarg(k) dbapi = dialect_cls.dbapi(**dbapi_args) dialect_args["dbapi"] = dbapi
这段代码 则是实例化dbpai对象。
# create dialect dialect = dialect_cls(**dialect_args)
开始实例化方言
(cargs, cparams) = dialect.create_connect_args(u) cparams.update(pop_kwarg("connect_args", {})) cargs = list(cargs) # allow mutability
创建连接所需要的参数
pool = pop_kwarg("pool", None) if pool is None: def connect(connection_record=None): if dialect._has_events: for fn in dialect.dispatch.do_connect: connection = fn( dialect, connection_record, cargs, cparams ) if connection is not None: return connection return dialect.connect(*cargs, **cparams) creator = pop_kwarg("creator", connect) poolclass = pop_kwarg("poolclass", None) if poolclass is None: poolclass = dialect_cls.get_pool_class(u) pool_args = {"dialect": dialect} # consume pool arguments from kwargs, translating a few of # the arguments translate = { "logging_name": "pool_logging_name", "echo": "echo_pool", "timeout": "pool_timeout", "recycle": "pool_recycle", "events": "pool_events", "use_threadlocal": "pool_threadlocal", "reset_on_return": "pool_reset_on_return", "pre_ping": "pool_pre_ping", "use_lifo": "pool_use_lifo", } for k in util.get_cls_kwargs(poolclass): tk = translate.get(k, k) if tk in kwargs: pool_args[k] = pop_kwarg(tk) for plugin in plugins: plugin.handle_pool_kwargs(poolclass, pool_args) pool = poolclass(creator, **pool_args) else: if isinstance(pool, poollib.dbapi_proxy._DBProxy): pool = pool.get_pool(*cargs, **cparams) else: pool = pool pool._dialect = dialect
创建连接池,默认创建pool.QueuePool
# create engine. engineclass = self.engine_cls engine_args = {} for k in util.get_cls_kwargs(engineclass): if k in kwargs: engine_args[k] = pop_kwarg(k) _initialize = kwargs.pop("_initialize", True) # all kwargs should be consumed if kwargs: raise TypeError( "Invalid argument(s) %s sent to create_engine(), " "using configuration %s/%s/%s. Please check that the " "keyword arguments are appropriate for this combination " "of components." % ( ",".join("'%s'" % k for k in kwargs), dialect.__class__.__name__, pool.__class__.__name__, engineclass.__name__, ) ) engine = engineclass(pool, dialect, u, **engine_args)
从上面可以看出来,引擎的核心是连接池和方言,连接池负责连接的维护,方言负责数据的行为。
if _initialize: do_on_connect = dialect.on_connect() if do_on_connect: def on_connect(dbapi_connection, connection_record): conn = getattr( dbapi_connection, "_sqla_unwrap", dbapi_connection ) if conn is None: return do_on_connect(conn) event.listen(pool, "first_connect", on_connect) event.listen(pool, "connect", on_connect) def first_connect(dbapi_connection, connection_record): c = base.Connection( engine, connection=dbapi_connection, _has_events=False ) c._execution_options = util.immutabledict() dialect.initialize(c) dialect.do_rollback(c.connection) event.listen(pool, "first_connect", first_connect, once=True)
第一次初始化连接并进行监听
dialect_cls.engine_created(engine) if entrypoint is not dialect_cls: entrypoint.engine_created(engine) for plugin in plugins: plugin.engine_created(engine)
总结: create_engine通过传入的URI和相关参数,创建一个Engine,该引擎包含了方言(Dialect)和Pool,Dialect如中文名翻译一样,方言:作为不同的数据库Mysql,Oracle,PostgreSQL等,会有不同的行为,Dialect就是用来操作不同数据库的行为,对应接口调用dbapi操作。 而Pool作为数据库连接池,用来管理数据库连接,通过维护一个连接池,池子的大小,数量和生命周期,减少数据库连接的频繁切换,提高查询等操作效率。