diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 9732ae6c42..eb2fbb682d 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -14,4 +14,7 @@
-
\ No newline at end of file
+
+
+
+
diff --git a/chunjun-connectors/chunjun-connector-binlog/src/main/java/com/dtstack/chunjun/connector/binlog/converter/BinlogSyncConverter.java b/chunjun-connectors/chunjun-connector-binlog/src/main/java/com/dtstack/chunjun/connector/binlog/converter/BinlogSyncConverter.java
index 99601ebe0c..4bef79932c 100644
--- a/chunjun-connectors/chunjun-connector-binlog/src/main/java/com/dtstack/chunjun/connector/binlog/converter/BinlogSyncConverter.java
+++ b/chunjun-connectors/chunjun-connector-binlog/src/main/java/com/dtstack/chunjun/connector/binlog/converter/BinlogSyncConverter.java
@@ -64,6 +64,7 @@
import static com.dtstack.chunjun.constants.CDCConstantValue.DATABASE;
import static com.dtstack.chunjun.constants.CDCConstantValue.LSN;
import static com.dtstack.chunjun.constants.CDCConstantValue.OP_TIME;
+import static com.dtstack.chunjun.constants.CDCConstantValue.PRIMARY_KEY;
import static com.dtstack.chunjun.constants.CDCConstantValue.SCHEMA;
import static com.dtstack.chunjun.constants.CDCConstantValue.TABLE;
import static com.dtstack.chunjun.constants.CDCConstantValue.TS;
@@ -262,6 +263,14 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce
List beforeList = rowData.getBeforeColumnsList();
List afterList = rowData.getAfterColumnsList();
+ String beforePrimaryKeys = extractPrimaryKeys(beforeList);
+ String afterPrimaryKeys = extractPrimaryKeys(afterList);
+
+ columnRowData.addHeader(PRIMARY_KEY);
+ columnRowData.addField(
+ new StringColumn(
+ beforePrimaryKeys != null ? beforePrimaryKeys : afterPrimaryKeys));
+
List beforeColumnList = new ArrayList<>(beforeList.size());
List beforeHeaderList = new ArrayList<>(beforeList.size());
List afterColumnList = new ArrayList<>(afterList.size());
@@ -314,6 +323,13 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce
return result;
}
+ private String extractPrimaryKeys(List beforeList) {
+ return beforeList.stream()
+ .filter(c -> c.getIsKey())
+ .map(CanalEntry.Column::getName)
+ .collect(Collectors.joining(","));
+ }
+
/**
* 解析CanalEntry.Column
*
diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/conf/KafkaConfig.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/conf/KafkaConfig.java
index 88ab31c963..c331507302 100644
--- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/conf/KafkaConfig.java
+++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/conf/KafkaConfig.java
@@ -20,6 +20,7 @@
import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.connector.kafka.enums.StartupMode;
+import com.dtstack.chunjun.connector.kafka.sink.PartitionStrategy;
import lombok.Getter;
import lombok.Setter;
@@ -45,6 +46,9 @@ public class KafkaConfig extends CommonConfig {
private String topic;
/** kafka topics */
private List topics;
+
+ /** cdc数据根据database.schema.table进行映射,输出到对应的topic* */
+ private String tableIdToTopicMapping;
/** 默认需要一个groupId */
private String groupId = UUID.randomUUID().toString().replace("-", "");
/** kafka启动模式 */
@@ -64,6 +68,9 @@ public class KafkaConfig extends CommonConfig {
/** kafka sink分区字段 */
private List partitionAssignColumns;
+ // cdc 用于根据主键进行分区
+ private String partitionStrategy = PartitionStrategy.ALL_TO_ZERO.toString();
+
private String deserialization = "default";
/** deserialization的配置信息,每个deserialization的配置信息是不一样的 * */
@@ -82,7 +89,7 @@ public class KafkaConfig extends CommonConfig {
public String getOffset() {
if (offset == null) {
- return null;
+ return offset;
}
return offset.toLowerCase(Locale.ENGLISH);
}
diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java
index 4eb87c90b6..22000c3b37 100644
--- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java
+++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java
@@ -47,7 +47,6 @@
import com.dtstack.chunjun.util.MapUtil;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CollectionUtil;
import org.apache.commons.collections.CollectionUtils;
@@ -71,20 +70,28 @@
import static com.dtstack.chunjun.connector.kafka.option.KafkaOptions.DEFAULT_CODEC;
+/**
+ * @author chuixue
+ * @create 2021-06-07 15:51
+ * @description
+ */
public class KafkaSyncConverter
extends AbstractRowConverter, Object, byte[], String> {
/** source kafka msg decode */
- private final IDecode decode;
+ protected final IDecode decode;
+ /** sink json Decoder */
+ protected final JsonDecoder jsonDecoder;
/** kafka Conf */
- private final KafkaConfig kafkaConfig;
+ protected final KafkaConfig kafkaConfig;
/** kafka sink out fields */
- private List outList;
+ protected List outList;
- public KafkaSyncConverter(RowType rowType, KafkaConfig kafkaConfig, List keyTypeList) {
- super(rowType, kafkaConfig);
+ public KafkaSyncConverter(KafkaConfig kafkaConfig, List keyTypeList) {
+ super(null, kafkaConfig);
this.kafkaConfig = kafkaConfig;
this.outList = keyTypeList;
+ this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage());
if (DEFAULT_CODEC.defaultValue().equals(kafkaConfig.getCodec())) {
this.decode = new JsonDecoder(kafkaConfig.isAddMessage());
} else {
@@ -92,9 +99,10 @@ public KafkaSyncConverter(RowType rowType, KafkaConfig kafkaConfig, List
}
}
- public KafkaSyncConverter(RowType rowType, KafkaConfig kafkaConfig) {
- super(rowType, kafkaConfig);
+ public KafkaSyncConverter(KafkaConfig kafkaConfig) {
+ super(null, kafkaConfig);
this.commonConfig = this.kafkaConfig = kafkaConfig;
+ this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage());
if (DEFAULT_CODEC.defaultValue().equals(kafkaConfig.getCodec())) {
this.decode = new JsonDecoder(kafkaConfig.isAddMessage());
} else {
@@ -162,11 +170,11 @@ public RowData toInternal(ConsumerRecord input) throws Exception
result = new ColumnRowData(fieldConfList.size());
}
for (int i = 0; i < fieldConfList.size(); i++) {
- FieldConfig fieldConfig = fieldConfList.get(i);
- Object value = map.get(fieldConfig.getName());
+ FieldConfig fieldConf = fieldConfList.get(i);
+ Object value = map.get(fieldConf.getName());
AbstractBaseColumn baseColumn =
(AbstractBaseColumn) toInternalConverters.get(i).deserialize(value);
- result.addField(assembleFieldProps(fieldConfig, baseColumn));
+ result.addField(assembleFieldProps(fieldConf, baseColumn));
}
}
return result;
@@ -174,6 +182,11 @@ public RowData toInternal(ConsumerRecord input) throws Exception
@Override
public byte[] toExternal(RowData rowData, byte[] output) throws Exception {
+ Map map = getExternalMap(rowData);
+ return MapUtil.writeValueAsString(map).getBytes(StandardCharsets.UTF_8);
+ }
+
+ protected Map getExternalMap(RowData rowData) {
Map map;
int arity = rowData.getArity();
ColumnRowData row = (ColumnRowData) rowData;
@@ -187,11 +200,24 @@ public byte[] toExternal(RowData rowData, byte[] output) throws Exception {
Object value;
if (object instanceof TimestampColumn) {
value = ((TimestampColumn) object).asTimestampStr();
+ } else if (row.getField(i).getData() == null) {
+ value = null;
} else {
- value = org.apache.flink.util.StringUtils.arrayAwareToString(row.getField(i));
+ value = row.getField(i).asString();
}
map.put(kafkaConfig.getTableFields().get(i), value);
}
+
+ // get partition key value
+ if (!CollectionUtil.isNullOrEmpty(outList)) {
+ Map keyPartitionMap = new LinkedHashMap<>((arity << 2) / 3);
+ for (Map.Entry entry : map.entrySet()) {
+ if (outList.contains(entry.getKey())) {
+ keyPartitionMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ map = keyPartitionMap;
+ }
} else {
String[] headers = row.getHeaders();
if (Objects.nonNull(headers) && headers.length >= 1) {
@@ -199,10 +225,37 @@ public byte[] toExternal(RowData rowData, byte[] output) throws Exception {
map = new HashMap<>(headers.length >> 1);
for (String header : headers) {
AbstractBaseColumn val = row.getField(header);
- if (null == val) {
+ if (null == val || val instanceof NullColumn) {
map.put(header, null);
} else {
- map.put(header, val.getData());
+ // Timestamp需要转为yyyy-MM-dd hh:mm:ss.SSSSSS格式
+ if (val instanceof TimestampColumn) {
+ map.put(header, timeStampTostringBynacosPrecision(val.asTimestamp()));
+ } else if (val instanceof MapColumn) {
+ Object data = val.getData();
+ if (data instanceof Map) {
+ Map maps = (Map) data;
+ LinkedHashMap datas = new LinkedHashMap<>();
+ maps.forEach(
+ (k, v) -> {
+ if (v instanceof Timestamp) {
+ datas.put(
+ k,
+ timeStampTostringBynacosPrecision(
+ (Timestamp) v));
+ } else {
+ datas.put(k, v);
+ }
+ });
+ map.put(header, datas);
+ } else {
+ throw new RuntimeException(
+ "MapColumn data is not Map,map column data type is "
+ + data.getClass());
+ }
+ } else {
+ map.put(header, val.getData());
+ }
}
}
if (Arrays.stream(headers)
@@ -227,19 +280,7 @@ public byte[] toExternal(RowData rowData, byte[] output) throws Exception {
map = decode.decode(String.join(",", values));
}
}
-
- // get partition key value
- if (!CollectionUtil.isNullOrEmpty(outList)) {
- Map keyPartitionMap = new LinkedHashMap<>((arity << 2) / 3);
- for (Map.Entry entry : map.entrySet()) {
- if (outList.contains(entry.getKey())) {
- keyPartitionMap.put(entry.getKey(), entry.getValue());
- }
- }
- map = keyPartitionMap;
- }
-
- return MapUtil.writeValueAsString(map).getBytes(StandardCharsets.UTF_8);
+ return map;
}
@Override
@@ -300,4 +341,12 @@ protected IDeserializationConverter createInternalConverter(String type) {
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}
+
+ public String timeStampTostringBynacosPrecision(Timestamp t) {
+ if (t.getNanos() == 0) {
+ return new TimestampColumn(t, 0).asTimestampStr();
+ } else {
+ return t.toString();
+ }
+ }
}
diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java
new file mode 100644
index 0000000000..6b866b1512
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.kafka.converter;
+
+import com.dtstack.chunjun.connector.kafka.conf.KafkaConfig;
+import com.dtstack.chunjun.connector.kafka.sink.PartitionStrategy;
+import com.dtstack.chunjun.element.ColumnRowData;
+import com.dtstack.chunjun.element.column.MapColumn;
+import com.dtstack.chunjun.util.MapUtil;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author wujuan
+ * @create 2025-12-01 15:51
+ * @description
+ */
+public class KafkaSyncKeyConverter extends KafkaSyncConverter {
+
+ private final PartitionStrategy partitionStrategy;
+
+ public KafkaSyncKeyConverter(KafkaConfig kafkaConf, List keyTypeList) {
+ super(kafkaConf, keyTypeList);
+ this.partitionStrategy = PartitionStrategy.fromValue(kafkaConf.getPartitionStrategy());
+ }
+
+ public KafkaSyncKeyConverter(KafkaConfig kafkaConf) {
+ this(kafkaConf, null);
+ }
+
+ @Override
+ public byte[] toExternal(RowData rowData, byte[] output) throws Exception {
+
+ Map map = getExternalMap(rowData);
+
+ int arity = rowData.getArity();
+ ColumnRowData row = (ColumnRowData) rowData;
+
+ if (kafkaConfig.getTableFields() != null
+ && kafkaConfig.getTableFields().size() >= arity
+ && !(row.getField(0) instanceof MapColumn)) {
+ // get partition key value
+ if (!CollectionUtil.isNullOrEmpty(outList)) {
+ Map keyPartitionMap = new LinkedHashMap<>(1);
+ for (Map.Entry entry : map.entrySet()) {
+ if (outList.contains(entry.getKey())) {
+ keyPartitionMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ map = keyPartitionMap;
+ }
+ } else {
+ // CDC kafka key handler
+ // partition.strategy hash-by-key / all-to-zero
+ if (partitionStrategy != null
+ && partitionStrategy.equals(PartitionStrategy.HASH_BY_KEY)) {
+ Map keyPartitionMap = new LinkedHashMap<>(1);
+ if (map.get("message") instanceof Map) {
+ // cdc 数据,折叠类型的数据
+ Map message = (Map) map.get("message");
+ String type = (String) message.get("type");
+ String primaryKey = (String) (message).get("primary_key");
+
+ if (StringUtils.isNotEmpty(primaryKey)) {
+ String[] primaryKeys = primaryKey.split(",");
+ switch (type) {
+ case "DELETE":
+ {
+ Map before =
+ (Map) message.get("before");
+ for (String key : primaryKeys) {
+ keyPartitionMap.put(key, before.get(key));
+ }
+ map = keyPartitionMap;
+ }
+ break;
+ case "INSERT":
+ case "UPDATE":
+ {
+ Map after =
+ (Map) message.get("after");
+ for (String key : primaryKeys) {
+ keyPartitionMap.put(key, after.get(key));
+ }
+ map = keyPartitionMap;
+ }
+ break;
+ default:
+ }
+ }
+
+ } else {
+ // cdc 数据,paving data. pavingData = true
+ String primaryKey = (String) map.get("primary_key");
+ String type = (String) map.get("type");
+ if (StringUtils.isNotEmpty(primaryKey)) {
+ String[] primaryKeys = primaryKey.split(",");
+ String prefix;
+ switch (type) {
+ case "INSERT":
+ case "UPDATE":
+ prefix = "after_";
+ break;
+ case "DELETE":
+ default:
+ prefix = "before_";
+ break;
+ }
+
+ for (String key : primaryKeys) {
+ keyPartitionMap.put(key, map.get(prefix + key));
+ }
+ map = keyPartitionMap;
+ }
+ }
+ }
+ }
+ return MapUtil.writeValueAsString(map).getBytes(StandardCharsets.UTF_8);
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java
index d052a05ac8..586a7291f0 100644
--- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java
+++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java
@@ -22,20 +22,19 @@
import com.dtstack.chunjun.connector.kafka.conf.KafkaConfig;
import com.dtstack.chunjun.connector.kafka.converter.KafkaRawTypeMapping;
import com.dtstack.chunjun.connector.kafka.converter.KafkaSyncConverter;
+import com.dtstack.chunjun.connector.kafka.converter.KafkaSyncKeyConverter;
import com.dtstack.chunjun.connector.kafka.enums.StartupMode;
import com.dtstack.chunjun.connector.kafka.partitioner.CustomerFlinkPartition;
import com.dtstack.chunjun.connector.kafka.serialization.RowSerializationSchema;
import com.dtstack.chunjun.converter.RawTypeMapper;
import com.dtstack.chunjun.sink.SinkFactory;
import com.dtstack.chunjun.util.GsonUtil;
-import com.dtstack.chunjun.util.TableUtil;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
@@ -46,8 +45,12 @@
import java.util.Properties;
+/**
+ * Date: 2021/04/07 Company: www.dtstack.com
+ *
+ * @author tudou
+ */
public class KafkaSinkFactory extends SinkFactory {
-
protected KafkaConfig kafkaConfig;
public KafkaSinkFactory(SyncConfig config) {
@@ -65,6 +68,11 @@ public KafkaSinkFactory(SyncConfig config) {
kafkaConfig.setTopic(kafkaConfig.getTopics().get(0));
}
super.initCommonConf(kafkaConfig);
+ if (StringUtils.isNotBlank(kafkaConfig.getTopic())
+ && StringUtils.isNotBlank(kafkaConfig.getTableIdToTopicMapping())) {
+ throw new IllegalArgumentException(
+ "topic and tableIdToTopicMapping can not be set at the same time");
+ }
}
@Override
@@ -89,8 +97,6 @@ protected DataStreamSink createOutput(
}
RowSerializationSchema rowSerializationSchema;
- RowType rowType =
- TableUtil.createRowType(kafkaConfig.getColumn(), KafkaRawTypeMapping::apply);
if (!CollectionUtil.isNullOrEmpty(kafkaConfig.getPartitionAssignColumns())) {
Preconditions.checkState(
!CollectionUtil.isNullOrEmpty(kafkaConfig.getTableFields()),
@@ -108,21 +114,21 @@ protected DataStreamSink createOutput(
new RowSerializationSchema(
kafkaConfig,
new CustomerFlinkPartition<>(),
- new KafkaSyncConverter(
- rowType, kafkaConfig, kafkaConfig.getPartitionAssignColumns()),
- new KafkaSyncConverter(rowType, kafkaConfig));
+ new KafkaSyncKeyConverter(
+ kafkaConfig, kafkaConfig.getPartitionAssignColumns()),
+ new KafkaSyncConverter(kafkaConfig));
} else {
rowSerializationSchema =
new RowSerializationSchema(
kafkaConfig,
new CustomerFlinkPartition<>(),
- null,
- new KafkaSyncConverter(rowType, kafkaConfig));
+ new KafkaSyncKeyConverter(kafkaConfig),
+ new KafkaSyncConverter(kafkaConfig));
}
KafkaProducer kafkaProducer =
new KafkaProducer(
- kafkaConfig.getTopic(),
+ StringUtils.isEmpty(kafkaConfig.getTopic()) ? "" : kafkaConfig.getTopic(),
rowSerializationSchema,
props,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/PartitionStrategy.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/PartitionStrategy.java
new file mode 100644
index 0000000000..5d80e4e41d
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/PartitionStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.kafka.sink;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/** Partition Strategy for sending {@link ProducerRecord} to kafka partition. */
+public enum PartitionStrategy {
+
+ /** All {@link ProducerRecord} will be sent to partition 0. */
+ ALL_TO_ZERO("all-to-zero"),
+
+ /** {@link ProducerRecord} will be sent to specific partition by primary key. */
+ HASH_BY_KEY("hash-by-key");
+
+ private final String value;
+
+ PartitionStrategy(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ // 添加自定义查找方法
+ public static PartitionStrategy fromValue(String value) {
+ for (PartitionStrategy strategy : PartitionStrategy.values()) {
+ if (strategy.value.equals(value)) {
+ return strategy;
+ }
+ }
+ return ALL_TO_ZERO;
+ }
+}
diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/constants/CDCConstantValue.java b/chunjun-core/src/main/java/com/dtstack/chunjun/constants/CDCConstantValue.java
index d44e346e39..a49473a0a2 100644
--- a/chunjun-core/src/main/java/com/dtstack/chunjun/constants/CDCConstantValue.java
+++ b/chunjun-core/src/main/java/com/dtstack/chunjun/constants/CDCConstantValue.java
@@ -31,4 +31,5 @@ public class CDCConstantValue {
public static final String AFTER = "after";
public static final String SCN = "scn";
public static final String LSN = "lsn";
+ public static final String PRIMARY_KEY = "primary_key";
}
diff --git a/chunjun-examples/json/binlog/binlog_kafka_partition_by_key.json b/chunjun-examples/json/binlog/binlog_kafka_partition_by_key.json
new file mode 100644
index 0000000000..aaed50a050
--- /dev/null
+++ b/chunjun-examples/json/binlog/binlog_kafka_partition_by_key.json
@@ -0,0 +1,50 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "parameter": {
+ "schema": "wujuan",
+ "username": "zhiqiang",
+ "password": "lizhiqiang",
+ "cat": "insert,delete,update",
+ "jdbcUrl": "jdbc:mysql://localhost:3306/wujuan?useSSL=false",
+ "host": "localhost",
+ "port": 3306,
+
+ "table": [],
+ "splitUpdate": true,
+ "pavingData": true
+ },
+ "name": "binlogreader"
+ },
+ "writer": {
+ "parameter": {
+ "producerSettings": {
+ "zookeeper.connect": "",
+ "bootstrap.servers": "localhost:9092"
+ },
+ "dataCompelOrder": false,
+ "partitionStrategy" : "hash-by-key",
+ "topic": "wujuan4"
+ },
+ "name": "kafkawriter",
+ "type": 26
+ }
+ }
+ ],
+ "setting": {
+ "restore": {
+ "isRestore": true,
+ "isStream": true
+ },
+ "errorLimit": {},
+ "speed": {
+ "readerChannel": 1,
+ "writerChannel": 1,
+ "bytes": -1048576,
+ "channel": 1
+ }
+ }
+ }
+}