From e640362b874bc836dc48689b442e16cfce16c2f9 Mon Sep 17 00:00:00 2001 From: wujuan Date: Mon, 15 Sep 2025 16:12:59 +0800 Subject: [PATCH] [hotfix-#1954][kafka] binlog -> kafka support partition by, add partitionStrategy parameter, optional value all-to-zero or hash-by-key, default value all-to-zero --- .idea/vcs.xml | 5 +- .../binlog/converter/BinlogSyncConverter.java | 16 ++ .../connector/kafka/conf/KafkaConfig.java | 9 +- .../kafka/converter/KafkaSyncConverter.java | 103 +++++++++---- .../converter/KafkaSyncKeyConverter.java | 144 ++++++++++++++++++ .../kafka/sink/KafkaSinkFactory.java | 28 ++-- .../kafka/sink/PartitionStrategy.java | 51 +++++++ .../chunjun/constants/CDCConstantValue.java | 1 + .../binlog/binlog_kafka_partition_by_key.json | 50 ++++++ 9 files changed, 367 insertions(+), 40 deletions(-) create mode 100644 chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java create mode 100644 chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/PartitionStrategy.java create mode 100644 chunjun-examples/json/binlog/binlog_kafka_partition_by_key.json diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 9732ae6c42..eb2fbb682d 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -14,4 +14,7 @@ - \ No newline at end of file + + + + diff --git a/chunjun-connectors/chunjun-connector-binlog/src/main/java/com/dtstack/chunjun/connector/binlog/converter/BinlogSyncConverter.java b/chunjun-connectors/chunjun-connector-binlog/src/main/java/com/dtstack/chunjun/connector/binlog/converter/BinlogSyncConverter.java index 99601ebe0c..4bef79932c 100644 --- a/chunjun-connectors/chunjun-connector-binlog/src/main/java/com/dtstack/chunjun/connector/binlog/converter/BinlogSyncConverter.java +++ b/chunjun-connectors/chunjun-connector-binlog/src/main/java/com/dtstack/chunjun/connector/binlog/converter/BinlogSyncConverter.java @@ -64,6 +64,7 @@ import static com.dtstack.chunjun.constants.CDCConstantValue.DATABASE; import static com.dtstack.chunjun.constants.CDCConstantValue.LSN; import static com.dtstack.chunjun.constants.CDCConstantValue.OP_TIME; +import static com.dtstack.chunjun.constants.CDCConstantValue.PRIMARY_KEY; import static com.dtstack.chunjun.constants.CDCConstantValue.SCHEMA; import static com.dtstack.chunjun.constants.CDCConstantValue.TABLE; import static com.dtstack.chunjun.constants.CDCConstantValue.TS; @@ -262,6 +263,14 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce List beforeList = rowData.getBeforeColumnsList(); List afterList = rowData.getAfterColumnsList(); + String beforePrimaryKeys = extractPrimaryKeys(beforeList); + String afterPrimaryKeys = extractPrimaryKeys(afterList); + + columnRowData.addHeader(PRIMARY_KEY); + columnRowData.addField( + new StringColumn( + beforePrimaryKeys != null ? beforePrimaryKeys : afterPrimaryKeys)); + List beforeColumnList = new ArrayList<>(beforeList.size()); List beforeHeaderList = new ArrayList<>(beforeList.size()); List afterColumnList = new ArrayList<>(afterList.size()); @@ -314,6 +323,13 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce return result; } + private String extractPrimaryKeys(List beforeList) { + return beforeList.stream() + .filter(c -> c.getIsKey()) + .map(CanalEntry.Column::getName) + .collect(Collectors.joining(",")); + } + /** * 解析CanalEntry.Column * diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/conf/KafkaConfig.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/conf/KafkaConfig.java index 88ab31c963..c331507302 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/conf/KafkaConfig.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/conf/KafkaConfig.java @@ -20,6 +20,7 @@ import com.dtstack.chunjun.config.CommonConfig; import com.dtstack.chunjun.config.FieldConfig; import com.dtstack.chunjun.connector.kafka.enums.StartupMode; +import com.dtstack.chunjun.connector.kafka.sink.PartitionStrategy; import lombok.Getter; import lombok.Setter; @@ -45,6 +46,9 @@ public class KafkaConfig extends CommonConfig { private String topic; /** kafka topics */ private List topics; + + /** cdc数据根据database.schema.table进行映射,输出到对应的topic* */ + private String tableIdToTopicMapping; /** 默认需要一个groupId */ private String groupId = UUID.randomUUID().toString().replace("-", ""); /** kafka启动模式 */ @@ -64,6 +68,9 @@ public class KafkaConfig extends CommonConfig { /** kafka sink分区字段 */ private List partitionAssignColumns; + // cdc 用于根据主键进行分区 + private String partitionStrategy = PartitionStrategy.ALL_TO_ZERO.toString(); + private String deserialization = "default"; /** deserialization的配置信息,每个deserialization的配置信息是不一样的 * */ @@ -82,7 +89,7 @@ public class KafkaConfig extends CommonConfig { public String getOffset() { if (offset == null) { - return null; + return offset; } return offset.toLowerCase(Locale.ENGLISH); } diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java index 4eb87c90b6..22000c3b37 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java @@ -47,7 +47,6 @@ import com.dtstack.chunjun.util.MapUtil; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.CollectionUtil; import org.apache.commons.collections.CollectionUtils; @@ -71,20 +70,28 @@ import static com.dtstack.chunjun.connector.kafka.option.KafkaOptions.DEFAULT_CODEC; +/** + * @author chuixue + * @create 2021-06-07 15:51 + * @description + */ public class KafkaSyncConverter extends AbstractRowConverter, Object, byte[], String> { /** source kafka msg decode */ - private final IDecode decode; + protected final IDecode decode; + /** sink json Decoder */ + protected final JsonDecoder jsonDecoder; /** kafka Conf */ - private final KafkaConfig kafkaConfig; + protected final KafkaConfig kafkaConfig; /** kafka sink out fields */ - private List outList; + protected List outList; - public KafkaSyncConverter(RowType rowType, KafkaConfig kafkaConfig, List keyTypeList) { - super(rowType, kafkaConfig); + public KafkaSyncConverter(KafkaConfig kafkaConfig, List keyTypeList) { + super(null, kafkaConfig); this.kafkaConfig = kafkaConfig; this.outList = keyTypeList; + this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage()); if (DEFAULT_CODEC.defaultValue().equals(kafkaConfig.getCodec())) { this.decode = new JsonDecoder(kafkaConfig.isAddMessage()); } else { @@ -92,9 +99,10 @@ public KafkaSyncConverter(RowType rowType, KafkaConfig kafkaConfig, List } } - public KafkaSyncConverter(RowType rowType, KafkaConfig kafkaConfig) { - super(rowType, kafkaConfig); + public KafkaSyncConverter(KafkaConfig kafkaConfig) { + super(null, kafkaConfig); this.commonConfig = this.kafkaConfig = kafkaConfig; + this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage()); if (DEFAULT_CODEC.defaultValue().equals(kafkaConfig.getCodec())) { this.decode = new JsonDecoder(kafkaConfig.isAddMessage()); } else { @@ -162,11 +170,11 @@ public RowData toInternal(ConsumerRecord input) throws Exception result = new ColumnRowData(fieldConfList.size()); } for (int i = 0; i < fieldConfList.size(); i++) { - FieldConfig fieldConfig = fieldConfList.get(i); - Object value = map.get(fieldConfig.getName()); + FieldConfig fieldConf = fieldConfList.get(i); + Object value = map.get(fieldConf.getName()); AbstractBaseColumn baseColumn = (AbstractBaseColumn) toInternalConverters.get(i).deserialize(value); - result.addField(assembleFieldProps(fieldConfig, baseColumn)); + result.addField(assembleFieldProps(fieldConf, baseColumn)); } } return result; @@ -174,6 +182,11 @@ public RowData toInternal(ConsumerRecord input) throws Exception @Override public byte[] toExternal(RowData rowData, byte[] output) throws Exception { + Map map = getExternalMap(rowData); + return MapUtil.writeValueAsString(map).getBytes(StandardCharsets.UTF_8); + } + + protected Map getExternalMap(RowData rowData) { Map map; int arity = rowData.getArity(); ColumnRowData row = (ColumnRowData) rowData; @@ -187,11 +200,24 @@ public byte[] toExternal(RowData rowData, byte[] output) throws Exception { Object value; if (object instanceof TimestampColumn) { value = ((TimestampColumn) object).asTimestampStr(); + } else if (row.getField(i).getData() == null) { + value = null; } else { - value = org.apache.flink.util.StringUtils.arrayAwareToString(row.getField(i)); + value = row.getField(i).asString(); } map.put(kafkaConfig.getTableFields().get(i), value); } + + // get partition key value + if (!CollectionUtil.isNullOrEmpty(outList)) { + Map keyPartitionMap = new LinkedHashMap<>((arity << 2) / 3); + for (Map.Entry entry : map.entrySet()) { + if (outList.contains(entry.getKey())) { + keyPartitionMap.put(entry.getKey(), entry.getValue()); + } + } + map = keyPartitionMap; + } } else { String[] headers = row.getHeaders(); if (Objects.nonNull(headers) && headers.length >= 1) { @@ -199,10 +225,37 @@ public byte[] toExternal(RowData rowData, byte[] output) throws Exception { map = new HashMap<>(headers.length >> 1); for (String header : headers) { AbstractBaseColumn val = row.getField(header); - if (null == val) { + if (null == val || val instanceof NullColumn) { map.put(header, null); } else { - map.put(header, val.getData()); + // Timestamp需要转为yyyy-MM-dd hh:mm:ss.SSSSSS格式 + if (val instanceof TimestampColumn) { + map.put(header, timeStampTostringBynacosPrecision(val.asTimestamp())); + } else if (val instanceof MapColumn) { + Object data = val.getData(); + if (data instanceof Map) { + Map maps = (Map) data; + LinkedHashMap datas = new LinkedHashMap<>(); + maps.forEach( + (k, v) -> { + if (v instanceof Timestamp) { + datas.put( + k, + timeStampTostringBynacosPrecision( + (Timestamp) v)); + } else { + datas.put(k, v); + } + }); + map.put(header, datas); + } else { + throw new RuntimeException( + "MapColumn data is not Map,map column data type is " + + data.getClass()); + } + } else { + map.put(header, val.getData()); + } } } if (Arrays.stream(headers) @@ -227,19 +280,7 @@ public byte[] toExternal(RowData rowData, byte[] output) throws Exception { map = decode.decode(String.join(",", values)); } } - - // get partition key value - if (!CollectionUtil.isNullOrEmpty(outList)) { - Map keyPartitionMap = new LinkedHashMap<>((arity << 2) / 3); - for (Map.Entry entry : map.entrySet()) { - if (outList.contains(entry.getKey())) { - keyPartitionMap.put(entry.getKey(), entry.getValue()); - } - } - map = keyPartitionMap; - } - - return MapUtil.writeValueAsString(map).getBytes(StandardCharsets.UTF_8); + return map; } @Override @@ -300,4 +341,12 @@ protected IDeserializationConverter createInternalConverter(String type) { throw new UnsupportedOperationException("Unsupported type:" + type); } } + + public String timeStampTostringBynacosPrecision(Timestamp t) { + if (t.getNanos() == 0) { + return new TimestampColumn(t, 0).asTimestampStr(); + } else { + return t.toString(); + } + } } diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java new file mode 100644 index 0000000000..6b866b1512 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.kafka.converter; + +import com.dtstack.chunjun.connector.kafka.conf.KafkaConfig; +import com.dtstack.chunjun.connector.kafka.sink.PartitionStrategy; +import com.dtstack.chunjun.element.ColumnRowData; +import com.dtstack.chunjun.element.column.MapColumn; +import com.dtstack.chunjun.util.MapUtil; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.CollectionUtil; + +import org.apache.commons.lang3.StringUtils; + +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * @author wujuan + * @create 2025-12-01 15:51 + * @description + */ +public class KafkaSyncKeyConverter extends KafkaSyncConverter { + + private final PartitionStrategy partitionStrategy; + + public KafkaSyncKeyConverter(KafkaConfig kafkaConf, List keyTypeList) { + super(kafkaConf, keyTypeList); + this.partitionStrategy = PartitionStrategy.fromValue(kafkaConf.getPartitionStrategy()); + } + + public KafkaSyncKeyConverter(KafkaConfig kafkaConf) { + this(kafkaConf, null); + } + + @Override + public byte[] toExternal(RowData rowData, byte[] output) throws Exception { + + Map map = getExternalMap(rowData); + + int arity = rowData.getArity(); + ColumnRowData row = (ColumnRowData) rowData; + + if (kafkaConfig.getTableFields() != null + && kafkaConfig.getTableFields().size() >= arity + && !(row.getField(0) instanceof MapColumn)) { + // get partition key value + if (!CollectionUtil.isNullOrEmpty(outList)) { + Map keyPartitionMap = new LinkedHashMap<>(1); + for (Map.Entry entry : map.entrySet()) { + if (outList.contains(entry.getKey())) { + keyPartitionMap.put(entry.getKey(), entry.getValue()); + } + } + map = keyPartitionMap; + } + } else { + // CDC kafka key handler + // partition.strategy hash-by-key / all-to-zero + if (partitionStrategy != null + && partitionStrategy.equals(PartitionStrategy.HASH_BY_KEY)) { + Map keyPartitionMap = new LinkedHashMap<>(1); + if (map.get("message") instanceof Map) { + // cdc 数据,折叠类型的数据 + Map message = (Map) map.get("message"); + String type = (String) message.get("type"); + String primaryKey = (String) (message).get("primary_key"); + + if (StringUtils.isNotEmpty(primaryKey)) { + String[] primaryKeys = primaryKey.split(","); + switch (type) { + case "DELETE": + { + Map before = + (Map) message.get("before"); + for (String key : primaryKeys) { + keyPartitionMap.put(key, before.get(key)); + } + map = keyPartitionMap; + } + break; + case "INSERT": + case "UPDATE": + { + Map after = + (Map) message.get("after"); + for (String key : primaryKeys) { + keyPartitionMap.put(key, after.get(key)); + } + map = keyPartitionMap; + } + break; + default: + } + } + + } else { + // cdc 数据,paving data. pavingData = true + String primaryKey = (String) map.get("primary_key"); + String type = (String) map.get("type"); + if (StringUtils.isNotEmpty(primaryKey)) { + String[] primaryKeys = primaryKey.split(","); + String prefix; + switch (type) { + case "INSERT": + case "UPDATE": + prefix = "after_"; + break; + case "DELETE": + default: + prefix = "before_"; + break; + } + + for (String key : primaryKeys) { + keyPartitionMap.put(key, map.get(prefix + key)); + } + map = keyPartitionMap; + } + } + } + } + return MapUtil.writeValueAsString(map).getBytes(StandardCharsets.UTF_8); + } +} diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java index d052a05ac8..586a7291f0 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java @@ -22,20 +22,19 @@ import com.dtstack.chunjun.connector.kafka.conf.KafkaConfig; import com.dtstack.chunjun.connector.kafka.converter.KafkaRawTypeMapping; import com.dtstack.chunjun.connector.kafka.converter.KafkaSyncConverter; +import com.dtstack.chunjun.connector.kafka.converter.KafkaSyncKeyConverter; import com.dtstack.chunjun.connector.kafka.enums.StartupMode; import com.dtstack.chunjun.connector.kafka.partitioner.CustomerFlinkPartition; import com.dtstack.chunjun.connector.kafka.serialization.RowSerializationSchema; import com.dtstack.chunjun.converter.RawTypeMapper; import com.dtstack.chunjun.sink.SinkFactory; import com.dtstack.chunjun.util.GsonUtil; -import com.dtstack.chunjun.util.TableUtil; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.Preconditions; @@ -46,8 +45,12 @@ import java.util.Properties; +/** + * Date: 2021/04/07 Company: www.dtstack.com + * + * @author tudou + */ public class KafkaSinkFactory extends SinkFactory { - protected KafkaConfig kafkaConfig; public KafkaSinkFactory(SyncConfig config) { @@ -65,6 +68,11 @@ public KafkaSinkFactory(SyncConfig config) { kafkaConfig.setTopic(kafkaConfig.getTopics().get(0)); } super.initCommonConf(kafkaConfig); + if (StringUtils.isNotBlank(kafkaConfig.getTopic()) + && StringUtils.isNotBlank(kafkaConfig.getTableIdToTopicMapping())) { + throw new IllegalArgumentException( + "topic and tableIdToTopicMapping can not be set at the same time"); + } } @Override @@ -89,8 +97,6 @@ protected DataStreamSink createOutput( } RowSerializationSchema rowSerializationSchema; - RowType rowType = - TableUtil.createRowType(kafkaConfig.getColumn(), KafkaRawTypeMapping::apply); if (!CollectionUtil.isNullOrEmpty(kafkaConfig.getPartitionAssignColumns())) { Preconditions.checkState( !CollectionUtil.isNullOrEmpty(kafkaConfig.getTableFields()), @@ -108,21 +114,21 @@ protected DataStreamSink createOutput( new RowSerializationSchema( kafkaConfig, new CustomerFlinkPartition<>(), - new KafkaSyncConverter( - rowType, kafkaConfig, kafkaConfig.getPartitionAssignColumns()), - new KafkaSyncConverter(rowType, kafkaConfig)); + new KafkaSyncKeyConverter( + kafkaConfig, kafkaConfig.getPartitionAssignColumns()), + new KafkaSyncConverter(kafkaConfig)); } else { rowSerializationSchema = new RowSerializationSchema( kafkaConfig, new CustomerFlinkPartition<>(), - null, - new KafkaSyncConverter(rowType, kafkaConfig)); + new KafkaSyncKeyConverter(kafkaConfig), + new KafkaSyncConverter(kafkaConfig)); } KafkaProducer kafkaProducer = new KafkaProducer( - kafkaConfig.getTopic(), + StringUtils.isEmpty(kafkaConfig.getTopic()) ? "" : kafkaConfig.getTopic(), rowSerializationSchema, props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/PartitionStrategy.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/PartitionStrategy.java new file mode 100644 index 0000000000..5d80e4e41d --- /dev/null +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/PartitionStrategy.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.kafka.sink; + +import org.apache.kafka.clients.producer.ProducerRecord; + +/** Partition Strategy for sending {@link ProducerRecord} to kafka partition. */ +public enum PartitionStrategy { + + /** All {@link ProducerRecord} will be sent to partition 0. */ + ALL_TO_ZERO("all-to-zero"), + + /** {@link ProducerRecord} will be sent to specific partition by primary key. */ + HASH_BY_KEY("hash-by-key"); + + private final String value; + + PartitionStrategy(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + // 添加自定义查找方法 + public static PartitionStrategy fromValue(String value) { + for (PartitionStrategy strategy : PartitionStrategy.values()) { + if (strategy.value.equals(value)) { + return strategy; + } + } + return ALL_TO_ZERO; + } +} diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/constants/CDCConstantValue.java b/chunjun-core/src/main/java/com/dtstack/chunjun/constants/CDCConstantValue.java index d44e346e39..a49473a0a2 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/constants/CDCConstantValue.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/constants/CDCConstantValue.java @@ -31,4 +31,5 @@ public class CDCConstantValue { public static final String AFTER = "after"; public static final String SCN = "scn"; public static final String LSN = "lsn"; + public static final String PRIMARY_KEY = "primary_key"; } diff --git a/chunjun-examples/json/binlog/binlog_kafka_partition_by_key.json b/chunjun-examples/json/binlog/binlog_kafka_partition_by_key.json new file mode 100644 index 0000000000..aaed50a050 --- /dev/null +++ b/chunjun-examples/json/binlog/binlog_kafka_partition_by_key.json @@ -0,0 +1,50 @@ +{ + "job": { + "content": [ + { + "reader": { + "parameter": { + "schema": "wujuan", + "username": "zhiqiang", + "password": "lizhiqiang", + "cat": "insert,delete,update", + "jdbcUrl": "jdbc:mysql://localhost:3306/wujuan?useSSL=false", + "host": "localhost", + "port": 3306, + + "table": [], + "splitUpdate": true, + "pavingData": true + }, + "name": "binlogreader" + }, + "writer": { + "parameter": { + "producerSettings": { + "zookeeper.connect": "", + "bootstrap.servers": "localhost:9092" + }, + "dataCompelOrder": false, + "partitionStrategy" : "hash-by-key", + "topic": "wujuan4" + }, + "name": "kafkawriter", + "type": 26 + } + } + ], + "setting": { + "restore": { + "isRestore": true, + "isStream": true + }, + "errorLimit": {}, + "speed": { + "readerChannel": 1, + "writerChannel": 1, + "bytes": -1048576, + "channel": 1 + } + } + } +}