Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected void buildCommandResult() {
// For now, we will return -1 as the deleted count.When we update collections to use this class
// we can refactor to return the actual count for them.
// If there is error, we won't add this status.
if (tasks.errorTasks().isEmpty()) {
if (taskGroup.errorTasks().isEmpty()) {
resultBuilder.addStatus(CommandStatus.DELETED_COUNT, -1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private void buildNonPerDocumentResult() {

// Note: See DocRowIdentifer, it has an attribute that will be called for JSON serialization
List<DocRowIdentifer> insertedIds =
tasks.completedTasks().stream()
taskGroup.completedTasks().stream()
.map(InsertDBTask::docRowID)
.map(Optional::orElseThrow)
.toList();
Expand Down Expand Up @@ -102,17 +102,17 @@ private void buildPerDocumentResult() {
// kept using the same approach as InsertOperationPage to make comparison easy until we remove
// the old class

var results = new InsertionResult[tasks.size()];
var results = new InsertionResult[taskGroup.tasks().size()];

// Results array filled in order: first successful insertions
for (var task : tasks.completedTasks()) {
for (var task : taskGroup.completedTasks()) {
results[task.position()] =
new InsertionResult(task.docRowID().orElseThrow(), InsertionStatus.OK, null);
}

List<CommandResult.Error> seenErrors = new ArrayList<>();
// Second: failed insertions; output in order of insertion
for (var task : tasks.errorTasks()) {
for (var task : taskGroup.errorTasks()) {
var cmdError = resultBuilder.throwableToCommandError(task.failure().orElseThrow());

// We want to avoid adding the same error multiple times, so we keep track of the index:
Expand All @@ -129,7 +129,7 @@ private void buildPerDocumentResult() {

// And third, if any, skipped insertions; those that were not attempted (f.ex due
// to failure for ordered inserts)
for (var task : tasks.skippedTasks()) {
for (var task : taskGroup.skippedTasks()) {
results[task.position()] =
new InsertionResult(task.docRowID().orElseThrow(), InsertionStatus.SKIPPED, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ protected void buildCommandResult() {
addTaskWarningsToResult();
addTaskErrorsToResult();

var metadataAttempts = tasks.completedTasks();
var metadataAttempts = taskGroup.completedTasks();
if (metadataAttempts.size() > 1) {
throw new IllegalArgumentException("Only one attempt is expected for metadata commands");
}
if (!metadataAttempts.isEmpty()) {
if (showSchema) {
resultBuilder.addStatus(statusKey, tasks.getFirst().getSchema());
resultBuilder.addStatus(statusKey, taskGroup.tasks().getFirst().getSchema());
} else {
resultBuilder.addStatus(statusKey, tasks.getFirst().getNames());
resultBuilder.addStatus(statusKey, taskGroup.tasks().getFirst().getNames());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ protected void buildCommandResult() {
maybeAddSortedRowCount();
maybeAddSchema(CommandStatus.PROJECTION_SCHEMA);

tasks.completedTasks().stream()
taskGroup.completedTasks().stream()
.flatMap(task -> task.documents().stream())
.forEach(resultBuilder::addDocument);
}

protected void maybeAddSortedRowCount() {

var rowCounts =
tasks.completedTasks().stream()
taskGroup.completedTasks().stream()
.map(ReadDBTask::sortedRowCount)
.filter(Optional::isPresent)
.map(Optional::get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Accumulator<TaskT, SchemaT> accumulator(
protected void buildCommandResult() {
super.buildCommandResult();

resultBuilder.addStatus(CommandStatus.OK, tasks.allTasksCompleted() ? 1 : 0);
resultBuilder.addStatus(CommandStatus.OK, taskGroup.allTasksCompleted() ? 1 : 0);
}

public static class Accumulator<TaskT extends SchemaDBTask<SchemaT>, SchemaT extends SchemaObject>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected void buildCommandResult() {
super.buildCommandResult();

// truncate a table, set delete_count status as -1
if (tasks.errorTasks().isEmpty()) {
if (taskGroup.errorTasks().isEmpty()) {
resultBuilder.addStatus(CommandStatus.DELETED_COUNT, -1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected void buildCommandResult() {
// However - we do not know if an upsert happened :(
// NOTE when update collection uses operation attempt this will get more complex
// If there is error, we won't add this status.
if (tasks.errorTasks().isEmpty()) {
if (taskGroup.errorTasks().isEmpty()) {
resultBuilder.addStatus(CommandStatus.MATCHED_COUNT, 1);
resultBuilder.addStatus(CommandStatus.MODIFIED_COUNT, 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Operation<TableSchemaObject> createOperation(
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"createOperation() - zero embeddingActions, creating direct TaskOperation operation tasksAndDeferrables.taskGroup().size()={}",
tasksAndDeferrables.taskGroup().size());
tasksAndDeferrables.taskGroup().tasks().size());
}
// basic task, just wrap the tasks in an operation and go
return new TaskOperation<>(
Expand All @@ -58,7 +58,7 @@ Operation<TableSchemaObject> createOperation(
LOGGER.debug(
"createOperation() - creating CompositeTask Operation, embeddingActions.size()={}, tasksAndDeferrables.taskGroup().size()={}",
embeddingActions.size(),
tasksAndDeferrables.taskGroup().size());
tasksAndDeferrables.taskGroup().tasks().size());
}

var compositeBuilder = new CompositeTaskOperationBuilder<>(commandContext);
Expand All @@ -78,7 +78,7 @@ Operation<TableSchemaObject> createOperation(
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"createOperation() - created EmbeddingTasks embeddingTaskGroup.size={}",
embeddingTaskGroup.size());
embeddingTaskGroup.tasks().size());
}

// we want to run a group of embedding tasks and then a group of the other tasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public static <SchemaT extends TableBasedSchemaObject> Accumulator<SchemaT> accu
protected void buildCommandResult() {

// There should only be 1 rerankig task
if (tasks.completedTasks().size() != 1) {
if (taskGroup.completedTasks().size() != 1) {
throw new IllegalStateException(
"Expected exactly 1 completed RerankingTask, got " + tasks.completedTasks().size());
"Expected exactly 1 completed RerankingTask, got " + taskGroup.completedTasks().size());
}
var completedTask = tasks.completedTasks().getFirst();
var completedTask = taskGroup.completedTasks().getFirst();

// add any errors and warnings
super.buildCommandResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public CompositeTask(
super(position, schemaObject, retryPolicy);

this.innerTaskGroup = Objects.requireNonNull(innerTaskGroup, "innerTaskGroup cannot be null");
if (innerTaskGroup.isEmpty()) {
if (innerTaskGroup.tasks().isEmpty()) {
throw new IllegalArgumentException("innerTaskGroup cannot be empty");
}
// last task accumulator can be null, if this is an intermediate task
Expand Down Expand Up @@ -173,7 +173,7 @@ protected void onSuccess(CompositeTaskInnerPage<InnerTaskT, SchemaT> result) {
public Task<SchemaT> setSkippedIfReady() {
// make sure we pass this though to the inner tasks, the CompositeTask has been skipped
// so all inner tasks are also skipped
innerTaskGroup.forEach(Task::setSkippedIfReady);
innerTaskGroup.tasks().forEach(Task::setSkippedIfReady);
return super.setSkippedIfReady();
}

Expand All @@ -189,7 +189,7 @@ public List<WarningException> allWarnings() {

var allWarnings = new ArrayList<>(super.allWarnings());
allWarnings.addAll(
innerTaskGroup.stream().map(Task::allWarnings).flatMap(List::stream).toList());
innerTaskGroup.tasks().stream().map(Task::allWarnings).flatMap(List::stream).toList());

return allWarnings;
}
Expand All @@ -204,7 +204,7 @@ public List<WarningException> warningsExcludingSuppressed() {

var allWarnings = new ArrayList<>(super.warningsExcludingSuppressed());
allWarnings.addAll(
innerTaskGroup.stream()
innerTaskGroup.tasks().stream()
.map(Task::warningsExcludingSuppressed)
.flatMap(List::stream)
.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static <SchemaT extends SchemaObject> Accumulator<SchemaT> accumulator(
@Override
public CommandResult get() {

if (!tasks.errorTasks().isEmpty()) {
if (!taskGroup.errorTasks().isEmpty()) {
// we have some failed tasks, they are failed CompositeTask's that have lifted errors
// from their inner tasks
// the superclass will build a basic response with errors and warnings, that is what we need
Expand All @@ -43,9 +43,7 @@ public CommandResult get() {

// the last composite task is the one that will build the results of running all the composite
// tasks.
// TODO: AARON - need better guarantee the last task is the last task according to it's position
// etc
return tasks.getLast().lastTaskAccumulator().getResults().get();
return taskGroup.tasks().getLast().lastTaskAccumulator().getResults().get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
public abstract class DBTaskPage<TaskT extends DBTask<SchemaT>, SchemaT extends SchemaObject>
extends TaskPage<TaskT, SchemaT> {

protected DBTaskPage(TaskGroup<TaskT, SchemaT> tasks, CommandResultBuilder resultBuilder) {
super(tasks, resultBuilder);
protected DBTaskPage(TaskGroup<TaskT, SchemaT> taskGroup, CommandResultBuilder resultBuilder) {
super(taskGroup, resultBuilder);
}

/**
Expand All @@ -19,11 +19,11 @@ protected DBTaskPage(TaskGroup<TaskT, SchemaT> tasks, CommandResultBuilder resul
* have the _id or PK to report.
*/
protected void maybeAddSchema(CommandStatus statusKey) {
if (tasks.isEmpty()) {
if (taskGroup.tasks().isEmpty()) {
return;
}

tasks.stream()
taskGroup.tasks().stream()
.map(DBTask::schemaDescription)
.filter(Optional::isPresent)
.findFirst()
Expand Down
Loading
Loading