From a658533e746a33c8b00d0d9b6bd5c8a99fbcdb0d Mon Sep 17 00:00:00 2001 From: dujie Date: Mon, 1 Dec 2025 20:03:13 +0800 Subject: [PATCH] [feature-#1955]jdbcConf adds default optimization parameters --- .github/workflows/build.yml | 5 ---- .../converter/ClickhouseRawTypeConverter.java | 1 + .../connector/jdbc/dialect/JdbcDialect.java | 14 ++++++++++ .../connector/jdbc/sink/JdbcSinkFactory.java | 1 + .../jdbc/source/JdbcSourceFactory.java | 1 + .../jdbc/table/JdbcDynamicTableFactory.java | 2 ++ .../connector/mysql/dialect/MysqlDialect.java | 26 +++++++++++++++++++ .../mysql/sink/MysqlSinkFactory.java | 2 -- .../mysql/source/MysqlSourceFactory.java | 2 -- .../mysql/table/MysqlDynamicTableFactory.java | 20 -------------- .../mysqld/source/MysqldSourceFactory.java | 2 -- .../converter/OracleRawTypeMapper.java | 2 +- .../postgresql/dialect/PostgresqlDialect.java | 12 +++++++++ .../sqlserver/dialect/SqlserverDialect.java | 14 ++++++++++ .../sybase/source/SybaseSourceFactory.java | 2 -- 15 files changed, 72 insertions(+), 34 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 57af918a56..87dd0cfee9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -71,11 +71,6 @@ jobs: distribution: 'temurin' cache: 'maven' - name: add dependencies - run: | - wget http://nexus.dev.dtstack.cn/nexus/content/repositories/dtstack-release/com/esen/jdbc/gbase/8.3.81.53/gbase-8.3.81.53.jar - wget https://cdn.gbase.cn/products/27/czrl6z38BvTfEQS4uyQcS/gbasedbtjdbc_3.5.1.jar - ./mvnw install:install-file -DgroupId=com.esen.jdbc -DartifactId=gbase -Dversion=8.3.81.53 -Dpackaging=jar -Dfile=./gbase-8.3.81.53.jar - ./mvnw install:install-file -DgroupId=com.gbasedbt.jdbc.Driver -DartifactId=gbasedbt -Dversion=3.5.1_1_d0c87a -Dpackaging=jar -Dfile=./gbasedbtjdbc_3.5.1.jar - name: build project run: | ./mvnw clean package -Dmaven.test.skip --no-snapshot-updates diff --git a/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/converter/ClickhouseRawTypeConverter.java b/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/converter/ClickhouseRawTypeConverter.java index 0a9a93c91d..7df22a66fa 100644 --- a/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/converter/ClickhouseRawTypeConverter.java +++ b/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/converter/ClickhouseRawTypeConverter.java @@ -40,6 +40,7 @@ public class ClickhouseRawTypeConverter { public static DataType apply(TypeConfig type) { switch (type.getType().toUpperCase(Locale.ENGLISH)) { case "BOOLEAN": + case "BOOL": return DataTypes.BOOLEAN(); case "TINYINT": case "INT8": diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/dialect/JdbcDialect.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/dialect/JdbcDialect.java index a6bf554b09..9817033ad1 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/dialect/JdbcDialect.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/dialect/JdbcDialect.java @@ -525,4 +525,18 @@ default Function, TypeConfig> typeBuilder() { default TableIdentify getTableIdentify(String confSchema, String confTable) { return new TableIdentify(null, confSchema, confTable, this::quoteIdentifier, false); } + + /** + * Add additional parameters to jdbc properties,for reader only. + * + * @param jdbcConf jdbc datasource configuration + */ + default void putReaderExtParam(JdbcConfig jdbcConf) {} + + /** + * Add additional parameters to jdbc properties,for writer only. + * + * @param jdbcConf jdbc datasource configuration + */ + default void putWriterExtParam(JdbcConfig jdbcConf) {} } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcSinkFactory.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcSinkFactory.java index b1087bf3e6..d961f5eb1f 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/JdbcSinkFactory.java @@ -96,6 +96,7 @@ public JdbcSinkFactory(SyncConfig syncConfig, JdbcDialect jdbcDialect) { @Override public DataStreamSink createSink(DataStream dataSet) { + jdbcDialect.putWriterExtParam(jdbcConfig); JdbcOutputFormatBuilder builder = getBuilder(); initColumnInfo(); builder.setJdbcConf(jdbcConfig); diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcSourceFactory.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcSourceFactory.java index cc41ef113a..229022232a 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcSourceFactory.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcSourceFactory.java @@ -105,6 +105,7 @@ protected Class getConfClass() { @Override public DataStream createSource() { + jdbcDialect.putReaderExtParam(jdbcConfig); initColumnInfo(); initRestoreConfig(); initPollingConfig(); diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/table/JdbcDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/table/JdbcDynamicTableFactory.java index fbba0b2e2b..a854607758 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/table/JdbcDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/table/JdbcDynamicTableFactory.java @@ -185,6 +185,7 @@ protected JdbcConfig getSinkConnectionConfig( jdbcConfig.setUniqueKey(keyFields); resetTableInfo(jdbcConfig); + getDialect().putWriterExtParam(jdbcConfig); return jdbcConfig; } @@ -257,6 +258,7 @@ protected JdbcConfig getSourceConnectionConfig(ReadableConfig readableConfig) { if (StringUtils.isBlank(jdbcConfig.getCustomSql())) { resetTableInfo(jdbcConfig); } + getDialect().putReaderExtParam(jdbcConfig); return jdbcConfig; } diff --git a/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/dialect/MysqlDialect.java b/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/dialect/MysqlDialect.java index 50483b8133..c764e7f5b1 100644 --- a/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/dialect/MysqlDialect.java +++ b/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/dialect/MysqlDialect.java @@ -20,6 +20,7 @@ import com.dtstack.chunjun.config.TypeConfig; import com.dtstack.chunjun.connector.jdbc.conf.TableIdentify; +import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig; import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; import com.dtstack.chunjun.connector.mysql.converter.MysqlRawTypeConverter; @@ -37,6 +38,7 @@ import java.util.Arrays; import java.util.Locale; import java.util.Optional; +import java.util.Properties; import java.util.function.Function; import java.util.stream.Collectors; @@ -166,4 +168,28 @@ public Function, TypeConfig> typeBuilder() { return typeConfig; }); } + + @Override + public void putWriterExtParam(JdbcConfig jdbcConf) { + Properties properties = jdbcConf.getProperties(); + if (properties == null) { + properties = new Properties(); + } + properties.putIfAbsent("useCursorFetch", "true"); + properties.putIfAbsent("rewriteBatchedStatements", "true"); + properties.put("tinyInt1isBit", "false"); + jdbcConf.setProperties(properties); + } + + @Override + public void putReaderExtParam(JdbcConfig jdbcConf) { + Properties properties = jdbcConf.getProperties(); + if (properties == null) { + properties = new Properties(); + } + properties.putIfAbsent("useCursorFetch", "true"); + properties.putIfAbsent("rewriteBatchedStatements", "true"); + properties.put("tinyInt1isBit", "false"); + jdbcConf.setProperties(properties); + } } diff --git a/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/sink/MysqlSinkFactory.java b/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/sink/MysqlSinkFactory.java index 57e20c4e90..24803d4332 100644 --- a/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/sink/MysqlSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/sink/MysqlSinkFactory.java @@ -22,14 +22,12 @@ import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat; import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder; import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory; -import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect; public class MysqlSinkFactory extends JdbcSinkFactory { public MysqlSinkFactory(SyncConfig syncConfig) { super(syncConfig, new MysqlDialect()); - JdbcUtil.putExtParam(jdbcConfig); } @Override diff --git a/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/source/MysqlSourceFactory.java b/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/source/MysqlSourceFactory.java index 1f188f1297..4f4373e67a 100644 --- a/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/source/MysqlSourceFactory.java +++ b/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/source/MysqlSourceFactory.java @@ -22,7 +22,6 @@ import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat; import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormatBuilder; import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory; -import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -42,7 +41,6 @@ public MysqlSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) && jdbcConfig.getFetchSize() == 0) { jdbcConfig.setFetchSize(1000); } - JdbcUtil.putExtParam(jdbcConfig); } @Override diff --git a/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/table/MysqlDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/table/MysqlDynamicTableFactory.java index 184dd9a35a..2c7afb995f 100644 --- a/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/table/MysqlDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-mysql/src/main/java/com/dtstack/chunjun/connector/mysql/table/MysqlDynamicTableFactory.java @@ -18,19 +18,14 @@ package com.dtstack.chunjun.connector.mysql.table; -import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig; import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat; import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder; import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat; import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormatBuilder; import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory; -import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.catalog.ResolvedSchema; - public class MysqlDynamicTableFactory extends JdbcDynamicTableFactory { // 默认是Mysql流式拉取 @@ -54,21 +49,6 @@ protected int getDefaultFetchSize() { return DEFAULT_FETCH_SIZE; } - @Override - protected JdbcConfig getSourceConnectionConfig(ReadableConfig readableConfig) { - JdbcConfig jdbcConfig = super.getSourceConnectionConfig(readableConfig); - JdbcUtil.putExtParam(jdbcConfig); - return jdbcConfig; - } - - @Override - protected JdbcConfig getSinkConnectionConfig( - ReadableConfig readableConfig, ResolvedSchema schema) { - JdbcConfig jdbcConfig = super.getSinkConnectionConfig(readableConfig, schema); - JdbcUtil.putExtParam(jdbcConfig); - return jdbcConfig; - } - @Override protected JdbcInputFormatBuilder getInputFormatBuilder() { return new JdbcInputFormatBuilder(new JdbcInputFormat()); diff --git a/chunjun-connectors/chunjun-connector-mysqld/src/main/java/com/dtstack/chunjun/connector/mysqld/source/MysqldSourceFactory.java b/chunjun-connectors/chunjun-connector-mysqld/src/main/java/com/dtstack/chunjun/connector/mysqld/source/MysqldSourceFactory.java index 9373c9f883..07b2ab0088 100644 --- a/chunjun-connectors/chunjun-connector-mysqld/src/main/java/com/dtstack/chunjun/connector/mysqld/source/MysqldSourceFactory.java +++ b/chunjun-connectors/chunjun-connector-mysqld/src/main/java/com/dtstack/chunjun/connector/mysqld/source/MysqldSourceFactory.java @@ -24,7 +24,6 @@ import com.dtstack.chunjun.connector.jdbc.source.distribute.DistributedJdbcInputFormat; import com.dtstack.chunjun.connector.jdbc.source.distribute.DistributedJdbcInputFormatBuilder; import com.dtstack.chunjun.connector.jdbc.source.distribute.DistributedJdbcSourceFactory; -import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect; import com.dtstack.chunjun.connector.mysqld.utils.MySqlDataSource; import com.dtstack.chunjun.throwable.ChunJunRuntimeException; @@ -57,7 +56,6 @@ public class MysqldSourceFactory extends DistributedJdbcSourceFactory { public MysqldSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) { super(syncConfig, env, new MysqlDialect()); - JdbcUtil.putExtParam(jdbcConfig); } @Override diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeMapper.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeMapper.java index 9e4c103431..8641fc9737 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeMapper.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeMapper.java @@ -75,7 +75,7 @@ public static DataType apply(TypeConfig type) { if (type.getType().contains("TIME ZONE")) { return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(); } else { - return DataTypes.TIMESTAMP(); + return type.toTimestampDataType(0); } } throw new UnsupportedTypeException(type); diff --git a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java index 82ece6dfd7..7577b03760 100644 --- a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java +++ b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java @@ -19,6 +19,7 @@ package com.dtstack.chunjun.connector.postgresql.dialect; import com.dtstack.chunjun.config.CommonConfig; +import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig; import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; @@ -36,6 +37,7 @@ import java.sql.ResultSet; import java.util.Arrays; import java.util.Optional; +import java.util.Properties; import java.util.stream.Collectors; public class PostgresqlDialect implements JdbcDialect { @@ -160,4 +162,14 @@ public String getCopyStatement( return String.format( COPY_SQL_TEMPL, tableLocation, fieldsExpression, fieldDelimiter, nullVal); } + + @Override + public void putWriterExtParam(JdbcConfig jdbcConf) { + Properties properties = jdbcConf.getProperties(); + if (properties == null) { + properties = new Properties(); + } + properties.putIfAbsent("reWriteBatchedInserts", "true"); + jdbcConf.setProperties(properties); + } } diff --git a/chunjun-connectors/chunjun-connector-sqlserver/src/main/java/com/dtstack/chunjun/connector/sqlserver/dialect/SqlserverDialect.java b/chunjun-connectors/chunjun-connector-sqlserver/src/main/java/com/dtstack/chunjun/connector/sqlserver/dialect/SqlserverDialect.java index ef1bfc48f9..cee203be0b 100644 --- a/chunjun-connectors/chunjun-connector-sqlserver/src/main/java/com/dtstack/chunjun/connector/sqlserver/dialect/SqlserverDialect.java +++ b/chunjun-connectors/chunjun-connector-sqlserver/src/main/java/com/dtstack/chunjun/connector/sqlserver/dialect/SqlserverDialect.java @@ -20,6 +20,7 @@ import com.dtstack.chunjun.config.CommonConfig; import com.dtstack.chunjun.config.TypeConfig; +import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig; import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; import com.dtstack.chunjun.connector.jdbc.source.JdbcInputSplit; import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; @@ -55,6 +56,7 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -333,4 +335,16 @@ public Function, TypeConfig> typeBuilder() { return typeConfig; }); } + + @Override + public void putWriterExtParam(JdbcConfig jdbcConf) { + Properties properties = jdbcConf.getProperties(); + if (properties == null) { + properties = new Properties(); + } + properties.putIfAbsent("enablePrepareOnFirstPreparedStatementCall", "true"); + properties.putIfAbsent("disableStatementPooling", "false"); + properties.putIfAbsent("statementPoolingCacheSize", "50"); + jdbcConf.setProperties(properties); + } } diff --git a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/source/SybaseSourceFactory.java b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/source/SybaseSourceFactory.java index 83c4926f4e..8552576446 100644 --- a/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/source/SybaseSourceFactory.java +++ b/chunjun-connectors/chunjun-connector-sybase/src/main/java/com/dtstack/chunjun/connector/sybase/source/SybaseSourceFactory.java @@ -20,7 +20,6 @@ import com.dtstack.chunjun.config.SyncConfig; import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory; -import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; import com.dtstack.chunjun.connector.sybase.dialect.SybaseDialect; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -36,6 +35,5 @@ public SybaseSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env && jdbcConfig.getFetchSize() == 0) { jdbcConfig.setFetchSize(1000); } - JdbcUtil.putExtParam(jdbcConfig); } }