文章目录
azkaban任务日志处理方式
azkaban是将任务的运行日志存储在mysql 表中的,根据任务的exec_id
,name
,attempt
,即可获取对应任务的日志,当日志文件比较大时,日志将按照50KB的规格来进行分段储存,每一段日志都插入一条数据,详细内容如下文。
azkaban日志mysql表信息
mysql表的schema如下:
CREATE TABLE `execution_logs` (
`exec_id` int(11) NOT NULL,
`name` varchar(640) NOT NULL,
`attempt` int(11) NOT NULL,
`enc_type` tinyint(4) DEFAULT NULL,
`start_byte` int(11) NOT NULL,
`end_byte` int(11) DEFAULT NULL,
`log` longblob,
`upload_time` bigint(20) DEFAULT NULL,
PRIMARY KEY (`exec_id`,`name`,`attempt`,`start_byte`),
KEY `ex_log_attempt` (`exec_id`,`name`,`attempt`),
KEY `ex_log_index` (`exec_id`,`name`),
KEY `ex_log_upload_time` (`upload_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- exec_id 任务的执行id
- name 任务flow的名称
- attempt 重试次数
- enc_type 日志编码类型(plain/gzip)
- log 日志文件内容
- upload_time 日志上传时间
源码中逻辑处理流程
private void uploadLogFile(final DatabaseTransOperator transOperator, final int execId,
final String name,
final int attempt, final File[] files, final EncodingType encType)
throws SQLException {
// 50K buffer... if logs are greater than this, we chunk.
// However, we better prevent large log files from being uploaded somehow
final byte[] buffer = new byte[50 * 1024];
int pos = 0;
int length = buffer.length;
int startByte = 0;
try {
for (int i = 0; i < files.length; ++i) {
final File file = files[i];
final BufferedInputStream bufferedStream =
new BufferedInputStream(new FileInputStream(file));
try {
int size = bufferedStream.read(buffer, pos, length);
while (size >= 0) {
if (pos + size == buffer.length) {
// Flush here.
uploadLogPart(transOperator, execId, name, attempt, startByte,
startByte + buffer.length, encType, buffer, buffer.length);
pos = 0;
length = buffer.length;
startByte += buffer.length;
} else {
// Usually end of file.
pos += size;
length = buffer.length - pos;
}
size = bufferedStream.read(buffer, pos, length);
}
} finally {
IOUtils.closeQuietly(bufferedStream);
}
}
// Final commit of buffer.
if (pos > 0) {
uploadLogPart(transOperator, execId, name, attempt, startByte, startByte
+ pos, encType, buffer, pos);
}
} catch (final SQLException e) {
logger.error("Error writing log part.", e);
throw new SQLException("Error writing log part", e);
} catch (final IOException e) {
logger.error("Error chunking.", e);
throw new SQLException("Error chunking", e);
}
}
private void uploadLogPart(final DatabaseTransOperator transOperator, final int execId,
final String name,
final int attempt, final int startByte, final int endByte,
final EncodingType encType,
final byte[] buffer, final int length)
throws SQLException, IOException {
final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs "
+ "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
+ "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";
byte[] buf = buffer;
if (encType == EncodingType.GZIP) {
buf = GZIPUtils.gzipBytes(buf, 0, length);
} else if (length < buf.length) {
buf = Arrays.copyOf(buffer, length);
}
transOperator.update(INSERT_EXECUTION_LOGS, execId, name, attempt,
encType.getNumVal(), startByte, startByte + length, buf, DateTime.now()
.getMillis());
}