sqlalchemy 源码分析之create_engine引擎的创建

引擎是sqlalchemy的核心,不管是 sql core 还是orm的使用都需要依赖引擎的创建,为此我们研究下,引擎是如何创建的。

1 from sqlalchemy import create_engine
2 engine = create_engine('mysql+pymysql://root:x@127.0.0.1/test',
3                        echo=True,  # 设置为True,则输出sql语句
4                        pool_size=5,  # 数据库连接池初始化的容量
5                        max_overflow=10,  # 连接池最大溢出容量,该容量+初始容量=最大容量。超出会堵塞等待,等待时间为timeout参数值默认30
6 
7                        pool_recycle=7200  # 重连周期
8                        )

create_engine 创建引擎对象,源代码如下:

 

class PlainEngineStrategy(DefaultEngineStrategy):
    """Strategy for configuring a regular Engine."""


    name = "plain"
    engine_cls = base.Engine

PlainEngineStrategy()

  这里有个参数 strategy:策略,一般情况默认是'plain',通过参数动态去实例策略类。我们看看对应默认的策略'plain'对应的类是哪个?

default_strategy = "plain"
def create_engine(*args, **kwargs):
  strategy = kwargs.pop("strategy", default_strategy)
  strategy = strategies.strategies[strategy]
  return strategy.create(*args, **kwargs)

 

可以看到是PlainEngineStrategy(),接下来回到创建方法 strategy.create(*args, **kwargs),具体看看怎么创建的。 其实调用了父类DefaultEngineStrategy的方法create。

  

class DefaultEngineStrategy(EngineStrategy):
    """Base class for built-in strategies."""


    def create(self, name_or_url, **kwargs):
        # create url.URL object
        u = url.make_url(name_or_url)


        plugins = u._instantiate_plugins(kwargs)


        u.query.pop("plugin", None)
        kwargs.pop("plugins", None)


        entrypoint = u._get_entrypoint()
        dialect_cls = entrypoint.get_dialect_cls(u)


        if kwargs.pop("_coerce_config", False):


            def pop_kwarg(key, default=None):
                value = kwargs.pop(key, default)
                if key in dialect_cls.engine_config_types:
                    value = dialect_cls.engine_config_types[key](value)
                return value


        else:
            pop_kwarg = kwargs.pop


        dialect_args = {}
        # consume dialect arguments from kwargs
        for k in util.get_cls_kwargs(dialect_cls):
            if k in kwargs:
                dialect_args[k] = pop_kwarg(k)


        dbapi = kwargs.pop("module", None)
        if dbapi is None:
            dbapi_args = {}
            for k in util.get_func_kwargs(dialect_cls.dbapi):
                if k in kwargs:
                    dbapi_args[k] = pop_kwarg(k)
            dbapi = dialect_cls.dbapi(**dbapi_args)


        dialect_args["dbapi"] = dbapi


        for plugin in plugins:
            plugin.handle_dialect_kwargs(dialect_cls, dialect_args)


        # create dialect
        dialect = dialect_cls(**dialect_args)


        # assemble connection arguments
        (cargs, cparams) = dialect.create_connect_args(u)
        cparams.update(pop_kwarg("connect_args", {}))
        cargs = list(cargs)  # allow mutability


        # look for existing pool or create
        pool = pop_kwarg("pool", None)
        if pool is None:


            def connect(connection_record=None):
                if dialect._has_events:
                    for fn in dialect.dispatch.do_connect:
                        connection = fn(
                            dialect, connection_record, cargs, cparams
                        )
                        if connection is not None:
                            return connection
                return dialect.connect(*cargs, **cparams)


            creator = pop_kwarg("creator", connect)


            poolclass = pop_kwarg("poolclass", None)
            if poolclass is None:
                poolclass = dialect_cls.get_pool_class(u)
            pool_args = {"dialect": dialect}


            # consume pool arguments from kwargs, translating a few of
            # the arguments
            translate = {
                "logging_name": "pool_logging_name",
                "echo": "echo_pool",
                "timeout": "pool_timeout",
                "recycle": "pool_recycle",
                "events": "pool_events",
                "use_threadlocal": "pool_threadlocal",
                "reset_on_return": "pool_reset_on_return",
                "pre_ping": "pool_pre_ping",
                "use_lifo": "pool_use_lifo",
            }
            for k in util.get_cls_kwargs(poolclass):
                tk = translate.get(k, k)
                if tk in kwargs:
                    pool_args[k] = pop_kwarg(tk)


            for plugin in plugins:
                plugin.handle_pool_kwargs(poolclass, pool_args)


            pool = poolclass(creator, **pool_args)
        else:
            if isinstance(pool, poollib.dbapi_proxy._DBProxy):
                pool = pool.get_pool(*cargs, **cparams)
            else:
                pool = pool


            pool._dialect = dialect


        # create engine.
        engineclass = self.engine_cls
        engine_args = {}
        for k in util.get_cls_kwargs(engineclass):
            if k in kwargs:
                engine_args[k] = pop_kwarg(k)


        _initialize = kwargs.pop("_initialize", True)


        # all kwargs should be consumed
        if kwargs:
            raise TypeError(
                "Invalid argument(s) %s sent to create_engine(), "
                "using configuration %s/%s/%s.  Please check that the "
                "keyword arguments are appropriate for this combination "
                "of components."
                % (
                    ",".join("'%s'" % k for k in kwargs),
                    dialect.__class__.__name__,
                    pool.__class__.__name__,
                    engineclass.__name__,
                )
            )


        engine = engineclass(pool, dialect, u, **engine_args)


        if _initialize:
            do_on_connect = dialect.on_connect()
            if do_on_connect:


                def on_connect(dbapi_connection, connection_record):
                    conn = getattr(
                        dbapi_connection, "_sqla_unwrap", dbapi_connection
                    )
                    if conn is None:
                        return
                    do_on_connect(conn)


                event.listen(pool, "first_connect", on_connect)
                event.listen(pool, "connect", on_connect)


            def first_connect(dbapi_connection, connection_record):
                c = base.Connection(
                    engine, connection=dbapi_connection, _has_events=False
                )
                c._execution_options = util.immutabledict()
                dialect.initialize(c)
                dialect.do_rollback(c.connection)


            event.listen(pool, "first_connect", first_connect, once=True)


        dialect_cls.engine_created(engine)
        if entrypoint is not dialect_cls:
            entrypoint.engine_created(engine)


        for plugin in plugins:
            plugin.engine_created(engine)


        return engine

  

我们逐一分析:  u = url.make_url(name_or_url) # 这个方法解析传入的数据库连接的uri信息,符合条件最终返回一个URL对象 plugins = u._instantiate_plugins(kwargs) # 插件初始化, entrypoint = u._get_entrypoint()  # 根据传入url中的数据库类型(mysql)和驱动库(pymysql),来注册插件,返回方言类 dialect_cls = entrypoint.get_dialect_cls(u)  # 获取Dialect类 这里需要说明下Dialect(方言类)的作用是用来定义数据库和DBapi的行为  
if kwargs.pop("_coerce_config", False):

            def pop_kwarg(key, default=None):
                value = kwargs.pop(key, default)
                if key in dialect_cls.engine_config_types:
                    value = dialect_cls.engine_config_types[key](value)
                return value

        else:
            pop_kwarg = kwargs.pop

        dialect_args = {}
        # consume dialect arguments from kwargs
        for k in util.get_cls_kwargs(dialect_cls):
            if k in kwargs:
                dialect_args[k] = pop_kwarg(k)

  这段代码没啥,创建出方言所需要的完整参数dialect_args

 

dbapi = kwargs.pop("module", None)
if dbapi is None:
    dbapi_args = {}
    for k in util.get_func_kwargs(dialect_cls.dbapi):
        if k in kwargs:
            dbapi_args[k] = pop_kwarg(k)
    dbapi = dialect_cls.dbapi(**dbapi_args)


dialect_args["dbapi"] = dbapi

  这段代码 则是实例化dbpai对象。

# create dialect
dialect = dialect_cls(**dialect_args)

  

  开始实例化方言
        (cargs, cparams) = dialect.create_connect_args(u)
        cparams.update(pop_kwarg("connect_args", {}))
        cargs = list(cargs)  # allow mutability

  

创建连接所需要的参数  
pool = pop_kwarg("pool", None)
if pool is None:


    def connect(connection_record=None):
        if dialect._has_events:
            for fn in dialect.dispatch.do_connect:
                connection = fn(
                    dialect, connection_record, cargs, cparams
                )
                if connection is not None:
                    return connection
        return dialect.connect(*cargs, **cparams)


    creator = pop_kwarg("creator", connect)


    poolclass = pop_kwarg("poolclass", None)
    if poolclass is None:
        poolclass = dialect_cls.get_pool_class(u)
    pool_args = {"dialect": dialect}


    # consume pool arguments from kwargs, translating a few of
    # the arguments
    translate = {
        "logging_name": "pool_logging_name",
        "echo": "echo_pool",
        "timeout": "pool_timeout",
        "recycle": "pool_recycle",
        "events": "pool_events",
        "use_threadlocal": "pool_threadlocal",
        "reset_on_return": "pool_reset_on_return",
        "pre_ping": "pool_pre_ping",
        "use_lifo": "pool_use_lifo",
    }
    for k in util.get_cls_kwargs(poolclass):
        tk = translate.get(k, k)
        if tk in kwargs:
            pool_args[k] = pop_kwarg(tk)


    for plugin in plugins:
        plugin.handle_pool_kwargs(poolclass, pool_args)


    pool = poolclass(creator, **pool_args)
else:
    if isinstance(pool, poollib.dbapi_proxy._DBProxy):
        pool = pool.get_pool(*cargs, **cparams)
    else:
        pool = pool
pool._dialect = dialect

  创建连接池,默认创建pool.QueuePool

# create engine.
engineclass = self.engine_cls
engine_args = {}
for k in util.get_cls_kwargs(engineclass):
    if k in kwargs:
        engine_args[k] = pop_kwarg(k)


_initialize = kwargs.pop("_initialize", True)


# all kwargs should be consumed
if kwargs:
    raise TypeError(
        "Invalid argument(s) %s sent to create_engine(), "
        "using configuration %s/%s/%s.  Please check that the "
        "keyword arguments are appropriate for this combination "
        "of components."
        % (
            ",".join("'%s'" % k for k in kwargs),
            dialect.__class__.__name__,
            pool.__class__.__name__,
            engineclass.__name__,
        )
    )


engine = engineclass(pool, dialect, u, **engine_args)

  从上面可以看出来,引擎的核心是连接池和方言,连接池负责连接的维护,方言负责数据的行为。

 

if _initialize:
    do_on_connect = dialect.on_connect()
    if do_on_connect:


        def on_connect(dbapi_connection, connection_record):
            conn = getattr(
                dbapi_connection, "_sqla_unwrap", dbapi_connection
            )
            if conn is None:
                return
            do_on_connect(conn)


        event.listen(pool, "first_connect", on_connect)
        event.listen(pool, "connect", on_connect)


    def first_connect(dbapi_connection, connection_record):
        c = base.Connection(
            engine, connection=dbapi_connection, _has_events=False
        )
        c._execution_options = util.immutabledict()
        dialect.initialize(c)
        dialect.do_rollback(c.connection)


    event.listen(pool, "first_connect", first_connect, once=True)

  

第一次初始化连接并进行监听  
dialect_cls.engine_created(engine)
if entrypoint is not dialect_cls:
    entrypoint.engine_created(engine)


for plugin in plugins:
    plugin.engine_created(engine)

  

总结:   create_engine通过传入的URI和相关参数,创建一个Engine,该引擎包含了方言(Dialect)和Pool,Dialect如中文名翻译一样,方言:作为不同的数据库Mysql,Oracle,PostgreSQL等,会有不同的行为,Dialect就是用来操作不同数据库的行为,对应接口调用dbapi操作。 而Pool作为数据库连接池,用来管理数据库连接,通过维护一个连接池,池子的大小,数量和生命周期,减少数据库连接的频繁切换,提高查询等操作效率。
上一篇:python中一颗*args和2颗**kwargs的区别


下一篇:❥单例模式❥