azkaban 为邮件告警的邮件内容提供了一个接口azkaban.executor.mail.MailCreator
如下:
public interface MailCreator {
//生成第一次失败时发送的邮件内容
boolean createFirstErrorMessage(ExecutableFlow flow,
EmailMessage message, String azkabanName, String scheme,
String clientHostname, String clientPortNumber);
//生成任务失败时发送的邮件内容
boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow> pastExecutions,
EmailMessage message, String azkabanName, String scheme, String clientHostname,
String clientPortNumber, String... reasons);
//生成任务成功时发送的邮件信息
boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
String azkabanName, String scheme, String clientHostname,
String clientPortNumber);
//生成更新任务状态失败时邮件告警信息
boolean createFailedUpdateMessage(List<ExecutableFlow> flows, Executor executor,
ExecutorManagerException updateException, EmailMessage message,
String azkabanName, String scheme, String clientHostname,
String clientPortNumber);
}
azkaban默认的邮件是azkaban.executor.mail.DefaultMailCreator
生成的:
也就是说我们要自定义修改邮件格式内容 就需要实现一下这个MailCreator接口:
这里我们完全可以仿照DefaultMailCreator写一个,只是在关键方法内写我们自己的逻辑:
我这里把自定义的mailCreator名称定义为“simple”
public class SimpleMailCreator implements MailCreator {
public static final String SIMPLE_MAIL_CREATOR = "simple";
private static final HashMap<String, MailCreator> registeredCreators = new HashMap<>();
private static final SimpleMailCreator simpleCreator;
static {
simpleCreator = new SimpleMailCreator();
registerCreator(SIMPLE_MAIL_CREATOR , simpleCreator);
}
public static void registerCreator(final String name, final MailCreator creator) {
registeredCreators.put(name, creator);
}
public static MailCreator getCreator(final String name) {
MailCreator creator = registeredCreators.get(name);
if (creator == null) {
creator = simpleCreator;
}
return creator;
}
@Override
public boolean createFirstErrorMessage(final ExecutableFlow flow,
final EmailMessage message, final String azkabanName, final String scheme,
final String clientHostname, final String clientPortNumber) {
...(写自己的逻辑)
}
@Override
public boolean createErrorEmail(final ExecutableFlow flow, final List<ExecutableFlow>
pastExecutions, final EmailMessage message, final String azkabanName, final String scheme,
final String clientHostname, final String clientPortNumber, final String... reasons) {
...(写自己的逻辑)
}
@Override
public boolean createSuccessEmail(final ExecutableFlow flow, final EmailMessage message,
final String azkabanName, final String scheme, final String clientHostname,
final String clientPortNumber) {
...(写自己的逻辑)
}
@Override
public boolean createFailedUpdateMessage(final List<ExecutableFlow> flows,
final Executor executor, final ExecutorManagerException updateException,
final EmailMessage message, final String azkabanName,
final String scheme, final String clientHostname, final String clientPortNumber) {
...(写自己的逻辑)
}
}
自定义的MailCreator写好后,我们需要找到在哪里调用MailCreator的代码位置:
经过一番查找后,看到在azkaban.utils.Emailer
处调用了MailCreator:
@Override
public void alertOnFirstError(final ExecutableFlow flow) {
final EmailMessage message = this.messageCreator.createMessage();
final MailCreator mailCreator = getMailCreator(flow);
final boolean mailCreated = mailCreator.createFirstErrorMessage(flow, message, this.azkabanName,
this.scheme, this.clientHostname, this.clientPortNumber);
sendEmail(message, mailCreated,
"first error email message for execution " + flow.getExecutionId());
}
@Override
public void alertOnError(final ExecutableFlow flow, final String... extraReasons) {
final EmailMessage message = this.messageCreator.createMessage();
final MailCreator mailCreator = getMailCreator(flow);
List<ExecutableFlow> last72hoursExecutions = new ArrayList<>();
if (flow.getStartTime() > 0) {
final long startTime = flow.getStartTime() - Duration.ofHours(72).toMillis();
try {
last72hoursExecutions = this.executorLoader.fetchFlowHistory(flow.getProjectId(), flow
.getFlowId(), startTime);
} catch (final ExecutorManagerException e) {
logger.error("unable to fetch past executions", e);
}
}
final boolean mailCreated = mailCreator.createErrorEmail(flow, last72hoursExecutions, message,
this.azkabanName, this.scheme, this.clientHostname, this.clientPortNumber, extraReasons);
sendEmail(message, mailCreated, "error email message for execution " + flow.getExecutionId());
}
@Override
public void alertOnSuccess(final ExecutableFlow flow) {
final EmailMessage message = this.messageCreator.createMessage();
final MailCreator mailCreator = getMailCreator(flow);
final boolean mailCreated = mailCreator.createSuccessEmail(flow, message, this.azkabanName,
this.scheme, this.clientHostname, this.clientPortNumber);
sendEmail(message, mailCreated, "success email message for execution " + flow.getExecutionId());
}
从以上代码中我们可以看出来,不管成功还是失败告警,调用MailCreator
都有一个关键的方法getMailCreator(flow)
:
private MailCreator getMailCreator(final ExecutableFlow flow) {
final String name = flow.getExecutionOptions().getMailCreator();
return getMailCreator(name);
}
private MailCreator getMailCreator(final String name) {
final MailCreator mailCreator = DefaultMailCreator.getCreator(name);
logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
return mailCreator;
}
可以看出Azkaban每次都会去获取默认的MailCreator ,这一点很不友好,所以这点我们就需要修改源码:
定义一个变量mailCreatorType
,然后这个变量从配置文件中获取:
this.mailCreatorType = props.getString("mail.creator", "deault");
然后定义重载方法getMailCreator()
:
private MailCreator getMailCreator() {
if ("simple".equals(mailCreatorType)) {
return new SimpleMailCreator();
} else {
return new DefaultMailCreator();
}
}
然后修改azkaban.utils.Emailer
中的如下方法,将getMailCreator(flow)
替换为我们自己写的 getMailCreator()
即可:
然后重新编译,打包,并部署运行即可。