debezium深度解读单消息转换SMT

Kafka Connect是Apache Kafka®的一部分,在Kafka和其它系统之间提供可靠的、可扩展的分布式流式集成。Kafka Connect具有可用于许多系统的连接器,它是一个配置驱动的工具,不需要编码。

Kafka Connect API还提供了一个简单的接口,用于处理从源端通过数据管道到接收端的记录,该API称为单消息转换(SMT),顾名思义,当数据通过Kafka Connect连接器时,它可以对数据管道中的每条消息进行操作。

连接器分为源端或接收端,它们要么从Kafka上游的系统中提取数据,要么向Kafka的下游推送数据。这个转换可以配置为在任何一侧进行,源端连接器可以在写入Kafka主题之前对数据进行转换,接收端连接器也可以在将数据写入接收端之前对其进行转换。

转换的一些常见用途是:

  • 对字段重命名;
  • 掩蔽值;
  • 根据值将记录路由到主题;
  • 将时间戳转换或插入记录中;
  • 操作主键,例如根据字段的值设置主键。

Kafka自带了许多转换器,但是开发自定义的转换器也非常容易。

配置Kafka Connect的单消息转换

需要给转换器指定一个名字,该名字将用于指定该转换器的其他属性。例如,下面是JDBC源端利用RegexRouter转换器的配置片段,该转换器将固定字符串附加到要写入的主题的末尾:

1
2
3
4
5
6
7
8
9
10
11
{
"name": "jdbcSource",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

"transforms": "routeRecords",
"transforms.routeRecords.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeRecords.regex": "(.*)",
"transforms.routeRecords.replacement": "$1-test"
}
}

该转换器被命名为routeRecords,且在后续中用于传递属性。注意,上面的示例显示了RegexRouter的两个配置属性:正则表达式regex和匹配组引用replacement。此设置将从JDBC源端获取表名,并将其加上-test后缀。根据转换器的功能不同,也可能会有不同的配置属性,具体可以参见相关的文档。

执行多次转换

有时需要执行多次转换,Kafka Connect支持定义多个转化器,他们在配置中链接在一起。这些消息按照在transforms属性中定义的顺序执行转换。

下面的转换使用ValueToKey转换器将值转换为主键,并使用ExtractField转换器仅使用ID整数值作为主键:

1
2
3
4
5
“transforms”:”createKey,extractInt”,
“transforms.createKey.type”:”org.apache.kafka.connect.transforms.ValueToKey”,
“transforms.createKey.fields”:”c1”,
“transforms.extractInt.type”:”org.apache.kafka.connect.transforms.ExtractField$Key”,
“transforms.extractInt.field”:”c1”

注意,使用上述$Key符号,会指定此转换将作用于记录的Key,如果要针对记录的Value,需要在这里指定$Value。最后ConnectRecord看起来像这样:

1
2
3
key        value
------------------------------
null {"c1":{"int":100},"c2":{"string":"bar"}}

转换后:

1
2
3
key        value
------------------------------
100 {"c1":{"int":100},"c2":{"string":"bar"}}

单消息转换深入解读

下面深入地看下连接器如何处理数据。转换器被编译为JAR,并通过Connect工作节点的属性文件中的plugin.path属性,指定其可用于Kafka Connect,安装后就可以在连接器属性中配置转换。

配置和部署后,源端连接器将从上游系统接收记录,将其转换为ConnectRecord,然后将该记录传递给配置的转换器的apply()函数,然后等待返回记录。接收端连接器也是执行类似的过程,从Kafka主题读取并反序列化每个消息之后,将调用转换器的apply()方法,并将结果记录发送到目标系统。

如何开发单消息转换器

要开发将UUID插入每个记录的简单转换器,需要逐步执行以下的步骤。

apply方法是转换器的核心,这种转换支持带有模式和不带有模式的数据,因此每个都有一个转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}

private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);

final Map<String, Object> updatedValue = new HashMap<>(value);

updatedValue.put(fieldName, getRandomUuid());

return newRecord(record, null, updatedValue);
}

private R applyWithSchema(R record) {
final Struct value = requireStruct(operatingValue(record), PURPOSE);

Schema updatedSchema = schemaUpdateCache.get(value.schema());
if(updatedSchema == null) {
updatedSchema = makeUpdatedSchema(value.schema());
schemaUpdateCache.put(value.schema(), updatedSchema);
}

final Struct updatedValue = new Struct(updatedSchema);

for (Field field : value.schema().fields()) {
updatedValue.put(field.name(), value.get(field));
}

updatedValue.put(fieldName, getRandomUuid());

return newRecord(record, updatedSchema, updatedValue);
}

此转换器可以应用于记录的键或值,因此需要实现Key和Value子类,其扩展了InsertUuid类并实现apply方法调用的newRecord方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static class Key<R extends ConnectRecord<R>> extends InsertUuid<R> {

@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}

@Override
protected Object operatingValue(R record) {
return record.key();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}

}

public static class Value<R extends ConnectRecord<R>> extends InsertUuid<R> {

@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}

@Override
protected Object operatingValue(R record) {
return record.value();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}

}

该转换器仅改变了模式和值,但是要注意其可以操纵ConnectRecord的所有部分:Key、Value、Key和Value的模式、目标主题、目标分区和时间戳。

该转换器具有可选的参数,这些参数可以在运行时配置,并可以通过转换器类中重写的configure()方法访问:

1
2
3
4
5
6
7
8
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
fieldName = config.getString(ConfigName.UUID_FIELD_NAME);

schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
}

如上所示,该Transformation接口很简单,它实现了一个apply()方法来接收ConnectRecord然后再返回ConnectRecord,它可以选择通过configure()方法接收参数。

接下来,编译此JAR并将其放入Connect工作节点中plugin.path指定的路径中。注意需要将转换器所依赖的任何依赖项打包到它的路径中或编译为胖JAR。然后在连接器配置中调用它,如下所示(注意$Value内部类约定,以指示此转换应作用于记录的值):

1
2
3
transforms=insertuuid
transforms.insertuuid.type=kafka.connect.smt.InsertUuid$Value
transforms.insertuuid.uuid.field.name="uuid"

相关

kafka-connect-insert-uuid