Skip to content

Conversation

@jypma
Copy link
Contributor

@jypma jypma commented Mar 18, 2025

Relates to #1801

This commit introduces Task, a data structure that represents a recipe, or program, for producing a value of type T (or failing with an exception). It is similar in semantics to RunnableGraph[T], but intended as first-class building block.

It has the following properties:

  • A task can have resources associated to it, which are guaranteed to be released if the task is cancelled or fails
  • Tasks can be forked so multiple ones can run concurrently
  • Such forked tasks can be cancelled

A Task can be created from a RunnableGraph which has a KillSwitch, by connecting a Source and a Sink through a KillSwitch, or by direct lambda functions.

Open discussion points and TODOs (in order of highest architectural impact):

  • Current cancellation of a graph through KillSwitch doesn't communicate back a trigger when the graph is actually done cancelling. However, since we also have the Future[T] that the graph's sink materializes to, that's enough to synchronize on.
  • Resource safety, which can then guarantee cleanup even under cancellation signals.
  • Background processing wth cancellation (fibers, forkDaemon, forkResource). No child fibers (yet).
  • Add many more combinators (plain zip, andThen, before, raceAll)
  • Add more combinators (asResource, ...)
  • Add .delay() and general scheduling by hooking in the scheduling features of Materializer
  • Sink.forEachTask and Flow.mapTask
  • Resource.ofAutoCloseable
  • Task.never()
  • Task.runAsMain (or perhaps Application base classes / traits), which runs a task as main(), capturing Ctrl+C to interrupt the task (and, hence, clean up resources).
  • Move to task module. Don't depend on stream. Streams should depend on task, so we can use tasks directly in stream combinators.
  • Scala DSL

Needing an eye from reviewers:

  • More test cases to exercise typical edge cases and concurrency race conditions

To be handled in later pull requests:

  • TestClock (implementation of Clock for unit tests, where tests can advance the clock, so time-driven code can run deterministically and faster than realtime)
  • Fun Task-friendly concurrency primitives like a queue, mutex, and a mutable ref with stream subscription support.
  • (Potentially) Another monad Effect[E,A] which introduces modelled errors (in addition to exceptions). This can be easily built on top of a TaskDef[Either[E,A]].

Fixes #1801 .

@jypma jypma force-pushed the jyp-task branch 2 times, most recently from b9621d1 to e24af06 Compare March 19, 2025 12:14
@jypma jypma force-pushed the jyp-task branch 2 times, most recently from ce8e407 to b7ba3a9 Compare March 28, 2025 16:51
@jypma
Copy link
Contributor Author

jypma commented Apr 1, 2025

I'm getting closer to a first well-connected base with things like resource safety, interruptability and time somewhat properly present. I think we should have a discussion on what to do with the interruptability of graphs, though:

  • Pekko's KillSwitch only has a void method to stop a graph. You won't know when it's actually stopped, unless you also have a Sink with a future that completes when the graph is actually dead.
  • Probably as a side result of this, pekko-connectors-kafka has its own DrainingControl concept, which does promise to shut down the graph (even cleanly).

Is this something to discuss and look at now? I suppose it could be done later, by putting interruptability into a type class when creating a GraphDef, but it'd be nicer to stick to first-level concepts (like KillSwitch).

@pjfanning
Copy link
Member

@jypma if you thinking copying some of the pekko-connectors-kafka code into your task package helps, go ahead. Let's keep any additions specific to the task package for now and we can later discuss if it is worth taking some of reusable bits out and making them available as public APIs or to be used by other Pekko classes.

@jypma jypma force-pushed the jyp-task branch 2 times, most recently from 0172b96 to f6dc3c5 Compare April 6, 2025 18:38
@jypma
Copy link
Contributor Author

jypma commented Apr 6, 2025

I've decided to just introduce a RunningGraph interface to represent a graph that will eventually complete (with a Task), and has the ability to be interrupted (with another Task). That covers both KillSwitch (where interruption is immediate, but followed by awaiting the sink's completion stage) and DrainingControl (where interruption has a clear "done" signal).

@jypma jypma force-pushed the jyp-task branch 5 times, most recently from 9010cc4 to a7b04d0 Compare April 11, 2025 07:30
This commit introduces Task, a data structure that represents a
recipe, or program, for producing a value of type T (or failing with
an exception). It is similar in semantics to RunnableGraph[T], but
intended as first-class building block.

It has the following properties:
- A task can have resources associated to it, which are guaranteed to
be released if the task is cancelled or fails
- Tasks can be forked so multiple ones can run concurrently
- Such forked tasks can be cancelled

A Task can be created from a RunnableGraph which has a KillSwitch, by
connecting a Source and a Sink through a KillSwitch, or by direct
lambda functions.
@He-Pin He-Pin closed this Aug 27, 2025
@He-Pin He-Pin reopened this Aug 27, 2025
@He-Pin
Copy link
Member

He-Pin commented Sep 22, 2025

@jypma Seems there are some conflicts. I would like to learn more once this is ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Task monad based on Materializer

3 participants