建模杂谈系列88 项目元数据实践6-PM2实现过程回顾

说明

建模杂谈系列84 项目元数据实践5的设计基础上进行了实践。

如果按无脑方式来组织这个应用花费的时间是1,那么本次花费的时间是1.5左右。时耗比:1.5

总的说起来,结果还是比较满意的。有几点体会比较强:

  • 1 时间花的比想象的要多一些。大概花了4天左右进行开发,有边做边完善设计的因素,但是还是比想象中长。特别是有的地方感觉繁琐。
  • 2 基本上没有其他调试,一次成功。这个是设计应该有的效果,但是当进入细节开发时,对于结果是不确定的,也没有多余的时间修改。
  • 3 提升的潜力巨大。或者说还需要做的事还比较多,有以下几方面:
    • 1 使用redis管理过程数据。过程数据有多个制程/流程,如果要手动的声明要监督哪些过程数据会比较麻烦。在每个proc开发的过程中将数据随时存入redis。
    • 2 流程主动汇报。将一些流程相关的细节存入redis(相当于内存runtime),和之前的runtime相互呼应,持久化在这个文件夹。
    • 3 使用图库管理关系。一类关系是流程配置文件定义的流(proc的依赖);另一种是流程(Process)和过程(Procedure)发生变化时,其继承关系。

btw,我把这个服务放在了「猫扇-4650G」机器上,稳定在了(1200转,46.5度)的样子,挺安静的,蹲在边上听了一会风扇也没啥声音。回头看看服务充分运行时机器的表现。
建模杂谈系列88 项目元数据实践6-PM2实现过程回顾

内容

按照反过来的顺序梳理一下本次实践的内容

1 运行

当各部分都配置好了之后,需要启动三个docker。

1.1 基础:input和分发

这个容器的作用是从消息队列中获取数据,并且分发到各功能流程。消息队列在公网服务器,运行的主机在本地。

docker run -d --name='m4_pm2_ner_base'\
               --restart=always\
               -v /etc/localtime:/etc/localtime  \
               -v /etc/timezone:/etc/timezone\
               -e "LANG=C.UTF-8"\
               -v /opt/m4_pm2测试:/workspace\
               registry.cn-hangzhou.aliyuncs.com/YOURIMAGE \
               sh -c "python forever_run_part1_base.py"

1.2 处理和回传

这个容器负责运行各功能流程,并将结果送到目标数据库

docker run -d --name='m4_pm2_ner_process'\
               --restart=always\
               -v /etc/localtime:/etc/localtime  \
               -v /etc/timezone:/etc/timezone\
               -e "LANG=C.UTF-8"\
               -v /opt/m4_pm2测试:/workspace\
               registry.cn-hangzhou.aliyuncs.com/YOURIMAGE \
               sh -c "python forever_run_part2_processing.py"

1.3 巡查

docker run -d --name='m4_pm2_ner_patrol'\
               --restart=always\
               -v /etc/localtime:/etc/localtime  \
               -v /etc/timezone:/etc/timezone\
               -e "LANG=C.UTF-8"\
               -v /opt/m4_pm2测试:/workspace\
               registry.cn-hangzhou.aliyuncs.com/andy08008/pytorch_jupyter:v7 \
               sh -c "python forever_run_part3_patrol.py"

forever_run是通过APScheduler控制的无限循环的链式函数。模板是这样的:

import pandas as pd 
import funcs as fs
import os 
from apscheduler.schedulers.blocking import BlockingScheduler  
from apscheduler.schedulers.background import  BackgroundScheduler
from datetime import datetime, timedelta
import time 

# 给到某个时间datetime 字符串(默认为当前)以及偏移的周、日、时、分、秒。默认为1s。返回新的时间日期字符串str或datetime类型dt。
def make_dt_bias(cur_dt = None, str_format = '%Y-%m-%d %H:%M:%S', 
                 b_weeks = 0, b_days=0, b_hours =0 , b_minutes=0, 
                 b_seconds=1, b_milliseconds=0, b_microseconds = 0,
                 return_type = 'dt'):
    if cur_dt is None:
        cur_dt = datetime.now()
    else:
        cur_dt = datetime.strptime(cur_dt, str_format)
    
    cur_dt = cur_dt + timedelta(weeks=b_weeks, days=b_days, 
                                hours=b_hours, minutes=b_minutes,  
                                seconds=b_seconds, milliseconds=b_milliseconds,
                                microseconds=b_microseconds)
    if return_type =='dt':
        return cur_dt
    else:
        return datetime.strftime(cur_dt, str_format)
    
from jinja2 import Template

# 根据字典生成shell命令中的关键字变量
def gen_jinja_shell_kw_from_dict(kw_dict = None):
    the_tmp = '{%for k,v in obj.items()%} {%if v%} --{{k}}={{v}} {%endif%} {%endfor%}'
    the_tmp1 = Template(the_tmp)
    return the_tmp1.render(obj=kw_dict)


# 链式函数 
def chain_func(cur_func_name = None, func_order_dict = None , sche = None ):
    if cur_func_name is not None:
        print('[I] >>> Chain Started At: ', datetime.now() )
        cur_func = func_order_dict[cur_func_name]['cur_func']
        cur_func_kw = func_order_dict[cur_func_name]['cur_func_kw']



        if cur_func_kw is None:
            res = cur_func()
        else:
            res = cur_func(**cur_func_kw)

        print('go next step')
        
        next_func =  func_order_dict[cur_func_name].get('next_func')
        next_date_bias_kw = func_order_dict[cur_func_name].get('next_date_bias_kw')
        if next_func is None:
            print('[I] Done & Quit')

        # 本函数处理逻辑
        if next_date_bias_kw is None:
            next_dt = make_dt_bias()
        else:
            next_dt = make_dt_bias(**next_date_bias_kw)

        sche.add_job(chain_func, 'date', run_date = next_dt, kwargs = {'cur_func_name':next_func , 'func_order_dict':func_order_dict, 'sche':sche})

    else:
        print('[I] Chain is Empty')
        
        
# 1 查询当前最新的数据状况
def test_hello(para1=None):
    print('>> hello', para1)
#     return os.system('python ./some_folder/some.py -p %s --prefix=%s' % (pname,prefix) )




# 待执行的函数字典
func_order_dict = {}
func_order_dict['step1'] = {'cur_func':test_hello, 
                            'cur_func_kw':{'para1':'YoYo, Check it Now'
                                           }, 
                            'next_func':'step1' , 'next_date_bias_kw':None }

# 'next_date_bias_kw':{'b_seconds':60} 偏移60秒执行下一步, None默认偏移1秒


if __name__ =='__main__':

    # 执行态
    sche = BlockingScheduler()
    print(datetime.now())
    next_dt = make_dt_bias()
    print(next_dt)
    sche.add_job(chain_func,'date',run_date =next_dt,
    kwargs={'cur_func_name':'step1','func_order_dict':func_order_dict,'sche':sche})
    print('[S] starting chain')
    sche.start()

以核心的流程处理容器为例,核心的执行函数是

def processing_data(para1 = None):
    print('>>> running ', para1)

    res0 = os.system('python entry_runtime_p_000_v_000.py')
    print('>>> handling (p=process)f(f=function)_000', res0)

    res1 = os.system('python entry_runtime_p_001_v_000.py')
    print('>>> handling (p=process)f(f=function)_001', res1)


    res2 = os.system('python entry_runtime_p_002_v_000.py')
    print('>>> handling (p=process)f(f=function)_002', res2)
    return True

主要是通过执行某个py文件来达到一个目的。每个py文件可以视为是一种集成,这种集成必须保证覆盖了一个流程(process)的所有部分,但又不必一一对应。某些瓶颈流程可以启动多个容器。

Note: 我发现这次的设计里我忘记给每个流程加上shard控制,因此是不能很好的并行的。当然,实践的这个实际应用也不需要我去并行。只是考虑到proc的通用性,这块的缺失下次补上。 另外,每个并行实际上会启动新的容器,如果是手工操作的话会浪费不少时间,所以我计划把这部分结合到Agent中,由Agent按计划发起docker命令。Agent本身是以PM2方式构造的,不需要并行。

2 功能(流程)

2.1 entry_runtime 运行

一个entry_runtime对应实际执行的proc集合,可以只是流程(Process)的一部分。

这次一个entry_runtime恰好对应了一个process。

entry_runtime_p_002_v_000.py

import os 

# 1 公司名称的预处理
res = os.system('python ./sche_001/pys/proc_002_v_000.py --sname=sche_001 --prname=002 --pr_version=000')
# 每个流程总体现了 A -> B 
print('获取已经生成的子任务,预处理(公司名)后存盘','opr status:',res )


# 2 人名
res = os.system('python ./sche_001/pys/proc_003_v_000.py --sname=sche_001 --prname=002 --pr_version=000')
print('获取已经生成的子任务,人名','opr status:',res )


# 3 预测公司名
res = os.system('python ./sche_001/pys/proc_004_v_000.py --sname=sche_001 --prname=002 --pr_version=000')
print('预测公司名','opr status:',res )


# 4 预测人名
res = os.system('python ./sche_001/pys/proc_004_v_001.py --sname=sche_001 --prname=002 --pr_version=000')
print('预测人名','opr status:',res )


# 5 合并任务
res = os.system('python ./sche_001/pys/proc_005_v_000.py --sname=sche_001 --prname=002 --pr_version=000')
print('合并公司名和人名','opr status:',res )


# 6 检测任务碎片是否完整
res = os.system('python ./sche_001/pys/proc_006_v_001.py --sname=sche_001 --prname=002 --pr_version=000')
print('检查任务碎片是否完整','opr status:',res )


# 7 提交任务
res = os.system('python ./sche_001/pys/proc_007_v_000.py --sname=sche_001 --prname=002 --pr_version=000')
print('任务提交到数据库','opr status:',res )

# 8 删除文件
res = os.system('python ./sche_001/pys/proc_008_v_001.py --sname=sche_001 --prname=002 --pr_version=000')
print('删除数据','opr status:',res )

首先,整体上看起来是很清晰的。每个功能一个py文件。
这样做还有一个优点,每个py文件是以shell命令的方式执行的(由os代劳),这样文件在运行过程中会占用资源,一旦运行完毕,这些资源会随之释放。
还有一种我不知道算优点还是缺点:在运行过程中py文件发生改变,在下一次执行时会立即生效。
除此之外,这种运行方式也使得整体流程不会因为某一段逻辑的失败而完全中断。

其次,这种方式在复用上是容易的。因为002功能是在000001的基础上做了一个合并,在开发时仅仅修改了一下配置文件,增加了一个,并轻微修改两个原有流程就完成了。

过程中思考最多的还是结构,而不是功能,我觉得neo4j可以帮忙大幅提升PM2的效率

2.2 流程配置文件

一个异构流程对应一个功能, p_002 = f_002

002功能对应的配置文件放在./sche_001/process_config/p_002_v_000/main.conf,由于使用PM2代为管理,事实上不需要去记住这样的地址。某种程度上,PM2也是一种代理。

# 公司名识别的预处理
[proc_002_v_000]
last_proc =  proc_001_v_000
ner_task = company
rule_set = NerPreProcessing_company
# 每个短句列表的最大长度,考虑内存/显存
max_ss_list_len = 2000
ss_len_min = 4
ss_len_max = 100
intever_type = left_close
data_type = dict_list
output_mode = persistent

run_mode=auto
# 每次差集流动100个子任务
per_handle_task_num=100


# 人名识别的预处理
[proc_003_v_000]
last_proc =  proc_001_v_000
ner_task = person
rule_set = NerPreProcessing_person
# 每个短句列表的最大长度,考虑内存/显存
max_ss_list_len = 2000
ss_len_min = 2
ss_len_max = 100
intever_type = left_close
data_type = dict_list
output_mode = persistent


run_mode=auto
# 每次差集流动100个子任务
per_handle_task_num=100


# 公司名的模式识别
[proc_004_v_000]
last_proc=proc_002_v_000
model_path = model_v0
rule_set=Ner_model_predicting
allow_fetch_num = 10

# 在预处理的文件中已经指定要做什么...
# task_type = company
run_mode=auto


# 人名的模式识别
[proc_004_v_001]
last_proc=proc_003_v_000
model_path = model_v0
rule_set=Ner_model_predicting
allow_fetch_num = 10

# 在预处理的文件中已经指定要做什么...
# task_type = company
run_mode=auto


# 将两部分的任务进行合并
[proc_005_v_000]
last_proc1=proc_002_v_000
last_proc1_amend=predict
last_proc2=proc_003_v_000
last_proc2_amend=predict

run_mode=auto
per_handle_task_num=100
# 输入在本流程步
# cur_proc_to=schema_output


# 根据任务的parts收集各子任务,完成后进行合并,送往output
[proc_006_v_001]
last_proc=proc_005_v_000

per_handle_task_num=100

run_mode=auto
cur_proc_to=schema_output

[proc_007_v_000]
output_folder = schema_output
per_handle_task_num=100


run_mode = auto
result_db_name = result_db
target_table = t_biz_entity_parse
max_id_query = select max(id) from %s
insert_batch_num = 1000


[proc_008_v_001]
# 无前序依赖, 增加了一些要清除的文件夹

# auto 会有一个容忍率,deliverd和数据的数量在一定范围内视为完成而进行删除
# force则只要fetch就可以删除
run_mode=auto
tolerance = 0.1
output_folder = schema_output

# 要检查删除的各文件夹
del_input = schema_input
del_proc1 = proc_001_v_000
del_proc2 = proc_002_v_000
del_proc2_amend =  predict
del_proc3 = proc_003_v_000
del_proc3_amend =  predict
del_proc5 = proc_005_v_000

del_output = schema_output

这个配置文件基本上就是p_000, p_001两个流程加起来,再增加一个流程。

关于过程(Procedure)的调整

有一些流程可以被复用,但是要略微做修改,所以我们可以把流程复制后,将版本号的末位改一下,再对应的做微调就好了。

三个版本号大致对应:

  • 第一位:大版本变化,调用参数和运行方式可能都有变化
  • 第二位:中变化,参数可能变了。
  • 第三位:小变化,可以继续使用。

这里又引入另一个问题,ProcedureProcess都是很灵活的,可以根据实际情况进行微调。微调的基础通常类似于fork,这个类似于git,我觉得通过neo4j来进行管理会比较好。

一个配置文件可以视为是某个子图的筛选语句

Addtional: PM(Project Meta)和RS(RuleSet)

PM和RS都是为了实现可靠,灵活的目的设计的

PM和RS的不同点是:

  • 1 PM的颗粒度比较大。一个RS可能只是一个流程的一部分。
  • 2 是否持久化。PM是有持久化的,RS一般就是内存运行。
  • 3 PM更松散,RS更集中。PM中的某个过程失败,并不会使整个流程完全瘫痪。而RS如果依赖没读到直接就Fail。

这两者有独立存在的必要。这个就简单理解为通信五层协议为啥要独立存在吧。

两者也有可以互相借鉴的地方。PM的结构分的比RS清晰,大概是因为有很多持久化的部分,因此更直观。RS的调用集成度更高。

3 过程模板

具体的过程就不一一描述了,大致可以列出这样一个模板
proc_template.py

# ==================  A 导入的固定部分   ==================
import os
import sys

# 将当前路径加入,以便调用funcs
basedir = os.path.abspath(os.path.dirname(__file__))
# 回退两层到(/workspace/)
basedir = basedir[:basedir.rfind('/')]
basedir = basedir[:basedir.rfind('/')]
print(basedir)
if basedir not in sys.path:
    sys.path.append(basedir)



# ==================  B 常用包以及临时函数 General Package and Temporary Function  ==================
import os
import pandas as pd 
import numpy as np 
import time
import funcs as fs
import requests as req
from functools import partial


# ===== b 这个可以定义一些临时函数

def temp():
    pass


# ==================  C 过程关键字参数 Procedure Keyword Args   ==================
import argparse
def get_arg():
    parser = argparse.ArgumentParser(description='Customized Arguments')
#     parser.add_argument('-p','--pkl', default='Meta')

    # 制程名
    parser.add_argument('--sname')
    # 流程名
    parser.add_argument('--prname')
    # 流程版本
    parser.add_argument('--pr_version')
    # 是否强制运行
    parser.add_argument('--run_mode')
    # 当前的片shard
    parser.add_argument('--shard')
    # 最大分片
    parser.add_argument('--max_shard')

    # 准备解析参数
    args = parser.parse_args()

    sname = args.sname
    prname = args.prname
    pr_version = args.pr_version
    run_mode = args.run_mode
    shard = args.shard
    max_shard = args.max_shard
    
    return sname,prname,pr_version,run_mode,shard,max_shard

# ==================  D 运行命令示例 Run Command  ==================
# 运行:python ./sche_001/pys/proc_template.py --sname=sche_001 --prname=001 --pr_version=000


if __name__ == '__main__':
    
    # ==================  E 运行参数获取 Runtime Parameter   ==================
    sname,prname,pr_version,run_mode, shard,max_shard = get_arg()
    # ====== e 制程和流程名称必须给,其他的可以不写(采用默认的值)

    # 版本号可以不给
    pr_version = pr_version or '000'
    # 当前片可以不给:要给就要匹配最大片同时给。例如, 最大片为2时,就要有0,1两个片
    shard = shard or 0 
    # 最大片可以不给
    max_shard = max_shard or 1

    # 获取当前流程名
    cur_proc =  os.path.basename(__file__)
    cur_proc1 = cur_proc[:cur_proc.find('.')]
    print('>>> Current Process: %s ' % cur_proc1)

    # ====== e debug时可以这么给(在项目根目录下启动ipython)
    # import funcs as fs
    # sname = 'sche_001'
    # prname = '000'
    # pr_version = '000'
    # cur_proc1 = 'proc_002_v_000'


    # ==================  F 配置参数获取 Process Configs   ==================
    # 初始化当前的pm2
    pm2 = fs.PM2(sname,prname, pr_version)


    # ====== f 基础配置文件和当前过程的配置文件
    base_conf_dict = pm2.get_base_conf()
    proc_conf_dict = pm2.get_process_conf()[cur_proc1]


    # ====== f 以下是常用的一些配置
    # f1 差集流动数量
    task_per_time = proc_conf_dict['task_per_time']
    # f2 上一步(from)
    last_proc = proc_conf_dict['last_proc']
    # f3 结果存放位置(to,默认是当前流程,当前过程的文件夹下面)
    result_to_process = proc_conf_dict['result_to_process']
    result_to_proc = proc_conf_dict['result_to_proc']

    # ==================  G  变量获取 Variables Fetch   ==================
    # ====== g 根据从F步获得的配置参数,获取下一步核心逻辑所需要的变量
    # 略...


    # ==================  H  差集检查 Gap Check   ==================
    # ====== h 根据主任务/子任务两种方式进行差集的检查,auto情况下,具有差集才会执行
    # ====== h 这步相当于, if exists, then execute【存在差集才执行】
    
    # ====== H 此步相当于是对可能要执行的候选集进行了一轮筛选


    # >>>>>>>>>>>>>>>>>>  对每一个符合差集检查结果的任务循环 For Every Possible Task  >>>>>>>>>>>>>>>>>>

    # ==================  I  元数据交互 Meta Interact   ==================
    # ====== i 检查每一个主任务/子任务的元数据【存在元数据才执行】
    # ====== i 有时候需要对元数据进行进一步的判断再决定是否执行【判定元数据再执行】
    # ====== i 标记本步的数据在交付后是否要删除
    # ====== i 标记其他以任务为中心的元数据
    


    # ==================  J  核心逻辑 Core Logics   ==================
    # ====== j 这一步可以理解为是一个RS(RuleSet),给到input后会生成固定的output(内存中)
    # some ruleset


    # ==================  K  元数据更新 Meta Updating   ==================
    # ====== k 根据核心逻辑的执行状态,标记任务元数据
    # ====== k 将一些运行时的状态,暂存到redis(持久化的位置是runtime,这里的持久化不是redis自带的,而是自己指定)

这么看起来,一个过程步也还是比较繁琐的。后续通过PM, 结合Web交互可以做进一步的增强:在确保逻辑完整严密的情况下,尽量减少靠人来记忆和管理这个长链条。

以上就是本次实现的主要逻辑,总体上来说还是不错。再进行一次优化,时耗比应该就会接近1了。下一次还是基于单个PM进行优化, 通过规范化模板减少在花关系梳理的精力。再往后,达到多个项目的联合管理,跨制程,跨流程协作可以把时耗比降到0.7以下。

PM最大的价值是在保证质量的前提下,进行可线性扩张的效率提升。

目前看来,PM是可以实现几乎0联合调试的,通过不断细化和规范过程步模板,我们可以把功能的实现以众包的方式实现。假设原来一个项目,一个人完成需要1个月。如果一个人花10天可以定义好各流程以及组成的过程步,将需要处理的过程步发给30个人,2天完成后进行合并。就算加上培训和校正的开销3天,那么只需要15天就可以完成项目。效果和一个人完成的几乎无差别

更有意思的是,因为设计是少数人实现的,所以可以做好通盘规划。可变参数会抽象出来,交给遗传算法进行大规模搜索。也就是说,功能得到了极大的增强。

而且这些流程、过程未来是可以复用的,成本将会随着时间的推移剧烈下降

4 其他

4.1 一些想法

目前的精力主要放在A-goes->B的goes上,其实A和B是我们关心的结果

从图的观点上看,有点和边。目前我们主要只考虑边,而点是被忽略的。这是因为在大部分的情况下,流程固定了,对应的点也就固定了。这样简单,但不够灵活

对于点的描述,或者说数据(可以想象为一个pkl文件)至少可以分为结构和内容两部分。

例如,我们有十条文本要处理。那么这十条文本可以通过列表(结构)来存储,每个文本(内容)都是一个字符(数据类型)。

如果我们抽象去描述这种数据,例如ListOfStr,我们要做的操作是对这十条文本进行清洗,那么就可以描述为:

A(ListOfStr) -[Clean Process]->B(ListOfStr)

这样就非常清晰了。当然,未来进一步地,我们可以用更精细的分类去表示这些类型,用对象管理,想不起来的时候甚至可以sample一个例子看看。

4.2 接下来做什么

4.2.1 数据库PM项目

计划建立一个「数据库交互」项目(DBM), 里面使用PM2对各种数据库管理方法进行规范。首先是完成neo4j,这样就可以很快的应用图库来加强PM2。

每种不同的数据库就是不同的schema, 通过process来容纳各总组合操作。这个项目比较特殊,因为按道理本来应该使用RS来完成这些功能(纯内存操作)。但之前做的几个对象LittleMongo,LittleNeo4j这些感觉下来,有些模式没想清楚。

所以希望通过PM项目非常清晰的表达这些操作,成熟后再封装为RS。

4.2.2 使用Redis加强过程管理

有一些实时统计,如果通过手工的方式逐个声明会比较费事。在过程中如果使用PM将内容实时的更新到Redis中,会很方便:

  • 1 因为在内存中,不同的流程要使用很简单,也很快
  • 2 只需要按逻辑从数据库中取数,不需要时候再想
  • 3 每个需要被统计的过程都可以直接写进去

同样,增加Redis会有一个点要注意:加大了系统的复杂性。原本PM不依赖其他的任何系统(文件系统是默认的),所以在使用Redis加强的时候,要考虑可选项。有Redis更好,没有Redis也可以用

上一篇:数据结构 -- 001 绪论


下一篇:【计题01组001号】LeetCode刷题笔记001