diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/DynamicKafkaSerializationSchema.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/DynamicKafkaSerializationSchema.java index 2715bbeb47..787086b47c 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/DynamicKafkaSerializationSchema.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/DynamicKafkaSerializationSchema.java @@ -26,9 +26,7 @@ import com.dtstack.chunjun.metrics.BaseMetric; import com.dtstack.chunjun.metrics.RowSizeCalculator; import com.dtstack.chunjun.restore.FormatState; -import com.dtstack.chunjun.throwable.ChunJunRuntimeException; import com.dtstack.chunjun.util.JsonUtil; -import com.dtstack.chunjun.util.ReflectionUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.LongCounter; @@ -52,8 +50,6 @@ import javax.annotation.Nullable; import java.io.Serializable; -import java.lang.reflect.InvocationTargetException; -import java.util.Objects; import java.util.Properties; /** @@ -365,21 +361,9 @@ private void initStatisticsAccumulator() { numWriteCounter = runtimeContext.getLongCounter(Metrics.NUM_WRITES); snapshotWriteCounter = runtimeContext.getLongCounter(Metrics.SNAPSHOT_WRITES); bytesWriteCounter = runtimeContext.getLongCounter(Metrics.WRITE_BYTES); - try { - durationCounter = - (LongMaximum) - Objects.requireNonNull( - ReflectionUtils.getDeclaredMethod( - runtimeContext, - "getAccumulator", - String.class, - Class.class)) - .invoke( - runtimeContext, - Metrics.WRITE_DURATION, - LongMaximum.class); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new ChunJunRuntimeException(e); + durationCounter = new LongMaximum(0); + if (runtimeContext.getAccumulator(Metrics.WRITE_DURATION) == null) { + runtimeContext.addAccumulator(Metrics.WRITE_DURATION, durationCounter); } outputMetric = new BaseMetric(runtimeContext); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/AccumulatorCollector.java b/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/AccumulatorCollector.java index e377c52b32..91db5ef1ba 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/AccumulatorCollector.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/AccumulatorCollector.java @@ -64,7 +64,7 @@ public AccumulatorCollector(StreamingRuntimeContext context, List metric valueAccumulatorMap = new HashMap<>(metricNames.size()); for (String metricName : metricNames) { valueAccumulatorMap.put( - metricName, new ValueAccumulator(context.getLongCounter(metricName), 0)); + metricName, new ValueAccumulator(context.getAccumulator(metricName), 0)); } scheduledExecutorService = @@ -179,6 +179,6 @@ public long getLocalAccumulatorValue(String name) { if (valueAccumulator == null) { return 0; } - return valueAccumulator.getLocal().getLocalValue(); + return (long) valueAccumulator.getLocal().getLocalValue(); } } diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/ValueAccumulator.java b/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/ValueAccumulator.java index 664f42da71..816de6f8d0 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/ValueAccumulator.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/metrics/ValueAccumulator.java @@ -17,7 +17,7 @@ */ package com.dtstack.chunjun.metrics; -import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.accumulators.Accumulator; import lombok.AllArgsConstructor; import lombok.Getter; @@ -27,6 +27,6 @@ @Getter @Setter public class ValueAccumulator { - private final LongCounter local; + private final Accumulator local; private long global; }