Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private boolean datastreamPipelineMode = true;

@Config(key = "ozone.client.datastream.sync.size",
defaultValue = "0B",
type = ConfigType.SIZE,
description = "The minimum size of written data before forcing the datanodes " +
"in the pipeline to flush the pending data to underlying storage." +
" If set to zero or negative, the client will not force the datanodes to flush.",
tags = ConfigTag.CLIENT)
private int dataStreamSyncSize = 0;

@Config(key = "ozone.client.stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
Expand Down Expand Up @@ -570,6 +579,10 @@ public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}

public int getDataStreamSyncSize() {
return dataStreamSyncSize;
}

public void setHBaseEnhancementsAllowed(boolean isHBaseEnhancementsEnabled) {
this.hbaseEnhancementsAllowed = isHBaseEnhancementsEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
private final DataStreamOutput out;
private CompletableFuture<DataStreamReply> dataStreamCloseReply;
private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
private static final long SYNC_SIZE = 0; // TODO: disk sync is disabled for now
private final long syncSize;
private long syncPosition = 0;
private StreamBuffer currentBuffer;
private XceiverClientMetrics metrics;
Expand All @@ -157,6 +157,7 @@ public BlockDataStreamOutput(
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
this.isDatastreamPipelineMode = config.isDatastreamPipelineMode();
this.syncSize = config.getDataStreamSyncSize();
this.blockID = new AtomicReference<>(blockID);
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
Expand Down Expand Up @@ -647,9 +648,8 @@ public boolean isClosed() {
}

private boolean needSync(long position) {
if (SYNC_SIZE > 0) {
// TODO: or position >= fileLength
if (position - syncPosition >= SYNC_SIZE) {
if (syncSize > 0) {
if (position - syncPosition >= syncSize) {
syncPosition = position;
return true;
}
Expand Down