[源码解析] 并行分布式框架 Celery 之架构 (2)
文章目录
- [源码解析] 并行分布式框架 Celery 之架构 (2)
0x00 摘要
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
本系列将通过源码分析,和大家一起深入学习 Celery。本文是系列第二篇,继续探究 Celery 架构。
0x01 上文回顾
上文中,我们大致介绍了 Celery 的概念,用途和架构,现在回忆 Celery 的架构图如下:
+-----------+ +--------------+
| Producer | | Celery Beat |
+-------+---+ +----+---------+
| |
| |
v v
+-------------------------+
| Broker |
+------------+------------+
|
|
|
+-------------------------------+
| | |
v v v
+----+-----+ +----+------+ +-----+----+
| Exchange | | Exchange | | Exchange |
+----+-----+ +----+------+ +----+-----+
| | |
v v v
+-----+ +-------+ +-------+
|queue| | queue | | queue |
+--+--+ +---+---+ +---+---+
| | |
| | |
v v v
+---------+ +--------+ +----------+
| worker | | Worker | | Worker |
+-----+---+ +---+----+ +----+-----+
| | |
| | |
+-----------------------------+
|
|
v
+---+-----+
| backend |
+---------+
下面我们从几个方面继续分析。
0x02 worker的思考
当启动一个worker的时候,这个worker会与broker建立链接(tcp长链接),然后如果有数据传输,则会创建相应的channel, 这个连接可以有多个channel。然后,worker就会去borker的队列里面取相应的task来进行消费了,这也是典型的消费者生产者模式。
2.1 worker的模式
首先,我们思考下worker 的工作模式,即,这些并发的 worker 彼此之间是什么关系?是否需要一个master 来统一调控?为了更好的对比,我们先看看nginx的实现。
2.1.1 Nginx模式
nginx 后台进程包含一个master进程和多个worker进程。
- master进程主要用来管理worker进程,包含:接收来自外界的信号,向各worker进程发送信号,监控worker进程的运行状态,当worker进程退出后(异常情况下),会自动重新启动新的worker进程。
- worker进程则处理基本的网络事件。多个worker进程之间是对等的,他们同等竞争来自客户端的请求,各进程互相之间是独立的。一个请求 只可能在一个worker进程中处理,一个worker进程,不可能处理其它进程的请求。
worker进程之间是平等的,每个进程处理请求的机会也是一样的。一个连接请求过来,每个进程都有可能处理这个连接,怎么做到的呢?
- 首先,每个worker进程都是从master进程fork过来,在master进程里面,先建立好需要listen的socket(listenfd)之后,然后再fork出多个worker进程。
- 其次,所有worker进程的listenfd会在新连接到来时变得可读,为保证只有一个进程处理该连接,所有worker进程在注册listenfd读事件前抢accept_mutex,抢到互斥锁的那个进程注册listenfd读事件,在读事件里调用accept接受该连接。
- 最后,当一个worker进程在accept这个连接之后,就开始读取请求,解析请求,处理请求,产生数据后,再返回给客户端,最后才断开连接,这样一个完整的请求就是这样的了。
我们可以看到,一个请求完全由worker进程来处理,而且只在一个worker进程中处理。
2.1.2 Celery 模式
2.1.2.1 模式
与 Nginx不同,在 Celery 之中,没有 master 进程。所有的都是worker 进程。大家都在 redis 之上等待新任务。
但是,每一个worker内部,父进程和子进程内部,却又是 master - slave 模式,也就是我们常说的主从。
- master(就是父进程)负责任务的获取,分发,slaves 的管理(创建,增加,关闭,重启,丢弃等等),其他辅助模块的维护等等。
- slave(就是子进程)负责消费从调度器传递过来的任务。
worker内部 具体流程如下:
- 调度器首先预生成(prefork)一些工作进程,做为一个进程池(mutiprocessing-pool),之后通过事件驱动(select/poll/epoll)的方式,监听内核的事件(读、写、异常等等)。
- 如果master监听到就执行对应的回调,源源不断的从 中间人(broker)那里提取任务,并通过 管道(pipe)作为进程间通讯的方式,运用一系列的路由策略(round-robin、weight 等等)交给slave。
- slave工作进程 消费(ack)任务,再通过管道向调度器进行状态同步(sync),进程间通讯等等行为。
- 这个 workloop 其实很明显,就是监听读管道的数据(主进程从这个管道的另一端写),然后执行对应的回调,期间会调用 put 方法,往写管道同步状态(主进程可以从管道的另一端读这个数据)。
具体如下图:
+-------------+
| |
| Redis |
| |
+---+------+--+
^ ^
| |
| |
+-----------------------+ +--------------+
| |
| |
| |
+------------------+--------------------+ +-----------+--------+
| Worker 1 | | Worker n |
| | | |
| | | |
| Parent process | | Parent process |
| + | | + |
| | | | | |
| | | | | |
| +--------+------------+ | | | |
| | | | | | |
| | | | | | |
| v v | | v |
| subprocess 1 ... subprocess n | ... | subprocess |
| | | |
+---------------------------------------+ +--------------------+
2.1.2.2 交互
在 Celery 中,采用的是分布式的管理方式,每个节点之间都是通过广播/单播进行通信,从而达到协同效果。
在处理具体控制管理工作时候,worker 进程之间有交流,具体分为两种:
- 启动时候使用 Mingle 模块来互相交换信息。
- 运行状态下,通过 gossip 协议进行状态的共享。但是这个状态共享对于任务的分配和worker 的调度没有必然的联系,只是用来监控和响应控制台消息。因为假如有若干 worker,面对一个控制台消息,应该只有一个 worker 来响应其消息,所以就利用 gossip 协议选举出一个 leader,这个 leader 进行响应。
在处理具体业务工作时候,worker 之间没有交流。
当启动一个worker的时候,这个worker会与broker建立链接(tcp长链接),然后如果有数据传输,则会创建相应的channel,一个连接可以有多个channel。然后,worker就会去borker的队列里面取相应的task来进行消费了,这也是典型的消费者生产者模式。
以 redis 为例,底层 Kombu 事实上是使用 redis 的 BRPOP 功能来完成对具体 queue 中消息的读取。
如果多个 worker 同时去使用 brpop 获取 broker 消息,那么具体哪一个能够读取到消息,其实这就是有一个 竞争机制,因为redis 的单进程处理,所以只能有一个 worker 才能读到。
2.2 worker 组成
在 worker 文档中提到:worker主要由四部分组成的:task_pool, consumer, scheduler, mediator。
这四部分依赖下面两组数据结构工作。
- 就绪队列:那些 立刻就需要运行的task, 这些task到达worker的时候会被放到这个就绪队列中等待consumer执行。
- ETA:是那些有ETA参数,或是rate_limit参数的 task。这些 task 被放入 timer 队列中,timer 负责在条件合适的情况下,把这些 task 放入执行pool。
但是实际上,mediator 在代码中没有发现。也许是 mediator 成了默认功能而非组件。
2.2.1 task_pool
task_pool主要是用来存放的是一些worker。当启动了一个worker,并且提供并发参数的时候,会将一些worker放在这里面。
celery默认的并发方式是prefork,也就是多进程的方式,这里只是celery对multiprocessing.Pool
进行了轻量的改造,然后给了一个新的名字叫做prefork。
这个pool与多进程的进程池的区别就是这个task_pool只是存放一些运行的worker。
2.2.2 consumer
consumer也就是消费者, 主要是从broker那里接受一些message。然后将message转化为celery.worker.request.Request
的一个实例。并且在适当的时候,会把这个请求包装进Task中。
Task就是用装饰器 app_celery.task()
装饰的函数所生成的类,所以可以在自定义的任务函数中使用这个请求参数,获取一些关键的信息。
2.2.3 Scheduler
对于 Scheduler,可以从 Beat 和 Timer 两个方面讲述。
2.2.3.1 Beat
Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
其中枢部分就是 Scheduler,Service 是驱动部分,最后的承载实体就是 SchedulerEntry。
其内部主要数据结构是一个最小堆,它的作用就是承载了所有我们设置得定时任务,而最小堆的特性就是堆顶的元素是最小的,排序的依据是时间差值。celery 会先计算每个定时任务下一次执行的时间戳 - 当前时间戳,然后根据这个时间差值进行排序,毫无疑问,差值最小的就是下一次需要执行的任务。
在 Service 的 start 函数中,会调用 scheduler.tick(),从而在内部最小堆中获取下次一需要执行的任务。将 SchedulerEntry
转换为 Task
,发送到 redis 的队列中。
具体定义如下:
class Scheduler:
"""Scheduler for periodic tasks.
"""
Entry = ScheduleEntry
#: The schedule dict/shelve.
schedule = None
#: Maximum time to sleep between re-checking the schedule.
max_interval = DEFAULT_MAX_INTERVAL
#: How often to sync the schedule (3 minutes by default)
sync_every = 3 * 60
#: How many tasks can be called before a sync is forced.
sync_every_tasks = None
_last_sync = None
_tasks_since_sync = 0
持久化
在 Celery 中,定时任务的执行并不会因为我们重启了 Celery 而失效,反而在重启 Celery 之后,Celery 会根据上一次关闭之前的执行状态,重新计算新的执行周期,而这里计算的前提就是能够获取旧的执行信息,而在 Scheduler 中,这些信息都是默认保存在文件中的。
Celery 默认的存储是通过 Python 默认的 shelve 库实现的,shelve 是一个类似于字典对象的数据库,我们可以通过调用 sync
命令在磁盘和内存中同步数据。
2.2.3.2 Timer
文档中对于 Timer 的描述如下:
The timer schedules internal functions, like cleanup and internal monitoring,
but also it schedules ETA tasks and rate limited tasks.
If the scheduled tasks ETA has passed it is moved to the execution pool.
可以看到,其功能就是调度内部的函数,比如清理和监控,也调度ETA tasks and rate limited tasks。
对于清理,有比如 backend.process_cleanup 和 loader.on_process_cleanup。
2.3 初始化过程
worker初始化过程中,各个模块的执行顺序是由一个BluePrint类定义,并且根据各个模块之间的依赖进行排序执行。
Worker 的 start 方法中,其实就是执行了一个 self.blueprint 的 start 方法,这里面的 blueprint,是 celery 自己实现的一个 有向无环图(DAG)的数据结构,其功能简单描述下就是:根据命令行传入的不同参数,初始化不同的组件(step),并执行这些组件的初始化方法。其实就是一个对流程控制的面向对象的封装。
每个 Step 的具体的功能如下:
- Timer:用于执行定时任务的 Timer;
- Hub:Eventloop 的封装对象;
- Pool:构造各种执行池(线程/进程/协程);
- Autoscaler:用于自动增长或者 pool 中工作单元;
- StateDB:持久化 worker 重启区间的数据(只是重启);
- Autoreloader:用于自动加载修改过的代码;
- Beat:创建 Beat 进程,不过是以子进程的形式运行(不同于命令行中以 beat 参数运行);
0x03 Consumer的思考
Celery 使用 Consumer 来从 broker 获取消息。
3.1 组件
Consumer 的组件如下:
- 【1】Connection:管理和 broker 的 Connection 连接
- 【3】Events:用于发送监控事件
- 【2】Agent:
cell
actor - 【2】Mingle:不同 worker 之间同步状态用的
- 【1】Tasks:启动消息 Consumer
- 【3】Gossip:消费来自其他 worker 的事件
- 【1】Heart:发送心跳事件(consumer 的心跳)
- 【3】Control:远程命令管理服务
在参考文章 1: Worker 启动流程概述 中提到:
这里我对所有的 Bootstep 都做了标号处理,标号的大小说明了这些服务对于我们代码阅读的重要程序,1 最重要,3 最不紧要。对于 Consumer 来说,
1 是基本功能,这些功能组成了一个简单的非强壮的消息队列框架;
2 一般重要,可以实现一个高级一点的功能;
3 属于附加功能,同时也属于一点分布式的功能。
3.2 作用
因此,我们可以看到,celery Consumer 组件的概念远远要大于Kombu的Consumer,不只是从broker取得消息,也包括消息的消费,分发,监控,心跳等一系列功能。
可以说,除了消息循环引擎 被 hub 承担,多进程被 Pool,Autoscaler 承担,定时任务被 timer,beat 承担之外,其他主要功能都被 Consumer 承担。
0x04 高性能的思考
celery 的高性能主要靠两个方面来保证,一个是多进程,一个是事件驱动。此外在一些具体功能实现方面也确保了高性能的实现。
4.1 多进程
多进程可以良好的发挥每个核的计算能力。可以在一定程度上提升程序的并发能力,缓解 IO 的压力。
Celery 的方案叫做 prefork,也就是预生成。预生成指的是,主进程在执行具体的业务逻辑之前,先提前 fork 出来一堆子进程,并把他们存起来集中管理,形成一个进程池。平常的时候这些子进程都是 休眠(asleep) 状态,只有当主进程派发任务的时候,会唤醒(awake)其中的一个子进程,并通过进程间通讯的手段,向子进程传输相应的任务数据。
如前所述,每一个worker内部,父进程和子进程内部,是 master - slave 模式。
- master(就是父进程)负责任务的获取,分发,slaves 的管理(创建,增加,关闭,重启,丢弃等等),其他辅助模块的维护等等。
- 调度器首先预生成(prefork)一些工作进程(slave),做为一个进程池(mutiprocessing-pool),之后通过事件驱动(select/poll/epoll)的方式,监听内核的事件(读、写、异常等等)。
- slave(就是子进程)负责消费从调度器传递过来的任务。再通过管道向调度器进行状态同步(sync),进程间通讯等等行为。
4.2 事件驱动
Kombu内部使用了事件驱动。
Master 调度器是一个事件驱动模型,什么是事件驱动,其实就是它消灭了阻塞。
正常的单线程模型,一次只能拿一条消息,每一次都要走一条来和回的链路,并且需要一个 while True 的循环不断的去检测,这样无疑是非常低效且开销大的。
而事件驱动则不这样,他可以同时发送多个检测的信号,然后就直接挂起,等待内核进行提示,有提示再去执行对应的回调。这样既优雅的化解了单线程每次都要检测的 while True,又通过多次请求并发降低了重复链路。
以 epoll 为例:
-
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些。
-
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表 就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了 这些文件描述符在系统调用时复制的开销。
-
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描 述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调 机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
4.3 Task的实现
Task 承载的功能就是在 Celery 应用中,启动对应的消息消费者。
关于 Task 的实现,这就涉及问题:究竟是分发代码还是分发数据?
因为 Celery 是一个通用性功能,不是特定面对大数据,所以分发数据是不可避免的。
剩下问题就是是否需要分发代码?
Task 任务最基本的形式就是函数,任务发布最直接的想法就是client将要执行的相关函数代码打包,发布到broker。分布式计算框架spark就是使用这种方式。
4.3.1 分发代码
业界分发代码的代表是 Spark。Spark的思想比较简单:挪计算不挪数据。那怎么去描述这个计算呢?Spark 通过RDD封装一个针对数据对应关系记录,在这个封装之上来记录计算。这就涉及到两个最重要的问题:
- 如何拆分计算逻辑;
- 如何分发计算逻辑;
于是 Spark 把所有的计算逻辑划分为这两种类型:
- 能够分发到各个节点上并行执行的;
- 需要经过一定量的结果合并之后才能继续执行的;
然后把一个巨大的问题拆分成相对独立的子问题分发到各个机器上求解。
在实际提交时候,Spark把计算代码提交到每个工作节点上然后进行计算。
4.3.2 Celery 模式
2.0之前的celery也支持这种任务发布的方式。这种方式显而易见的一个坏处是传递给broker的数据量可能会比较大。解决的办法也很容易想到,就是把要发布的任务相关的代码,提前告诉worker。
这就是 全局集合 和 注解注册的作用。
@app.task(name='hello_task')
def hello():
print('hello')
其中的app是worker中的application,通过装饰器的方式,对任务函数注册。
app会维护一个字典,key是任务的名字,也就是这里的hello_task
,value是这个函数的内存地址。任务名必须唯一,但是任务名这个参数不是必须的,如果没有给这个参数,celery会自动根据包的路径和函数名生成一个任务名。
通过上面这种方式,client发布任务只需要提供任务名以及相关参数,不必提供任务相关代码:
# client端
app.send_task('hello_task')
这里需要注意:client发布任务后,任务会以一个消息的形式写入broker队列,带有任务名称等相关参数,等待worker获取。这里任务的发布,是完全独立于worker端的,即使worker没有启动,消息也会被写入队列。
这种方式也有显而易见的坏处,所有要执行的任务代码都需要提前在worker端注册好,client端和worker端的耦合变强了。
4.4 Prefetch
目前 Kombu QoS 只是支持 prefetch_count。
设置 prefetch_count 的目的是:
- Prefetch指的是一个Celery Worker节点,能够提前获取一些还还未被其他节点执行的任务,这样可以提高Worker节点的运行效率。
- 同时也可以通过设置Qos的prefetch count来控制consumer的流量,防止消费者从队列中一下拉取所有消息,从而导致击穿服务,导致服务崩溃或异常。
Kombu qos prefetch_count 是一个整数值N,表示的意思就是一个消费者最多只能一次拉取N条消息,一旦N条消息没有处理完,就不会从队列中获取新的消息,直到有消息被ack。
Kombu 中,会记录 prefetch_count的值,同时记录的还有该channel dirty (acked/rejected) 的消息个数。
4.5 Celery函数
Celery 还提供了一些工作流功能,其中某些功能可以让我们提高性能。比如 Chunks 功能。
任务块函数能够让你将需要处理的大量对象分为分成若干个任务块,如果你有一百万个对象,那么你可以创建 10 个任务块,每个任务块处理十万个对象。有些人可能会担心,分块处理会导致并行性能下降,实际上,由于避免了消息传递的开销,因此反而会大大的提高性能。
add_chunks_sig = add.chunks(zip(range(100), range(100)), 10)
result = add_chunks_sig.delay()
result.get()
0x05 分布式的思考
我们从负载均衡,容灾恢复,worke之间交互这三个角度来看看 Celery 如何实现分布式。
5.1 负载均衡
Celery 的负载均衡其实可以分为三个层次,而且是与 Kombu 高度耦合(本文 broker 以 Redis 为例)。
- 在 worker 决定 与 哪几个 queue 交互,有一个负载均衡;
- 在 worker 决定与 broker 交互,使用 brpop 获取消息时候有一个负载均衡;
- 在 worker 获得 broker 消息之后,内部 具体 调用 task 时候,worker 内部进行多进程分配时候,有一个负载均衡。
另外,Celery 还有一个 AutoScaler 组件,其作用 实际就是在线调节进程池大小。这也和缓解负载相关。
其主要逻辑大致如下图所示(后续文章中会有详细讲解):
+
Kombu | |Redis
|
BRPOP(keys) |
+------------------------------------------+ |
| Worker 1 | ------------------+ |
| | | |
+------------------------------------------+ | | queue 1 key
| |
| |
+------------------------------------------+ BRPOP(keys) | |
| Worker 2 | +---------------------------> queue 2 key
| | | |
+------------------------------------------+ | |
| |
+------------------------------------------+ | | queue 3 key
| Worker 3 | | |
| | | |
| +-----------+ | | |
| | queue 1 | | BRPOP(keys) | |
| | queue 2 | keys | | |
| | ...... | +--------+---------------------------------+ |
| | queue n | ^ | |
| +-----------+ | | |
| | | |
| + | |
| | |
| + round_robin_cycle | |
| | | |
+------------------------------------------+ |
| |
| fair_strategy |
| |
+-------+----------+----------------+ |
| | | |
v v v |
+-----+--------+ +------+-------+ +-----+--------+ |
| subprocess 1 | | subprocess 2 | | subprocess 3 | +
+--------------+ +--------------+ +--------------+
5.2 failover 容灾恢复
5.2.1 错误种类&失败维度
Celery 之中,错误主要有3种:
- 用户代码错误:错误可以直接返回应用,因为Celery无法知道如何处理;
- Broker错误:Celery可以根据负载平衡策略尝试下一个节点;
- 网络超时错误:Celery可以重试该请求;
从系统角度出发,几个最可能的失败维度如下:
- Broker失败;
- Worker —> Broker 这个链路会失败;
- Worker 节点会失败;
- Worker 中的多进程中,某一个进程本身失效;
- Worker 的某一个进程中,内部处理任务失败;
从实际处理看,broker可以使用 RabbitMQ,可以做 集群和故障转移;但这是涉及到整体系统设计的维度,所以本系列不做分析。
5.2.2 处理方法
依据错误级别,错误处理 分别有 重试 与 fallback选择 两种。
我们以 Worker —> Broker 维度为例来进行分析。此维度上主要关心的是:
- Broker 某一个节点失效;
- worker 与 Broker 之间网络失效;
在这个维度上,无论是 Celery 还是 Kombu 都做了努力,但是从根本来说,还是 Kombu 的努力。
5.2.2.1 重试
在 Celery 中,对于重试,有 broker_connection_max_retries 配置,就是最大重试次数。
当出现网络故障时候,Celery 会根据 broker_connection_max_retries 配置来进行重试。
在 Komub 中,同样做了 各种 重试 处理,比如 在 Connection.py 中有如下重试参数:
- max_retries:最大重试次数;
- errback (Callable):失败回调策略;
- callback (Callable):每次重试间隔的回调函数;
5.2.2.2 自动重试
自动重试是 kombu 的另外一种重试途径,比如在 kombu\connection.py 就有 autoretry,其基本套路是:
- 在调用fun时候,可以使用 autoretry 这个mapper 做包装。并且可以传入上次调用成功的 channel。
- 如果调用fun过程中失败,kombu 会自动进行try。
5.2.2.3 fallback
如果重试不解决问题,则会使用 fallback。比如 broker_failover_strategy 是 Celery 针对 broker Connection 来设置的策略。会自动映射到 kombu.connection.failover_strategies
。
Kombu 在配置 Connection的时候,可以设置多个 broker url,在连接 broker 的时候,kombu 自动会选取最健康的 broker 节点进行连接。
5.3 Worker之间的交互
前面提到,在处理具体控制管理工作时候,在运行状态下,worker 进程之间通过 gossip 协议进行状态的共享。
但是这个状态共享对于任务的分配和worker 的调度没有必然的联系,只是用来监控和响应控制台消息。因为假如有若干 worker,面对一个控制台消息,应该只有一个 worker 来响应其消息,所以就利用 gossip 协议选举出一个 leader,这个 leader 进行响应。
Gossip 协议跟其他协议一样,也有一些不可避免的缺陷,主要是两个:
1)消息的延迟
由于 Gossip 协议中,节点只会随机向少数几个节点发送消息,消息最终是通过多个轮次的散播而到达全网的,因此使用 Gossip 协议会造成不可避免的消息延迟。不适合用在对实时性要求较高的场景下。
2)消息冗余
Gossip 协议规定,节点会定期随机选择周围节点发送消息,而收到消息的节点也会重复该步骤,因此就不可避免的存在消息重复发送给同一节点的情况,造成了消息的冗余,同时也增加了收到消息的节点的处理压力。而且,由于是定期发送,因此,即使收到了消息的节点还会反复收到重复消息,加重了消息的冗余。
为什么用 gossip?可能因为是用 gossip 来处理管理功能,就是在 workers 之中选出一个 leader 来响应控制台的消息。这样就不需要对消息即时性有要求。
0x06 总结
通过以上的分析,大家应该对 Celery 的架构有了初步的了解。从下文开始,我们逐一分析 Celery 的几个方面,敬请期待。
0xFF 参考
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,可以扫描下面二维码(或者长按识别二维码)关注个人公众号)。