Azkaban 自定义邮件内容以及格式 源码修改

azkaban 为邮件告警的邮件内容提供了一个接口azkaban.executor.mail.MailCreator
如下:

public interface MailCreator {
//生成第一次失败时发送的邮件内容
  boolean createFirstErrorMessage(ExecutableFlow flow,
      EmailMessage message, String azkabanName, String scheme,
      String clientHostname, String clientPortNumber);
//生成任务失败时发送的邮件内容
  boolean createErrorEmail(ExecutableFlow flow, List<ExecutableFlow> pastExecutions,
      EmailMessage message, String azkabanName, String scheme, String clientHostname,
      String clientPortNumber, String... reasons);
//生成任务成功时发送的邮件信息
  boolean createSuccessEmail(ExecutableFlow flow, EmailMessage message,
      String azkabanName, String scheme, String clientHostname,
      String clientPortNumber);
//生成更新任务状态失败时邮件告警信息
  boolean createFailedUpdateMessage(List<ExecutableFlow> flows, Executor executor,
      ExecutorManagerException updateException, EmailMessage message,
      String azkabanName, String scheme, String clientHostname,
      String clientPortNumber);
}

azkaban默认的邮件是azkaban.executor.mail.DefaultMailCreator生成的:
Azkaban 自定义邮件内容以及格式 源码修改
也就是说我们要自定义修改邮件格式内容 就需要实现一下这个MailCreator接口:
这里我们完全可以仿照DefaultMailCreator写一个,只是在关键方法内写我们自己的逻辑:
我这里把自定义的mailCreator名称定义为“simple”

public class SimpleMailCreator implements MailCreator {
  public static final String SIMPLE_MAIL_CREATOR = "simple";
  private static final HashMap<String, MailCreator> registeredCreators = new HashMap<>();
  private static final SimpleMailCreator simpleCreator;

  static {
    simpleCreator = new SimpleMailCreator();
    registerCreator(SIMPLE_MAIL_CREATOR , simpleCreator);
  }

  public static void registerCreator(final String name, final MailCreator creator) {
    registeredCreators.put(name, creator);
  }

  public static MailCreator getCreator(final String name) {
    MailCreator creator = registeredCreators.get(name);
    if (creator == null) {
      creator = simpleCreator;
    }
    return creator;
  }
  @Override
  public boolean createFirstErrorMessage(final ExecutableFlow flow,
      final EmailMessage message, final String azkabanName, final String scheme,
      final String clientHostname, final String clientPortNumber) {
      ...(写自己的逻辑)
  }

  @Override
  public boolean createErrorEmail(final ExecutableFlow flow, final List<ExecutableFlow>
      pastExecutions, final EmailMessage message, final String azkabanName, final String scheme,
      final String clientHostname, final String clientPortNumber, final String... reasons) {
       ...(写自己的逻辑)
  }

  @Override
  public boolean createSuccessEmail(final ExecutableFlow flow, final EmailMessage message,
      final String azkabanName, final String scheme, final String clientHostname,
      final String clientPortNumber) {
       ...(写自己的逻辑)
  }

  @Override
  public boolean createFailedUpdateMessage(final List<ExecutableFlow> flows,
      final Executor executor, final ExecutorManagerException updateException,
      final EmailMessage message, final String azkabanName,
      final String scheme, final String clientHostname, final String clientPortNumber) {
       ...(写自己的逻辑)
  }
}

自定义的MailCreator写好后,我们需要找到在哪里调用MailCreator的代码位置:

经过一番查找后,看到在azkaban.utils.Emailer处调用了MailCreator:

@Override
public void alertOnFirstError(final ExecutableFlow flow) {
  final EmailMessage message = this.messageCreator.createMessage();
  final MailCreator mailCreator = getMailCreator(flow);
  final boolean mailCreated = mailCreator.createFirstErrorMessage(flow, message, this.azkabanName,
      this.scheme, this.clientHostname, this.clientPortNumber);
  sendEmail(message, mailCreated,
      "first error email message for execution " + flow.getExecutionId());
}
@Override
public void alertOnError(final ExecutableFlow flow, final String... extraReasons) {
  final EmailMessage message = this.messageCreator.createMessage();
  final MailCreator mailCreator = getMailCreator(flow);
  List<ExecutableFlow> last72hoursExecutions = new ArrayList<>();

  if (flow.getStartTime() > 0) {
    final long startTime = flow.getStartTime() - Duration.ofHours(72).toMillis();
    try {
      last72hoursExecutions = this.executorLoader.fetchFlowHistory(flow.getProjectId(), flow
          .getFlowId(), startTime);
    } catch (final ExecutorManagerException e) {
      logger.error("unable to fetch past executions", e);
    }
  }

  final boolean mailCreated = mailCreator.createErrorEmail(flow, last72hoursExecutions, message,
      this.azkabanName, this.scheme, this.clientHostname, this.clientPortNumber, extraReasons);
  sendEmail(message, mailCreated, "error email message for execution " + flow.getExecutionId());
}

@Override
public void alertOnSuccess(final ExecutableFlow flow) {
  final EmailMessage message = this.messageCreator.createMessage();
  final MailCreator mailCreator = getMailCreator(flow);
  final boolean mailCreated = mailCreator.createSuccessEmail(flow, message, this.azkabanName,
      this.scheme, this.clientHostname, this.clientPortNumber);
  sendEmail(message, mailCreated, "success email message for execution " + flow.getExecutionId());
}

从以上代码中我们可以看出来,不管成功还是失败告警,调用MailCreator都有一个关键的方法getMailCreator(flow)

private MailCreator getMailCreator(final ExecutableFlow flow) {
  final String name = flow.getExecutionOptions().getMailCreator();
  return getMailCreator(name);
}

private MailCreator getMailCreator(final String name) {
  final MailCreator mailCreator = DefaultMailCreator.getCreator(name);
  logger.debug("ExecutorMailer using mail creator:" + mailCreator.getClass().getCanonicalName());
  return mailCreator;
}

可以看出Azkaban每次都会去获取默认的MailCreator ,这一点很不友好,所以这点我们就需要修改源码:
Azkaban 自定义邮件内容以及格式 源码修改

定义一个变量mailCreatorType,然后这个变量从配置文件中获取:

this.mailCreatorType = props.getString("mail.creator", "deault");

然后定义重载方法getMailCreator()

private MailCreator getMailCreator() {
    if ("simple".equals(mailCreatorType)) {
        return new SimpleMailCreator();
    } else {
        return new DefaultMailCreator();
    }
}

然后修改azkaban.utils.Emailer中的如下方法,将getMailCreator(flow) 替换为我们自己写的 getMailCreator()即可:
Azkaban 自定义邮件内容以及格式 源码修改
然后重新编译,打包,并部署运行即可。

上一篇:Android app中这样用flow更方便-刷新token获取数据


下一篇:kotlin flow如何结合Android使用场景