diff --git a/ChangeLog.md b/ChangeLog.md index 112b2cc6b..d46175e13 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -12,6 +12,7 @@ Blob: - Added support for sealing append blobs. (issue #810) - Added support for delegation sas with version of 2015-07-05. - Fix issue on SQL: Delete a container with blob, then create container/blob with same name, and delete container will fail. (issue #2563) +- Fixed hang in blob operations when a client disconnects before the OperationQueue processes the request. (issue #2575) Table: diff --git a/src/common/persistence/FSExtentStore.ts b/src/common/persistence/FSExtentStore.ts index 31eedb31a..f2521b13f 100644 --- a/src/common/persistence/FSExtentStore.ts +++ b/src/common/persistence/FSExtentStore.ts @@ -518,6 +518,18 @@ export default class FSExtentStore implements IExtentStore { let count: number = 0; let wsEnd = false; + if (!rs.readable) { + this.logger.debug( + `FSExtentStore:streamPipe() Readable stream is not readable, rejecting streamPipe.`, + contextId + ); + reject( + new Error( + `FSExtentStore:streamPipe() Readable stream is not readable.` + )); + return; + } + rs.on("data", data => { count += data.length; if (!ws.write(data)) { diff --git a/tests/blob/fsStore.test.ts b/tests/blob/fsStore.test.ts index b0aee39bb..a912f2d3c 100644 --- a/tests/blob/fsStore.test.ts +++ b/tests/blob/fsStore.test.ts @@ -47,4 +47,25 @@ describe("FSExtentStore", () => { let readable3 = await store.readExtent(extent3); assert.strictEqual(await readIntoString(readable3), "Test"); }); + + it("should handle garbage collected input stream during appendExtent @loki", async () => { + const store = new FSExtentStore(metadataStore, DEFAULT_BLOB_PERSISTENCE_ARRAY, logger); + await store.init(); + + const stream1 = Readable.from("Test", { objectMode: false }); + + // From manual testing express.js it seems that if the request is aborted + // before it is handled/listeners are set up, the stream is destroyed. + // This simulates that behavior. + stream1.destroy(); + + // Then we check that appendExtent handles the destroyed stream + // gracefully/does not hang. + try { + await store.appendExtent(stream1); + assert.fail("Expected an error to be thrown due to destroyed stream"); + } catch (err) { + assert.deepStrictEqual(err.message, "FSExtentStore:streamPipe() Readable stream is not readable."); + } + }); });