Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/src/Aggregator/Aggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import type { Effect } from "../Effect";
import type { DomainEvent } from "../event-types";
import type { Stack } from "../Stack";

export interface Aggregator {
getStack(): Stack;
dispatchEvent(event: DomainEvent): void;
subscribeChanges: (
listener: (effects: Effect[], stack: Stack) => void,
) => () => void;
}
124 changes: 124 additions & 0 deletions core/src/Aggregator/SyncAggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { produceEffects } from "produceEffects";
import { aggregate } from "../aggregate";
import type { Effect } from "../Effect";
import type { DomainEvent } from "../event-types";
import type { Stack } from "../Stack";
import type { Publisher } from "../utils/Publisher/Publisher";
import type { Aggregator } from "./Aggregator";

export class SyncAggregator implements Aggregator {
private events: DomainEvent[];
private changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>;
private autoUpdateTask: DynamicallyScheduledTask;
private previousStack: Stack;

constructor(
events: DomainEvent[],
changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>,
) {
this.events = events;
this.changePublisher = changePublisher;
this.autoUpdateTask = new DynamicallyScheduledTask(async () => {
this.updateStack();
});
this.previousStack = this.computeStack();
}

getStack(): Stack {
return this.previousStack;
}

dispatchEvent(event: DomainEvent): void {
this.events.push(event);
this.updateStack();
}

subscribeChanges(
listener: (effects: Effect[], stack: Stack) => void,
): () => void {
return this.changePublisher.subscribe(({ effects, stack }) => {
listener(effects, stack);
});
}

private computeStack(): Stack {
return aggregate(this.events, Date.now());
}

private predictUpcomingTransitionStateUpdate(): {
event: DomainEvent;
timestamp: number;
} | null {
const activeActivities = this.previousStack.activities.filter(
(activity) =>
activity.transitionState === "enter-active" ||
activity.transitionState === "exit-active",
);
const mostRecentlyActivatedActivity = activeActivities.sort(
(a, b) => a.estimatedTransitionEnd - b.estimatedTransitionEnd,
)[0];

return mostRecentlyActivatedActivity
? {
event:
mostRecentlyActivatedActivity.exitedBy ??
mostRecentlyActivatedActivity.enteredBy,
timestamp: mostRecentlyActivatedActivity.estimatedTransitionEnd,
}
: null;
}

private updateStack(): void {
const previousStack = this.previousStack;
const currentStack = this.computeStack();
const effects = produceEffects(previousStack, currentStack);

if (effects.length > 0) {
this.changePublisher.publish({ effects, stack: currentStack });

this.previousStack = currentStack;

const upcomingTransitionStateUpdate =
this.predictUpcomingTransitionStateUpdate();

if (upcomingTransitionStateUpdate) {
this.autoUpdateTask.schedule(upcomingTransitionStateUpdate.timestamp);
}
}
}
}

class DynamicallyScheduledTask {
private task: () => Promise<void>;
private scheduleId: number | null;

constructor(task: () => Promise<void>) {
this.task = task;
this.scheduleId = null;
}

schedule(timestamp: number): void {
if (this.scheduleId !== null) {
clearTimeout(this.scheduleId);
this.scheduleId = null;
}

const timeoutId = setTimeout(
() => {
if (this.scheduleId !== timeoutId) return;

this.scheduleId = null;

if (Date.now() < timestamp) {
this.schedule(timestamp);
return;
}

this.task();
},
Math.max(0, timestamp - Date.now()),
);

this.scheduleId = timeoutId;
}
}
2 changes: 1 addition & 1 deletion core/src/Stack.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { BaseDomainEvent } from "event-types/_base";
import type {
DomainEvent,
PoppedEvent,
Expand Down Expand Up @@ -29,6 +28,7 @@ export type Activity = {
id: string;
name: string;
transitionState: ActivityTransitionState;
estimatedTransitionEnd: number;
params: {
[key: string]: string | undefined;
};
Expand Down
14 changes: 8 additions & 6 deletions core/src/activity-utils/makeActivitiesReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ export function makeActivitiesReducer({
* Push new activity to activities
*/
Pushed(activities: Activity[], event: PushedEvent): Activity[] {
const isTransitionDone =
now - (resumedAt ?? event.eventDate) >= transitionDuration;
const estimatedTransitionEnd =
(resumedAt ?? event.eventDate) + transitionDuration;
const isTransitionDone = estimatedTransitionEnd <= now;

const transitionState: ActivityTransitionState =
event.skipEnterActiveState || isTransitionDone
Expand All @@ -37,7 +38,7 @@ export function makeActivitiesReducer({

return [
...activities.slice(0, reservedIndex),
makeActivityFromEvent(event, transitionState),
makeActivityFromEvent(event, transitionState, estimatedTransitionEnd),
...activities.slice(reservedIndex + 1),
];
},
Expand All @@ -46,8 +47,9 @@ export function makeActivitiesReducer({
* Replace activity at reservedIndex with new activity
*/
Replaced(activities: Activity[], event: ReplacedEvent): Activity[] {
const isTransitionDone =
now - (resumedAt ?? event.eventDate) >= transitionDuration;
const estimatedTransitionEnd =
(resumedAt ?? event.eventDate) + transitionDuration;
const isTransitionDone = estimatedTransitionEnd <= now;

const reservedIndex = findNewActivityIndex(activities, event);

Expand All @@ -60,7 +62,7 @@ export function makeActivitiesReducer({

return [
...activities.slice(0, reservedIndex),
makeActivityFromEvent(event, transitionState),
makeActivityFromEvent(event, transitionState, estimatedTransitionEnd),
...activities.slice(reservedIndex + 1),
];
},
Expand Down
2 changes: 2 additions & 0 deletions core/src/activity-utils/makeActivityFromEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import type { Activity, ActivityTransitionState } from "../Stack";
export function makeActivityFromEvent(
event: PushedEvent | ReplacedEvent,
transitionState: ActivityTransitionState,
estimatedTransitionEnd: number,
): Activity {
return {
id: event.activityId,
name: event.activityName,
transitionState,
estimatedTransitionEnd,
params: event.activityParams,
context: event.activityContext,
steps: [
Expand Down
8 changes: 5 additions & 3 deletions core/src/activity-utils/makeActivityReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ export function makeActivityReducer(context: {
...activity,
exitedBy: event,
transitionState: "exit-done",
estimatedTransitionEnd: context.resumedAt ?? event.eventDate,
}),

/**
* Change transition state to exit-done or exit-active depending on skipExitActiveState
*/
Popped: (activity: Activity, event: PoppedEvent): Activity => {
const isTransitionDone =
context.now - (context.resumedAt ?? event.eventDate) >=
context.transitionDuration;
const estimatedTransitionEnd =
(context.resumedAt ?? event.eventDate) + context.transitionDuration;
const isTransitionDone = estimatedTransitionEnd <= context.now;

const transitionState: ActivityTransitionState =
event.skipExitActiveState || isTransitionDone
Expand All @@ -49,6 +50,7 @@ export function makeActivityReducer(context: {
...activity,
exitedBy: event,
transitionState,
estimatedTransitionEnd,
params:
transitionState === "exit-done"
? activity.steps[0].params
Expand Down
1 change: 1 addition & 0 deletions core/src/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export function aggregate(inputEvents: DomainEvent[], now: number): Stack {
id: activity.id,
name: activity.name,
transitionState: activity.transitionState,
estimatedTransitionEnd: activity.estimatedTransitionEnd,
params: activity.params,
steps,
enteredBy: activity.enteredBy,
Expand Down
55 changes: 14 additions & 41 deletions core/src/makeCoreStore.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import isEqual from "react-fast-compare";
import { aggregate } from "./aggregate";
import { ExclusiveTaskQueue } from "utils/TaskQueue/ExclusiveTaskQueue";
import type { Aggregator } from "./Aggregator/Aggregator";
import { SyncAggregator } from "./Aggregator/SyncAggregator";
import type { DomainEvent, PushedEvent, StepPushedEvent } from "./event-types";
import { makeEvent } from "./event-utils";
import type { StackflowActions, StackflowPlugin } from "./interfaces";
import { produceEffects } from "./produceEffects";
import type { Stack } from "./Stack";
import { divideBy, once } from "./utils";
import { makeActions } from "./utils/makeActions";
import { QueuingPublisher } from "./utils/Publisher/QueuingPublisher";
import { triggerPostEffectHooks } from "./utils/triggerPostEffectHooks";

const SECOND = 1000;

// 60FPS
const INTERVAL_MS = SECOND / 60;

export type MakeCoreStoreOptions = {
initialEvents: DomainEvent[];
initialContext?: any;
Expand Down Expand Up @@ -76,39 +71,23 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
options.handlers?.onInitialActivityNotFound?.();
}

const events: { value: DomainEvent[] } = {
value: [...initialRemainingEvents, ...initialPushedEvents],
};
const aggregator: Aggregator = new SyncAggregator(
[...initialRemainingEvents, ...initialPushedEvents],
new QueuingPublisher(new ExclusiveTaskQueue()),
);

const stack = {
value: aggregate(events.value, new Date().getTime()),
};
aggregator.subscribeChanges((effects) => {
triggerPostEffectHooks(effects, pluginInstances, actions);
});

const actions: StackflowActions = {
getStack() {
return stack.value;
return aggregator.getStack();
},
dispatchEvent(name, params) {
const newEvent = makeEvent(name, params);
const nextStackValue = aggregate(
[...events.value, newEvent],
new Date().getTime(),
);

events.value.push(newEvent);
setStackValue(nextStackValue);

const interval = setInterval(() => {
const nextStackValue = aggregate(events.value, new Date().getTime());

if (!isEqual(stack.value, nextStackValue)) {
setStackValue(nextStackValue);
}

if (nextStackValue.globalTransitionState === "idle") {
clearInterval(interval);
}
}, INTERVAL_MS);
aggregator.dispatchEvent(newEvent);
},
push: () => {},
replace: () => {},
Expand All @@ -120,12 +99,6 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
resume: () => {},
};

const setStackValue = (nextStackValue: Stack) => {
const effects = produceEffects(stack.value, nextStackValue);
stack.value = nextStackValue;
triggerPostEffectHooks(effects, pluginInstances, actions);
};

// Initialize action methods after actions object is fully created
Object.assign(
actions,
Expand All @@ -145,7 +118,7 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
});
});
}),
pullEvents: () => events.value,
pullEvents: () => aggregator.getStack().events,
subscribe(listener) {
storeListeners.push(listener);

Expand Down
12 changes: 6 additions & 6 deletions core/src/produceEffects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import { omit } from "./utils";
export function produceEffects(prevOutput: Stack, nextOutput: Stack): Effect[] {
const output: Effect[] = [];

const somethingChanged = !isEqual(prevOutput, nextOutput);

if (somethingChanged) {
output.push({
_TAG: "%SOMETHING_CHANGED%",
});
if (isEqual(prevOutput, nextOutput)) {
return [];
}

output.push({
_TAG: "%SOMETHING_CHANGED%",
});

const isPaused =
prevOutput.globalTransitionState !== "paused" &&
nextOutput.globalTransitionState === "paused";
Expand Down
24 changes: 24 additions & 0 deletions core/src/utils/Mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
export class Mutex {
private latestlyBookedSession: Promise<void> = Promise.resolve();

acquire(): Promise<{ release: () => void }> {
return new Promise((resolveSessionHandle) => {
this.latestlyBookedSession = this.latestlyBookedSession.then(
() =>
new Promise((resolveSession) =>
resolveSessionHandle({ release: () => resolveSession() }),
),
);
});
}

async runExclusively<T>(thunk: () => Promise<T>): Promise<T> {
const { release } = await this.acquire();

try {
return await thunk();
} finally {
release();
}
}
}
4 changes: 4 additions & 0 deletions core/src/utils/Publisher/Publisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface Publisher<T> {
publish(value: T): void;
subscribe(subscriber: (value: T) => void): () => void;
}
Loading
Loading