Skip to content

Add temporal-workflowstreams contrib module#2912

Open
brianstrauch wants to merge 2 commits into
masterfrom
workflow-streams
Open

Add temporal-workflowstreams contrib module#2912
brianstrauch wants to merge 2 commits into
masterfrom
workflow-streams

Conversation

@brianstrauch

Copy link
Copy Markdown
Member

What was changed

Adds a new temporal-workflowstreams contrib module: a durable, multi-topic pub/sub log hosted inside a workflow, mirroring the workflow streams contrib packages in the Go, Python, and TypeScript SDKs. All APIs are marked @Experimental.

Wire protocol (cross-SDK contract): external publishers send batches via the __temporal_workflow_stream_publish signal, subscribers long-poll via the __temporal_workflow_stream_poll update, and the __temporal_workflow_stream_offset query exposes the current offset. The JSON envelope field names and the per-item payload encoding (base64 of the serialized temporal.api.common.v1.Payload) match the other SDKs exactly, so a Java publisher or subscriber interoperates with a workflow written in any of them and vice versa.

Workflow side: WorkflowStream registers a typed listener (preferably from a @WorkflowInit constructor) and supports publisher dedup (publisher ID + monotonic sequence), ~1 MB poll response paging, truncation, and continue-as-new state carryover via WorkflowStreamState.

Client side: WorkflowStreamClient (also constructible from inside an activity) provides a batching background publisher with retry and sequence-based exactly-once delivery, plus a blocking subscription iterator that follows continue-as-new chains, recovers from truncation, and ends cleanly on terminal workflow states.

Core SDK change: like the Go SDK (isWorkflowStreamReservedName), the signal/update/query dispatchers and listener registration now permit handler names in the exact __temporal_workflow_stream_ sub-namespace, which is otherwise reserved under the __temporal_ prefix. Other __temporal_ names remain blocked.

Why?

Brings the Java SDK to parity with the other SDKs' experimental workflow streams support, enabling durable event streams whose cost scales with durable batches rather than message count.

Checklist

  1. Closes: n/a

  2. How was this tested:

    • PayloadWireTest — wire-format round-trips (base64-of-proto, the cross-SDK contract)
    • StreamPublisherTest — publish-path unit tests with an injected signal function (batching, sequence advancement, force flush, drain-on-close, flush timeout)
    • WorkflowStreamTest — workflow-side integration tests against the in-process test server (external publish, dedup, topic-filtered polls, truncation errors, @WorkflowInit construction, custom payload converters)
    • SubscribeTest — subscription integration tests (delivery and offset advancement, topic filtering, truncation reset, clean terminal end, continue-as-new follow across runs)
    • WorkflowStreamReservedNameTest — reserved-name exemption works end-to-end; other __temporal_ names still rejected
  3. Any docs updates needed: module README included (contrib/temporal-workflowstreams/README.md)

🤖 Generated with Claude Code

A durable, multi-topic pub/sub log hosted inside a workflow, mirroring
the workflow streams contrib packages in the Go, Python, and TypeScript
SDKs. External publishers send batches via a signal, subscribers
long-poll via an update, and a query exposes the current offset; the
wire protocol (handler names, JSON envelope field names, base64-of-proto
per-item payload encoding) matches the other SDKs exactly for
cross-language interop.

The workflow side registers a typed listener and supports publisher
dedup, ~1 MB poll response paging, truncation, and continue-as-new
state carryover. The client side provides a batching publisher with
retry and sequence-based exactly-once delivery, and a blocking
subscription iterator that follows continue-as-new chains and ends
cleanly on terminal workflow states.

Like the Go SDK, the core SDK now permits registering signal, update,
and query handlers in the __temporal_workflow_stream_ sub-namespace,
which is otherwise reserved.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@brianstrauch brianstrauch requested a review from a team as a code owner June 11, 2026 20:02
Mirrors the CODEOWNERS entries in sdk-go and sdk-python for their
workflow streams contrib packages.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@diwu-sf

diwu-sf commented Jun 26, 2026

Copy link
Copy Markdown

@brianstrauch we would like to use this, our set up is:

  • Temporary worker activity is using Python SDK, running the OpenAI agent sdk, events are published from this activity
  • Java spring boot server provide SSE stream to web client
  • Java need to be able to subscribe to the agent's LLM completion event stream to send to web client

Can you rebase this PR and get it fully merged?
Thanks

diwu-sf commented Jun 26, 2026

Copy link
Copy Markdown

One concern with the subscriber API: WorkflowStreamSubscription is currently a blocking Iterator, which means the application has to dedicate a driver thread to each active subscription.

The iterator loop is ergonomic:

try (WorkflowStreamSubscription sub = client.subscribe(options)) {
  for (WorkflowStreamItem item : sub) {
    ...
  }
}

but that caller thread is also what drives the long-poll loop. It blocks while waiting for poll update results, cooldowns, and terminal/continue-as-new handling. So the thread model becomes roughly:

one active workflow-stream subscription ~= one occupied driver thread

Could we expose a non-blocking subscriber API as the primary or additional API? For example:

WorkflowStreamSubscription<T> subscribe(
    WorkflowStreamSubscribeOptions<T> options,
    WorkflowStreamListener<T> listener);

interface WorkflowStreamListener<T> {
  CompletionStage<Void> onNext(WorkflowStreamItem<T> item);
  default void onError(Throwable failure) {}
  default void onCompleted() {}
}

With that shape, the implementation can run the blocking startUpdate(..., ACCEPTED) call on a shared executor, then wait for the long-poll result with getResultAsync() instead of keeping a caller thread parked for the duration of the poll. Many subscriptions can share the same executor/scheduler, and CompletionStage from onNext gives applications a backpressure boundary without blocking the poller thread.

The blocking iterator could still be useful as a convenience for synchronous consumers, but we would prefer not to make it the only subscriber interface, because it bakes in one driver thread per active subscription.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants