tungsten抽取和应用mysql binlog

  • 首先举例说明 api的基本使用方式
首先进行配置 , 可以看到源数据库和目的数据库

TungstenProperties tp=new TungstenProperties();
tp.setString(ReplicatorConf.SERVICE_NAME,"test");
tp.setString(ReplicatorConf.ROLE,ReplicatorConf.ROLE_MASTER);
tp.setString(ReplicatorConf.PIPELINES,"master");
tp.setString(ReplicatorConf.PIPELINE_ROOT + ".master","extract");
tp.setString(ReplicatorConf.STAGE_ROOT + ".extract",SingleThreadStageTask.class.toString());
tp.setString(ReplicatorConf.STAGE_ROOT + ".extract.extractor","mysql");
tp.setString(ReplicatorConf.STAGE_ROOT + ".extract.applier","mysql"); String mysqlExtractor=ReplicatorConf.EXTRACTOR_ROOT + ".mysql";
tp.setString(ReplicatorConf.EXTRACTOR_ROOT,"mysql");
tp.setString(mysqlExtractor,MySQLExtractor.class.getName());
tp.setString(mysqlExtractor + ".binlog_dir","/var/lib/mysql");
tp.setString(mysqlExtractor + ".binlog_file_pattern","mysql-bin");
tp.setString(mysqlExtractor + ".host","localhost");
tp.setString(mysqlExtractor + ".user","tungsten");
tp.setString(mysqlExtractor + ".password","secret"); String mysqlApplier=ReplicatorConf.APPLIER_ROOT + ".mysql";
tp.setString(ReplicatorConf.APPLIER_ROOT,"mysql");
tp.setString(mysqlApplier,MySQLApplier.class.getName());
tp.setString(mysqlApplier + ".host","localhost");
tp.setString(mysqlApplier + ".port","3306");
tp.setString(mysqlApplier + ".user","tungsten");
tp.setString(mysqlApplier + ".password","secret");
进行抽取和应用的代码

    ReplicatorRuntime runtime = new ReplicatorRuntime(conf,
new MockOpenReplicatorContext(),
ReplicatorMonitor.getInstance());
runtime.configure(); MySQLExtractor extractor = getMySQLExtractor(runtime);
extractor.setStrictVersionChecking(false);
extractor.prepare(runtime); Applier applier = runtime.getApplier();
applier.prepare(runtime); for (int i = 0; i < 7; i++) //通常这里会循环下去,不断接受event,并且apply
{
DBMSEvent event = extractor.extract();
if (event != null)
{
applier.apply(event, i, true);
}
}
  • 接下来进行一些核心类和源码的分析
进去 extractor.extract() 方法,这个是核心的处理类,可以看到有很多event类型,包括:

AppendBlockLogEvent
TableMapLogEvent
UserVarLogEvent
XidLogEvent
StopLogEvent
RowsLogEvent
RotateLogEvent
BeginLoadQueryLogEvent
ExecuteLoadQueryLogEvent
QueryLogEvent

另外还可处理  begin commit rollback set (autocommit , sql_mode)等命令 以及 ddl语句。

可以获取 serverId , sessionId ,eventID 等信息。


private DBMSEvent extractEvent(BinlogReader position)
throws ReplicatorException, InterruptedException
{ while (true)
{
DBMSEvent dbmsEvent = null;
LogEvent logEvent = processFile(position);
if (logEvent == null)
{
logger.debug("Unknown binlog field, skipping");
continue;
} if (serverId == -1)
serverId = logEvent.serverId; if (startTime == null)
startTime = logEvent.getWhen(); if (logEvent instanceof RowsLogEvent)
{
fragSize += ((RowsLogEvent) logEvent).getEventSize();
} boolean doCommit = false;
boolean doRollback = false; boolean unsafeForBlockCommit = false; if (logEvent.getClass() == QueryLogEvent.class)
{
QueryLogEvent event = (QueryLogEvent) logEvent;
String queryString = event.getQuery();
StatementData statement = new StatementData(queryString); // Extract the charset name if it can be found.
String charsetName = event.getCharsetName();
if (charsetName != null)
statement.addOption(ReplOptionParams.JAVA_CHARSET_NAME,
event.getCharsetName());
if (logger.isDebugEnabled())
logger.debug("Query extracted: " + queryString
+ " charset=" + charsetName); // Parse for SQL metadata and add to the statement.
String query; if (!useBytesForStrings)
query = queryString;
else
{
// Translate only a few bytes, in order to eventually
// detect some keywords.
int len = Math.min(event.getQueryAsBytes().length, 200);
if (charsetName == null)
query = new String(event.getQueryAsBytes(), 0, len);
else
query = new String(event.getQueryAsBytes(), 0, len,
charsetName);
}
SqlOperation sqlOperation = sqlMatcher.match(query);
statement.setParsingMetadata(sqlOperation); // We must commit on DDLs and the like except for BEGIN or
// START TRANSACTION, since they start new transaction at
// the same time
doCommit = !inTransaction || sqlOperation.isAutoCommit();
int operation = sqlOperation.getOperation();
if (operation == SqlOperation.BEGIN)
{
inTransaction = true;
doCommit = false;
// This a a BEGIN statement : buffer session variables
// for following row events if any and skip it /* Adding statement options */
savedOptions.add(new ReplOption("autocommit", event
.getAutocommitFlag()));
savedOptions.add(new ReplOption("sql_auto_is_null",
event.getAutoIsNullFlag()));
savedOptions.add(new ReplOption("foreign_key_checks",
event.getForeignKeyChecksFlag()));
savedOptions.add(new ReplOption("unique_checks", event
.getUniqueChecksFlag()));
savedOptions.add(new ReplOption("sql_mode", event
.getSqlMode()));
。。。。。。 continue;
} if (operation == SqlOperation.COMMIT)
{
// This is a COMMIT statement : dropping it for now
// Temporary workaround for TREP-243
doCommit = true;
inTransaction = !autocommitMode;
}
else if (operation == SqlOperation.ROLLBACK)
{
doRollback = true;
inTransaction = !autocommitMode; }
else
{
// some optimization: it makes sense to check for
// 'CREATE DATABASE' only if we know that it is not
// regular DML - this is a fix for TREP-52 - attempt
// to use DB which hasn't been created yet.
boolean isCreateOrDropDB = sqlOperation.getObjectType() == SqlOperation.SCHEMA;
boolean prependUseDb = !(sqlOperation.isAutoCommit() && isCreateOrDropDB); if (sessionId == -1)
{
// first query in transaction
sessionId = event.getSessionId();
} if (prependUseDb)
{
statement.setDefaultSchema(event.getDefaultDb());
} if (isCreateOrDropDB)
statement.addOption(
StatementData.CREATE_OR_DROP_DB, ""); if (operation == SqlOperation.CREATE
|| operation == SqlOperation.DROP
|| operation == SqlOperation.ALTER
|| operation == SqlOperation.UNRECOGNIZED)
unsafeForBlockCommit = true; statement.setTimestamp(event.getWhen().getTime());
if (!useBytesForStrings)
{
statement.setQuery(queryString);
fragSize += queryString.length();
}
else
{
byte[] bytes = event.getQueryAsBytes();
statement.setQuery(bytes);
fragSize += bytes.length;
} /* Adding statement options */
statement.addOption("autocommit",
event.getAutocommitFlag());
statement.addOption("sql_auto_is_null",
event.getAutoIsNullFlag());
statement.addOption("foreign_key_checks",
event.getForeignKeyChecksFlag());
statement.addOption("unique_checks",
event.getUniqueChecksFlag());
。。。。。。
}
}
。。。。。。
else if (logEvent.getClass() == XidLogEvent.class)
{
logger.debug("Commit extracted: "
+ ((XidLogEvent) logEvent).getXid());
// If there's nothing to commit, just ignore.
// Should happen for InnoDB tables in AUTOCOMMIT mode.
if (!dataArray.isEmpty())
{
doCommit = true;
}
if (rowChangeData != null)
{
doCommit = true;
}
// It seems like there's always explicit COMMIT event if
// transaction is implicitely committed,
// but does transaction start implicitely?
inTransaction = !autocommitMode; if (!doCommit)
{
logger.debug("Clearing Table Map events");
tableEvents.clear();
tableEvents = new HashMap<Long, TableMapLogEvent>();
return new DBMSEmptyEvent(getDBMSEventId(position,
sessionId));
}
}
else if (logEvent.getClass() == StopLogEvent.class)
{
logger.debug("Stop event extracted: "); // TUC-166. MySQL writes a stop event and closes the log
// when the MySQL daemon shuts down cleanly. It does not
// always mean the server is stopped now because we could
// be reading an old log file. We therefore ignore them
// and reread which makes us treat the file like a binlog
// with a missing ROTATE_LOG event.
String stopEventId = getDBMSEventId(position, sessionId);
logger.info("Skipping over server stop event in log: "
+ stopEventId);
}
else if (logEvent.getClass() == RotateLogEvent.class)
{
String newBinlogFilename = ((RotateLogEvent) logEvent)
.getNewBinlogFilename();
logger.debug("Rotate log event: new binlog="
+ newBinlogFilename); // Slave relay logs have master rotate logs that we need
// to ignore. We detect these because they don't match the
// log file pattern.
if (MODE_SLAVE_RELAY.equals(binlogMode)
&& !newBinlogFilename
.startsWith(this.binlogFilePattern))
{
logger.info("Ignored superfluous master rotate log event: file="
+ newBinlogFilename);
}
else
{
// It's real so we need to rotate the log.
position.close();
position.setFileName(((RotateLogEvent) logEvent)
.getNewBinlogFilename());
position.open();
// Kick off an asynchronous scan for old relay logs.
if (useRelayLogs)
purgeRelayLogs(false);
}
}
else if (logEvent.getClass() == TableMapLogEvent.class)
{
logger.debug("got table map event");
// remember last table map event
TableMapLogEvent tableEvent = (TableMapLogEvent) logEvent;
tableEvents.put(tableEvent.getTableId(), tableEvent);
}
else if (logEvent instanceof RowsLogEvent)
{
if (logger.isDebugEnabled())
logger.debug("got rows log event - event size = "
+ ((RowsLogEvent) logEvent).getEventSize());
rowChangeData = new RowChangeData();
RowsLogEvent rowsEvent = (RowsLogEvent) logEvent;
TableMapLogEvent tableEvent = tableEvents.get(rowsEvent
.getTableId());
rowsEvent.processExtractedEvent(rowChangeData, tableEvent);
dataArray.add(rowChangeData);
foundRowsLogEvent = true;
}
else if (logEvent instanceof BeginLoadQueryLogEvent)
{
BeginLoadQueryLogEvent event = (BeginLoadQueryLogEvent) logEvent;
if (prefetchSchemaNameLDI)
{
if (loadDataSchemas == null)
loadDataSchemas = new HashMap<Integer, String>();
loadDataSchemas.put(Integer.valueOf(event.getFileID()),
event.getSchemaName());
}
dataArray.add(new LoadDataFileFragment(event.getFileID(),
event.getData(), event.getSchemaName()));
doFileFragment = true;
}
else if (logEvent instanceof AppendBlockLogEvent)
{
AppendBlockLogEvent event = (AppendBlockLogEvent) logEvent;
String schema = null;
if (prefetchSchemaNameLDI && loadDataSchemas != null)
schema = loadDataSchemas.get(Integer.valueOf(event
.getFileID()));
dataArray.add(new LoadDataFileFragment(event.getFileID(),
event.getData(), schema));
doFileFragment = true;
}
。。。。。。
catch (Exception e)
{
if (runtime.getExtractorFailurePolicy() == FailurePolicy.STOP)
throw new ExtractorException(
"Unexpected failure while extracting event " + position,
e);
else
logger.error("Unexpected failure while extracting event "
+ position, e); }
return null;
}
上面的方法中有个生成event的方法
,如下所示
// Reads the next log from the file.
private LogEvent processFile(BinlogReader position)
throws ReplicatorException, InterruptedException
{
try
{
// Open up the binlog if we have not done so already.
if (!position.isOpen())
{
position.open();
}
if (logger.isDebugEnabled())
logger.debug("extracting from pos, file: "
+ position.getFileName() + " pos: "
+ position.getPosition());
long indexCheckStart = System.currentTimeMillis(); // Read from the binlog.
while (position.available() == 0)
{
// TREP-301 - If we are waiting at the end of the file we
// must check that we are not reading a log file that is
// missing a log-rotate record.
if (System.currentTimeMillis() - indexCheckStart > INDEX_CHECK_INTERVAL)
{
BinlogIndex bi = new BinlogIndex(binlogDir,
binlogFilePattern, true);
File nextBinlog = bi.nextBinlog(position.getFileName());
if (nextBinlog != null)
{
// We are stuck at the tail of one binlog with more
// to follow. Generate and return fake log-rotate
// event.
logger.warn("Current log file appears to be missing log-rotate event: "
+ position.getFileName());
logger.info("Auto-generating log-rotate event for next binlog file: "
+ nextBinlog.getName());
return new RotateLogEvent(nextBinlog.getName());
} // Ensure relay logs are running.
assertRelayLogsEnabled(); // Update index check time.
indexCheckStart = System.currentTimeMillis();
} // Sleep for a while.
Thread.sleep(10);
} // We can assume a V4 format description as we don't support MySQL
// versions prior to 5.0.
FormatDescriptionLogEvent description_event = new FormatDescriptionLogEvent(
4); // Read from the log.
LogEvent event = LogEvent.readLogEvent(runtime, position,
description_event, parseStatements, useBytesForStrings,
prefetchSchemaNameLDI);
position.setEventID(position.getEventID() + 1); return event;
}
catch (IOException e)
{
throw new MySQLExtractException("Binlog file read error: file="
+ position.getFileName() + " offset="
+ position.getPosition(), e);
}
}

然后进入到 LogEvent.readLogEvent方法,深入代码内部,可以看到如下代码

    private static LogEvent readLogEvent(boolean parseStatements,
byte[] buffer, int eventLength,
FormatDescriptionLogEvent descriptionEvent,
boolean useBytesForString) throws ReplicatorException
{
LogEvent event = null; switch (buffer[MysqlBinlog.EVENT_TYPE_OFFSET])
{
case MysqlBinlog.QUERY_EVENT :
event = new QueryLogEvent(buffer, eventLength,
descriptionEvent, parseStatements, useBytesForString);
break;
case MysqlBinlog.LOAD_EVENT :
logger.warn("Skipping unsupported LOAD_EVENT");
// ev = new Load_log_event(buf, event_len, description_event);
break;
case MysqlBinlog.NEW_LOAD_EVENT :
logger.warn("Skipping unsupported NEW_LOAD_EVENT");
// ev = new Load_log_event(buf, event_len, description_event);
break;
case MysqlBinlog.ROTATE_EVENT :
event = new RotateLogEvent(buffer, eventLength,
descriptionEvent);
break;
case MysqlBinlog.SLAVE_EVENT : /* can never happen (unused event) */
logger.warn("Skipping unsupported SLAVE_EVENT");
// ev = new Slave_log_event(buf, event_len);
break;
case MysqlBinlog.CREATE_FILE_EVENT :
logger.warn("Skipping unsupported CREATE_FILE_EVENT");
// ev = new Create_file_log_event(buf, event_len,
// description_event);
break;
case MysqlBinlog.APPEND_BLOCK_EVENT :
if (logger.isDebugEnabled())
logger.debug("reading APPEND_BLOCK_EVENT");
event = new AppendBlockLogEvent(buffer, eventLength,
descriptionEvent);
break;
case MysqlBinlog.DELETE_FILE_EVENT :
if (logger.isDebugEnabled())
logger.debug("reading DELETE_FILE_EVENT");
event = new DeleteFileLogEvent(buffer, eventLength,
descriptionEvent);
break;
case MysqlBinlog.EXEC_LOAD_EVENT :
logger.warn("Skipping unsupported EXEC_LOAD_EVENT");
break;
case MysqlBinlog.START_EVENT_V3 :
/* this is sent only by MySQL <=4.x */
logger.warn("Skipping unsupported START_EVENT_V3");
break;
case MysqlBinlog.STOP_EVENT :
event = new StopLogEvent(buffer, eventLength, descriptionEvent);
break;
case MysqlBinlog.INTVAR_EVENT :
if (logger.isDebugEnabled())
logger.debug("extracting INTVAR_EVENT");
event = new IntvarLogEvent(buffer, eventLength,
descriptionEvent);
break;
case MysqlBinlog.XID_EVENT :
event = new XidLogEvent(buffer, eventLength, descriptionEvent);
break;
case MysqlBinlog.RAND_EVENT :
event = new RandLogEvent(buffer, eventLength, descriptionEvent);
break;
case MysqlBinlog.USER_VAR_EVENT :
event = new UserVarLogEvent(buffer, eventLength,
descriptionEvent);
break;
case MysqlBinlog.FORMAT_DESCRIPTION_EVENT :
event = new FormatDescriptionLogEvent(buffer, eventLength,
descriptionEvent);
break;
。。。。。。
default :
logger.warn("Skipping unrecognized binlog event type "
+ buffer[MysqlBinlog.EVENT_TYPE_OFFSET]);
}
return event;
}


上一篇:Docker桥接宿主机网络与配置固定IP地址


下一篇:Win8驱动测试模式