diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 488b658fce2..0002284b7f9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -84,6 +84,15 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private boolean datastreamPipelineMode = true; + @Config(key = "ozone.client.datastream.sync.size", + defaultValue = "0B", + type = ConfigType.SIZE, + description = "The minimum size of written data before forcing the datanodes " + + "in the pipeline to flush the pending data to underlying storage." + + " If set to zero or negative, the client will not force the datanodes to flush.", + tags = ConfigTag.CLIENT) + private int dataStreamSyncSize = 0; + @Config(key = "ozone.client.stream.buffer.increment", defaultValue = "0B", type = ConfigType.SIZE, @@ -570,6 +579,10 @@ public void setDatastreamPipelineMode(boolean datastreamPipelineMode) { this.datastreamPipelineMode = datastreamPipelineMode; } + public int getDataStreamSyncSize() { + return dataStreamSyncSize; + } + public void setHBaseEnhancementsAllowed(boolean isHBaseEnhancementsEnabled) { this.hbaseEnhancementsAllowed = isHBaseEnhancementsEnabled; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index e398c79ce8d..e570ba30885 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -131,7 +131,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private final DataStreamOutput out; private CompletableFuture dataStreamCloseReply; private List> futures = new ArrayList<>(); - private static final long SYNC_SIZE = 0; // TODO: disk sync is disabled for now + private final long syncSize; private long syncPosition = 0; private StreamBuffer currentBuffer; private XceiverClientMetrics metrics; @@ -157,6 +157,7 @@ public BlockDataStreamOutput( this.xceiverClientFactory = xceiverClientManager; this.config = config; this.isDatastreamPipelineMode = config.isDatastreamPipelineMode(); + this.syncSize = config.getDataStreamSyncSize(); this.blockID = new AtomicReference<>(blockID); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); @@ -647,9 +648,8 @@ public boolean isClosed() { } private boolean needSync(long position) { - if (SYNC_SIZE > 0) { - // TODO: or position >= fileLength - if (position - syncPosition >= SYNC_SIZE) { + if (syncSize > 0) { + if (position - syncPosition >= syncSize) { syncPosition = position; return true; }