诺禾聊聊rocketmq-mysql的诺禾BinlogPositionManager

本文主要研究一下rocketmq-mysql的BinlogPositionManager

诺禾聊聊rocketmq-mysql的诺禾BinlogPositionManager点击添加图片描述(最多60个字) 编辑

 

BinlogPositionManager

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java

public class BinlogPositionManager {

  private Logger logger = LoggerFactory.getLogger(BinlogPositionManager.class);

?

  private DataSource dataSource;

  private Config config;

?

  private String binlogFilename;

  private Long nextPosition;

?

  public BinlogPositionManager(Config config, DataSource dataSource) {

      this.config = config;

      this.dataSource = dataSource;

  }

?

  public void initBeginPosition() throws Exception {

?

      if (config.startType == null || config.startType.equals("DEFAULT")) {

          initPositionDefault();

?

      } else if (config.startType.equals("NEW_EVENT")) {

          initPositionFromBinlogTail();

?

      } else if (config.startType.equals("LAST_PROCESSED")) {

          initPositionFromMqTail();

?

      } else if (config.startType.equals("SPECIFIED")) {

          binlogFilename = config.binlogFilename;

          nextPosition = config.nextPosition;

?

      }

?

      if (binlogFilename == null || nextPosition == null) {

          throw new Exception("binlogFilename | nextPosition is null.");

      }

  }

?

  //......

?

  public String getBinlogFilename() {

      return binlogFilename;

  }

?

  public Long getPosition() {

      return nextPosition;

  }

}

  • BinlogPositionManager提供了initBeginPosition、getBinlogFilename、getPosition方法;其中initBeginPosition方法根据config.startType的类型来执行不同逻辑,DEFAULT的是执行initPositionDefault,NEW_EVENT是执行initPositionFromBinlogTail,LAST_PROCESSED是执行initPositionFromMqTail,SPECIFIED是设置指定的binlogFilename及nextPosition

initPositionDefault

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java

public class BinlogPositionManager {

?

//......

?

  private void initPositionDefault() throws Exception {

?

      try {

          initPositionFromMqTail();

      } catch (Exception e) {

          logger.error("Init position from mq error.", e);

      }

?

      if (binlogFilename == null || nextPosition == null) {

          initPositionFromBinlogTail();

      }

?

  }

?

  //......

?

}

  • initPositionDefault执行的是initPositionFromMqTail方法,在binlogFilename或者nextPosition为null时还会执行initPositionFromBinlogTail

initPositionFromBinlogTail

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java

public class BinlogPositionManager {

?

//......

?

  private void initPositionFromBinlogTail() throws SQLException {

      String sql = "SHOW MASTER STATUS";

?

      Connection conn = null;

      ResultSet rs = null;

?

      try {

          Connection connection = dataSource.getConnection();

          rs = connection.createStatement().executeQuery(sql);

?

          while (rs.next()) {

              binlogFilename = rs.getString("File");

              nextPosition = rs.getLong("Position");

          }

?

      } finally {

?

          if (conn != null) {

              conn.close();

          }

          if (rs != null) {

              rs.close();

          }

      }

?

  }

?

  //......

?

}

  • initPositionFromBinlogTail方法通过SHOW MASTER STATUS来获取最新的binlogFilename及nextPosition

initPositionFromMqTail

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java

public class BinlogPositionManager {

?

//......

?

  private void initPositionFromMqTail() throws Exception {

      DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP");

      consumer.setNamesrvAddr(config.mqNamesrvAddr);

      consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));

      consumer.start();

?

      Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(config.mqTopic);

      MessageQueue queue = queues.iterator().next();

?

      if (queue != null) {

          Long offset = consumer.maxOffset(queue);

          if (offset > 0)

              offset--;

?

          PullResult pullResult = consumer.pull(queue, "*", offset, 100);

?

          if (pullResult.getPullStatus() == PullStatus.FOUND) {

              MessageExt msg = pullResult.getMsgFoundList().get(0);

              String json = new String(msg.getBody(), "UTF-8");

?

              JSONObject js = JSON.parseObject(json);

              binlogFilename = (String) js.get("binlogFilename");

              nextPosition = js.getLong("nextPosition");

          }

      }

?

  }

?

  //......

?

}

  • initPositionFromMqTail方法创建DefaultMQPullConsumer,然后从指定的topic拉取MessageQueue,然后通过consumer.maxOffset(queue)获取offset,再执行pull,找到第一个msg,然后设置binlogFilename及nextPosition

小结

BinlogPositionManager提供了initBeginPosition、getBinlogFilename、getPosition方法;其中initBeginPosition方法根据config.startType的类型来执行不同逻辑,DEFAULT的是执行initPositionDefault,NEW_EVENT是执行initPositionFromBinlogTail,LAST_PROCESSED是执行initPositionFromMqTail,SPECIFIED是设置指定的binlogFilename及nextPosition

doc

  • BinlogPositionManager

 

诺禾聊聊rocketmq-mysql的诺禾BinlogPositionManager

上一篇:java 8.0Mysql 助手类


下一篇:mybatisplus中使用@TableField注解类型转换数据库数组与json类型字段