From 910f6b00282ee794e70a4c9164e1784f7f8234e6 Mon Sep 17 00:00:00 2001 From: Eric Lee Date: Thu, 4 Dec 2025 12:09:28 -0800 Subject: [PATCH] quick-watch-for-cli --- .../execute-bulk-operation.test.ts | 184 +++++++++++++++++- .../bulk-operations/execute-bulk-operation.ts | 42 +++- .../watch-bulk-operation.test.ts | 114 ++++++++++- .../bulk-operations/watch-bulk-operation.ts | 81 ++++++-- 4 files changed, 399 insertions(+), 22 deletions(-) diff --git a/packages/app/src/cli/services/bulk-operations/execute-bulk-operation.test.ts b/packages/app/src/cli/services/bulk-operations/execute-bulk-operation.test.ts index 253bb61494..f4ac7eb5a6 100644 --- a/packages/app/src/cli/services/bulk-operations/execute-bulk-operation.test.ts +++ b/packages/app/src/cli/services/bulk-operations/execute-bulk-operation.test.ts @@ -1,7 +1,7 @@ import {executeBulkOperation} from './execute-bulk-operation.js' import {runBulkOperationQuery} from './run-query.js' import {runBulkOperationMutation} from './run-mutation.js' -import {watchBulkOperation} from './watch-bulk-operation.js' +import {watchBulkOperation, shortBulkOperationPoll} from './watch-bulk-operation.js' import {downloadBulkOperationResults} from './download-bulk-operation-results.js' import {validateApiVersion} from '../graphql/common.js' import {BulkOperationRunQueryMutation} from '../../api/graphql/bulk-operations/generated/bulk-operation-run-query.js' @@ -67,6 +67,7 @@ describe('executeBulkOperation', () => { beforeEach(() => { vi.mocked(ensureAuthenticatedAdminAsApp).mockResolvedValue(mockAdminSession) + vi.mocked(shortBulkOperationPoll).mockResolvedValue(createdBulkOperation) }) afterEach(() => { @@ -305,7 +306,7 @@ describe('executeBulkOperation', () => { }) }) - test('waits for operation to finish and renders success when watch is provided and operation finishes with COMPLETED status', async () => { + test('uses watchBulkOperation (not quickWatchBulkOperation) when watch flag is true', async () => { const query = '{ products { edges { node { id } } } }' const initialResponse: BulkOperationRunQueryMutation['bulkOperationRunQuery'] = { bulkOperation: createdBulkOperation, @@ -320,7 +321,9 @@ describe('executeBulkOperation', () => { vi.mocked(runBulkOperationQuery).mockResolvedValue(initialResponse) vi.mocked(watchBulkOperation).mockResolvedValue(completedOperation) - vi.mocked(downloadBulkOperationResults).mockResolvedValue('{"id":"gid://shopify/Product/123"}') + vi.mocked(downloadBulkOperationResults).mockResolvedValue( + '{"data":{"products":{"edges":[{"node":{"id":"gid://shopify/Product/123"}}],"userErrors":[]}},"__lineNumber":0}', + ) await executeBulkOperation({ organization: mockOrganization, @@ -330,6 +333,13 @@ describe('executeBulkOperation', () => { watch: true, }) + expect(watchBulkOperation).toHaveBeenCalledWith( + mockAdminSession, + createdBulkOperation.id, + expect.any(Object), + expect.any(Function), + ) + expect(shortBulkOperationPoll).not.toHaveBeenCalled() expect(renderSuccess).toHaveBeenCalledWith( expect.objectContaining({ headline: expect.stringContaining('Bulk operation succeeded:'), @@ -370,10 +380,64 @@ describe('executeBulkOperation', () => { expect(downloadBulkOperationResults).not.toHaveBeenCalled() }) + test('uses quickWatchBulkOperation (not watchBulkOperation) when watch flag is false', async () => { + const query = '{ products { edges { node { id } } } }' + const mockResponse: BulkOperationRunQueryMutation['bulkOperationRunQuery'] = { + bulkOperation: createdBulkOperation, + userErrors: [], + } + + vi.mocked(runBulkOperationQuery).mockResolvedValue(mockResponse) + vi.mocked(shortBulkOperationPoll).mockResolvedValue(createdBulkOperation) + + await executeBulkOperation({ + organization: mockOrganization, + remoteApp: mockRemoteApp, + storeFqdn, + query, + watch: false, + }) + + expect(shortBulkOperationPoll).toHaveBeenCalledWith(mockAdminSession, createdBulkOperation.id) + expect(watchBulkOperation).not.toHaveBeenCalled() + }) + + test('renders info message when quickWatchBulkOperation returns RUNNING status', async () => { + const query = '{ products { edges { node { id } } } }' + const runningOperation = { + ...createdBulkOperation, + status: 'RUNNING' as const, + objectCount: '50', + } + const mockResponse: BulkOperationRunQueryMutation['bulkOperationRunQuery'] = { + bulkOperation: createdBulkOperation, + userErrors: [], + } + + vi.mocked(runBulkOperationQuery).mockResolvedValue(mockResponse) + vi.mocked(shortBulkOperationPoll).mockResolvedValue(runningOperation) + + await executeBulkOperation({ + organization: mockOrganization, + remoteApp: mockRemoteApp, + storeFqdn, + query, + watch: false, + }) + + expect(renderSuccess).toHaveBeenCalledWith( + expect.objectContaining({ + headline: 'Bulk operation is running.', + body: ['Monitor its progress with:\n', {command: expect.stringContaining('shopify app bulk status')}], + }), + ) + }) + test('writes results to file when --output-file flag is provided', async () => { const query = '{ products { edges { node { id } } } }' const outputFile = '/tmp/results.jsonl' - const resultsContent = '{"id":"gid://shopify/Product/123"}\n{"id":"gid://shopify/Product/456"}' + const resultsContent = + '{"data":{"productCreate":{"product":{"id":"gid://shopify/Product/123"},"userErrors":[]}},"__lineNumber":0}\n{"data":{"productCreate":{"product":{"id":"gid://shopify/Product/456"},"userErrors":[]}},"__lineNumber":1}' const initialResponse: BulkOperationRunQueryMutation['bulkOperationRunQuery'] = { bulkOperation: createdBulkOperation, @@ -404,7 +468,8 @@ describe('executeBulkOperation', () => { test('writes results to stdout when --output-file flag is not provided', async () => { const query = '{ products { edges { node { id } } } }' - const resultsContent = '{"id":"gid://shopify/Product/123"}\n{"id":"gid://shopify/Product/456"}' + const resultsContent = + '{"data":{"productCreate":{"product":{"id":"gid://shopify/Product/123"},"userErrors":[]}},"__lineNumber":0}\n{"data":{"productCreate":{"product":{"id":"gid://shopify/Product/456"},"userErrors":[]}},"__lineNumber":1}' const initialResponse: BulkOperationRunQueryMutation['bulkOperationRunQuery'] = { bulkOperation: createdBulkOperation, @@ -537,4 +602,113 @@ describe('executeBulkOperation', () => { expect(validateApiVersion).not.toHaveBeenCalled() }) + + test('renders warning when completed operation results contain userErrors', async () => { + const query = '{ products { edges { node { id } } } }' + const resultsWithErrors = '{"data":{"productUpdate":{"userErrors":[{"message":"invalid input"}]}},"__lineNumber":0}' + + const initialResponse: BulkOperationRunQueryMutation['bulkOperationRunQuery'] = { + bulkOperation: createdBulkOperation, + userErrors: [], + } + const completedOperation = { + ...createdBulkOperation, + status: 'COMPLETED' as const, + url: 'https://example.com/download', + objectCount: '1', + } + + vi.mocked(runBulkOperationQuery).mockResolvedValue(initialResponse) + vi.mocked(watchBulkOperation).mockResolvedValue(completedOperation) + vi.mocked(downloadBulkOperationResults).mockResolvedValue(resultsWithErrors) + + await executeBulkOperation({ + organization: mockOrganization, + remoteApp: mockRemoteApp, + storeFqdn, + query, + watch: true, + }) + + expect(renderWarning).toHaveBeenCalledWith( + expect.objectContaining({ + headline: 'Bulk operation completed with errors.', + body: 'Check results for error details.', + }), + ) + expect(renderSuccess).not.toHaveBeenCalled() + }) + + test('renders success when completed operation results have no userErrors', async () => { + const query = '{ products { edges { node { id } } } }' + const resultsWithoutErrors = '{"data":{"productUpdate":{"product":{"id":"123"},"userErrors":[]}},"__lineNumber":0}' + + const initialResponse: BulkOperationRunQueryMutation['bulkOperationRunQuery'] = { + bulkOperation: createdBulkOperation, + userErrors: [], + } + const completedOperation = { + ...createdBulkOperation, + status: 'COMPLETED' as const, + url: 'https://example.com/download', + objectCount: '1', + } + + vi.mocked(runBulkOperationQuery).mockResolvedValue(initialResponse) + vi.mocked(watchBulkOperation).mockResolvedValue(completedOperation) + vi.mocked(downloadBulkOperationResults).mockResolvedValue(resultsWithoutErrors) + + await executeBulkOperation({ + organization: mockOrganization, + remoteApp: mockRemoteApp, + storeFqdn, + query, + watch: true, + }) + + expect(renderSuccess).toHaveBeenCalledWith( + expect.objectContaining({ + headline: expect.stringContaining('Bulk operation succeeded'), + }), + ) + expect(renderWarning).not.toHaveBeenCalled() + }) + + test('renders warning when results written to file contain userErrors', async () => { + const query = '{ products { edges { node { id } } } }' + const outputFile = '/tmp/results.jsonl' + const resultsWithErrors = '{"data":{"productUpdate":{"userErrors":[{"message":"invalid input"}]}},"__lineNumber":0}' + + const initialResponse: BulkOperationRunQueryMutation['bulkOperationRunQuery'] = { + bulkOperation: createdBulkOperation, + userErrors: [], + } + const completedOperation = { + ...createdBulkOperation, + status: 'COMPLETED' as const, + url: 'https://example.com/download', + objectCount: '1', + } + + vi.mocked(runBulkOperationQuery).mockResolvedValue(initialResponse) + vi.mocked(watchBulkOperation).mockResolvedValue(completedOperation) + vi.mocked(downloadBulkOperationResults).mockResolvedValue(resultsWithErrors) + + await executeBulkOperation({ + organization: mockOrganization, + remoteApp: mockRemoteApp, + storeFqdn, + query, + watch: true, + outputFile, + }) + + expect(writeFile).toHaveBeenCalledWith(outputFile, resultsWithErrors) + expect(renderWarning).toHaveBeenCalledWith( + expect.objectContaining({ + headline: 'Bulk operation completed with errors.', + body: `Results written to ${outputFile}. Check file for error details.`, + }), + ) + }) }) diff --git a/packages/app/src/cli/services/bulk-operations/execute-bulk-operation.ts b/packages/app/src/cli/services/bulk-operations/execute-bulk-operation.ts index b140c2ae52..7c4f3df067 100644 --- a/packages/app/src/cli/services/bulk-operations/execute-bulk-operation.ts +++ b/packages/app/src/cli/services/bulk-operations/execute-bulk-operation.ts @@ -1,6 +1,6 @@ import {runBulkOperationQuery} from './run-query.js' import {runBulkOperationMutation} from './run-mutation.js' -import {watchBulkOperation, type BulkOperation} from './watch-bulk-operation.js' +import {watchBulkOperation, shortBulkOperationPoll, type BulkOperation} from './watch-bulk-operation.js' import {formatBulkOperationStatus} from './format-bulk-operation-status.js' import {downloadBulkOperationResults} from './download-bulk-operation-results.js' import {extractBulkOperationId} from './bulk-operation-status.js' @@ -104,7 +104,8 @@ export async function executeBulkOperation(input: ExecuteBulkOperationInput): Pr await renderBulkOperationResult(operation, outputFile) } } else { - await renderBulkOperationResult(createdOperation, outputFile) + const operation = await shortBulkOperationPoll(adminSession, createdOperation.id) + await renderBulkOperationResult(operation, outputFile) } } else { renderWarning({ @@ -136,17 +137,39 @@ async function renderBulkOperationResult(operation: BulkOperation, outputFile?: customSections, }) break + case 'RUNNING': + renderSuccess({ + headline: 'Bulk operation is running.', + body: statusCommandHelpMessage(operation.id), + customSections, + }) + break case 'COMPLETED': if (operation.url) { const results = await downloadBulkOperationResults(operation.url) + const hasUserErrors = resultsContainUserErrors(results) if (outputFile) { await writeFile(outputFile, results) - renderSuccess({headline, body: [`Results written to ${outputFile}`], customSections}) } else { - renderSuccess({headline, customSections}) outputResult(results) } + + if (hasUserErrors) { + renderWarning({ + headline: 'Bulk operation completed with errors.', + body: outputFile + ? `Results written to ${outputFile}. Check file for error details.` + : 'Check results for error details.', + customSections, + }) + } else { + renderSuccess({ + headline, + body: outputFile ? [`Results written to ${outputFile}`] : undefined, + customSections, + }) + } } else { renderSuccess({headline, customSections}) } @@ -157,6 +180,17 @@ async function renderBulkOperationResult(operation: BulkOperation, outputFile?: } } +function resultsContainUserErrors(results: string): boolean { + const lines = results.trim().split('\n') + + return lines.some((line) => { + const parsed = JSON.parse(line) + if (!parsed.data) return false + const result = Object.values(parsed.data)[0] as {userErrors?: unknown[]} | undefined + return result?.userErrors !== undefined && result.userErrors.length > 0 + }) +} + function validateGraphQLDocument(graphqlOperation: string, variablesJsonl?: string): void { validateSingleOperation(graphqlOperation) diff --git a/packages/app/src/cli/services/bulk-operations/watch-bulk-operation.test.ts b/packages/app/src/cli/services/bulk-operations/watch-bulk-operation.test.ts index ffdbb64546..c573fd8fa2 100644 --- a/packages/app/src/cli/services/bulk-operations/watch-bulk-operation.test.ts +++ b/packages/app/src/cli/services/bulk-operations/watch-bulk-operation.test.ts @@ -1,4 +1,9 @@ -import {watchBulkOperation} from './watch-bulk-operation.js' +import { + watchBulkOperation, + shortBulkOperationPoll, + QUICK_WATCH_POLL_INTERVAL_MS, + QUICK_WATCH_TIMEOUT_MS, +} from './watch-bulk-operation.js' import {formatBulkOperationStatus} from './format-bulk-operation-status.js' import {adminRequestDoc} from '@shopify/cli-kit/node/api/admin' import {sleep} from '@shopify/cli-kit/node/system' @@ -165,3 +170,110 @@ describe('watchBulkOperation', () => { }) }) }) + +describe('quickWatchBulkOperation', () => { + const mockAdminSession = {token: 'test-token', storeFqdn: 'test.myshopify.com'} + const operationId = 'gid://shopify/BulkOperation/123' + + const createdOperation = { + id: operationId, + status: 'CREATED', + objectCount: '0', + url: null, + } + + const runningOperation = { + id: operationId, + status: 'RUNNING', + objectCount: '50', + url: null, + } + + const completedOperation = { + id: operationId, + status: 'COMPLETED', + objectCount: '100', + url: 'https://example.com/download', + } + + const failedOperation = { + id: operationId, + status: 'FAILED', + objectCount: '25', + url: null, + errorCode: 'INTERNAL_SERVER_ERROR', + } + + beforeEach(() => { + vi.mocked(sleep).mockResolvedValue() + }) + + test('returns immediately when operation is already completed', async () => { + vi.mocked(adminRequestDoc).mockResolvedValue({bulkOperation: completedOperation}) + + const result = await shortBulkOperationPoll(mockAdminSession, operationId) + + expect(result).toEqual(completedOperation) + expect(adminRequestDoc).toHaveBeenCalledTimes(1) + }) + + test('returns immediately when operation has failed', async () => { + vi.mocked(adminRequestDoc).mockResolvedValue({bulkOperation: failedOperation}) + + const result = await shortBulkOperationPoll(mockAdminSession, operationId) + + expect(result).toEqual(failedOperation) + expect(adminRequestDoc).toHaveBeenCalledTimes(1) + }) + + test.each(['FAILED', 'CANCELED', 'EXPIRED'])( + 'returns when operation reaches %s status within timeout', + async (status) => { + const terminalOperation = { + id: operationId, + status, + objectCount: '25', + url: null, + } + + vi.mocked(adminRequestDoc) + .mockResolvedValueOnce({bulkOperation: runningOperation}) + .mockResolvedValueOnce({bulkOperation: terminalOperation}) + + const result = await shortBulkOperationPoll(mockAdminSession, operationId) + + expect(result).toEqual(terminalOperation) + }, + ) + + test('polls multiple times before returning terminal status', async () => { + vi.mocked(adminRequestDoc) + .mockResolvedValueOnce({bulkOperation: createdOperation}) + .mockResolvedValueOnce({bulkOperation: runningOperation}) + .mockResolvedValueOnce({bulkOperation: completedOperation}) + + const result = await shortBulkOperationPoll(mockAdminSession, operationId) + + expect(result).toEqual(completedOperation) + expect(adminRequestDoc).toHaveBeenCalledTimes(3) + expect(sleep).toHaveBeenCalledWith(QUICK_WATCH_POLL_INTERVAL_MS / 1000) + }) + + test('returns latest state when timeout is reached without terminal status', async () => { + const originalDateNow = Date.now + let mockTime = 0 + vi.spyOn(Date, 'now').mockImplementation(() => { + const currentTime = mockTime + mockTime += QUICK_WATCH_TIMEOUT_MS + 1 + return currentTime + }) + + vi.mocked(adminRequestDoc).mockResolvedValue({bulkOperation: runningOperation}) + + const result = await shortBulkOperationPoll(mockAdminSession, operationId) + + expect(result.status).toBe('RUNNING') + + vi.spyOn(Date, 'now').mockImplementation(originalDateNow) + }) +}) diff --git a/packages/app/src/cli/services/bulk-operations/watch-bulk-operation.ts b/packages/app/src/cli/services/bulk-operations/watch-bulk-operation.ts index 7fdbd9361b..273c7383b1 100644 --- a/packages/app/src/cli/services/bulk-operations/watch-bulk-operation.ts +++ b/packages/app/src/cli/services/bulk-operations/watch-bulk-operation.ts @@ -16,8 +16,31 @@ const REGULAR_POLL_INTERVAL_SECONDS = 5 const INITIAL_POLL_COUNT = 10 const API_VERSION = '2026-01' +export const QUICK_WATCH_TIMEOUT_MS = 3000 +export const QUICK_WATCH_POLL_INTERVAL_MS = 300 + export type BulkOperation = NonNullable +export async function shortBulkOperationPoll(adminSession: AdminSession, operationId: string): Promise { + const startTime = Date.now() + const poller = pollBulkOperation({ + adminSession, + operationId, + pollIntervalSeconds: QUICK_WATCH_POLL_INTERVAL_MS / 1000, + }) + + let latestOperationState: BulkOperation | undefined + + do { + // eslint-disable-next-line no-await-in-loop + const {value, done} = await poller.next() + latestOperationState = value + if (done) return latestOperationState + } while (Date.now() - startTime < QUICK_WATCH_TIMEOUT_MS) + + return latestOperationState +} + export async function watchBulkOperation( adminSession: AdminSession, operationId: string, @@ -27,7 +50,15 @@ export async function watchBulkOperation( return renderSingleTask({ title: outputContent`Polling bulk operation...`, task: async (updateStatus) => { - const poller = pollBulkOperation(adminSession, operationId, abortSignal) + const poller = pollBulkOperation({ + adminSession, + operationId, + pollIntervalSeconds: REGULAR_POLL_INTERVAL_SECONDS, + initialPollIntervalSeconds: INITIAL_POLL_INTERVAL_SECONDS, + initialPollCount: INITIAL_POLL_COUNT, + useAdaptivePolling: true, + abortSignal, + }) while (true) { // eslint-disable-next-line no-await-in-loop @@ -44,11 +75,28 @@ export async function watchBulkOperation( }) } -async function* pollBulkOperation( - adminSession: AdminSession, - operationId: string, - abortSignal: AbortSignal, -): AsyncGenerator { +interface PollBulkOperationOptions { + adminSession: AdminSession + operationId: string + pollIntervalSeconds: number + /** When true, polls faster initially then slows to pollIntervalSeconds */ + useAdaptivePolling?: boolean + /** Poll interval in seconds for initial fast polls (default: 1) */ + initialPollIntervalSeconds?: number + /** Number of fast polls before switching to regular interval (default: 10) */ + initialPollCount?: number + abortSignal?: AbortSignal +} + +async function* pollBulkOperation({ + adminSession, + operationId, + pollIntervalSeconds, + useAdaptivePolling = false, + initialPollIntervalSeconds = INITIAL_POLL_INTERVAL_SECONDS, + initialPollCount = INITIAL_POLL_COUNT, + abortSignal, +}: PollBulkOperationOptions): AsyncGenerator { let pollCount = 0 while (true) { @@ -61,19 +109,28 @@ async function* pollBulkOperation( const latestOperationState = response.bulkOperation - if (TERMINAL_STATUSES.includes(latestOperationState.status) || abortSignal.aborted) { + if (TERMINAL_STATUSES.includes(latestOperationState.status) || abortSignal?.aborted) { return latestOperationState } else { yield latestOperationState } pollCount++ + let pollInterval = pollIntervalSeconds + if (useAdaptivePolling && pollCount <= initialPollCount) { + pollInterval = initialPollIntervalSeconds + } - // Use shorter interval for the first 10 polls, then switch to regular interval - const pollInterval = pollCount <= INITIAL_POLL_COUNT ? INITIAL_POLL_INTERVAL_SECONDS : REGULAR_POLL_INTERVAL_SECONDS - - // eslint-disable-next-line no-await-in-loop - await Promise.race([sleep(pollInterval), new Promise((resolve) => abortSignal.addEventListener('abort', resolve))]) + if (abortSignal) { + // eslint-disable-next-line no-await-in-loop + await Promise.race([ + sleep(pollInterval), + new Promise((resolve) => abortSignal.addEventListener('abort', resolve)), + ]) + } else { + // eslint-disable-next-line no-await-in-loop + await sleep(pollInterval) + } } }