Skip to content

Commit ec63ecb

Browse files
committed
fix: finalize agent tasks when report enforcement fails
Change-Id: I994b83bb32473ca8cc04f0a165292848d2b15bc1 Signed-off-by: Thomas Kosiewski <tk@coder.com>
1 parent 636180e commit ec63ecb

File tree

2 files changed

+217
-35
lines changed

2 files changed

+217
-35
lines changed

src/node/services/taskService.test.ts

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { describe, it, expect } from "bun:test";
22

33
import type { MuxMessage, MuxToolPart } from "@/common/types/message";
4-
import { Ok } from "@/common/types/result";
4+
import { Err, Ok } from "@/common/types/result";
55
import { TaskService } from "./taskService";
66

77
function createTaskToolPart(params: {
@@ -257,4 +257,123 @@ describe("TaskService", () => {
257257
});
258258
});
259259
});
260+
261+
describe("onStreamEnd", () => {
262+
it("should finalize tasks when report enforcement resume fails", async () => {
263+
const parentWorkspaceId = "parent";
264+
const childWorkspaceId = "child";
265+
266+
const workspace = {
267+
id: childWorkspaceId,
268+
path: "/tmp/agent",
269+
name: "agent",
270+
projectName: "proj",
271+
projectPath: "/proj",
272+
createdAt: "2025-01-01T00:00:00.000Z",
273+
parentWorkspaceId,
274+
agentType: "research",
275+
taskStatus: "running",
276+
taskModel: "openai:gpt-5-codex",
277+
};
278+
279+
const projects = new Map([
280+
[
281+
"/proj",
282+
{
283+
workspaces: [workspace],
284+
},
285+
],
286+
]);
287+
288+
let idCounter = 0;
289+
const config = {
290+
generateStableId: () => `id-${idCounter++}`,
291+
getTaskSettings: () => ({
292+
maxParallelAgentTasks: 3,
293+
maxTaskNestingDepth: 3,
294+
}),
295+
listWorkspaceConfigs: () => [{ projectPath: "/proj", workspace }],
296+
getWorkspaceConfig: (id: string) => {
297+
if (id !== childWorkspaceId) {
298+
return null;
299+
}
300+
301+
return { projectPath: "/proj", workspace };
302+
},
303+
editConfig: (edit: (cfg: unknown) => unknown) => {
304+
edit({ projects });
305+
},
306+
};
307+
308+
const histories = new Map<string, MuxMessage[]>([
309+
[
310+
childWorkspaceId,
311+
[
312+
{
313+
id: "assistant-1",
314+
role: "assistant",
315+
parts: [{ type: "text", text: "partial output" }],
316+
metadata: {
317+
historySequence: 1,
318+
},
319+
},
320+
],
321+
],
322+
[parentWorkspaceId, []],
323+
]);
324+
325+
const historyService = {
326+
getHistory: (workspaceId: string) => Ok(histories.get(workspaceId) ?? []),
327+
appendToHistory: (workspaceId: string, message: MuxMessage) => {
328+
const list = histories.get(workspaceId) ?? [];
329+
list.push(message);
330+
histories.set(workspaceId, list);
331+
return Ok(undefined);
332+
},
333+
};
334+
335+
const partialService = {
336+
readPartial: () => null,
337+
writePartial: () => Ok(undefined),
338+
};
339+
340+
const removed: string[] = [];
341+
const workspaceService = {
342+
emitChatEvent: (_workspaceId: string, _event: unknown) => undefined,
343+
emitWorkspaceMetadata: (_workspaceId: string) => undefined,
344+
resumeStream: () => Err({ type: "api_key_not_found", provider: "openai" }),
345+
remove: (workspaceId: string, _force?: boolean) => {
346+
removed.push(workspaceId);
347+
return Ok(undefined);
348+
},
349+
};
350+
351+
const aiService = {
352+
on: () => undefined,
353+
};
354+
355+
const service = new TaskService(
356+
config as never,
357+
historyService as never,
358+
partialService as never,
359+
workspaceService as never,
360+
aiService as never
361+
);
362+
363+
await (service as unknown as { onStreamEnd: (id: string) => Promise<void> }).onStreamEnd(
364+
childWorkspaceId
365+
);
366+
367+
expect(workspace.taskStatus).toBe("reported");
368+
expect(removed).toEqual([childWorkspaceId]);
369+
370+
const parentHistory = histories.get(parentWorkspaceId) ?? [];
371+
expect(parentHistory).toHaveLength(1);
372+
373+
const reportText = parentHistory[0].parts.find((p) => p.type === "text")?.text;
374+
expect(reportText).toBeDefined();
375+
expect(reportText).toContain("Mux was unable to resume this agent task");
376+
expect(reportText).toContain("partial output");
377+
});
378+
});
260379
});

src/node/services/taskService.ts

Lines changed: 97 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,42 @@ export class TaskService {
189189
}
190190
}
191191

192+
private async finalizeAgentTaskWithoutReport(
193+
workspaceId: string,
194+
reportMarkdown: string
195+
): Promise<void> {
196+
const workspaceConfig = this.config.getWorkspaceConfig(workspaceId);
197+
if (!workspaceConfig) {
198+
log.error("Failed to finalize agent task without report: unknown workspace", {
199+
workspaceId,
200+
});
201+
return;
202+
}
203+
204+
// If this isn't a properly-parented task workspace, at least mark it complete so it doesn't
205+
// consume a scheduler slot indefinitely.
206+
if (!workspaceConfig.workspace.parentWorkspaceId) {
207+
await this.updateTaskWorkspace(workspaceId, { taskStatus: "reported" });
208+
await this.maybeCleanupReportedWorkspace(workspaceId);
209+
await this.queueScheduling();
210+
return;
211+
}
212+
213+
try {
214+
await this.handleAgentReport(workspaceId, { reportMarkdown });
215+
} catch (error: unknown) {
216+
// Ensure a failed report doesn't leave the queue stuck.
217+
log.error("Failed to finalize agent task without agent_report", {
218+
workspaceId,
219+
error,
220+
});
221+
222+
await this.updateTaskWorkspace(workspaceId, { taskStatus: "reported" });
223+
await this.maybeCleanupReportedWorkspace(workspaceId);
224+
await this.queueScheduling();
225+
}
226+
}
227+
192228
async createAgentTask(params: CreateAgentTaskParams): Promise<{ childWorkspaceId: string }> {
193229
const preset = getAgentPreset(params.agentType);
194230

@@ -437,53 +473,80 @@ export class TaskService {
437473
await this.updateTaskWorkspace(workspaceId, { taskStatus: "awaiting_report" });
438474

439475
const preset = getAgentPreset(agentType);
440-
if (!preset) {
441-
return;
442-
}
443-
444-
// Force a report-only follow-up.
445-
const requirePolicy: ToolPolicy = [{ action: "require", regex_match: "^agent_report$" }];
446-
447-
const nudgeMessage = createMuxMessage(
448-
this.config.generateStableId(),
449-
"user",
450-
"You must now call agent_report with your final reportMarkdown. Do not do anything else.",
451-
{ synthetic: true }
452-
);
476+
if (preset) {
477+
// Force a report-only follow-up.
478+
const requirePolicy: ToolPolicy = [{ action: "require", regex_match: "^agent_report$" }];
479+
480+
const nudgeMessage = createMuxMessage(
481+
this.config.generateStableId(),
482+
"user",
483+
"You must now call agent_report with your final reportMarkdown. Do not do anything else.",
484+
{ synthetic: true }
485+
);
453486

454-
const appendResult = await this.historyService.appendToHistory(workspaceId, nudgeMessage);
455-
if (!appendResult.success) {
456-
throw new Error(appendResult.error);
457-
}
487+
const appendResult = await this.historyService.appendToHistory(workspaceId, nudgeMessage);
488+
if (!appendResult.success) {
489+
log.error("Failed to append agent_report enforcement message", {
490+
workspaceId,
491+
error: appendResult.error,
492+
});
493+
} else {
494+
this.workspaceService.emitChatEvent(workspaceId, {
495+
...nudgeMessage,
496+
type: "message",
497+
} satisfies WorkspaceChatMessage);
498+
}
458499

459-
this.workspaceService.emitChatEvent(workspaceId, {
460-
...nudgeMessage,
461-
type: "message",
462-
} satisfies WorkspaceChatMessage);
500+
const model = config.workspace.taskModel ?? DEFAULT_MODEL;
501+
const resumeResult = await this.workspaceService.resumeStream(workspaceId, {
502+
model,
503+
mode: "agent",
504+
additionalSystemInstructions: preset.systemPrompt,
505+
toolPolicy: requirePolicy,
506+
});
507+
if (resumeResult.success) {
508+
return;
509+
}
463510

464-
const model = config.workspace.taskModel ?? DEFAULT_MODEL;
465-
const resumeResult = await this.workspaceService.resumeStream(workspaceId, {
466-
model,
467-
mode: "agent",
468-
additionalSystemInstructions: preset.systemPrompt,
469-
toolPolicy: requirePolicy,
470-
});
471-
if (!resumeResult.success) {
472511
log.error("Failed to resume agent task for report enforcement", {
473512
workspaceId,
474513
error: resumeResult.error,
475514
});
515+
516+
const fallbackReport = await this.buildFallbackReportFromHistory(workspaceId);
517+
const reportMarkdown = [
518+
"Mux was unable to resume this agent task to collect a final agent_report.",
519+
"",
520+
"Resume error:",
521+
"```",
522+
this.formatErrorForReport(resumeResult.error),
523+
"```",
524+
...(fallbackReport
525+
? ["", "Best-effort output extracted from the task history:", "", fallbackReport]
526+
: [
527+
"",
528+
"Mux could not extract any assistant text from the task history (best-effort fallback).",
529+
]),
530+
].join("\n");
531+
532+
await this.finalizeAgentTaskWithoutReport(workspaceId, reportMarkdown);
533+
return;
476534
}
477-
return;
535+
536+
log.error("Agent task ended without agent_report, but no preset exists for enforcement", {
537+
workspaceId,
538+
agentType,
539+
});
540+
// Fall through to best-effort extraction.
478541
}
479542

480543
// Second failure: fall back to best-effort report extraction.
481544
const fallbackReport = await this.buildFallbackReportFromHistory(workspaceId);
482-
if (!fallbackReport) {
483-
return;
484-
}
545+
const reportMarkdown =
546+
fallbackReport ??
547+
"Mux did not receive an agent_report for this task and could not extract any assistant text from the task history.";
485548

486-
await this.handleAgentReport(workspaceId, { reportMarkdown: fallbackReport });
549+
await this.finalizeAgentTaskWithoutReport(workspaceId, reportMarkdown);
487550
}
488551

489552
private async tryResolveParentTaskToolCall(params: {

0 commit comments

Comments
 (0)