diff --git a/core/src/Aggregator/Aggregator.ts b/core/src/Aggregator/Aggregator.ts new file mode 100644 index 000000000..5cf3b7912 --- /dev/null +++ b/core/src/Aggregator/Aggregator.ts @@ -0,0 +1,11 @@ +import type { Effect } from "../Effect"; +import type { DomainEvent } from "../event-types"; +import type { Stack } from "../Stack"; + +export interface Aggregator { + getStack(): Stack; + dispatchEvent(event: DomainEvent): void; + subscribeChanges: ( + listener: (effects: Effect[], stack: Stack) => void, + ) => () => void; +} diff --git a/core/src/Aggregator/SyncAggregator.ts b/core/src/Aggregator/SyncAggregator.ts new file mode 100644 index 000000000..9f21e1ebe --- /dev/null +++ b/core/src/Aggregator/SyncAggregator.ts @@ -0,0 +1,124 @@ +import { produceEffects } from "produceEffects"; +import { aggregate } from "../aggregate"; +import type { Effect } from "../Effect"; +import type { DomainEvent } from "../event-types"; +import type { Stack } from "../Stack"; +import type { Publisher } from "../utils/Publisher/Publisher"; +import type { Aggregator } from "./Aggregator"; + +export class SyncAggregator implements Aggregator { + private events: DomainEvent[]; + private changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>; + private autoUpdateTask: DynamicallyScheduledTask; + private previousStack: Stack; + + constructor( + events: DomainEvent[], + changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>, + ) { + this.events = events; + this.changePublisher = changePublisher; + this.autoUpdateTask = new DynamicallyScheduledTask(async () => { + this.updateStack(); + }); + this.previousStack = this.computeStack(); + } + + getStack(): Stack { + return this.previousStack; + } + + dispatchEvent(event: DomainEvent): void { + this.events.push(event); + this.updateStack(); + } + + subscribeChanges( + listener: (effects: Effect[], stack: Stack) => void, + ): () => void { + return this.changePublisher.subscribe(({ effects, stack }) => { + listener(effects, stack); + }); + } + + private computeStack(): Stack { + return aggregate(this.events, Date.now()); + } + + private predictUpcomingTransitionStateUpdate(): { + event: DomainEvent; + timestamp: number; + } | null { + const activeActivities = this.previousStack.activities.filter( + (activity) => + activity.transitionState === "enter-active" || + activity.transitionState === "exit-active", + ); + const mostRecentlyActivatedActivity = activeActivities.sort( + (a, b) => a.estimatedTransitionEnd - b.estimatedTransitionEnd, + )[0]; + + return mostRecentlyActivatedActivity + ? { + event: + mostRecentlyActivatedActivity.exitedBy ?? + mostRecentlyActivatedActivity.enteredBy, + timestamp: mostRecentlyActivatedActivity.estimatedTransitionEnd, + } + : null; + } + + private updateStack(): void { + const previousStack = this.previousStack; + const currentStack = this.computeStack(); + const effects = produceEffects(previousStack, currentStack); + + if (effects.length > 0) { + this.changePublisher.publish({ effects, stack: currentStack }); + + this.previousStack = currentStack; + + const upcomingTransitionStateUpdate = + this.predictUpcomingTransitionStateUpdate(); + + if (upcomingTransitionStateUpdate) { + this.autoUpdateTask.schedule(upcomingTransitionStateUpdate.timestamp); + } + } + } +} + +class DynamicallyScheduledTask { + private task: () => Promise; + private scheduleId: number | null; + + constructor(task: () => Promise) { + this.task = task; + this.scheduleId = null; + } + + schedule(timestamp: number): void { + if (this.scheduleId !== null) { + clearTimeout(this.scheduleId); + this.scheduleId = null; + } + + const timeoutId = setTimeout( + () => { + if (this.scheduleId !== timeoutId) return; + + this.scheduleId = null; + + if (Date.now() < timestamp) { + this.schedule(timestamp); + return; + } + + this.task(); + }, + Math.max(0, timestamp - Date.now()), + ); + + this.scheduleId = timeoutId; + } +} diff --git a/core/src/Stack.ts b/core/src/Stack.ts index fe2c0dfc2..4bb242db0 100644 --- a/core/src/Stack.ts +++ b/core/src/Stack.ts @@ -1,4 +1,3 @@ -import type { BaseDomainEvent } from "event-types/_base"; import type { DomainEvent, PoppedEvent, @@ -29,6 +28,7 @@ export type Activity = { id: string; name: string; transitionState: ActivityTransitionState; + estimatedTransitionEnd: number; params: { [key: string]: string | undefined; }; diff --git a/core/src/activity-utils/makeActivitiesReducer.ts b/core/src/activity-utils/makeActivitiesReducer.ts index 13cbfdc2f..aef15ede9 100644 --- a/core/src/activity-utils/makeActivitiesReducer.ts +++ b/core/src/activity-utils/makeActivitiesReducer.ts @@ -25,8 +25,9 @@ export function makeActivitiesReducer({ * Push new activity to activities */ Pushed(activities: Activity[], event: PushedEvent): Activity[] { - const isTransitionDone = - now - (resumedAt ?? event.eventDate) >= transitionDuration; + const estimatedTransitionEnd = + (resumedAt ?? event.eventDate) + transitionDuration; + const isTransitionDone = estimatedTransitionEnd <= now; const transitionState: ActivityTransitionState = event.skipEnterActiveState || isTransitionDone @@ -37,7 +38,7 @@ export function makeActivitiesReducer({ return [ ...activities.slice(0, reservedIndex), - makeActivityFromEvent(event, transitionState), + makeActivityFromEvent(event, transitionState, estimatedTransitionEnd), ...activities.slice(reservedIndex + 1), ]; }, @@ -46,8 +47,9 @@ export function makeActivitiesReducer({ * Replace activity at reservedIndex with new activity */ Replaced(activities: Activity[], event: ReplacedEvent): Activity[] { - const isTransitionDone = - now - (resumedAt ?? event.eventDate) >= transitionDuration; + const estimatedTransitionEnd = + (resumedAt ?? event.eventDate) + transitionDuration; + const isTransitionDone = estimatedTransitionEnd <= now; const reservedIndex = findNewActivityIndex(activities, event); @@ -60,7 +62,7 @@ export function makeActivitiesReducer({ return [ ...activities.slice(0, reservedIndex), - makeActivityFromEvent(event, transitionState), + makeActivityFromEvent(event, transitionState, estimatedTransitionEnd), ...activities.slice(reservedIndex + 1), ]; }, diff --git a/core/src/activity-utils/makeActivityFromEvent.ts b/core/src/activity-utils/makeActivityFromEvent.ts index 58a74d3e7..615729db1 100644 --- a/core/src/activity-utils/makeActivityFromEvent.ts +++ b/core/src/activity-utils/makeActivityFromEvent.ts @@ -4,11 +4,13 @@ import type { Activity, ActivityTransitionState } from "../Stack"; export function makeActivityFromEvent( event: PushedEvent | ReplacedEvent, transitionState: ActivityTransitionState, + estimatedTransitionEnd: number, ): Activity { return { id: event.activityId, name: event.activityName, transitionState, + estimatedTransitionEnd, params: event.activityParams, context: event.activityContext, steps: [ diff --git a/core/src/activity-utils/makeActivityReducer.ts b/core/src/activity-utils/makeActivityReducer.ts index fac85e2d7..8adbffdae 100644 --- a/core/src/activity-utils/makeActivityReducer.ts +++ b/core/src/activity-utils/makeActivityReducer.ts @@ -30,15 +30,16 @@ export function makeActivityReducer(context: { ...activity, exitedBy: event, transitionState: "exit-done", + estimatedTransitionEnd: context.resumedAt ?? event.eventDate, }), /** * Change transition state to exit-done or exit-active depending on skipExitActiveState */ Popped: (activity: Activity, event: PoppedEvent): Activity => { - const isTransitionDone = - context.now - (context.resumedAt ?? event.eventDate) >= - context.transitionDuration; + const estimatedTransitionEnd = + (context.resumedAt ?? event.eventDate) + context.transitionDuration; + const isTransitionDone = estimatedTransitionEnd <= context.now; const transitionState: ActivityTransitionState = event.skipExitActiveState || isTransitionDone @@ -49,6 +50,7 @@ export function makeActivityReducer(context: { ...activity, exitedBy: event, transitionState, + estimatedTransitionEnd, params: transitionState === "exit-done" ? activity.steps[0].params diff --git a/core/src/aggregate.ts b/core/src/aggregate.ts index d8db38d98..0a2972d36 100644 --- a/core/src/aggregate.ts +++ b/core/src/aggregate.ts @@ -83,6 +83,7 @@ export function aggregate(inputEvents: DomainEvent[], now: number): Stack { id: activity.id, name: activity.name, transitionState: activity.transitionState, + estimatedTransitionEnd: activity.estimatedTransitionEnd, params: activity.params, steps, enteredBy: activity.enteredBy, diff --git a/core/src/makeCoreStore.ts b/core/src/makeCoreStore.ts index cbdc99aaa..0aff33e35 100644 --- a/core/src/makeCoreStore.ts +++ b/core/src/makeCoreStore.ts @@ -1,19 +1,14 @@ -import isEqual from "react-fast-compare"; -import { aggregate } from "./aggregate"; +import { ExclusiveTaskQueue } from "utils/TaskQueue/ExclusiveTaskQueue"; +import type { Aggregator } from "./Aggregator/Aggregator"; +import { SyncAggregator } from "./Aggregator/SyncAggregator"; import type { DomainEvent, PushedEvent, StepPushedEvent } from "./event-types"; import { makeEvent } from "./event-utils"; import type { StackflowActions, StackflowPlugin } from "./interfaces"; -import { produceEffects } from "./produceEffects"; -import type { Stack } from "./Stack"; import { divideBy, once } from "./utils"; import { makeActions } from "./utils/makeActions"; +import { QueuingPublisher } from "./utils/Publisher/QueuingPublisher"; import { triggerPostEffectHooks } from "./utils/triggerPostEffectHooks"; -const SECOND = 1000; - -// 60FPS -const INTERVAL_MS = SECOND / 60; - export type MakeCoreStoreOptions = { initialEvents: DomainEvent[]; initialContext?: any; @@ -76,39 +71,23 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore { options.handlers?.onInitialActivityNotFound?.(); } - const events: { value: DomainEvent[] } = { - value: [...initialRemainingEvents, ...initialPushedEvents], - }; + const aggregator: Aggregator = new SyncAggregator( + [...initialRemainingEvents, ...initialPushedEvents], + new QueuingPublisher(new ExclusiveTaskQueue()), + ); - const stack = { - value: aggregate(events.value, new Date().getTime()), - }; + aggregator.subscribeChanges((effects) => { + triggerPostEffectHooks(effects, pluginInstances, actions); + }); const actions: StackflowActions = { getStack() { - return stack.value; + return aggregator.getStack(); }, dispatchEvent(name, params) { const newEvent = makeEvent(name, params); - const nextStackValue = aggregate( - [...events.value, newEvent], - new Date().getTime(), - ); - - events.value.push(newEvent); - setStackValue(nextStackValue); - const interval = setInterval(() => { - const nextStackValue = aggregate(events.value, new Date().getTime()); - - if (!isEqual(stack.value, nextStackValue)) { - setStackValue(nextStackValue); - } - - if (nextStackValue.globalTransitionState === "idle") { - clearInterval(interval); - } - }, INTERVAL_MS); + aggregator.dispatchEvent(newEvent); }, push: () => {}, replace: () => {}, @@ -120,12 +99,6 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore { resume: () => {}, }; - const setStackValue = (nextStackValue: Stack) => { - const effects = produceEffects(stack.value, nextStackValue); - stack.value = nextStackValue; - triggerPostEffectHooks(effects, pluginInstances, actions); - }; - // Initialize action methods after actions object is fully created Object.assign( actions, @@ -145,7 +118,7 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore { }); }); }), - pullEvents: () => events.value, + pullEvents: () => aggregator.getStack().events, subscribe(listener) { storeListeners.push(listener); diff --git a/core/src/produceEffects.ts b/core/src/produceEffects.ts index 02331b203..348097ac1 100644 --- a/core/src/produceEffects.ts +++ b/core/src/produceEffects.ts @@ -7,14 +7,14 @@ import { omit } from "./utils"; export function produceEffects(prevOutput: Stack, nextOutput: Stack): Effect[] { const output: Effect[] = []; - const somethingChanged = !isEqual(prevOutput, nextOutput); - - if (somethingChanged) { - output.push({ - _TAG: "%SOMETHING_CHANGED%", - }); + if (isEqual(prevOutput, nextOutput)) { + return []; } + output.push({ + _TAG: "%SOMETHING_CHANGED%", + }); + const isPaused = prevOutput.globalTransitionState !== "paused" && nextOutput.globalTransitionState === "paused"; diff --git a/core/src/utils/Mutex.ts b/core/src/utils/Mutex.ts new file mode 100644 index 000000000..f58c0825c --- /dev/null +++ b/core/src/utils/Mutex.ts @@ -0,0 +1,24 @@ +export class Mutex { + private latestlyBookedSession: Promise = Promise.resolve(); + + acquire(): Promise<{ release: () => void }> { + return new Promise((resolveSessionHandle) => { + this.latestlyBookedSession = this.latestlyBookedSession.then( + () => + new Promise((resolveSession) => + resolveSessionHandle({ release: () => resolveSession() }), + ), + ); + }); + } + + async runExclusively(thunk: () => Promise): Promise { + const { release } = await this.acquire(); + + try { + return await thunk(); + } finally { + release(); + } + } +} diff --git a/core/src/utils/Publisher/Publisher.ts b/core/src/utils/Publisher/Publisher.ts new file mode 100644 index 000000000..fc2150426 --- /dev/null +++ b/core/src/utils/Publisher/Publisher.ts @@ -0,0 +1,4 @@ +export interface Publisher { + publish(value: T): void; + subscribe(subscriber: (value: T) => void): () => void; +} diff --git a/core/src/utils/Publisher/QueuingPublisher.ts b/core/src/utils/Publisher/QueuingPublisher.ts new file mode 100644 index 000000000..f778fb989 --- /dev/null +++ b/core/src/utils/Publisher/QueuingPublisher.ts @@ -0,0 +1,42 @@ +import type { TaskQueue } from "../TaskQueue/TaskQueue"; +import type { Publisher } from "./Publisher"; + +export class QueuingPublisher implements Publisher { + private taskQueue: TaskQueue; + private subscribers: ((value: T) => void)[] = []; + private errorHandler: (error: unknown) => void; + + constructor( + taskQueue: TaskQueue, + options?: { errorHandler?: (error: unknown) => void }, + ) { + this.taskQueue = taskQueue; + this.errorHandler = options?.errorHandler ?? (() => {}); + } + + publish( + value: T, + options?: { onPublishError?: (error: unknown) => void }, + ): void { + const subscribers = this.subscribers.slice(); + const publishTask = this.taskQueue.enqueue(async () => { + for (const subscriber of subscribers) { + try { + subscriber(value); + } catch (error) { + options?.onPublishError?.(error); + } + } + }); + + publishTask.finished.catch(this.errorHandler); + } + + subscribe(subscriber: (value: T) => void): () => void { + this.subscribers.push(subscriber); + + return () => { + this.subscribers = this.subscribers.filter((s) => s !== subscriber); + }; + } +} diff --git a/core/src/utils/TaskQueue/ExclusiveTaskQueue.ts b/core/src/utils/TaskQueue/ExclusiveTaskQueue.ts new file mode 100644 index 000000000..68962b86e --- /dev/null +++ b/core/src/utils/TaskQueue/ExclusiveTaskQueue.ts @@ -0,0 +1,12 @@ +import { Mutex } from "utils/Mutex"; +import type { QueuedTask, TaskQueue } from "./TaskQueue"; + +export class ExclusiveTaskQueue implements TaskQueue { + private taskRunLock: Mutex = new Mutex(); + + enqueue(task: () => Promise): QueuedTask { + return { + finished: this.taskRunLock.runExclusively(task), + }; + } +} diff --git a/core/src/utils/TaskQueue/TaskQueue.ts b/core/src/utils/TaskQueue/TaskQueue.ts new file mode 100644 index 000000000..9dd4eb6cc --- /dev/null +++ b/core/src/utils/TaskQueue/TaskQueue.ts @@ -0,0 +1,7 @@ +export interface TaskQueue { + enqueue(task: () => Promise): QueuedTask; +} + +export interface QueuedTask { + finished: Promise; +} diff --git a/core/tsconfig.json b/core/tsconfig.json index 6ce58df83..170b23f9b 100644 --- a/core/tsconfig.json +++ b/core/tsconfig.json @@ -4,5 +4,5 @@ "baseUrl": "./src", "outDir": "./dist" }, - "exclude": ["./dist"] + "exclude": ["./dist", "./src/**/*.spec.ts"] }