From 78600c0b0ff08493ff5744caf383fb648dc0054b Mon Sep 17 00:00:00 2001 From: dujie Date: Mon, 1 Dec 2025 08:30:54 +0800 Subject: [PATCH] [Feature-#1948][doris] performance and resource optimization --- .../chunjun-connector-doris/pom.xml | 55 +++ .../connector/doris/buffer/BufferFlusher.java | 13 + .../connector/doris/buffer/BufferPool.java | 197 +++++++++++ .../connector/doris/buffer/BufferPools.java | 202 +++++++++++ .../connector/doris/buffer/DorisSinkOP.java | 38 ++ .../DorisStreamLoadFailedException.java | 50 +++ .../connector/doris/buffer/IBufferPool.java | 9 + .../buffer/RetryableStreamLoadWriter.java | 47 +++ .../doris/buffer/StreamLoadWriter.java | 330 ++++++++++++++++++ .../converter/DorisHttpSqlConverter.java | 198 +++++++++-- .../converter/DorisHttpSyncConverter.java | 227 ++++++++++++ .../doris/converter/DorisRawTypeMapper.java | 5 + .../connector/doris/options/DorisConfig.java | 13 + .../connector/doris/options/DorisKeys.java | 8 + .../connector/doris/options/DorisOptions.java | 12 + .../connector/doris/options/LoadConfig.java | 5 + .../doris/sink/DorisDynamicTableSink.java | 25 +- .../doris/sink/DorisHttpOutputFormat.java | 150 ++++---- .../sink/DorisHttpOutputFormatBuilder.java | 12 +- .../doris/sink/DorisSinkFactory.java | 111 ++++-- .../doris/table/DorisDynamicTableFactory.java | 22 +- .../sink/format/BaseRichOutputFormat.java | 9 +- chunjun-examples/json/doris/mysql_doris.json | 129 ------- chunjun-examples/json/doris/stream_doris.json | 59 ++++ chunjun-examples/sql/doris/stream-doris.sql | 24 +- chunjun-local-test/pom.xml | 7 + 26 files changed, 1670 insertions(+), 287 deletions(-) create mode 100644 chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferFlusher.java create mode 100644 chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferPool.java create mode 100644 chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferPools.java create mode 100644 chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/DorisSinkOP.java create mode 100644 chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/DorisStreamLoadFailedException.java create mode 100644 chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/IBufferPool.java create mode 100644 chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/RetryableStreamLoadWriter.java create mode 100644 chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/StreamLoadWriter.java create mode 100644 chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpSyncConverter.java delete mode 100644 chunjun-examples/json/doris/mysql_doris.json create mode 100644 chunjun-examples/json/doris/stream_doris.json diff --git a/chunjun-connectors/chunjun-connector-doris/pom.xml b/chunjun-connectors/chunjun-connector-doris/pom.xml index ff06083942..3c1cc82579 100644 --- a/chunjun-connectors/chunjun-connector-doris/pom.xml +++ b/chunjun-connectors/chunjun-connector-doris/pom.xml @@ -38,6 +38,24 @@ + + com.fasterxml.jackson.core + jackson-databind + 2.9.10.1 + + + + org.apache.derby + derby + 10.10.2.0 + + + + io.airlift + slice + 0.41 + + com.dtstack.chunjun chunjun-connector-mysql @@ -51,6 +69,43 @@ org.apache.maven.plugins maven-antrun-plugin + + org.apache.maven.plugins + maven-shade-plugin + 3.1.0 + + + package + + shade + + + false + + + + + + com.fasterxml.jackson.core + shade.dorisbatch.com.fasterxml.jackson.core + + + com.fasterxml.jackson.databind + shade.dorisbatch.com.fasterxml.jackson.databind + + + com.fasterxml.jackson.annotation + shade.dorisbatch.com.fasterxml.jackson.annotation + + + org.apache.http + shade.core.org.apache.http + + + + + + diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferFlusher.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferFlusher.java new file mode 100644 index 0000000000..175aeb4170 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferFlusher.java @@ -0,0 +1,13 @@ +package com.dtstack.chunjun.connector.doris.buffer; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Supplier; + +public interface BufferFlusher { + void write(InputStream inputStream, int length) throws Exception; + + void write(Supplier supplier, int length) throws Exception; + + void close() throws IOException; +} diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferPool.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferPool.java new file mode 100644 index 0000000000..411bd2bed7 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferPool.java @@ -0,0 +1,197 @@ +package com.dtstack.chunjun.connector.doris.buffer; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +import static java.nio.charset.StandardCharsets.UTF_8; + +class BufferPool implements IBufferPool { + private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class); + private static final byte OPEN_BRACKET = 0x5B; // [ + private static final byte COMMA = 0x2C; // , + private static final byte CLOSE_BRACKET = 0x5D; // ] + + private final int capacity; + private volatile int position; + private volatile BufferState state; + private final ReentrantLock writeLock = new ReentrantLock(); + private final int bufferId; + private final AtomicInteger writeCount = new AtomicInteger(0); + private final BufferFlusher flusher;; + private final Consumer numWriterCount; + private final Consumer bytesWriterCount; + + private Slice slice; + + public enum BufferState { + AVAILABLE, // 可用状态 + WRITING, // 正在写入 + FLUSHING, // 正在刷写 + FULL // 已满待刷写 + } + + public BufferPool( + int capacity, + int bufferId, + BufferFlusher flusher, + Consumer numWriterCount, + Consumer bytesWriterCount) { + this.capacity = capacity; + this.slice = Slices.allocate(capacity); + this.position = 0; + this.bufferId = bufferId; + this.flusher = flusher; + this.numWriterCount = numWriterCount; + this.bytesWriterCount = bytesWriterCount; + this.reset(); + } + + /** + * 写入数据到缓冲区 + * + * @param data 要写入的数据 + * @return 实际写入的字节数,-1表示缓冲区已满 + */ + public int write(byte[] data) { + writeLock.lock(); + try { + return write(Slices.wrappedBuffer(data)); + } finally { + writeLock.unlock(); + } + } + + /** + * 写入数据到缓冲区 + * + * @param data 要写入的数据 + * @return 实际写入的字节数,-1表示缓冲区已满 + */ + public int write(String data) { + writeLock.lock(); + try { + return write(Slices.copiedBuffer(data, UTF_8)); + } finally { + writeLock.unlock(); + } + } + + @Override + public void flush() throws Exception { + writeLock.lock(); + try { + if (position > 1) { + state = BufferState.FLUSHING; + replaceLastCommonToClosingBracket(); + LOG.info( + "小缓冲区 {} 刷写数据,数据总量 {} MB,条数:{}", + bufferId, + (int) Math.ceil((double) position / (1024 * 1024)), + writeCount.get()); + numWriterCount.accept((long) writeCount.get()); + bytesWriterCount.accept((long) position); + Slice output = slice.slice(0, position); + LOG.info("开始异步刷写缓冲区 {},数据大小:{} 字节", this.bufferId, position); + // 执行实际的刷写操作 + flusher.write(output::getInput, position); + LOG.info("缓冲区 {} 刷写完成,数据大小:{} 字节", this.bufferId, position); + // 刷写完成后重置缓冲区并放回可用队列 + this.reset(); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public void shutdown() throws Exception { + this.flush(); + this.slice.clear(); + this.slice = null; + } + + /** + * 写入数据到缓冲区 + * + * @param data 要写入的数据 + * @return 实际写入的字节数,-1表示缓冲区已满 + */ + public int write(Slice data) { + writeLock.lock(); + try { + if (state != BufferState.WRITING && state != BufferState.AVAILABLE) { + return -1; // 缓冲区不可写 + } + + int availableSpace = capacity - position; + if (availableSpace <= 0) { + state = BufferState.FULL; + return -1; // 缓冲区已满 + } + + if (data.length() + 1 + position > capacity) { + state = BufferState.FULL; + return -1; // 数据无法存入缓冲区 + } + + state = BufferState.WRITING; + int bytesToWrite = data.length(); + slice.setBytes(position, data); + position += bytesToWrite; + slice.setByte(position, COMMA); + position += 1; + writeCount.incrementAndGet(); + + if (position >= capacity) { + state = BufferState.FULL; + } + + return bytesToWrite; + } finally { + writeLock.unlock(); + } + } + + // [{},{}, => [{},{}] + private void replaceLastCommonToClosingBracket() { + if (state == BufferState.FLUSHING) { + slice.setByte(position - 1, CLOSE_BRACKET); + } + } + + /** 刷写完成后重置缓冲区 */ + public void reset() { + writeLock.lock(); + try { + slice.setByte(0, OPEN_BRACKET); + position = 1; + writeCount.set(0); + state = BufferState.AVAILABLE; + LOG.info("SmallBuffer " + bufferId + " 已重置为可用状态"); + } finally { + writeLock.unlock(); + } + } + + public BufferState getState() { + return state; + } + + public int getBufferId() { + return bufferId; + } + + public int getPosition() { + return position; + } + + public int getCapacity() { + return capacity; + } +} diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferPools.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferPools.java new file mode 100644 index 0000000000..c8611e4d3b --- /dev/null +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/BufferPools.java @@ -0,0 +1,202 @@ +package com.dtstack.chunjun.connector.doris.buffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +/** 主缓冲池类 */ +public class BufferPools implements IBufferPool { + private static final Logger LOG = LoggerFactory.getLogger(BufferPools.class); + + private final List bufferPools; + private final BlockingQueue availableBuffers; + private volatile BufferPool currentBuffer; + private final ExecutorService flushExecutor; + private final ReentrantLock poolLock = new ReentrantLock(); + private final AtomicInteger writeCount = new AtomicInteger(0); + private final Consumer errorHandler; + private final int flusherSize; + + public BufferPools( + int poolSize, + int bufferCapacity, + BufferFlusher flusher, + Consumer errorHandler, + boolean keepOrder, + Consumer numWriterCount, + Consumer bytesWriterCount) { + this.bufferPools = new ArrayList<>(poolSize); + this.availableBuffers = new ArrayBlockingQueue<>(poolSize); + this.flusherSize = keepOrder ? 1 : poolSize; + this.flushExecutor = Executors.newFixedThreadPool(flusherSize); + this.errorHandler = errorHandler; + + // 初始化所有小缓冲池 + for (int i = 0; i < poolSize; i++) { + BufferPool buffer = + new BufferPool(bufferCapacity, i, flusher, numWriterCount, bytesWriterCount); + bufferPools.add(buffer); + availableBuffers.offer(buffer); + } + + LOG.info("BufferPool 初始化完成,包含 " + poolSize + " 个缓冲区"); + } + + /** 写入数据到缓冲池 */ + public int write(String data) { + if (data == null || "".equalsIgnoreCase(data.trim())) { + return 0; + } + + poolLock.lock(); + try { + // 如果当前没有活跃的缓冲区,选择一个新的 + if (currentBuffer == null || currentBuffer.getState() == BufferPool.BufferState.FULL) { + selectNewBuffer(); + } + + int written = currentBuffer.write(data); + if (written > 0) { + writeCount.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "写入 " + + written + + " 字节到缓冲区 " + + currentBuffer.getBufferId() + + ",总写入次数:" + + writeCount.get()); + } + + // 如果当前缓冲区已满,异步刷写并选择新缓冲区 + if (currentBuffer.getState() == BufferPool.BufferState.FULL) { + scheduleFlush(currentBuffer); + currentBuffer = null; // 清空当前缓冲区引用 + } + return written; + } else { + // 当前缓冲区已满,尝试选择新的缓冲区 + if (currentBuffer.getState() == BufferPool.BufferState.FULL) { + scheduleFlush(currentBuffer); + currentBuffer = null; + return write(data); // 递归调用重试 + } + return -1; + } + } finally { + poolLock.unlock(); + } + } + + /** 选择新的可用缓冲区 */ + private boolean selectNewBuffer() { + try { + LOG.info("选择新的缓冲区..."); + BufferPool newBuffer = availableBuffers.take(); + if (newBuffer != null) { + currentBuffer = newBuffer; + LOG.info( + "选择缓冲区 {} 作为当前写入目标,当前空闲缓冲区数量:{}", + newBuffer.getBufferId(), + availableBuffers.size()); + return true; + } + return false; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** 安排异步刷写 */ + private void scheduleFlush(BufferPool buffer) { + flushExecutor.submit( + () -> { + try { + buffer.flush(); + availableBuffers.offer(buffer); + LOG.info("缓冲区 " + buffer.getBufferId() + " 刷写完成并重新进入候选状态"); + } catch (Exception e) { + LOG.error("刷写缓冲区 " + buffer.getBufferId() + " 时发生错误:" + e.getMessage(), e); + // 即使出错也要重置缓冲区 + buffer.reset(); + availableBuffers.offer(buffer); + errorHandler.accept(e); + } + }); + } + + /** 主动刷写当前缓冲区 */ + @Override + public void flush() { + poolLock.lock(); + try { + if (currentBuffer != null && currentBuffer.getPosition() > 1) { + LOG.info("主动刷写当前缓冲区 {}", currentBuffer.getBufferId()); + scheduleFlush(currentBuffer); + currentBuffer = null; + } else { + LOG.debug("当前缓冲区为空或无数据,跳过刷写"); + } + } finally { + poolLock.unlock(); + } + } + + /** 获取缓冲池状态信息 */ + public void printStatus() { + LOG.info("\n=== BufferPool 状态 ==="); + LOG.info("可用缓冲区数量:" + availableBuffers.size()); + LOG.info("当前活跃缓冲区:" + (currentBuffer != null ? currentBuffer.getBufferId() : "无")); + LOG.info("总写入次数:" + writeCount.get()); + + for (BufferPool buffer : bufferPools) { + LOG.info( + "缓冲区 " + + buffer.getBufferId() + + " - 状态:" + + buffer.getState() + + ",位置:" + + buffer.getPosition() + + "/" + + buffer.getCapacity()); + } + LOG.info("========================\n"); + } + + /** 关闭缓冲池 */ + public void shutdown() { + LOG.info("刷写..."); + scheduleFlush(currentBuffer); + LOG.info("正在关闭 BufferPool..."); + flushExecutor.shutdown(); + try { + if (!flushExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS)) { + flushExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + flushExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + currentBuffer = null; + for (BufferPool bufferPool : bufferPools) { + try { + bufferPool.shutdown(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + bufferPools.clear(); + availableBuffers.clear(); + LOG.info("BufferPool 已关闭"); + } +} diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/DorisSinkOP.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/DorisSinkOP.java new file mode 100644 index 0000000000..bcdb068c31 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/DorisSinkOP.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.doris.buffer; + +import com.dtstack.chunjun.throwable.ChunJunRuntimeException; + +import org.apache.flink.types.RowKind; + +/** StarRocks sink operator. */ +public class DorisSinkOP { + public static final String COLUMN_KEY = "__DORIS_DELETE_SIGN__"; + + public static String parse(RowKind kind) { + if (RowKind.INSERT.equals(kind) || RowKind.UPDATE_AFTER.equals(kind)) { + return "0"; + } + if (RowKind.DELETE.equals(kind) || RowKind.UPDATE_BEFORE.equals(kind)) { + return "1"; + } + throw new ChunJunRuntimeException("Unsupported row kind."); + } +} diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/DorisStreamLoadFailedException.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/DorisStreamLoadFailedException.java new file mode 100644 index 0000000000..efe5ec82fd --- /dev/null +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/DorisStreamLoadFailedException.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.doris.buffer; + +import java.io.IOException; +import java.util.Map; + +public class DorisStreamLoadFailedException extends IOException { + + static final long serialVersionUID = 1L; + + private final Map response; + private boolean reCreateLabel; + + public DorisStreamLoadFailedException(String message, Map response) { + super(message); + this.response = response; + } + + public DorisStreamLoadFailedException( + String message, Map response, boolean reCreateLabel) { + super(message); + this.response = response; + this.reCreateLabel = reCreateLabel; + } + + public Map getFailedResponse() { + return response; + } + + public boolean needReCreateLabel() { + return reCreateLabel; + } +} diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/IBufferPool.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/IBufferPool.java new file mode 100644 index 0000000000..f3d5a14cbe --- /dev/null +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/IBufferPool.java @@ -0,0 +1,9 @@ +package com.dtstack.chunjun.connector.doris.buffer; + +public interface IBufferPool { + int write(String data); + + void flush() throws Exception; + + void shutdown() throws Exception; +} diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/RetryableStreamLoadWriter.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/RetryableStreamLoadWriter.java new file mode 100644 index 0000000000..d7e0448698 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/RetryableStreamLoadWriter.java @@ -0,0 +1,47 @@ +package com.dtstack.chunjun.connector.doris.buffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Supplier; + +public class RetryableStreamLoadWriter implements BufferFlusher { + private static final Logger LOG = LoggerFactory.getLogger(RetryableStreamLoadWriter.class); + private final int retries; + private final StreamLoadWriter streamLoadWriter; + + public RetryableStreamLoadWriter(StreamLoadWriter streamLoadWriter, int retries) { + this.retries = retries; + this.streamLoadWriter = streamLoadWriter; + } + + @Override + public void write(InputStream inputStream, int length) throws Exception { + streamLoadWriter.write(inputStream, length); + } + + @Override + public void write(Supplier supplier, int length) throws Exception { + for (int retry = 1; retry <= retries; retry++) { + try (InputStream inputStream = supplier.get()) { + streamLoadWriter.write(inputStream, length); + break; + } catch (Exception e) { + LOG.warn("Stream load failed on attempt {}. Error: {}", retry, e.getMessage()); + if (retry == retries) { + LOG.error("All {} retries failed. Throwing exception.", retries); + throw e; + } else { + Thread.sleep(5000L); + } + } + } + } + + @Override + public void close() throws IOException { + streamLoadWriter.close(); + } +} diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/StreamLoadWriter.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/StreamLoadWriter.java new file mode 100644 index 0000000000..7e9c57b4c1 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/buffer/StreamLoadWriter.java @@ -0,0 +1,330 @@ +package com.dtstack.chunjun.connector.doris.buffer; + +import com.dtstack.chunjun.connector.doris.options.DorisConfig; + +import com.alibaba.fastjson.JSON; +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_SOCKET_TIMEOUT_MS_DEFAULT; + +public class StreamLoadWriter implements BufferFlusher { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(StreamLoadWriter.class); + private static final String RESULT_FAILED = "Fail"; + private static final String RESULT_LABEL_EXISTED = "Label Already Exists"; + private static final String LAEBL_STATE_VISIBLE = "VISIBLE"; + private static final String LAEBL_STATE_COMMITTED = "COMMITTED"; + private static final String RESULT_LABEL_PREPARE = "PREPARE"; + private static final String RESULT_LABEL_ABORTED = "ABORTED"; + private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; + private static final int ERROR_LOG_MAX_LENGTH = 3000; + + private final DorisConfig dorisConf; + private long pos; + + private final String database; + private final String table; + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(("yyyyMMdd_HHmmss")); + private HttpClientBuilder httpclientBuilder; + + public StreamLoadWriter(DorisConfig dorisConf) { + this.dorisConf = dorisConf; + this.database = dorisConf.getDatabase(); + this.table = dorisConf.getTable(); + int socketTimeout = + dorisConf.getLoadConfig().getSocketTimeOutMs() == null + ? DORIS_SOCKET_TIMEOUT_MS_DEFAULT + : dorisConf.getLoadConfig().getSocketTimeOutMs(); + + int connectTimeout = + dorisConf.getLoadConfig().getRequestConnectTimeoutMs() == null + ? DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT + : dorisConf.getLoadConfig().getRequestConnectTimeoutMs(); + + RequestConfig config = + RequestConfig.custom() + .setSocketTimeout(socketTimeout) + .setConnectTimeout(connectTimeout) + .build(); + this.httpclientBuilder = + HttpClients.custom() + .setDefaultRequestConfig(config) + .setRedirectStrategy( + new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + } + + @Override + public void write(InputStream inputStream, int length) throws Exception { + String host = getAvailableHost(); + if (null == host) { + throw new IOException("None of the hosts in `load_url` could be connected."); + } + String lable = initBatchLabel(); + String loadUrl = host + "/api/" + database + "/" + table + "/_stream_load"; + LOG.info("Start to join batch data: label[{}].", lable); + Map loadResult = doHttpPut(loadUrl, lable, inputStream, length); + final String keyStatus = "Status"; + if (null == loadResult || !loadResult.containsKey(keyStatus)) { + throw new IOException( + "Unable to flush data to StarRocks: unknown result status, usually caused by: 1.authorization or permission related problems. 2.Wrong column_separator or row_delimiter. 3.Column count exceeded the limitation."); + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Stream Load response: \n%s\n", JSON.toJSONString(loadResult))); + } + if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { + Map logMap = new HashMap<>(); + if (loadResult.containsKey("ErrorURL")) { + logMap.put("streamLoadErrorLog", getErrorLog((String) loadResult.get("ErrorURL"))); + } + throw new DorisStreamLoadFailedException( + String.format( + "Failed to flush data to StarRocks, Error " + "response: \n%s\n%s\n", + JSON.toJSONString(loadResult), JSON.toJSONString(logMap)), + loadResult); + } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { + LOG.error(String.format("Stream Load response: \n%s\n", JSON.toJSONString(loadResult))); + // has to block-checking the state to get the final result + checkLabelState(host, lable); + } + } + + @Override + public void write(Supplier supplier, int length) throws Exception { + try (InputStream inputStream = supplier.get()) { + write(inputStream, length); + } + } + + private Map doHttpPut( + String loadUrl, String label, InputStream inputStream, int length) throws IOException { + LOG.info( + "Executing stream load to: '{}', size: '{}', thread: {}", + loadUrl, + length, + Thread.currentThread().getId()); + HttpPut httpPut = new HttpPut(loadUrl); + + httpPut.setHeader( + "columns", + dorisConf.getColumn().stream() + .map(i -> String.format("`%s`", i.getName())) + .collect(Collectors.joining(","))); + + if (!httpPut.containsHeader("timeout")) { + httpPut.setHeader("timeout", "60"); + } + + if (dorisConf.isEnableDelete()) { + httpPut.setHeader("hidden_columns", DorisSinkOP.COLUMN_KEY); + } + + httpPut.setHeader("Expect", "100-continue"); + httpPut.setHeader("ignore_json_size", "true"); + httpPut.setHeader("strip_outer_array", "true"); + httpPut.setHeader("partial_update", String.valueOf(dorisConf.isPartialUpdate())); + httpPut.setHeader("strict_mode", String.valueOf(dorisConf.isStrictMode())); + httpPut.setHeader("format", "json"); + httpPut.setHeader("label", label); + httpPut.setHeader( + "Authorization", + getBasicAuthHeader(dorisConf.getUsername(), dorisConf.getPassword())); + httpPut.setEntity(new InputStreamEntity(inputStream, length)); + httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); + + try (CloseableHttpClient httpclient = httpclientBuilder.build()) { + CloseableHttpResponse resp = httpclient.execute(httpPut); + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) return null; + Map result = + (Map) JSON.parse(EntityUtils.toString(respEntity)); + resp.close(); + return result; + } + } + + private void checkLabelState(String host, String label) throws IOException { + int idx = 0; + while (true) { + try { + TimeUnit.SECONDS.sleep(Math.min(++idx, 5)); + } catch (InterruptedException ex) { + break; + } + HttpGet httpGet = + new HttpGet( + host + + "/api/" + + dorisConf.getDatabase() + + "/get_load_state?label=" + + label); + httpGet.setHeader( + "Authorization", + getBasicAuthHeader(dorisConf.getUsername(), dorisConf.getPassword())); + httpGet.setHeader("Connection", "close"); + + try (CloseableHttpClient httpclient = httpclientBuilder.build()) { + CloseableHttpResponse resp = httpclient.execute(httpGet); + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) { + throw new DorisStreamLoadFailedException( + String.format( + "Failed to flush data to StarRocks, Error " + + "could not get the final state of label[%s].\n", + label), + null); + } + Map result = + (Map) JSON.parse(EntityUtils.toString(respEntity)); + String labelState = (String) result.get("state"); + if (null == labelState) { + throw new DorisStreamLoadFailedException( + String.format( + "Failed to flush data to StarRocks, Error " + + "could not get the final state of label[%s]. response[%s]\n", + label, EntityUtils.toString(respEntity)), + null); + } + LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState)); + resp.close(); + switch (labelState) { + case LAEBL_STATE_VISIBLE: + case LAEBL_STATE_COMMITTED: + return; + case RESULT_LABEL_PREPARE: + continue; + case RESULT_LABEL_ABORTED: + throw new DorisStreamLoadFailedException( + String.format( + "Failed to flush data to StarRocks, Error " + + "label[%s] state[%s]\n", + label, labelState), + null, + true); + case RESULT_LABEL_UNKNOWN: + default: + throw new DorisStreamLoadFailedException( + String.format( + "Failed to flush data to StarRocks, Error " + + "label[%s] state[%s]\n", + label, labelState), + null); + } + } + } + } + + public String initBatchLabel() { + String formatDate = LocalDateTime.now().format(dateTimeFormatter); + return String.format( + "flinkx_connector_%s_%s", + formatDate, UUID.randomUUID().toString().replaceAll("-", "")); + } + + private String getErrorLog(String errorUrl) { + if (errorUrl == null || !errorUrl.startsWith("http")) { + return null; + } + try { + HttpGet httpGet = new HttpGet(errorUrl); + try (CloseableHttpClient httpclient = httpclientBuilder.build()) { + CloseableHttpResponse resp = httpclient.execute(httpGet); + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) { + return null; + } + String errorLog = EntityUtils.toString(respEntity); + if (errorLog != null && errorLog.length() > ERROR_LOG_MAX_LENGTH) { + errorLog = errorLog.substring(0, ERROR_LOG_MAX_LENGTH); + } + resp.close(); + return errorLog; + } + } catch (Exception e) { + LOG.warn("Failed to get error log.", e); + return "Failed to get error log: " + e.getMessage(); + } + } + + private HttpEntity getHttpEntity(CloseableHttpResponse resp) { + int code = resp.getStatusLine().getStatusCode(); + if (200 != code) { + LOG.warn("Request failed with code:{}", code); + return null; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + LOG.warn("Request failed with empty response."); + return null; + } + return respEntity; + } + + private String getAvailableHost() { + List hostList = dorisConf.getFeNodes(); + long tmp = pos + hostList.size(); + for (; pos < tmp; pos++) { + String host = "http://" + hostList.get((int) (pos % hostList.size())); + if (tryHttpConnection(host)) { + return host; + } + } + return null; + } + + private boolean tryHttpConnection(String host) { + try { + URL url = new URL(host); + HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(dorisConf.getLoadConfig().getHttpCheckTimeoutMs()); + co.connect(); + co.disconnect(); + return true; + } catch (Exception e1) { + LOG.warn("Failed to connect to address:{}", host, e1); + return false; + } + } + + public void close() throws IOException {} + + private static String getBasicAuthHeader(String username, String password) { + String auth = username + ":" + password; + byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); + return "Basic " + new String(encodedAuth); + } +} diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpSqlConverter.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpSqlConverter.java index 71b203c886..34156d8f9f 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpSqlConverter.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpSqlConverter.java @@ -18,24 +18,50 @@ package com.dtstack.chunjun.connector.doris.converter; +import com.dtstack.chunjun.config.CommonConfig; +import com.dtstack.chunjun.connector.doris.buffer.DorisSinkOP; +import com.dtstack.chunjun.connector.doris.options.DorisConfig; import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.converter.IDeserializationConverter; import com.dtstack.chunjun.converter.ISerializationConverter; +import com.dtstack.chunjun.element.AbstractBaseColumn; +import com.dtstack.chunjun.element.column.BigDecimalColumn; +import com.dtstack.chunjun.element.column.BooleanColumn; +import com.dtstack.chunjun.element.column.ByteColumn; +import com.dtstack.chunjun.element.column.DoubleColumn; +import com.dtstack.chunjun.element.column.FloatColumn; +import com.dtstack.chunjun.element.column.IntColumn; +import com.dtstack.chunjun.element.column.LongColumn; +import com.dtstack.chunjun.element.column.ShortColumn; +import com.dtstack.chunjun.element.column.SqlDateColumn; +import com.dtstack.chunjun.element.column.StringColumn; +import com.dtstack.chunjun.element.column.TimestampColumn; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; -public class DorisHttpSqlConverter - extends AbstractRowConverter { +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; - private static final String NULL_VALUE = "\\N"; +public class DorisHttpSqlConverter + extends AbstractRowConverter, LogicalType> { - private static final long serialVersionUID = -2636292632781799617L; + private final boolean enableDelete; + private final List columnList; + public static final String DATETIME_FORMAT_SHORT = "yyyy-MM-dd HH:mm:ss"; - public DorisHttpSqlConverter(RowType rowType) { - super(rowType); + public DorisHttpSqlConverter(RowType rowType, CommonConfig conf) { + super(rowType, conf); + this.columnList = rowType.getFieldNames(); for (int i = 0; i < rowType.getFieldCount(); i++) { toInternalConverters.add( wrapIntoNullableInternalConverter( @@ -44,40 +70,154 @@ public DorisHttpSqlConverter(RowType rowType) { wrapIntoNullableExternalConverter( createExternalConverter(fieldTypes[i]), fieldTypes[i])); } + this.enableDelete = ((DorisConfig) conf).isEnableDelete(); } @Override - public RowData toInternal(RowData input) { - return null; + protected ISerializationConverter> wrapIntoNullableExternalConverter( + ISerializationConverter> ISerializationConverter, + LogicalType type) { + return (rowData, index, output) -> { + if (rowData == null + || rowData.isNullAt(index) + || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) { + output.put(columnList.get(index), null); + } else { + ISerializationConverter.serialize(rowData, index, output); + } + }; + } + + @Override + public RowData toInternal(RowData input) throws Exception { + throw new UnsupportedOperationException("Not support toInternal converter"); } @Override - public String[] toExternal(RowData rowData, String[] joiner) throws Exception { - for (int index = 0; index < fieldTypes.length; index++) { - toExternalConverters.get(index).serialize(rowData, index, joiner); + public Map toExternal(RowData rowData, Map output) + throws Exception { + for (int index = 0; index < toExternalConverters.size(); index++) { + toExternalConverters.get(index).serialize(rowData, index, output); + } + if (enableDelete) { + output.put(DorisSinkOP.COLUMN_KEY, DorisSinkOP.parse(rowData.getRowKind())); } - return joiner; + + return output; } @Override - protected ISerializationConverter wrapIntoNullableExternalConverter( - ISerializationConverter ISerializationConverter, LogicalType type) { - return ((rowData, index, joiner) -> { - if (rowData == null - || rowData.isNullAt(index) - || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) { - joiner[index] = NULL_VALUE; - } else { - ISerializationConverter.serialize(rowData, index, joiner); - } - }); + protected IDeserializationConverter createInternalConverter( + LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return val -> null; + case BOOLEAN: + return val -> new BooleanColumn((Boolean) val); + case TINYINT: + return val -> new ByteColumn((byte) val); + case SMALLINT: + return val -> new ShortColumn((short) val); + case INTEGER: + return val -> new IntColumn((int) val); + case BIGINT: + return val -> new LongColumn((long) val); + case FLOAT: + return val -> new FloatColumn((float) val); + case DOUBLE: + return val -> new DoubleColumn((double) val); + case DECIMAL: + return val -> new BigDecimalColumn((BigDecimal) val); + case CHAR: + case VARCHAR: + return val -> new StringColumn((String) val); + case DATE: + return val -> new SqlDateColumn(Date.valueOf((String) val)); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = ((TimestampType) type).getPrecision(); + return val -> + new TimestampColumn(Timestamp.valueOf((String) val), timestampPrecision); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } } @Override - protected ISerializationConverter createExternalConverter(LogicalType type) { - return (rowData, index, joiner) -> { - Object value = ((GenericRowData) rowData).getField(index); - joiner[index] = "".equals(value.toString()) ? NULL_VALUE : value.toString(); - }; + protected ISerializationConverter> createExternalConverter( + LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getBoolean(index) ? 1 : 0); + case TINYINT: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getByte(index)); + case SMALLINT: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getShort(index)); + case INTEGER: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getInt(index)); + case BIGINT: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getLong(index)); + case FLOAT: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getFloat(index)); + case DOUBLE: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getDouble(index)); + case DECIMAL: + final int decimalPrecision = ((DecimalType) type).getPrecision(); + final int decimalScale = ((DecimalType) type).getScale(); + return (rowData, index, map) -> + map.put( + columnList.get(index), + rowData.getDecimal(index, decimalPrecision, decimalScale) + .toBigDecimal()); + case CHAR: + case VARCHAR: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getString(index).toString()); + + case DATE: + return (rowData, index, map) -> + map.put( + columnList.get(index), + Date.valueOf(LocalDate.ofEpochDay(rowData.getInt(index))) + .toString()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (rowData, index, map) -> { + final int timestampPrecision = ((TimestampType) type).getPrecision(); + map.put( + columnList.get(index), + rowData.getTimestamp(index, timestampPrecision) + .toTimestamp() + .toString()); + }; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return (rowData, index, map) -> { + final int localP = ((LocalZonedTimestampType) type).getPrecision(); + map.put( + columnList.get(index), + rowData.getTimestamp(index, localP).toTimestamp().toString()); + }; + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + public static String addStrForNum(String str, int strLength, String appendStr) { + int strLen = str.length(); + if (strLen < strLength) { + while (strLen < strLength) { + StringBuffer sb = new StringBuffer(); + sb.append(str).append(appendStr); + str = sb.toString(); + strLen = str.length(); + } + } + return str; } } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpSyncConverter.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpSyncConverter.java new file mode 100644 index 0000000000..c67cd0796e --- /dev/null +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpSyncConverter.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.doris.converter; + +import com.dtstack.chunjun.config.CommonConfig; +import com.dtstack.chunjun.connector.doris.buffer.DorisSinkOP; +import com.dtstack.chunjun.connector.doris.options.DorisConfig; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.converter.IDeserializationConverter; +import com.dtstack.chunjun.converter.ISerializationConverter; +import com.dtstack.chunjun.element.AbstractBaseColumn; +import com.dtstack.chunjun.element.ColumnRowData; +import com.dtstack.chunjun.element.column.BigDecimalColumn; +import com.dtstack.chunjun.element.column.BooleanColumn; +import com.dtstack.chunjun.element.column.ByteColumn; +import com.dtstack.chunjun.element.column.DoubleColumn; +import com.dtstack.chunjun.element.column.FloatColumn; +import com.dtstack.chunjun.element.column.IntColumn; +import com.dtstack.chunjun.element.column.LongColumn; +import com.dtstack.chunjun.element.column.ShortColumn; +import com.dtstack.chunjun.element.column.SqlDateColumn; +import com.dtstack.chunjun.element.column.StringColumn; +import com.dtstack.chunjun.element.column.TimestampColumn; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Map; + +public class DorisHttpSyncConverter + extends AbstractRowConverter, LogicalType> { + + private final boolean enableDelete; + private final List columnList; + public static final String DATETIME_FORMAT_SHORT = "yyyy-MM-dd HH:mm:ss"; + + public DorisHttpSyncConverter(RowType rowType, CommonConfig conf) { + super(rowType, conf); + this.columnList = rowType.getFieldNames(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + toInternalConverters.add( + wrapIntoNullableInternalConverter( + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( + wrapIntoNullableExternalConverter( + createExternalConverter(fieldTypes[i]), fieldTypes[i])); + } + this.enableDelete = ((DorisConfig) conf).isEnableDelete(); + } + + @Override + protected ISerializationConverter> wrapIntoNullableExternalConverter( + ISerializationConverter> ISerializationConverter, + LogicalType type) { + return (rowData, index, output) -> { + if (rowData == null + || rowData.isNullAt(index) + || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) { + output.put(columnList.get(index), null); + } else { + ISerializationConverter.serialize(rowData, index, output); + } + }; + } + + @Override + public RowData toInternal(RowData input) throws Exception { + throw new UnsupportedOperationException("Not support toInternal converter"); + } + + @Override + public Map toExternal(RowData rowData, Map output) + throws Exception { + for (int index = 0; index < toExternalConverters.size(); index++) { + toExternalConverters.get(index).serialize(rowData, index, output); + } + if (enableDelete) { + output.put(DorisSinkOP.COLUMN_KEY, DorisSinkOP.parse(rowData.getRowKind())); + } + return output; + } + + @Override + protected IDeserializationConverter createInternalConverter( + LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return val -> null; + case BOOLEAN: + return val -> new BooleanColumn((Boolean) val); + case TINYINT: + return val -> new ByteColumn((byte) val); + case SMALLINT: + return val -> new ShortColumn((short) val); + case INTEGER: + return val -> new IntColumn((int) val); + case BIGINT: + return val -> new LongColumn((long) val); + case FLOAT: + return val -> new FloatColumn((float) val); + case DOUBLE: + return val -> new DoubleColumn((double) val); + case DECIMAL: + return val -> new BigDecimalColumn((BigDecimal) val); + case CHAR: + case VARCHAR: + return val -> new StringColumn((String) val); + case DATE: + return val -> new SqlDateColumn(Date.valueOf((String) val)); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = ((TimestampType) type).getPrecision(); + return val -> + new TimestampColumn(Timestamp.valueOf((String) val), timestampPrecision); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + @Override + protected ISerializationConverter> createExternalConverter( + LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return (rowData, index, map) -> + map.put( + columnList.get(index), + ((ColumnRowData) rowData).getField(index).asBoolean() ? 1 : 0); + case TINYINT: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getByte(index)); + case SMALLINT: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getShort(index)); + case INTEGER: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getInt(index)); + case BIGINT: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getLong(index)); + case FLOAT: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getFloat(index)); + case DOUBLE: + return (rowData, index, map) -> + map.put(columnList.get(index), rowData.getDouble(index)); + case DECIMAL: + return (rowData, index, map) -> + map.put( + columnList.get(index), + ((ColumnRowData) rowData).getField(index).asBigDecimal()); + case CHAR: + case VARCHAR: + return (rowData, index, map) -> + map.put( + columnList.get(index), + ((ColumnRowData) rowData).getField(index).asString()); + case DATE: + return (rowData, index, map) -> + map.put( + columnList.get(index), + ((ColumnRowData) rowData).getField(index).asSqlDate().toString()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = ((TimestampType) type).getPrecision(); + final String formatStr; + if (timestampPrecision > 0) { + formatStr = + addStrForNum( + DATETIME_FORMAT_SHORT + ".", + DATETIME_FORMAT_SHORT.length() + 1 + timestampPrecision, + "S"); + } else { + formatStr = DATETIME_FORMAT_SHORT; + } + return (rowData, index, map) -> { + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(formatStr); + map.put( + columnList.get(index), + dateTimeFormatter.format( + ((ColumnRowData) rowData) + .getField(index) + .asTimestamp() + .toLocalDateTime())); + }; + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + public static String addStrForNum(String str, int strLength, String appendStr) { + int strLen = str.length(); + if (strLen < strLength) { + while (strLen < strLength) { + StringBuffer sb = new StringBuffer(); + sb.append(str).append(appendStr); + str = sb.toString(); + strLen = str.length(); + } + } + return str; + } +} diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRawTypeMapper.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRawTypeMapper.java index ff6a1e7475..76c37667be 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRawTypeMapper.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRawTypeMapper.java @@ -53,6 +53,7 @@ public static DataType apply(TypeConfig type) { case "DECIMAL UNSIGNED": case "NUMERIC": case "DECIMALV2": + case "DECIMALV3": return type.toDecimalDataType(); case "REAL": case "DOUBLE": @@ -68,8 +69,11 @@ public static DataType apply(TypeConfig type) { case "JSON": case "ENUM": case "SET": + case "LARGEINT": + case "ARRAY": return DataTypes.STRING(); case "DATE": + case "DATEV2": return DataTypes.DATE(); case "TIME": return DataTypes.TIME(); @@ -77,6 +81,7 @@ public static DataType apply(TypeConfig type) { return DataTypes.INTERVAL(DataTypes.YEAR()); case "TIMESTAMP": case "DATETIME": + case "DATETIMEV2": return DataTypes.TIMESTAMP(0); case "TINYBLOB": case "BLOB": diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfig.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfig.java index da28e4ce5e..e90d87ac9f 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfig.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfig.java @@ -58,6 +58,19 @@ public class DorisConfig extends JdbcConfig { private Properties loadProperties; + private int poolSize = 4; + + // MB + private int memorySizePerPool = 64; + // partial_update 为 true,以开启部分更新特性,默认为行模式部分更新 + private boolean partialUpdate = false; + + private boolean keepOrder = false; + + private boolean strictMode = false; + + private boolean enableDelete = false; + public String serializeToString() { try { String optionsJson = GsonUtil.GSON.toJson(this); diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java index 6813879710..bbcf36b9f4 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java @@ -48,6 +48,8 @@ public final class DorisKeys { public static final String REQUEST_CONNECT_TIMEOUT_MS_KEY = "requestConnectTimeoutMs"; + public static final String SOCKET_TIMEOUT_MS_KEY = "socketTimeOutMs"; + public static final String REQUEST_READ_TIMEOUT_MS_KEY = "requestReadTimeoutMs"; public static final String REQUEST_QUERY_TIMEOUT_S_KEY = "requestQueryTimeoutS"; @@ -86,8 +88,14 @@ public final class DorisKeys { public static final Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; + // stream load + public static final Integer HTTP_CHECK_TIMEOUT_DEFAULT = 10 * 1000; + public static final Integer DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600; + // default checkpoint timeout is 5min + public static final Integer DORIS_SOCKET_TIMEOUT_MS_DEFAULT = 4 * 60 * 1000; + public static final String DORIS_TABLET_SIZE = "doris.request.tablet.size"; public static final Integer DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisOptions.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisOptions.java index 8641a294c7..3d4f3a1fe3 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisOptions.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisOptions.java @@ -85,6 +85,12 @@ public class DorisOptions { .defaultValue(DorisKeys.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) .withDescription(""); + public static final ConfigOption HTTP_CHECK_TIMEOUT_MS = + ConfigOptions.key("httpCheckTimeoutS") + .intType() + .defaultValue(DorisKeys.HTTP_CHECK_TIMEOUT_DEFAULT) + .withDescription(""); + public static final ConfigOption REQUEST_QUERY_TIMEOUT_SEC = ConfigOptions.key("requestQueryTimeoutS") .intType() @@ -150,4 +156,10 @@ public class DorisOptions { .intType() .defaultValue(DorisKeys.DORIS_BATCH_SIZE_DEFAULT) .withDescription(""); + + public static final ConfigOption SINK_ENABLE_DELETE = + ConfigOptions.key("sink.enable-delete") + .booleanType() + .defaultValue(false) + .withDescription("whether to enable the delete function"); } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConfig.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConfig.java index 307a673a6a..c937262d72 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConfig.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConfig.java @@ -41,6 +41,11 @@ public class LoadConfig implements Serializable { private Integer requestQueryTimeoutS; + private Integer socketTimeOutMs; + + /** Timeout duration of the HTTP connection when checking the connectivity with StarRocks */ + private Integer httpCheckTimeoutMs; + private Integer requestRetries; private Integer requestBatchSize; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisDynamicTableSink.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisDynamicTableSink.java index 819638cf2b..f4acf740dc 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisDynamicTableSink.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisDynamicTableSink.java @@ -44,6 +44,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; public class DorisDynamicTableSink extends JdbcDynamicTableSink { @@ -84,10 +85,10 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { private DorisHttpOutputFormatBuilder httpBuilder(RowType rowType, DorisConfig dorisConfig) { DorisHttpOutputFormatBuilder builder = new DorisHttpOutputFormatBuilder(); - builder.setColumns(tableSchema.getColumnNames()); - builder.setConfig(dorisConfig); + dorisConfig.setBatchSize(1); // batchSize 设置为 1,刷新依靠缓冲池溢出机制 builder.setDorisOptions(dorisConfig); - builder.setRowConverter(new DorisHttpSqlConverter(rowType)); + dorisConfig.setColumn(getFieldConfigFromSchema()); + builder.setRowConverter(new DorisHttpSqlConverter(rowType, dorisConfig)); return builder; } @@ -135,6 +136,24 @@ public DynamicTableSink copy() { return new DorisDynamicTableSink(physicalSchema, dorisConfig); } + private List getFieldConfigFromSchema() { + + return physicalSchema.getColumns().stream() + .map( + tableColumn -> { + FieldConfig fieldConfig = new FieldConfig(); + fieldConfig.setName(tableColumn.getName()); + fieldConfig.setType( + TypeConfig.fromString( + tableColumn + .getDataType() + .getLogicalType() + .asSummaryString())); + return fieldConfig; + }) + .collect(Collectors.toList()); + } + @Override public String asSummaryString() { return "doris sink"; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java index 0b495ad9f6..5bb93f42f8 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java @@ -18,49 +18,64 @@ package com.dtstack.chunjun.connector.doris.sink; +import com.dtstack.chunjun.connector.doris.buffer.BufferFlusher; +import com.dtstack.chunjun.connector.doris.buffer.BufferPools; +import com.dtstack.chunjun.connector.doris.buffer.IBufferPool; +import com.dtstack.chunjun.connector.doris.buffer.RetryableStreamLoadWriter; +import com.dtstack.chunjun.connector.doris.buffer.StreamLoadWriter; import com.dtstack.chunjun.connector.doris.options.DorisConfig; -import com.dtstack.chunjun.connector.doris.rest.Carrier; -import com.dtstack.chunjun.connector.doris.rest.DorisLoadClient; -import com.dtstack.chunjun.connector.doris.rest.DorisStreamLoad; import com.dtstack.chunjun.sink.format.BaseRichOutputFormat; -import com.dtstack.chunjun.throwable.WriteRecordException; +import com.dtstack.chunjun.throwable.NoRestartException; +import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.table.data.RowData; +import com.alibaba.fastjson.JSON; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_RETRIES_DEFAULT; /** use DorisStreamLoad to write data into doris */ @Slf4j public class DorisHttpOutputFormat extends BaseRichOutputFormat { private static final long serialVersionUID = 992571748616683426L; - private DorisConfig options; - private DorisLoadClient client; - /** cache carriers * */ - private final Map carrierMap = new HashMap<>(); - - private List columns; - - public void setOptions(DorisConfig options) { - this.options = options; - } + @Setter private DorisConfig dorisConf; - public void setColumns(List columns) { - this.columns = columns; - } + private IBufferPool pools; + private BufferFlusher writer; + private volatile AtomicReference flushException; @Override public void open(int taskNumber, int numTasks) throws IOException { - DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(options); - dorisStreamLoad.replaceBackend(); - client = new DorisLoadClient(dorisStreamLoad, options.isNameMapped(), options); super.open(taskNumber, numTasks); + this.flushException = new AtomicReference<>(); + + this.writer = new RetryableStreamLoadWriter(new StreamLoadWriter(dorisConf), getRetrys()); + this.pools = + new BufferPools( + dorisConf.getPoolSize(), + dorisConf.getMemorySizePerPool() * 1024 * 1024, + writer, + throwable -> flushException.set(throwable), + dorisConf.isKeepOrder(), + new Consumer() { + @Override + public synchronized void accept(Long num) { + numWriteCounter.add(num); + } + }, + new Consumer() { + @Override + public synchronized void accept(Long size) { + bytesWriteCounter.add(size); + } + }); } @Override @@ -69,58 +84,67 @@ protected void openInternal(int taskNumber, int numTasks) { } @Override - protected void closeInternal() {} + protected void closeInternal() { + try { + pools.shutdown(); + pools = null; + writer.close(); + writer = null; + } catch (Exception e) { + throw new RuntimeException(e); + } + checkFlushException(); + } + // 重写此方法,否则numWriteCounter bytesWriteCounter指标重复计算 @Override - protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordException { + protected void writeSingleRecord(RowData rowData, LongCounter numWriteCounter) { + writeSingleRecordInternal(rowData); + } + + @Override + protected void writeSingleRecordInternal(RowData rowData) { + checkFlushException(); try { - client.process(rowData, columns, rowConverter); + Object map = rowConverter.toExternal(rowData, new HashMap<>(columnNameList.size())); + String json = JSON.toJSONString(map); + pools.write(json); } catch (Exception e) { - throw new WriteRecordException("", e, 0, rowData); + throw new RuntimeException(e); } } @Override protected void writeMultipleRecordsInternal() throws Exception { - int size = rows.size(); - for (int i = 0; i < size; i++) { - client.process(rows.get(i), i, carrierMap, columns, rowConverter); - } - if (!carrierMap.isEmpty()) { - Set keys = carrierMap.keySet(); - for (String key : keys) { - try { - Carrier carrier = carrierMap.get(key); - client.flush(carrier); - Set indexes = carrier.getRowDataIndexes(); - List removeList = new ArrayList<>(indexes.size()); - // Add the amount of data written successfully. - numWriteCounter.add(indexes.size()); - for (int index : indexes) { - removeList.add(rows.get(index)); - } - // Remove RowData from rows after a successful write - // to prevent multiple writes. - rows.removeAll(removeList); - } finally { - carrierMap.remove(key); - } - } - } + throw new RuntimeException( + "DorisHttpOutputFormat does not support writeMultipleRecordsInternal"); } @Override - protected synchronized void writeRecordInternal() { - if (flushEnable.get()) { - try { - writeMultipleRecordsInternal(); - } catch (Exception e) { - // 批量写异常转为单条写 - rows.forEach(item -> writeSingleRecord(item, numWriteCounter)); - } finally { - // Data is either recorded dirty data or written normally - rows.clear(); + protected synchronized void writeRecordInternal() {} + + private int getRetrys() { + + if (dorisConf.getLoadConfig().getRequestRetries() == null) { + return DORIS_REQUEST_RETRIES_DEFAULT; + } else { + return dorisConf.getLoadConfig().getRequestRetries(); + } + } + + private void checkFlushException() { + Throwable throwable = flushException.get(); + if (throwable != null) { + StackTraceElement[] stack = Thread.currentThread().getStackTrace(); + for (StackTraceElement stackTraceElement : stack) { + log.info( + stackTraceElement.getClassName() + + "." + + stackTraceElement.getMethodName() + + " line:" + + stackTraceElement.getLineNumber()); } + throw new NoRestartException("Writing records to Doris failed.", throwable); } } } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormatBuilder.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormatBuilder.java index 56ca74a4e4..373773027a 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormatBuilder.java @@ -19,11 +19,8 @@ package com.dtstack.chunjun.connector.doris.sink; import com.dtstack.chunjun.connector.doris.options.DorisConfig; -import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig; import com.dtstack.chunjun.sink.format.BaseRichOutputFormatBuilder; -import java.util.List; - public class DorisHttpOutputFormatBuilder extends BaseRichOutputFormatBuilder { @@ -32,13 +29,8 @@ public DorisHttpOutputFormatBuilder() { } public void setDorisOptions(DorisConfig options) { - JdbcConfig jdbcConfig = options.setToJdbcConf(); - format.setOptions(options); - format.setConfig(jdbcConfig); - } - - public void setColumns(List columns) { - format.setColumns(columns); + super.setConfig(options); + format.setDorisConf(options); } @Override diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java index 0983d64210..ca66970a8f 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java @@ -21,6 +21,7 @@ import com.dtstack.chunjun.config.OperatorConfig; import com.dtstack.chunjun.config.SyncConfig; import com.dtstack.chunjun.config.TypeConfig; +import com.dtstack.chunjun.connector.doris.converter.DorisHttpSyncConverter; import com.dtstack.chunjun.connector.doris.converter.DorisRawTypeMapper; import com.dtstack.chunjun.connector.doris.options.DorisConfig; import com.dtstack.chunjun.connector.doris.options.LoadConfig; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Properties; +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.BATCH_SIZE_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DESERIALIZE_ARROW_ASYNC_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DESERIALIZE_QUEUE_SIZE_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_BATCH_SIZE_DEFAULT; @@ -62,7 +64,9 @@ import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_RETRIES_DEFAULT; +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_SOCKET_TIMEOUT_MS_DEFAULT; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.EXEC_MEM_LIMIT_KEY; +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.HTTP_CHECK_TIMEOUT_DEFAULT; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_OPTIONS_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_BATCH_SIZE_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_CONNECT_TIMEOUT_MS_KEY; @@ -70,6 +74,7 @@ import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_READ_TIMEOUT_MS_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_RETRIES_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_TABLET_SIZE_KEY; +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.SOCKET_TIMEOUT_MS_KEY; public class DorisSinkFactory extends SinkFactory { private final DorisConfig options; @@ -96,59 +101,101 @@ public DorisSinkFactory(SyncConfig syncConfig) { LoadConfig loadConfig = LoadConfig.builder() .requestTabletSize( - (int) - properties.getOrDefault( - REQUEST_TABLET_SIZE_KEY, Integer.MAX_VALUE)) + Integer.parseInt( + properties + .getOrDefault( + REQUEST_TABLET_SIZE_KEY, Integer.MAX_VALUE) + .toString())) .requestConnectTimeoutMs( - (int) - properties.getOrDefault( - REQUEST_CONNECT_TIMEOUT_MS_KEY, - DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)) + Integer.parseInt( + properties + .getOrDefault( + REQUEST_CONNECT_TIMEOUT_MS_KEY, + DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) + .toString())) + .socketTimeOutMs( + Integer.parseInt( + properties + .getOrDefault( + SOCKET_TIMEOUT_MS_KEY, + DORIS_SOCKET_TIMEOUT_MS_DEFAULT) + .toString())) .requestReadTimeoutMs( - (int) - properties.getOrDefault( - REQUEST_READ_TIMEOUT_MS_KEY, - DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)) + Integer.parseInt( + properties + .getOrDefault( + REQUEST_READ_TIMEOUT_MS_KEY, + DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) + .toString())) + .httpCheckTimeoutMs( + Integer.parseInt( + properties + .getOrDefault( + REQUEST_READ_TIMEOUT_MS_KEY, + HTTP_CHECK_TIMEOUT_DEFAULT) + .toString())) .requestQueryTimeoutS( - (int) - properties.getOrDefault( - REQUEST_QUERY_TIMEOUT_S_KEY, - DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)) + Integer.parseInt( + properties + .getOrDefault( + REQUEST_QUERY_TIMEOUT_S_KEY, + DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) + .toString())) .requestRetries( - (int) - properties.getOrDefault( - REQUEST_RETRIES_KEY, DORIS_REQUEST_RETRIES_DEFAULT)) + Integer.parseInt( + properties + .getOrDefault( + REQUEST_RETRIES_KEY, + DORIS_REQUEST_RETRIES_DEFAULT) + .toString())) .requestBatchSize( - (int) - properties.getOrDefault( - REQUEST_BATCH_SIZE_KEY, DORIS_BATCH_SIZE_DEFAULT)) + Integer.parseInt( + properties + .getOrDefault( + REQUEST_BATCH_SIZE_KEY, + DORIS_BATCH_SIZE_DEFAULT) + .toString())) .execMemLimit( - (long) - properties.getOrDefault( - EXEC_MEM_LIMIT_KEY, DORIS_EXEC_MEM_LIMIT_DEFAULT)) + Long.parseLong( + properties + .getOrDefault( + EXEC_MEM_LIMIT_KEY, + DORIS_EXEC_MEM_LIMIT_DEFAULT) + .toString())) .deserializeQueueSize( - (int) - properties.getOrDefault( - DESERIALIZE_QUEUE_SIZE_KEY, - DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)) + Integer.parseInt( + properties + .getOrDefault( + DESERIALIZE_QUEUE_SIZE_KEY, + DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) + .toString())) .deserializeArrowAsync( - (boolean) - properties.getOrDefault( - DESERIALIZE_ARROW_ASYNC_KEY, - DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)) + Boolean.parseBoolean( + properties + .getOrDefault( + DESERIALIZE_ARROW_ASYNC_KEY, + DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) + .toString())) .build(); options.setColumn(syncConfig.getWriter().getFieldList()); options.setLoadProperties(properties); options.setLoadConfig(loadConfig); + options.setBatchSize(parameter.getIntVal(BATCH_SIZE_KEY, DORIS_BATCH_SIZE_DEFAULT)); super.initCommonConf(options); } @Override public DataStreamSink createSink(DataStream dataSet) { if (options.getFeNodes() != null) { + options.setBatchSize(1); // batchSize 设置为 1,刷新依靠缓冲池溢出机制 DorisHttpOutputFormatBuilder builder = new DorisHttpOutputFormatBuilder(); builder.setDorisOptions(options); + builder.setRowConverter( + new DorisHttpSyncConverter( + TableUtil.createRowType(options.getColumn(), getRawTypeMapper()), + options), + useAbstractBaseColumn); return createOutput(dataSet, builder.finish()); } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java index 0151118417..5d27d1b9bc 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java @@ -26,6 +26,7 @@ import com.dtstack.chunjun.connector.doris.source.DorisInputFormat; import com.dtstack.chunjun.connector.doris.source.DorisInputFormatBuilder; import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig; +import com.dtstack.chunjun.connector.jdbc.config.SinkConnectionConfig; import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; import com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource; import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormatBuilder; @@ -43,6 +44,7 @@ import org.apache.commons.lang3.StringUtils; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -54,8 +56,10 @@ import static com.dtstack.chunjun.connector.jdbc.options.JdbcLookupOptions.DRUID_PREFIX; import static com.dtstack.chunjun.connector.jdbc.options.JdbcLookupOptions.VERTX_PREFIX; import static com.dtstack.chunjun.connector.jdbc.options.JdbcLookupOptions.getLibConfMap; +import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.SINK_ALL_REPLACE; import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.SINK_POST_SQL; import static com.dtstack.chunjun.connector.jdbc.options.JdbcSinkOptions.SINK_PRE_SQL; +import static com.dtstack.chunjun.table.options.SinkOptions.SINK_BUFFER_FLUSH_INTERVAL; /** declare doris table factory info. */ public class DorisDynamicTableFactory extends JdbcDynamicTableFactory @@ -84,8 +88,8 @@ public DynamicTableSink createDynamicTableSink(Context context) { // 3.封装参数 ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema(); - DorisConfig ftpConfig = getConfByOptions(config); - return new DorisDynamicTableSink(resolvedSchema, ftpConfig); + DorisConfig dorisConfig = getConfByOptions(config); + return new DorisDynamicTableSink(resolvedSchema, dorisConfig); } @Override @@ -179,12 +183,22 @@ private static DorisConfig getConfByOptions(ReadableConfig config) { dorisConfig.setPassword(config.get(DorisOptions.PASSWORD)); } + dorisConfig.setEnableDelete(config.get(DorisOptions.SINK_ENABLE_DELETE)); + + SinkConnectionConfig conf = new SinkConnectionConfig(); + conf.setJdbcUrl(url); + conf.setTable(Arrays.asList(tableName)); + conf.setSchema(schema); + conf.setAllReplace(config.get(SINK_ALL_REPLACE)); + LoadConfig loadConfig = getLoadConfig(config); dorisConfig.setLoadConfig(loadConfig); dorisConfig.setLoadProperties(new Properties()); dorisConfig.setMaxRetries(config.get(DorisOptions.MAX_RETRIES)); dorisConfig.setMode(config.get(DorisOptions.WRITE_MODE)); dorisConfig.setBatchSize(config.get(DorisOptions.BATCH_SIZE)); + dorisConfig.setFlushIntervalMills(config.get(SINK_BUFFER_FLUSH_INTERVAL)); + dorisConfig.setConnection(Collections.singletonList(conf)); return dorisConfig; } @@ -194,6 +208,7 @@ private static LoadConfig getLoadConfig(ReadableConfig config) { .requestTabletSize(config.get(DorisOptions.REQUEST_TABLET_SIZE)) .requestConnectTimeoutMs(config.get(DorisOptions.REQUEST_CONNECT_TIMEOUT_MS)) .requestReadTimeoutMs(config.get(DorisOptions.REQUEST_READ_TIMEOUT_MS)) + .httpCheckTimeoutMs(config.get(DorisOptions.HTTP_CHECK_TIMEOUT_MS)) .requestQueryTimeoutS(config.get(DorisOptions.REQUEST_QUERY_TIMEOUT_SEC)) .requestRetries(config.get(DorisOptions.REQUEST_RETRIES)) .requestBatchSize(config.get(DorisOptions.REQUEST_BATCH_SIZE)) @@ -242,7 +257,8 @@ public Set> optionalOptions() { DorisOptions.LINE_DELIMITER, DorisOptions.MAX_RETRIES, DorisOptions.WRITE_MODE, - DorisOptions.BATCH_SIZE) + DorisOptions.BATCH_SIZE, + DorisOptions.SINK_ENABLE_DELETE) .collect(Collectors.toSet()); options.addAll(optionalOptions); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseRichOutputFormat.java b/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseRichOutputFormat.java index 8975ce7ebb..b0221a909a 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseRichOutputFormat.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseRichOutputFormat.java @@ -134,6 +134,10 @@ public abstract class BaseRichOutputFormat extends RichOutputFormat /** 存储用于批量写入的数据行数 */ protected transient List rows; + + /** 批量提交字节数 */ + protected long rowsBytes; + /** 存储用于批量写入的数据字节数 */ protected transient long batchMaxByteSize; /** 数据类型转换器 */ @@ -305,6 +309,7 @@ public synchronized void writeRecord(RowData rowData) { writeSingleRecord(rowData, numWriteCounter); size = 1; } else { + rowsBytes += rowSizeCalculator.getObjectSize(rowData); rows.add(rowData); if (rows.size() >= batchSize) { writeRecordInternal(); @@ -313,7 +318,6 @@ public synchronized void writeRecord(RowData rowData) { } } updateDuration(); - bytesWriteCounter.add(rowSizeCalculator.getObjectSize(rowData)); if (checkpointEnabled) { snapshotWriteCounter.add(size); } @@ -489,6 +493,7 @@ protected void writeSingleRecord(RowData rowData, LongCounter numWriteCounter) { try { writeSingleRecordInternal(rowData); numWriteCounter.add(1L); + bytesWriteCounter.add(rowSizeCalculator.getObjectSize(rowData)); } catch (WriteRecordException e) { long globalErrors = accumulatorCollector.getAccumulatorValue(Metrics.NUM_ERRORS, false); @@ -508,12 +513,14 @@ protected synchronized void writeRecordInternal() { try { writeMultipleRecordsInternal(); numWriteCounter.add(rows.size()); + bytesWriteCounter.add(rowsBytes); } catch (Exception e) { // 批量写异常转为单条写 rows.forEach(item -> writeSingleRecord(item, numWriteCounter)); } finally { // Data is either recorded dirty data or written normally rows.clear(); + rowsBytes = 0; } } } diff --git a/chunjun-examples/json/doris/mysql_doris.json b/chunjun-examples/json/doris/mysql_doris.json deleted file mode 100644 index 6a1a811a61..0000000000 --- a/chunjun-examples/json/doris/mysql_doris.json +++ /dev/null @@ -1,129 +0,0 @@ -{ - "job": { - "content": [ - { - "reader": { - "parameter": { - "username": "username", - "password": "password", - "connection": [ - { - "jdbcUrl": [ - "jdbc:mysql://127.0.0.1:3306/shitou?useUnicode=true&characterEncoding=utf8" - ], - "table": [ - "expamle_tbl" - ] - } - ], - "column": [ - { - "name": "user_id", - "type": "bigint" - }, - { - "name": "date", - "type": "date" - }, - { - "name": "timestamp", - "type": "timestamp" - }, - { - "name": "city", - "type": "varchar" - }, - { - "name": "age", - "type": "smallint" - }, - { - "name": "sex", - "type": "tinyint" - }, - { - "name": "cost", - "type": "bigint" - }, - { - "name": "max_dwell_time", - "type": "int" - }, - { - "name": "min_dwell_time", - "type": "int" - } - ], - "customSql": "", - "where": "", - "queryTimeOut": 1000, - "requestAccumulatorInterval": 2 - }, - "name": "mysqlreader" - }, - "writer": { - "parameter": { - "batchSize": 1024, - "maxRetries": 3, - "feNodes": ["127.0.0.1:8030"], - "column": [ - { - "name": "user_id", - "type": "largeint" - }, - { - "name": "date", - "type": "date" - }, - { - "name": "timestamp", - "type": "timestamp" - }, - { - "name": "city", - "type": "varchar" - }, - { - "name": "age", - "type": "smallint" - }, - { - "name": "sex", - "type": "tinyint" - }, - - { - "name": "cost", - "type": "bigint" - }, - { - "name": "max_dwell_time", - "type": "int" - }, - { - "name": "min_dwell_time", - "type": "int" - } - - ], - "username": "username", - "password": "password", - "database": "doris", - "table": "expamle_tbl", - "fieldDelimiter": "\t" - }, - "name": "dorisbatchwriter" - } - } - ], - "setting": { - "restore": { - "isRestore": false, - "isStream": true - }, - "speed": { - "channel": 1 - } - } - } -} diff --git a/chunjun-examples/json/doris/stream_doris.json b/chunjun-examples/json/doris/stream_doris.json new file mode 100644 index 0000000000..a11e8a2dba --- /dev/null +++ b/chunjun-examples/json/doris/stream_doris.json @@ -0,0 +1,59 @@ +{ + "job" : { + "content" : [ { + "reader":{ + "parameter":{ + "column":[ + { + "name":"int_data", + "type":"int" + }, + { + "name":"string_data", + "type":"string" + } + ], + "sliceRecordCount":[ + "1000000" + ] + }, + "name":"streamreader" + }, + "writer" : { + "parameter" : { + "column" : [ { + "customConverterType" : "INT", + "name" : "id", + "isPart" : false, + "type" : "INT", + "key" : "id" + }, { + "customConverterType" : "VARCHAR", + "name" : "name", + "isPart" : false, + "type" : "VARCHAR", + "key" : "name" + } ], + "database" : "test_1121", + "table" : "test_002", + "username" : "root", + "loadOptions":{"requestConnectTimeoutMs": "20000"}, + "feNodes" : [ "172.16.124.70:18030" ] + }, + "name" : "doriswriter" + } + } ], + "setting" : { + "restore" : { + }, + "errorLimit" : { + }, + "speed" : { + "readerChannel" : 1, + "writerChannel" : 1, + "bytes" : 0, + "channel" : 1 + } + } + } +} diff --git a/chunjun-examples/sql/doris/stream-doris.sql b/chunjun-examples/sql/doris/stream-doris.sql index 27d024f3a0..1157271ce7 100644 --- a/chunjun-examples/sql/doris/stream-doris.sql +++ b/chunjun-examples/sql/doris/stream-doris.sql @@ -1,11 +1,7 @@ CREATE TABLE test_kafka ( id int, - name VARCHAR, - message VARCHAR, - age int, - address VARCHAR, - proc_time AS PROCTIME() + name VARCHAR ) WITH ( 'connector' = 'stream-x', 'number-of-rows' = '1000', -- 输入条数,默认无限 @@ -15,29 +11,23 @@ CREATE TABLE test_kafka CREATE TABLE doris_out_test ( id int, - name VARCHAR, - message VARCHAR, - age int, - address VARCHAR + name VARCHAR ) WITH ( 'password' = '', 'connector' = 'doris-x', 'sink.buffer-flush.interval' = '1000', 'sink.all-replace' = 'false', 'sink.buffer-flush.max-rows' = '100', - 'schema' = 'tiezhu', - 'table-name' = 'doris_2', + 'schema' = 'test_1121', + 'table-name' = 'test_002', 'sink.parallelism' = '1', --- 'feNodes' = 'doris_fe:8030', - 'url' = 'jdbc:mysql://doris_fe:9030', + 'feNodes' = '172.16.124.70:18030', +-- 'url' = 'jdbc:mysql://doris_fe:9030', 'username' = 'root' ); insert into doris_out_test select id as id, - name as name, - message as message, - age as age, - address as address + name as name from test_kafka; diff --git a/chunjun-local-test/pom.xml b/chunjun-local-test/pom.xml index 22f564d18b..e43ba75795 100644 --- a/chunjun-local-test/pom.xml +++ b/chunjun-local-test/pom.xml @@ -51,6 +51,13 @@ chunjun-connector-stream ${project.version} + + + + com.dtstack.chunjun + chunjun-connector-doris + ${project.version} +