背景: 现有基于anaconda3开发的引擎脚本,用于动态识别处理数据;
场景: 现在这个引擎是服务化,通过网络请求来调用; 我们的调用端是一个容器化的服务,因为引擎的入参出参数据量大,所以现在的服务化方案需要修改,本地化此引擎来减少网络传输的时间,达到优化的目的;
方案思考:
方案一: 将引擎集成到服务镜像中,这样可以本地调用;
方案二: 将引擎打成单独的镜像,放到容器的宿主机上,将存储共同挂载到相同目录,保证容器与引擎容器共享存储,再通过flask暴露接口来启动这个容器;
方案结论:
-
舍弃了方案一,因为引擎太大了,单独的镜像达到快3G,若集成到服务镜像中,会导致服务镜像过大;且在相同容器中运行,并发性能也有一定的瓶颈(对于服务容器有资源限制);
-
在方案二中更加的灵活,可以将引擎本地化模块独立开来;因为容器化,所以对于数据处理效率更高,并发性能也更好(可以同时运行多个容器);
实践:
-
首先我们需要将引擎打成镜像
FROM continuumio/anaconda3:latest COPY /guanlian_v4.0_for_engine_local /engine ENTRYPOINT ["python", "/engine/process_bm_local.py"]
-
首先基于anaconda3作为基础镜像;
-
然后将引擎脚本复制到容器中;
-
使用ENTRYPOINT主要是为了传参,因为我们需要将入参文件的路径传进去,引擎才能知道位置来进行处理;而在引擎运行处理完毕后就会关闭当前容器。
通过命令打成镜像
docker build -t engine:v1 .
-
-
启动容器进行功能测试
docker run -v /home/test:/engine_data -it engine:v1 /engine_data/in.txt
查看log,引擎运行正常,最后输出结果:
finish in.txt in.txt_guanlian.log in.txt_out.json
验证得知容器运行OK
-
建立一个接口,来执行docker启动命令,我们使用flask(简单)
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: yxhe # Date: 2021/6/9/0009 21:38 # ---------------------------------------------------------- import os import threading from flask import Flask, request app = Flask(__name__) @app.route('/engine_gl/execute', methods=['POST']) def translate(): folder = request.args.get("folder") file_name = request.args.get("iname") base_work_dir = request.args.get("workDir") in_path = base_work_dir + '/' + folder thread = threading.Thread(target=exe, args=(in_path, file_name, )) thread.start() return 'ok' def exe(in_path, file_name): os.system("docker run -v " + in_path + ":/engine_data -i engine:v1 /engine_data/" + file_name) os.system("touch " + in_path + "/finish") if __name__ == '__main__': app.run(host="0.0.0.0", port=8080, debug=False)
-
os.system() 方法会阻塞主线程,导致当前请求时间过长。最终可能请求超时;所以这里使用线程异步处理这段逻辑;
-
异步之后,调用方就无法知道程序什么时候跑完;因为需要进文件夹再拿取输出的文件数据;所以这边写入了finish文件来标识输出文件完毕,可以获取;
-
这里遇见一个问题,脚本运行的时候
nohup python test.py &
通过postman调用的时候,发现报错如下:
the input device is not a TTY
这个错误是因为我们启动docker命令的时候添加了-t配置,去掉即可
-
-
服务内部封装本地化调用代码;
@Component @Slf4j public class AssociateEngine { @Value("${engine.associate.mount}") private String path; @Value("${engine.associate.host-mount}") private String host_mount; @Value("${engine.associate.url}") private String engine_url; @Value("${engine.associate.polling-times}") private int polling_times; @Value("${engine.associate.polling-interval}") private int polling_interval; // 30s @Value("${engine.associate.file-in-name}") private String in_name; @Value("${engine.associate.file-out-suffix}") private String out_suffix; public String execute(String json) { try { // 首先创建一个工作目录文件夹 String folder = String.valueOf(System.currentTimeMillis()); String workDir = createWorkDir(this.splicingPath(true, path, folder)); // 将输入参数json写入文件中 writeStringToFile(json, this.splicingPath(false, workDir, in_name)); log.info("本地化引擎,数据已写入容器{}, 宿主机:{}", this.splicingPath(false, workDir, in_name), this.splicingPath(false, host_mount, folder, in_name)); // 调用Python脚本中的api,触发docker容器运行 invoke(engine_url + "?folder=%s&iname=%s&workDir=%s", folder, in_name, this.splicingPath(true, host_mount)); // 这里是一个轮询,来获取执行结果,以查询到finish文件为结束;或者以轮询次数超过最大限制次数为结束 String pollingResultPath = pollingResult(workDir, in_name); // 获取到本地化引擎输出的文件结果目录路径 log.info("本地化引擎,获取引擎执行结果{}", pollingResultPath); if (pollingResultPath == null) { log.error("引擎调用失败!"); return null; } // 读取输出的文件结果并返回 return readEngineResult(pollingResultPath); } catch (Exception e) { e.printStackTrace(); log.error("引擎报错~ ,errorInfo: {}", e.getMessage(), e); } return null; } private String readEngineResult(String pollingResultPath) throws IOException { FileInputStream fin = new FileInputStream(pollingResultPath); InputStreamReader reader = new InputStreamReader(fin); BufferedReader buffReader = new BufferedReader(reader); StringBuilder stringBuilder = new StringBuilder(); String line; while((line = buffReader.readLine())!=null){ stringBuilder.append(line); } buffReader.close(); return stringBuilder.toString(); } private String pollingResult(String workDir, String inName) throws InterruptedException { int times = 1; // 轮询 for(;;) { File file = new File(workDir); File[] fs = file.listFiles(); if (fs == null) { log.error("文件目录错误,目录为空!"); return null; } for (File f : fs) { if (f.getName().equals("finish")) { return this.splicingPath(false, workDir, inName + out_suffix); } } log.info("本地化引擎,获取引擎执行结果第{}次失败。。。", times); if (times >= polling_times) { log.error("轮询时间到,未查询到正确结果!!"); return null; } times++; Thread.sleep(polling_interval); } } private String createWorkDir(String path) { File file = new File(path); if (!file.exists()) { boolean mkdirs = file.mkdirs(); if (mkdirs) { return path; } } return path; } private void writeStringToFile(String json, String filePath) throws IOException { OutputStream out = new FileOutputStream(filePath); out.write(json.getBytes()); out.flush(); out.close(); } private void invoke(String url, String folder, String inName, String hostMount) { HttpClientUtil.post(String.format(url, folder, inName, hostMount)); } private String splicingPath(boolean includeFinish, String... paths) { StringBuilder stringBuilder = new StringBuilder(); for (String path : paths) { stringBuilder.append(path); if (!path.endsWith(File.separator)) { stringBuilder.append(File.separator); } } String finalPath = stringBuilder.toString(); if (includeFinish) { return finalPath; } return finalPath.substring(0, finalPath.length()-1); } }
结论
至此解决引擎本地化的问题;