Workflow message passing - Java SDK
A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates. The Workflow implementation defines these endpoints via handler methods that can react to incoming messages and return values. Temporal Clients use messages to read Workflow state and control execution. See Workflow message passing for a general overview of this topic. This page introduces these features for the Temporal Java SDK.
Write message handlersβ
Follow these guidelines when writing your message handlers:
- Message handlers are defined as methods on the Workflow class, using one of the three annotations:
@QueryMethod
,@SignalMethod
, and@UpdateMethod
. - The parameters and return values of handlers and the main Workflow function must be serializable.
- Prefer a single class with multiple fields over using multiple input parameters. A class allows you to add fields without changing the calling signature.
Query handlersβ
A Query is a synchronous operation that retrieves state from a Workflow Execution:
public class MessagePassingIntro {
public enum Language {
CHINESE,
ENGLISH,
FRENCH,
SPANISH,
PORTUGUESE,
}
public static class GetLanguagesInput {
public boolean includeUnsupported;
public GetLanguagesInput() {
this.includeUnsupported = false;
}
public GetLanguagesInput(boolean includeUnsupported) {
this.includeUnsupported = includeUnsupported;
}
}
@WorkflowInterface
public interface GreetingWorkflow {
...
// π Use the @QueryMethod annotation to define a Query handler in the
// Workflow interface.
@QueryMethod
List<Language> getLanguages(GetLanguagesInput input);
}
public static class GreetingWorkflowImpl implements GreetingWorkflow {
...
@Override
public List<Language> getLanguages(GetLanguagesInput input) {
// π The Query handler returns a value: it must not mutate the Workflow state
// or perform blocking operations.
if (input.includeUnsupported) {
return Arrays.asList(Language.values());
} else {
return new ArrayList(greetings.keySet());
}
}
}
}
- A Query handler must not modify Workflow state.
- You can't perform blocking operations such as executing an Activity in a Query handler.
- The Query annotation accepts an argument (
name
) as described in the API reference docs for@QueryMethod
.
Signal handlersβ
A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow:
public class MessagePassingIntro {
public static class ApproveInput {
private String name;
public ApproveInput() {}
public ApproveInput(String name) {
this.name = name;
}
}
@WorkflowInterface
public interface GreetingWorkflow {
...
// π Use the @SignalMethod annotation to define a Signal handler in the
// Workflow interface.
@SignalMethod
void approve(ApproveInput input);
}
public static class GreetingWorkflowImpl implements GreetingWorkflow {
...
@Override
public Language setLanguage(Language language) {
// π The Signal handler mutates the Workflow state but cannot return a value.
Language previousLanguage = this.language;
this.language = language;
return previousLanguage;
}
}
}
-
The handler should not return a value. The response is sent immediately from the server, without waiting for the Workflow to process the Signal.
-
The Signal annotation accepts arguments (
name
, andunfinished_policy
) as described in the API reference docs for@SignalMethod
. -
Signal (and Update) handlers can be blocking. This allows you to use Activities, Child Workflows, durable
Workflow.sleep
Timers,Workflow.await
, and more. See Blocking handlers and Workflow message passing for guidelines on safely using blocking Signal and Update handlers.
Update handlers and validatorsβ
An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:
public class MessagePassingIntro {
@WorkflowInterface
public interface GreetingWorkflow {
...
// π Use the @UpdateMethod annotation to define an Update handler in the
// Workflow interface.
@UpdateMethod
Language setLanguage(Language language);
// π Update validators are optional
@UpdateValidatorMethod(updateName = "setLanguage")
void setLanguageValidator(Language language);
}
public static class GreetingWorkflowImpl implements GreetingWorkflow {
...
@Override
public Language setLanguage(Language language) {
// π The Update handler can mutate the Workflow state and return a value.
Language previousLanguage = this.language;
this.language = language;
return previousLanguage;
}
@Override
public void setLanguageValidator(Language language) {
// π The Update validator performs validation but cannot mutate the Workflow state.
if (!greetings.containsKey(language)) {
throw new IllegalArgumentException("Unsupported language: " + language);
}
}
}
}
-
The Update annotation accepts arguments (
name
, andunfinished_policy
) as described in the API reference docs for@UpdateMethod
. -
About validators:
- Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you can skip them.
- Define an Update validator with the
@UpdateValidatorMethod
annotation. Use theupdateName
argument when declaring the validator to connect it to its Update. The validator must returnvoid
and accept the same argument types as the handler.
-
Accepting and rejecting Updates with validators:
- To reject an Update, throw an exception of any type in the validator.
- Without a validator, Updates are always accepted.
-
Validators and Event History:
- The
WorkflowExecutionUpdateAccepted
event is written into the History whether the acceptance was automatic or programmatic. - When a Validator throws an error, the Update is rejected, the Update is not run, and
WorkflowExecutionUpdateAccepted
won't be added to the Event History. The caller receives an "Update failed" error.
- The
-
Use
getCurrentUpdateInfo
to obtain information about the current Update. This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once. -
Signal (and Update) handlers can be blocking, letting them use Activities, Child Workflows, durable
Workflow.sleep
Timers,Workflow.await
conditions, and more. See Blocking handlers and Workflow message passing for safe usage guidelines.
Send messagesβ
To send Queries, Signals, or Updates you call methods on a WorkflowInterface
, often called the "WorkflowStub."
Use newWorkflowStub to obtain the WorkflowStub.
For example:
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId(WORKFLOW_ID).build();
// Create the workflow client stub. It is used to start the workflow execution.
GreetingWorkflow workflow = client.newWorkflowStub(GreetingWorkflow.class, workflowOptions);
// Start workflow asynchronously and call its getGreeting workflow method
WorkflowClient.start(workflow::getGreetings);
To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.
Send a Queryβ
Call a Query method defined within a Workflow from a WorkflowStub
created in Client code to send a Query to a Workflow Execution:
List<Language> languages = workflow.getLanguages(new GetLanguagesInput(false));
System.out.println("Supported languages: " + languages);
-
Sending a Query doesnβt add events to a Workflow's Event History.
-
You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not safe and, therefore, not supported.
-
A Worker must be online and polling the Task Queue to process a Query.
Send a Signalβ
You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that havenβt closed.
Send a Signal from a Clientβ
To send a Signal from Client code, call a Signal method on the WorkflowStub:
workflow.approve(new ApproveInput("Me"));
-
The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.
-
The WorkflowExecutionSignaled Event appears in the Workflow's Event History.
Send a Signal from a Workflowβ
A Workflow can send a Signal to another Workflow, known as an External Signal.
Use Workflow.newExternalWorkflowStub
in your current Workflow to create an ExternalWorkflowStub
for the other Workflow.
Call Signal methods on the external stub to Signal the other Workflow:
OtherWorkflow other = Workflow.newExternalWorkflowStub(OtherWorkflow.class, otherWorkflowID);
other.mySignalMethod();
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
Signal-With-Startβ
Signal-With-Start allows a Client to send a Signal to a Workflow Execution, starting the Execution if it is not already running.
If there's a Workflow running with the given Workflow Id, it will be signaled.
If there isn't, a new Workflow will be started and immediately signaled.
To use Signal-With-Start, call signalWithStart
and pass the name of your Signal with its arguments:
public static void signalWithStart() {
// WorkflowStub is a client-side stub to a single Workflow instance
WorkflowStub untypedWorkflowStub = client.newUntypedWorkflowStub("GreetingWorkflow",
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(taskQueue)
.build());
untypedWorkflowStub.signalWithStart("setCustomer", new Object[] {customer2}, new Object[] {customer1});
String greeting = untypedWorkflowStub.getResult(String.class);
}
Here's the WorkflowInterface
for the previous example.
When using Signal-With-Start, the Signal handler (setCustomer
) will be executed before the Workflow method (greet
).
@WorkflowInterface
public interface GreetingWorkflow {
@WorkflowMethod
String greet(Customer customer);
@SignalMethod
void setCustomer(Customer customer);
@QueryMethod
Customer getCustomer();
}
Send an Updateβ
An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.
A Client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions or perform an Update equivalent of Signal-With-Start.
WorkflowExecutionUpdateAccepted
is added to the Event History when the Worker confirms that the Update passed validation.WorkflowExecutionUpdateCompleted
is added to the Event History when the Worker confirms that the Update has finished.
To send an Update to a Workflow Execution, you can:
-
Call the Update method on a WorkflowStub in Client code and wait for the Update to complete. This code fetches an Update result:
Language previousLanguage = workflow.setLanguage(Language.CHINESE);
-
Send
startUpdate
to receive anWorkflowUpdateHandle
as soon as the Update is accepted or rejected.- Use this
WorkflowUpdateHandle
later to fetch your results. - Blocking Update handlers normally perform long-running asynchronous operations.
startUpdate
only waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete.
For example:
WorkflowUpdateHandle<Language> handle =
WorkflowStub.fromTyped(workflow)
.startUpdate(
"setLanguage", WorkflowUpdateStage.ACCEPTED, Language.class, Language.ENGLISH);
previousLanguage = handle.getResultAsync().get();For more details, see the "Blocking handlers" section.
- Use this
To obtain an Update handle, you can:
- Use
startUpdate
to start an Update and return the handle, as shown in the preceding example. - Use
getUpdateHandle
to fetch a handle for an in-progress Update using the Update ID and Workflow ID.
You can use the WorkflowUpdateHandle
to obtain information about the update:
getExecution()
: Returns the Workflow Execution that this Update was sent to.getId()
: Returns the Update's unique ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once.getResultAsync()
: Returns aCompletableFuture
which can be used to wait for the Update to complete.
In real-world development, sometimes you may be unable to import Workflow Definition method signatures. When you don't have access to the Workflow Definition or it isn't written in Java, you can use these non-type safe APIs to obtain an untyped WorkflowStub:
Pass method names instead of method objects to:
Message handler patternsβ
This section covers common write operations, such as Signal and Update handlers. It doesn't apply to pure read operations, like Queries or Update Validators.
For additional information, see Inject work into the main Workflow, and Ensuring your messages are processed exactly once.
Do blocking operations in handlersβ
Signal and Update handlers can block.
This allows you to use Workflow.await
, Activities, Child Workflows, Workflow.sleep
Timers, etc.
This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await calls.
It's essential to understand the things that could go wrong in order to use blocking handlers safely. See Workflow message passing for guidance on safe usage of blocking Signal and Update handlers, and the Controlling handler concurrency and Waiting for message handlers to finish sections below.
The following code modifies the Update handler from earlier on in this page. The Update handler now makes a blocking call to execute an Activity:
public static class GreetingWorkflowImpl implements GreetingWorkflow {
...
@Override
public Language setLanguage(Language language) {
if (!greetings.containsKey(language)) {
String greeting = activity.greetingService(language);
if (greeting == null) {
// π An update validator cannot be blocking, so cannot be used to check that the remote
// greetingService supports the requested language. Throwing an ApplicationFailure
// will fail the Update, but the WorkflowExecutionUpdateAccepted event will still be
// added to history.
throw ApplicationFailure.newFailure("Greeting service does not support: " + language, "GreetingFailure")
}
greetings.put(language, greeting);
}
Language previousLanguage = this.language;
this.language = language;
return previousLanguage;
}
}
Although a Signal handler can also make blocking calls like this, using an Update handler allows the Client to receive a result or error once the Activity completes. This lets your Client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc.
Add blocking wait conditionsβ
Sometimes, blocking Signal or Update handlers need to meet certain conditions before they should continue.
You can use Workflow.await
to prevent the code from proceeding until a condition is true.
You specify the condition by passing a function that returns true
or false
.
This is an important feature that helps you control your handler logic.
Here are two important use cases for Workflow.await
:
- Waiting in a handler until it is appropriate to continue.
- Waiting in the main Workflow until all active handlers have finished.
Wait for conditions in handlersβ
It's common to use Workflow.await
in a handler.
For example, suppose your Workflow class has a updateReadyToExecute
method that indicates whether your Update handler should be allowed to start executing.
You can use workflow.wait_condition
in the handler to make the handler pause until the condition is met:
@Override
public String setLanguage(UpdateInput input) {
Workflow.await(() -> this.updateReadyToExecute(input));
...
}
Remember: handlers can execute before the main Workflow method starts.
You can also use Workflow.await
anywhere else in the handler to wait for a specific condition to become true.
This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become true.
Ensure your handlers finish before the Workflow completesβ
Workflow.await
can ensure your handler completes before a Workflow finishes.
When your Workflow uses blocking Signal or Update handlers, your main Workflow method can return or Continue-as-New while a handler is still waiting on an async task, such as an Activity.
The Workflow completing may interrupt the handler before it finishes crucial work and cause Client errors when trying to retrieve Update results.
Use Workflow.await
to wait for Workflow.isEveryHandlerFinished
to return true
to address this problem and allow your Workflow to end smoothly:
public class MyWorkflowImpl implements MyWorkflow {
...
@Override
public String run() {
...
Workflow.await(() -> Workflow.isEveryHandlerFinished());
return "workflow-result";
}
}
By default, your Worker will log a warning when you allow a Workflow Execution to finish with unfinished handler executions.
You can silence these warnings on a per-handler basis by passing the unfinishedPolicy
argument to the @SignalMethod
/ @UpdateMethod
annotation:
@WorkflowInterface
public interface MyWorkflow {
...
@UpdateMethod(unfinishedPolicy = HandlerUnfinishedPolicy.ABANDON)
void myUpdate();
}
See Finishing handlers before the Workflow completes for more information.
Use locks to prevent concurrent handler executionβ
Concurrent processes can interact in unpredictable ways. Incorrectly written concurrent message-passing code may not work correctly when multiple handler instances run simultaneously. Here's an example of a pathological case:
public class DataWorkflowImpl implements DataWorkflow {
...
@Override
public void badSignalHandler() {
Data data = activity.fetchData();
this.x = data.x;
// ππ Bug!! If multiple instances of this method are executing concurrently, then
// there may be times when the Workflow has self.x from one Activity execution and self.y from another.
Workflow.sleep(Duration.ofSeconds(1));
this.y = data.y;
}
}
Coordinating access with WorkflowLock
corrects this code.
Locking makes sure that only one handler instance can execute a specific section of code at any given time:
public class DataWorkflowImpl implements DataWorkflow {
WorkflowLock lock = Workflow.newWorkflowLock();
...
@Override
public void safeSignalHandler() {
try {
lock.lock();
Data data = activity.fetchData();
this.x = data.x;
// ππ Bug!! If multiple instances of this method are executing concurrently, then
// there may be times when the Workflow has self.x from one Activity execution and self.y from another.
Workflow.sleep(Duration.ofSeconds(1));
this.y = data.y;
} finally {
lock.unlock()
}
}
}
Message handler troubleshootingβ
When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:
-
The Client can't contact the server: You'll receive a
WorkflowServiceException
on which thecause
is aStatusRuntimeException
andstatus
ofUNAVAILABLE
(after some retries). -
The Workflow does not exist: You'll receive a
WorkflowNotFoundException
.
See Exceptions in message handlers for a nonβJava-specific discussion of this topic.
Problems when sending a Signalβ
When using Signal, the above WorkflowException
s are the only types of exception that will result from the request.
In contrast, for Queries and Updates, the client waits for a response from the Worker. If an issue occurs during the handler execution by the Worker, the Client may receive an exception.
Problems when sending an Updateβ
When working with Updates, you may encounter these errors:
-
No Workflow Workers are polling the Task Queue: Your request will be retried by the SDK Client indefinitely. You can impose a timeout with
CompletableFuture.get()
method with a timeout parameter. This throws ajava.util.concurrent.TimeoutException
exception when it expires. -
Update failed: You'll receive a
WorkflowUpdateException
exception. There are two ways this can happen:-
The Update was rejected by an Update validator defined in the Workflow alongside the Update handler.
-
The Update failed after having been accepted.
Update failures are like Workflow failures. Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler. These might include:
- A failed Child Workflow
- A failed Activity (if the Activity retries have been set to a finite number)
- The Workflow author throwing
ApplicationFailure
- Any error listed in getFailWorkflowExceptionTypes (empty by default)
-
-
The handler caused the Workflow Task to fail: A Workflow Task Failure causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage:
- If the request hasn't been accepted by the server, you receive a
FAILED_PRECONDITION
WorkflowServiceException
exception. - If the request has been accepted, it is durable.
Once the Workflow is healthy again after a code deploy, use an
WorkflowUpdateHandle
](https://javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/client/WorkflowUpdateHandle.html) to fetch the Update result.
- If the request hasn't been accepted by the server, you receive a
-
The Workflow finished while the Update handler execution was in progress: You'll receive a
WorkflowServiceException
"workflow execution already completed"`.This will happen if the Workflow finished while the Update handler execution was in progress, for example because
-
The Workflow was canceled or failed.
-
The Workflow completed normally or continued-as-new and the Workflow author did not wait for handlers to be finished.
-
Problems when sending a Queryβ
When working with Queries, you may encounter these errors:
-
There is no Workflow Worker polling the Task Queue: You'll receive a
WorkflowServiceException
on which thecause
is aStatusRuntimeException
with astatus
ofFAILED_PRECONDITION
. -
Query failed: You'll receive a
WorkflowQueryException
exception if something goes wrong during a Query. Any exception in a Query handler will trigger this error. This differs from Signal and Update requests, where exceptions can lead to Workflow Task Failure instead. -
The handler caused the Workflow Task to fail. This would happen, for example, if the Query handler blocks the thread for too long without yielding.
Dynamic componentsβ
A dynamic Workflow, Activity, Signal, Update, or Query is a kind of unnamed item. Normally, these items are registered by name with the Worker and invoked at runtime. When an unregistered or unrecognized Workflow, Activity, or message request arrives with a recognized method signature, the Worker can use a pre-registered dynamic stand-in.
For example, you might send a request to start a Workflow named "MyUnknownWorkflow". After receiving a Workflow Task, the Worker may find that there's no registered Workflow Definitions of that type. It then checks to see if there's a registered dynamic Workflow. If the dynamic Workflow signature matches the incoming Workflow signature, the Worker invokes that just as it would invoke a non-dynamic statically named version.
By registering dynamic versions of your Temporal components, the Worker can fall back to these alternate implementations for name mismatches.
Use dynamic elements judiciously and as a fallback mechanism, not a primary design. They can introduce long-term maintainability and debugging issues. Reserve dynamic invocation use for cases where a name is not or can't be known at compile time.
Set a Dynamic Workflowβ
Use DynamicWorkflow
to implement Workflow Types dynamically.
Register a Workflow implementation type that extends DynamicWorkflow
to implement any Workflow Type that is not explicitly registered with the Worker.
The dynamic Workflow interface is implemented with the execute
method. This method takes in EncodedValues
that are inputs to the Workflow Execution.
These inputs can be specified by the Client when invoking the Workflow Execution.
public class MyDynamicWorkflow implements DynamicWorkflow {
@Override
public Object execute(EncodedValues args) {
}
}
How to set a Dynamic Activityβ
To handle Activity types that do not have an explicitly registered handler, you can directly implement a dynamic Activity.
Use DynamicActivity
to implement any number of Activity types dynamically.
When an Activity implementation that extends DynamicActivity
is registered, it is called for any Activity type invocation that doesn't have an explicitly registered handler.
The dynamic Activity interface is implemented with the execute
method, as shown in the following example.
// Dynamic Activity implementation
public static class DynamicGreetingActivityImpl implements DynamicActivity {
@Override
public Object execute(EncodedValues args) {
String activityType = Activity.getExecutionContext().getInfo().getActivityType();
return activityType
+ ": "
+ args.get(0, String.class)
+ " "
+ args.get(1, String.class)
+ " from: "
+ args.get(2, String.class);
}
}
Use Activity.getExecutionContext()
to get information about the Activity type that should be implemented dynamically.
How to set a Dynamic Signalβ
You can also implement Signal handlers dynamically. This is useful for library-level code and implementation of DSLs.
Use Workflow.registerListener(Object)
to register an implementation of the DynamicSignalListener
in the Workflow implementation code.
Workflow.registerListener(
(DynamicSignalHandler)
(signalName, encodedArgs) -> name = encodedArgs.get(0, String.class));
When registered, any Signals sent to the Workflow without a defined handler will be delivered to the DynamicSignalHandler
.
Note that you can only register one Workflow.registerListener(Object)
per Workflow Execution.
DynamicSignalHandler
can be implemented in both regular and dynamic Workflow implementations.
How to set a Dynamic Queryβ
You can also implement Query handlers dynamically. This is useful for library-level code and implementation of DSLs.
Use Workflow.registerListener(Object)
to register an implementation of the DynamicQueryListener
in the Workflow implementation code.
Workflow.registerListener(
(DynamicQueryHandler)
(queryName, encodedArgs) -> name = encodedArgs.get(0, String.class));
When registered, any Queries sent to the Workflow without a defined handler will be delivered to the DynamicQueryHandler
.
Note that you can only register one Workflow.registerListener(Object)
per Workflow Execution.
DynamicQueryHandler
can be implemented in both regular and dynamic Workflow implementations.
How to set a Dynamic Updateβ
You can also implement Update handlers dynamically. This is useful for library-level code and implementation of DSLs.
Workflow.registerListener(
(DynamicUpdateHandler)
(updateName, encodedArgs) -> encodedArgs.get(0, String.class));
When registered, any Updates sent to the Workflow without a defined handler will be delivered to the DynamicUpdateHandler
.
You can only register one Workflow.registerListener(Object)
per Workflow Execution.
DynamicUpdateHandler
can be implemented in both regular and dynamic Workflow implementations.