datax生产环境启动运行是通过datax.py启动的,如下:
$ python datax.py job/{YOUR_JOB.json}
这篇文章就是打算解读下datax.py这个源码。
我们从main函数开始,沿着程序的执行流程慢慢解读。
if __name__ == "__main__":
printCopyright() //打印版权信息
parser = getOptionParser() //获取参数解析器
options, args = parser.parse_args(sys.argv[1:]) //解析参数
if options.reader is not None and options.writer is not None:
//如果解析后,入参的 reader和writer不为空,在从github上构建出一个 json的样例模板
generateJobConfigTemplate(options.reader,options.writer)
sys.exit(RET_STATE['OK'])
if len(args) != 1:
parser.print_help()
sys.exit(RET_STATE['FAIL'])
startCommand = buildStartCommand(options, args) //构建job命令
# print startCommand
child_process = subprocess.Popen(startCommand, shell=True) //启动java子进程执行真正的命令(也就是job进程)
register_signal()
(stdout, stderr) = child_process.communicate()
sys.exit(child_process.returncode)
我代码里加了注释,可以看到步骤都很清晰。一共有四个大步骤:
- 1.打印datax版权信息
- 2.获取参数解析器解析参数
- 3.构建启动命令
- 4.启动java子进程
下面依次展开这个4个流程详细解读。
打印版权信息
这个简单,省略。
获取参数解析器解析参数
def getOptionParser():
usage = "usage: %prog [options] job-url-or-path"
parser = OptionParser(usage=usage)
prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
"Normal user use these options to set jvm parameters, job runtime mode etc. "
"Make sure these options can be used in Product Env.")
prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
help="Set job unique id when running by Distribute/Local Mode.")
prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
action="store", default="standalone",
help="Set job runtime mode such as: standalone, local, distribute. "
"Default mode is standalone.")
prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
action="store", dest="params",
help='Set job parameter, eg: the source tableName you want to set it by command, '
'then you can use like this: -p"-DtableName=your-table-name", '
'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
'Note: you should config in you job tableName with ${tableName}.')
prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
action="store", dest="reader",type="string",
help='View job config[reader] template, eg: mysqlreader,streamreader')
prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
action="store", dest="writer",type="string",
help='View job config[writer] template, eg: mysqlwriter,streamwriter')
parser.add_option_group(prodEnvOptionGroup)
devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
"Developer use these options to trace more details of DataX.")
devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
help="Set to remote debug mode.")
devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
default="info", help="Set log level such as: debug, info, all etc.")
parser.add_option_group(devEnvOptionGroup)
return parser
这里使用python内置的OptionParser来构建parser,通过usage我们知道执行命令的姿势是
datax.py [options] job-url-or-path
options就再接下来的代码中通过add_option来添加。比如我们可以用类似如下的方式执行:
python datax.py -r txtReader -w txtFileWriter
注意到上面的示例并没有指定json文件,因为datax会自动从github拉取对应插件的json的模版给我们,这块的处理正是下面这个代码:
if options.reader is not None and options.writer is not None:
generateJobConfigTemplate(options.reader,options.writer)
构建启动命令
def buildStartCommand(options, args):
commandMap = {}
tempJVMCommand = DEFAULT_JVM
if options.jvmParameters:
tempJVMCommand = tempJVMCommand + " " + options.jvmParameters
if options.remoteDebug:
tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
print 'local ip: ', getLocalIp()
if options.loglevel:
tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))
if options.mode:
commandMap["mode"] = options.mode
# jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
jobResource = args[0]
if not isUrl(jobResource):
jobResource = os.path.abspath(jobResource)
if jobResource.lower().startswith("file://"):
jobResource = jobResource[len("file://"):]
jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
if options.params:
jobParams = jobParams + " " + options.params
if options.jobid:
commandMap["jobid"] = options.jobid
commandMap["jvm"] = tempJVMCommand
commandMap["params"] = jobParams
commandMap["job"] = jobResource
return Template(ENGINE_COMMAND).substitute(**commandMap)
流程如下:
- 处理jvm参数
- 处理datax参数(要执行的json,日志等)
- 组装java命令
可以通过
print startCommand
打印最终的命令看看,就是一个标准的java命令。类似下面这种:
# java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\xxxx\github\DataX\target\datax\datax/log -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\xxxx\github\DataX\target\datax\datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=D:\xxxx\github\DataX\target\datax\datax -Dlogback.configurationFile=D:\xxxx\github\DataX\target\datax\datax/conf/logback.xml -classpath D:\xxxx\github\DataX\target\datax\datax/lib/* -Dlog.file.name=x\datax\job\job_json com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job D:\xxxx\github\DataX\target\datax\datax\job\job.json