2020.07.07

Mysql Binlog Connector for Java

Pocket

こんにちは、次世代システム研究室のN.M.です。

What is Mysql’s binlog?

Mysql’s binlog contains events that pertain to anything that changes the state of the Mysql database. These include inserts, updates and deletes. Mysql generates a binlog if it is started with the --log-bin option. Mysql’s binlog is commonly used for replicating the state of a mysql database.

Events can be further divided into header and data parts.
  • The header contains information on the type of event, the server that generated it and so on.
  • The data part contains information specific to the event such as specific data for modification.
Mysql creates multiple binlog files, rotating to a new log file as is needed.
...
HOSTNAME-bin.0000101
HOSTNAME-bin.0000102
HOSTNAME-bin.0000103
...
HOSTNAME-bin.index
At the end of each log is a special type of event, the Rotate Event, which contains the name of the next log file.

There are two basic types of format for binlog row format or statement format. Row format contains information about the rows updated, statement format contains the actual SQL statements.

There are several versions of the binary log file format:
  • v1: Used in MySQL 3.23
  • v2: Early MySQL 4.0.x versions (OBSOLETE, NO LONGER SUPPORTED)
  • v3: Used in MySQL 4.0.2 though 4.1
  • v4: Used in MySQL 5.0 and up
Programs that process the binary log must be able to account for each of the supported binary log formats. The information on the format used is contained within Format Events.

As the name suggests, the binlog is binary. Mysql provides a tool called mysqlbinlog which converts the binlog to text format.
mysqlbinlog --user=<MYSQL_USER> --base64-output=decode-rows -v -R --start-position=<BINLOG_START_POST> --stop-position=<BINLOG_STOP_POS> <BINLOG_FILENAME>


This tool can process a file or stream from a remote server. If you stream binlog from a remote server for a long enough time, you will receive Rotate Events indicating output has been rotated to a new file. The new file name will be in the Rotate Event data.

How about connecting from Java?

You have two options either use the command above, executed from within your Java program (as well as parsing the various different types of events), or use a Java implementation of the mysqlbinlog tool, such as Mysql Binlog Connector for Java (MBCJ)

MBCJ handles V4 of the binlog format, which means it can handle binlogs from Mysql 5.0 and up, and it works on binlog row format or statement format.

Key features include:
  • automatic binlog filename/position | GTID resolution
  • resumable disconnects
  • plugable failover strategies
  • binlog_checksum=CRC32 support (for MySQL 5.6.2+ users)
  • secure communication over the TLS
  • JMX-friendly
  • real-time stats
  • availability in Maven Central
  • no third-party dependencies
  • test suite over different versions of MySQL releases
Just like the mysqlbinlog tool, MBCJ can process binlog files directly or connect to servers remotely

To process files it uses the BinaryLogFileReader class. An example of using this class:
File binlogFile = ...
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
try {
    for (Event event; (event = reader.readEvent()) != null; ) {
        ...
    }
} finally {
    reader.close();
}
 

You can process the binlog as a stream with the BinaryLogClient class;

An example of using this class:
BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "password");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(new EventListener() {

    @Override
    public void onEvent(Event event) {
        ...
    }
});
client.connect();
 

In this blog we will focus on the BinaryLogClient. You can specify the file name and position in the file to start processing from as show below:
client.setBinlogFilename(filename);
client.setBinlogPosition(position);
 

Binlog Events are represented by the EventHeader class and the EventData class.

The EventHeader class contains a reference to the EventType enum class. You can use this to determine the type of event and handle it appropriately as shown below:
private final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap<>();

...

public BinaryLogClient.EventListener getEventListener() {
    return event -> {
  
        EventHeader eventHeader = event.getHeader();
        switch (eventHeader.getEventType()) {
            case TABLE_MAP: {
                TableMapEventData tableMapEvent = event.getData();
                tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
                break;
            }
            case WRITE_ROWS: {
                WriteRowsEventData eventData = event.getData();
                String tableName = tableMapEventByTableId.get(eventData.getTableId()).getTable();
                String schema = tableMapEventByTableId.get(eventData.getTableId()).getDatabase();
                log.info("WRITE_ROWS Event for schema: {}, table: {}", schema, tableName);
                processedCount.incrementAndGet();
                break;
            }
            case UPDATE_ROWS: {
                UpdateRowsEventData eventData = event.getData();
                String tableName = tableMapEventByTableId.get(eventData.getTableId()).getTable();
                String schema = tableMapEventByTableId.get(eventData.getTableId()).getDatabase();
                log.info("UPDATE_ROWS Event for schema: {}, table: {}", schema, tableName);
                processedCount.incrementAndGet();
                break;
            }
            case DELETE_ROWS: {
                DeleteRowsEventData eventData = event.getData();
                String tableName = tableMapEventByTableId.get(eventData.getTableId()).getTable();
                String schema = tableMapEventByTableId.get(eventData.getTableId()).getDatabase();
                log.info("DELETE_ROWS Event for schema: {}, table: {}", schema, tableName);
                processedCount.incrementAndGet();
                break;
            }
        }
    };
}
 

Any event that updates a table is preceded by the TABLE_MAP Event. The above code uses this to populate the tableMapEventByTableId map, which is then used to get the table name on the next update event (WRITE_ROWS, UPDATE_ROWS, DELETE_ROWS).

MBCJ contains deserializers for all the main Event types, but you can override the default deserializers with your own.

How do I make sense of the data?

Because binlog data does not contains column names for each Event this data needs to be obtained from somewhere else. Given the table name of an event which can be obtained from the code shown above, there are two possible choices: query the mysql database for the column names, use the JDBC Connector meta-data. Because the first technique involves a DB query you may want to cache your results if performance is important.

Querying the mysql database for column names for a table:
select COLUMN_NAME, ORDINAL_POSITION, COLUMN_DEFAULT, IS_NULLABLE,
DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, CHARACTER_OCTET_LENGTH, NUMERIC_PRECISION, NUMERIC_SCALE,
CHARACTER_SET_NAME, COLLATION_NAME 
from INFORMATION_SCHEMA.COLUMNS
where TABLE_SCHEMA = <TABLE_SCHEMA> and TABLE_NAME = <TABLE_NAME>
order by ORDINAL_POSITION;
 

You can use the JDBC Connector’s metadata to achieve the same thing:
Connection connection = ...
DatabaseMetaData metaData = connection.getMetaData();
ResultSet tableResultSet = metaData.getTables(null, "public", null, new String[]{"TABLE"});
try {
    while (tableResultSet.next()) {
        String tableName = tableResultSet.getString("TABLE_NAME");
        ResultSet columnResultSet = metaData.getColumns(null, "public", tableName, null);
        try {
            while (columnResultSet.next()) {
                String columnName = columnResultSet.getString("COLUMN_NAME");
                ...
            }
        } finally {
            columnResultSet.close();
        }
    }
} finally {
    tableResultSet.close();
}
Rather than querying mysql on every Event, this data should be cached if performance is important.

When ordered by ORDINAL_POSITION the order of the columns will be the same as the order of the columns in the EventData classes for Events of type WRITE_ROWS and UPDATE_ROWS.

Those may be obtained by specifying the column index (starting at zero) as shown below for WRITE_ROWS:
WriteRowsEventData writeRowsEventData = ...

// Get the first column, which is an ID for our schema: 
Long id = (Long) writeRowsEventData.getRows().get(0)[0];

int INDEX_OF_VARCHAR_COL = ...
String someString = String.valueOf(writeRowsEventData.getRows().get(0)[INDEX_OF_VARCHAR_COL])
or for UPDATE_ROWS Events:
// beforeAfter is a map containing the before values as keys, 
// and the after values as values
Map.Entry<Serializable[], Serializable[]> beforeAfter = updateRowsEventData.getRows().get(0);

// Get the first column, which is an ID for our schema, from the after values:
Long id = (Long) beforeAfter.getValue()[0];

int INDEX_OF_VARCHAR_COL = ...
String someStringBeforeUpdate = String.valueOf(beforeAfter.getKey()[INDEX_OF_VARCHAR_COL]);
String someStringAfterUpdate = String.valueOf(beforeAfter.getValue()[INDEX_OF_VARCHAR_COL]);
 

Summary

We covered an introduction to the Mysql binlog and how to access that binlog using a pure Java client that understands the binlog format V4.

 

次世代システム研究室では、グループ全体のインテグレーションを支援してくれるアーキテクトを募集しています。インフラ設計、構築経験者の方、次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ募集職種一覧からご応募をお願いします。

 

皆さんのご応募をお待ちしています。