Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static com.dtstack.chunjun.constants.CDCConstantValue.DATABASE;
import static com.dtstack.chunjun.constants.CDCConstantValue.LSN;
import static com.dtstack.chunjun.constants.CDCConstantValue.OP_TIME;
import static com.dtstack.chunjun.constants.CDCConstantValue.PRIMARY_KEY;
import static com.dtstack.chunjun.constants.CDCConstantValue.SCHEMA;
import static com.dtstack.chunjun.constants.CDCConstantValue.TABLE;
import static com.dtstack.chunjun.constants.CDCConstantValue.TS;
Expand Down Expand Up @@ -262,6 +263,14 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
List<CanalEntry.Column> beforeList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterList = rowData.getAfterColumnsList();

String beforePrimaryKeys = extractPrimaryKeys(beforeList);
String afterPrimaryKeys = extractPrimaryKeys(afterList);

columnRowData.addHeader(PRIMARY_KEY);
columnRowData.addField(
new StringColumn(
beforePrimaryKeys != null ? beforePrimaryKeys : afterPrimaryKeys));

List<AbstractBaseColumn> beforeColumnList = new ArrayList<>(beforeList.size());
List<String> beforeHeaderList = new ArrayList<>(beforeList.size());
List<AbstractBaseColumn> afterColumnList = new ArrayList<>(afterList.size());
Expand Down Expand Up @@ -314,6 +323,13 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
return result;
}

private String extractPrimaryKeys(List<CanalEntry.Column> beforeList) {
return beforeList.stream()
.filter(c -> c.getIsKey())
.map(CanalEntry.Column::getName)
.collect(Collectors.joining(","));
}

/**
* 解析CanalEntry.Column
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.connector.kafka.enums.StartupMode;
import com.dtstack.chunjun.connector.kafka.sink.PartitionStrategy;

import lombok.Getter;
import lombok.Setter;
Expand All @@ -45,6 +46,9 @@ public class KafkaConfig extends CommonConfig {
private String topic;
/** kafka topics */
private List<String> topics;

/** cdc数据根据database.schema.table进行映射,输出到对应的topic* */
private String tableIdToTopicMapping;
/** 默认需要一个groupId */
private String groupId = UUID.randomUUID().toString().replace("-", "");
/** kafka启动模式 */
Expand All @@ -64,6 +68,9 @@ public class KafkaConfig extends CommonConfig {
/** kafka sink分区字段 */
private List<String> partitionAssignColumns;

// cdc 用于根据主键进行分区
private String partitionStrategy = PartitionStrategy.ALL_TO_ZERO.toString();

private String deserialization = "default";

/** deserialization的配置信息,每个deserialization的配置信息是不一样的 * */
Expand All @@ -82,7 +89,7 @@ public class KafkaConfig extends CommonConfig {

public String getOffset() {
if (offset == null) {
return null;
return offset;
}
return offset.toLowerCase(Locale.ENGLISH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import com.dtstack.chunjun.util.MapUtil;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CollectionUtil;

import org.apache.commons.collections.CollectionUtils;
Expand All @@ -71,30 +70,39 @@

import static com.dtstack.chunjun.connector.kafka.option.KafkaOptions.DEFAULT_CODEC;

/**
* @author chuixue
* @create 2021-06-07 15:51
* @description
*/
public class KafkaSyncConverter
extends AbstractRowConverter<ConsumerRecord<byte[], byte[]>, Object, byte[], String> {

/** source kafka msg decode */
private final IDecode decode;
protected final IDecode decode;
/** sink json Decoder */
protected final JsonDecoder jsonDecoder;
/** kafka Conf */
private final KafkaConfig kafkaConfig;
protected final KafkaConfig kafkaConfig;
/** kafka sink out fields */
private List<String> outList;
protected List<String> outList;

public KafkaSyncConverter(RowType rowType, KafkaConfig kafkaConfig, List<String> keyTypeList) {
super(rowType, kafkaConfig);
public KafkaSyncConverter(KafkaConfig kafkaConfig, List<String> keyTypeList) {
super(null, kafkaConfig);
this.kafkaConfig = kafkaConfig;
this.outList = keyTypeList;
this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage());
if (DEFAULT_CODEC.defaultValue().equals(kafkaConfig.getCodec())) {
this.decode = new JsonDecoder(kafkaConfig.isAddMessage());
} else {
this.decode = new TextDecoder();
}
}

public KafkaSyncConverter(RowType rowType, KafkaConfig kafkaConfig) {
super(rowType, kafkaConfig);
public KafkaSyncConverter(KafkaConfig kafkaConfig) {
super(null, kafkaConfig);
this.commonConfig = this.kafkaConfig = kafkaConfig;
this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage());
if (DEFAULT_CODEC.defaultValue().equals(kafkaConfig.getCodec())) {
this.decode = new JsonDecoder(kafkaConfig.isAddMessage());
} else {
Expand Down Expand Up @@ -162,18 +170,23 @@ public RowData toInternal(ConsumerRecord<byte[], byte[]> input) throws Exception
result = new ColumnRowData(fieldConfList.size());
}
for (int i = 0; i < fieldConfList.size(); i++) {
FieldConfig fieldConfig = fieldConfList.get(i);
Object value = map.get(fieldConfig.getName());
FieldConfig fieldConf = fieldConfList.get(i);
Object value = map.get(fieldConf.getName());
AbstractBaseColumn baseColumn =
(AbstractBaseColumn) toInternalConverters.get(i).deserialize(value);
result.addField(assembleFieldProps(fieldConfig, baseColumn));
result.addField(assembleFieldProps(fieldConf, baseColumn));
}
}
return result;
}

@Override
public byte[] toExternal(RowData rowData, byte[] output) throws Exception {
Map<String, Object> map = getExternalMap(rowData);
return MapUtil.writeValueAsString(map).getBytes(StandardCharsets.UTF_8);
}

protected Map<String, Object> getExternalMap(RowData rowData) {
Map<String, Object> map;
int arity = rowData.getArity();
ColumnRowData row = (ColumnRowData) rowData;
Expand All @@ -187,22 +200,62 @@ public byte[] toExternal(RowData rowData, byte[] output) throws Exception {
Object value;
if (object instanceof TimestampColumn) {
value = ((TimestampColumn) object).asTimestampStr();
} else if (row.getField(i).getData() == null) {
value = null;
} else {
value = org.apache.flink.util.StringUtils.arrayAwareToString(row.getField(i));
value = row.getField(i).asString();
}
map.put(kafkaConfig.getTableFields().get(i), value);
}

// get partition key value
if (!CollectionUtil.isNullOrEmpty(outList)) {
Map<String, Object> keyPartitionMap = new LinkedHashMap<>((arity << 2) / 3);
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (outList.contains(entry.getKey())) {
keyPartitionMap.put(entry.getKey(), entry.getValue());
}
}
map = keyPartitionMap;
}
} else {
String[] headers = row.getHeaders();
if (Objects.nonNull(headers) && headers.length >= 1) {
// cdc
map = new HashMap<>(headers.length >> 1);
for (String header : headers) {
AbstractBaseColumn val = row.getField(header);
if (null == val) {
if (null == val || val instanceof NullColumn) {
map.put(header, null);
} else {
map.put(header, val.getData());
// Timestamp需要转为yyyy-MM-dd hh:mm:ss.SSSSSS格式
if (val instanceof TimestampColumn) {
map.put(header, timeStampTostringBynacosPrecision(val.asTimestamp()));
} else if (val instanceof MapColumn) {
Object data = val.getData();
if (data instanceof Map) {
Map<String, Object> maps = (Map<String, Object>) data;
LinkedHashMap<String, Object> datas = new LinkedHashMap<>();
maps.forEach(
(k, v) -> {
if (v instanceof Timestamp) {
datas.put(
k,
timeStampTostringBynacosPrecision(
(Timestamp) v));
} else {
datas.put(k, v);
}
});
map.put(header, datas);
} else {
throw new RuntimeException(
"MapColumn data is not Map,map column data type is "
+ data.getClass());
}
} else {
map.put(header, val.getData());
}
}
}
if (Arrays.stream(headers)
Expand All @@ -227,19 +280,7 @@ public byte[] toExternal(RowData rowData, byte[] output) throws Exception {
map = decode.decode(String.join(",", values));
}
}

// get partition key value
if (!CollectionUtil.isNullOrEmpty(outList)) {
Map<String, Object> keyPartitionMap = new LinkedHashMap<>((arity << 2) / 3);
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (outList.contains(entry.getKey())) {
keyPartitionMap.put(entry.getKey(), entry.getValue());
}
}
map = keyPartitionMap;
}

return MapUtil.writeValueAsString(map).getBytes(StandardCharsets.UTF_8);
return map;
}

@Override
Expand Down Expand Up @@ -300,4 +341,12 @@ protected IDeserializationConverter createInternalConverter(String type) {
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}

public String timeStampTostringBynacosPrecision(Timestamp t) {
if (t.getNanos() == 0) {
return new TimestampColumn(t, 0).asTimestampStr();
} else {
return t.toString();
}
}
}
Loading
Loading