From 1075122e3ed4dea0fdc1d3606d02b7c7ccd43474 Mon Sep 17 00:00:00 2001 From: wujuan Date: Tue, 2 Dec 2025 10:38:06 +0800 Subject: [PATCH] [hotfix-#1960][kafka] KafkaSyncConverter construct parameter transmission error --- .../kafka/converter/KafkaSyncConverter.java | 9 +++++---- .../kafka/converter/KafkaSyncKeyConverter.java | 11 ++++++----- .../connector/kafka/sink/KafkaSinkFactory.java | 13 +++++++++---- .../connector/kafka/source/KafkaSourceFactory.java | 5 ++++- 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java index 22000c3b37..8b9247100d 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java @@ -47,6 +47,7 @@ import com.dtstack.chunjun.util.MapUtil; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.CollectionUtil; import org.apache.commons.collections.CollectionUtils; @@ -87,8 +88,8 @@ public class KafkaSyncConverter /** kafka sink out fields */ protected List outList; - public KafkaSyncConverter(KafkaConfig kafkaConfig, List keyTypeList) { - super(null, kafkaConfig); + public KafkaSyncConverter(KafkaConfig kafkaConfig, RowType rowType, List keyTypeList) { + super(rowType, kafkaConfig); this.kafkaConfig = kafkaConfig; this.outList = keyTypeList; this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage()); @@ -99,8 +100,8 @@ public KafkaSyncConverter(KafkaConfig kafkaConfig, List keyTypeList) { } } - public KafkaSyncConverter(KafkaConfig kafkaConfig) { - super(null, kafkaConfig); + public KafkaSyncConverter(KafkaConfig kafkaConfig, RowType rowType) { + super(rowType, kafkaConfig); this.commonConfig = this.kafkaConfig = kafkaConfig; this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage()); if (DEFAULT_CODEC.defaultValue().equals(kafkaConfig.getCodec())) { diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java index 6b866b1512..4361c7d7ec 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java @@ -25,6 +25,7 @@ import com.dtstack.chunjun.util.MapUtil; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.CollectionUtil; import org.apache.commons.lang3.StringUtils; @@ -41,15 +42,15 @@ */ public class KafkaSyncKeyConverter extends KafkaSyncConverter { - private final PartitionStrategy partitionStrategy; + private PartitionStrategy partitionStrategy; - public KafkaSyncKeyConverter(KafkaConfig kafkaConf, List keyTypeList) { - super(kafkaConf, keyTypeList); + public KafkaSyncKeyConverter(KafkaConfig kafkaConf, RowType rowType, List keyTypeList) { + super(kafkaConf, rowType, keyTypeList); this.partitionStrategy = PartitionStrategy.fromValue(kafkaConf.getPartitionStrategy()); } - public KafkaSyncKeyConverter(KafkaConfig kafkaConf) { - this(kafkaConf, null); + public KafkaSyncKeyConverter(KafkaConfig kafkaConf, RowType rowType) { + this(kafkaConf, rowType, null); } @Override diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java index 586a7291f0..54ef6e97e0 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java @@ -29,12 +29,14 @@ import com.dtstack.chunjun.converter.RawTypeMapper; import com.dtstack.chunjun.sink.SinkFactory; import com.dtstack.chunjun.util.GsonUtil; +import com.dtstack.chunjun.util.TableUtil; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.Preconditions; @@ -96,6 +98,9 @@ protected DataStreamSink createOutput( "when kafka sink dataCompelOrder set true , Parallelism must 1."); } + RowType rowType = + TableUtil.createRowType(kafkaConfig.getColumn(), KafkaRawTypeMapping::apply); + RowSerializationSchema rowSerializationSchema; if (!CollectionUtil.isNullOrEmpty(kafkaConfig.getPartitionAssignColumns())) { Preconditions.checkState( @@ -115,15 +120,15 @@ protected DataStreamSink createOutput( kafkaConfig, new CustomerFlinkPartition<>(), new KafkaSyncKeyConverter( - kafkaConfig, kafkaConfig.getPartitionAssignColumns()), - new KafkaSyncConverter(kafkaConfig)); + kafkaConfig, rowType, kafkaConfig.getPartitionAssignColumns()), + new KafkaSyncConverter(kafkaConfig, rowType)); } else { rowSerializationSchema = new RowSerializationSchema( kafkaConfig, new CustomerFlinkPartition<>(), - new KafkaSyncKeyConverter(kafkaConfig), - new KafkaSyncConverter(kafkaConfig)); + new KafkaSyncKeyConverter(kafkaConfig, rowType), + new KafkaSyncConverter(kafkaConfig, rowType)); } KafkaProducer kafkaProducer = diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/source/KafkaSourceFactory.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/source/KafkaSourceFactory.java index 2281872031..6b40c6730e 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/source/KafkaSourceFactory.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/source/KafkaSourceFactory.java @@ -118,7 +118,10 @@ public DataStream createSource() { RowType rowType = TableUtil.createRowType(kafkaConfig.getColumn(), KafkaRawTypeMapping::apply); DynamicKafkaDeserializationSchema deserializationSchema = - new RowDeserializationSchema(kafkaConfig, new KafkaSyncConverter(kafkaConfig)); + new RowDeserializationSchema( + kafkaConfig, + new KafkaSyncConverter( + kafkaConfig, rowType, kafkaConfig.getPartitionAssignColumns())); KafkaConsumerWrapper consumer = new KafkaConsumerWrapper(topics, deserializationSchema, props); switch (kafkaConfig.getMode()) {