Skip to main content

Workflow message passing - Go SDK

A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates. The Workflow implementation defines these endpoints via handler methods that can react to incoming Queries and Updates, and via Signal channels. Temporal Clients use messages to read Workflow state and control its execution. See Workflow message passing for a general overview of this topic. This page introduces these features for the Temporal Go SDK.

Handle messages

info

The code that follows is part of a working message passing sample.

Follow these guidelines when writing message handlers:

  • Values sent in messages, and the return values of message handlers and the main Workflow function, must be serializable.
  • Prefer using a single struct over multiple input parameters. This allows you to add fields without changing the calling signature.

Query handlers

A Query is a synchronous operation that retrieves state from a Workflow Execution:

type Language string

const Chinese Language = "chinese"
const English Language = "english"
const French Language = "french"
const Spanish Language = "spanish"
const Portuguese Language = "portuguese"

const GetLanguagesQuery = "GetLanguages"

type GetLanguagesInput struct {
IncludeUnsupported bool
}

func GreetingWorkflow(ctx workflow.Context) (string, error) {
...
greeting := map[Language]string{English: "Hello", Chinese: "你好,世界"}
err := workflow.SetQueryHandler(ctx, GetLanguagesQuery, func(input GetLanguagesInput) ([]Language, error) {
// 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
if input.IncludeUnsupported {
return []Language{Chinese, English, French, Spanish, Portuguese}, nil
} else {
// Range over map is a nondeterministic operation.
// It is OK to have a non-deterministic operation in a query function.
//workflowcheck:ignore
return maps.Keys(greeting), nil
}
})
...
}
  • Use SetQueryHandler to set a Query Handler that listens for a Query by name.
  • The handler must be a function that returns two values, a serializable result and an error.
  • You can't perform async operations such as executing an Activity in a Query handler.

Signal Channels

A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow. Handle Signal messages by receiving them from their channel:

const ApproveSignal = "approve"

type ApproveInput struct {
Name string
}

func GreetingWorkflow(ctx workflow.Context) error {
logger := workflow.GetLogger(ctx)
approverName := ""
...
// Block until the language is approved
var approveInput ApproveInput
workflow.GetSignalChannel(ctx, ApproveSignal).Receive(ctx, &approveInput)
approverName = approveInput.Name
logger.Info("Received approval", "Approver", approverName)
...
}
  • Pass the Signal's name to GetSignalChannel to get the Signal Channel that listen for Signals of that type.

Alternatively, you might want the Workflow to proceed and still be capable of handling external Signals.

func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
var signal MySignal
signalChan := workflow.GetSignalChannel(ctx, "your-signal-name")
workflow.Go(ctx, func(ctx workflow.Context) {
for {
selector := workflow.NewSelector(ctx)
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &signal)
})
selector.Select(ctx)
}
})
// You could now submit an activity; any signals will still be received while the activity is pending.
}

In the example above, the Workflow code uses workflow.GetSignalChannel to open a workflow.Channel for the Signal type (identified by the Signal name).

  • Before completing the Workflow or using Continue-As-New, make sure to do an asynchronous drain on the Signal channel. Otherwise, the Signals will be lost. The batch sliding window sample contains an example:
	reportCompletionChannel := workflow.GetSignalChannel(ctx, "ReportCompletion")
// Drain signals async
for {
var recordId int
ok := reportCompletionChannel.ReceiveAsync(&recordId)
if !ok {
break
}
s.recordCompletion(ctx, recordId)
}

Update handlers and validators

An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:

type Language string

const SetLanguageUpdate = "set-language"

func GreetingWorkflow(ctx workflow.Context) error {
language := English

err = workflow.SetUpdateHandlerWithOptions(ctx, SetLanguageUpdate, func(ctx workflow.Context, newLanguage Language) (Language, error) {
// 👉 An Update handler can mutate the Workflow state and return a value.
var previousLanguage Language
previousLanguage, language = language, newLanguage
return previousLanguage, nil
}, workflow.UpdateHandlerOptions{
Validator: func(ctx workflow.Context, newLanguage Language) error {
if _, ok := greeting[newLanguage]; !ok {
// 👉 In an Update validator you return any error to reject the Update.
return fmt.Errorf("%s unsupported language", newLanguage)
}
return nil
},
})
...
}
  • Register an Update handler for a given name using either workflow.SetUpdateHandler or workflow.SetUpdateHandlerWithOptions.

  • The handler must be a function that accepts a workflow.Context as its first parameter.

  • The function can return either a serializable value with an error or just an error.

  • About validators:

    • Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you don't need a validator.
    • To set a validator, pass the validator function in the workflow.UpdateHandlerOptions when calling workflow.SetUpdateHandlerWithOptions. The validator must be a function that accepts the same argument types as the handler and returns a single value of type error.
  • Accepting and rejecting Updates with validators:

    • To reject an Update you must return an error or panic in the validator. The Workflow's WorkflowPanicPolicy determines how panics are handled inside the Handler function.
    • Without a validator, Updates are always accepted.
  • Validators and Event History:

    • The WorkflowExecutionUpdateAccepted event is written into History whether the acceptance was automatic or due to a validator function not throwing an error or panicking.
    • When a validator throws an error, the Update is rejected and WorkflowExecutionUpdateAccepted won't be added to the Event History. The caller receives an "Update failed" error.
  • Use workflow.GetCurrentUpdateInfo to obtain information about the current Update. This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once.

  • Update handlers can use Activities, Child Workflows, durable workflow.Sleep Timers, workflow.Await conditions, and more. See Blocking handlers and Workflow message passing for safe usage guidelines.

Send messages

To send Queries, Signals, or Updates, you call methods on a Temporal Client. To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.

Send a Query

Queries are sent from a Temporal Client.

Use Client.QueryWorkflow or Client.QueryWorkflowWithOptions.

// ...
supportedLangResult, err := temporalClient.QueryWorkflow(context.Background(), we.GetID(), we.GetRunID(), message.GetLanguagesQuery, message.GetLanguagesInput{IncludeUnsupported: false})
if err != nil {
log.Fatalf("Unable to query workflow: %v", err)
}
var supportedLang []message.Language
err = supportedLangResult.Get(&supportedLang)
if err != nil {
log.Fatalf("Unable to get query result: %v", err)
}
log.Println("Supported languages:", supportedLang)
// ...
  • Sending a Query doesn’t add events to a Workflow's Event History.

  • You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not supported.

  • A Worker must be online and polling the Task Queue to process a Query.

Send a Signal

You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that haven’t closed.

Send a Signal from a Client

Use Client.SignalWorkflow.

Pass in both the Workflow Id and Run Id to uniquely identify the Workflow Execution. If only the Workflow Id is supplied (provide an empty string as the Run Id param), the Workflow Execution that is running receives the Signal.

// ...
err = temporalClient.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), message.ApproveSignal, message.ApproveInput{Name: ""})
if err != nil {
log.Fatalf("Unable to signal workflow: %v", err)
}
// ...
  • The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.

  • The WorkflowExecutionSignaled Event appears in the Workflow's Event History.

Sending a Signal from a Workflow

A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.

// ...
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
...
signal := MySignal {
Message: "Some important data",
}
err := workflow.SignalExternalWorkflow(ctx, "some-workflow-id", "", "your-signal-name", signal).Get(ctx, nil)
if err != nil {
// ...
}
// ...
}

When an External Signal is sent:

Signal-With-Start

Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.

If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.

Use the Client.SignalWithStartWorkflow API to start a Workflow Execution (if not already running) and pass it the Signal at the same time.

Because the Workflow Execution might not exist, this API does not take a Run ID as a parameter

// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWithStartWorkflow(context.Background(), "your-workflow-id", "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}

Send an Update

An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.

A Client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions or perform an Update equivalent of Signal-With-Start.

  • WorkflowExecutionUpdateAccepted is added to the Event History when the Worker confirms that the Update passed validation.
  • WorkflowExecutionUpdateCompleted is added to the Event History when the Worker confirms that the Update has finished.

Use the Client.UpdateWorkflow API to send an Update to a Workflow Execution.

You must provide the Workflow Id, but specifying a Run Id is optional. If you supply only the Workflow Id (and provide an empty string as the Run Id param), the running Workflow Execution receives the Update.

You must provide a WaitForStage when calling UpdateWorkflow(). This parameter controls what stage the update must reach before a handle is returned to the caller. If WaitForStage is set to WorkflowUpdateStageCompleted the handle is returned after the update completes; if WaitForStage is set to WorkflowUpdateStageAccepted the handle is returned after the Update is accepted (i.e. after the validator has run, if there is a validator).

updateHandle, err := temporalClient.UpdateWorkflow(context.Background(), client.UpdateWorkflowOptions{
WorkflowID: we.GetID(),
RunID: we.GetRunID(),
UpdateName: message.SetLanguageUpdate,
WaitForStage: client.WorkflowUpdateStageAccepted,
Args: []interface{}{message.Chinese},
})
if err != nil {
log.Fatalf("Unable to update workflow: %v", err)
}
var previousLang message.Language
err = updateHandle.Get(context.Background(), &previousLang)
if err != nil {
log.Fatalf("Unable to get update result: %v", err)
}

Message handler patterns

This section covers common write operations, such as Signal and Update handlers. It doesn't apply to pure read operations, like Queries or Update Validators.

tip

For additional information, see Inject work into the main Workflow, Ensuring your messages are processed exactly once, and this sample demonstrating safe blocking message handling.

Blocking handlers

Signal and Update handlers can block. This allows you to use Activities, Child Workflows, durable workflow.Sleep Timers, workflow.Await conditions, etc. This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await calls.

It's essential to understand the things that could go wrong in order to use blocking handlers safely. See Workflow message passing for guidance on safe usage of blocking Signal and Update handlers, and the Controlling handler concurrency and Waiting for message handlers to finish sections below.

The following code modifies the Update handler from earlier on in this page. The Update handler now makes a blocking call to execute an Activity:

func GreetingWorkflow(ctx workflow.Context) error {
language := English

err = workflow.SetUpdateHandler(ctx, SetLanguageUpdate, func(ctx workflow.Context, newLanguage Language) (Language, error) {
if _, ok := greeting[newLanguage]; !ok {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

var greeting string
err := workflow.ExecuteActivity(ctx, CallGreetingService, newLanguage).Get(ctx, &greeting)
if err != nil {
return nil, err
}
greeting[newLanguage] = greeting
}
var previousLanguage Language
previousLanguage, language = language, newLanguage
return previousLanguage, nil
})
...
}

Add blocking wait conditions

Sometimes, blocking Signal or Update handlers need to meet certain conditions before they should continue. You can use workflow.Await to prevent the code from proceeding until a condition is true. You specify the condition by passing a function that returns true or false. This is an important feature that helps you control your handler logic.

Here are three important use cases for Workflow.await:

  • Waiting until a specific Update has arrived.
  • Waiting in a handler until it is appropriate to continue.
  • Waiting in the main Workflow until all active handlers have finished.
err = workflow.SetUpdateHandler(ctx, "UpdateHandler", func(ctx workflow.Context, input UpdateInput) error {
workflow.Await(ctx, updateUnblockedFunc)
...
})

This is necessary if your Update handlers require something in the main Workflow function to be done first, since an Update handler can execute concurrently with the main Workflow function.

You can also use Workflow.await anywhere else in the handler to wait for a specific condition to become true. This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become true.

Ensure your handlers finish before the Workflow completes

Workflow.await can ensure your handler completes before a Workflow finishes. When your Workflow uses blocking Update handlers, your main Workflow method can return or Continue-as-New while a handler is still waiting on an async task, such as an Activity. The Workflow completing may interrupt the handler before it finishes crucial work and cause client errors when trying to retrieve Update results. Use workflow.Await to wait for AllHandlersFinished to return true to address this problem and allow your Workflow to end smoothly:

func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
...
err = workflow.Await(ctx, func() bool {
return workflow.AllHandlersFinished(ctx)
})
return nil
}

By default, your Worker will log a warning if you allow your Workflow Execution to finish with unfinished Update handler executions. You can silence these warnings on a per-handler basis by setting UnfinishedPolicy field on workflow.UpdateHandlerOptions struct:

	err = workflow.SetUpdateHandlerWithOptions(ctx, UpdateHandlerName, UpdateFunc, workflow.UpdateHandlerOptions{
UnfinishedPolicy: workflow.HandlerUnfinishedPolicyAbandon,
})

See Finishing handlers before the Workflow completes for more information.

Use workflow.Mutex to prevent concurrent handler execution

See Message handler concurrency.

Concurrent processes can interact in unpredictable ways. Incorrectly written concurrent message-passing code may not work correctly when multiple handler instances run simultaneously. Here's an example of a pathological case:

// ...
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
...
err := workflow.SetUpdateHandler(ctx, "BadUpdateHandler", func(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

var result Data
err := workflow.ExecuteActivity(ctx, FetchData, name).Get(ctx, &result)
x = result.x
// 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then
// there may be times when the Workflow has self.x from one Activity execution and self.y from another.
err = workflow.Sleep(ctx, time.Second)
if err != nil {
return err
}
y = result.y
})
...
}

Coordinating access with workflow.Mutex corrects this code. Locking makes sure that only one handler instance can execute a specific section of code at any given time:

func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
...
err := workflow.SetUpdateHandler(ctx, "SafeUpdateHandler", func(ctx workflow.Context) error {
err := mutex.Lock(ctx)
if err != nil {
return err
}
defer mutex.Unlock()
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

var result Data
err := workflow.ExecuteActivity(ctx, FetchData, name).Get(ctx, &result)
x = data.x
err = workflow.Sleep(ctx, time.Second)
if err != nil {
return err
}
self.y = data.y
})
...
}

Troubleshooting

See Exceptions in message handlers for a non–Go-specific discussion of this topic.

When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:

  • The Client can't contact the server

  • The Workflow does not exist

Unlike Signals, for Queries and Updates, the Client waits for a response from the Worker. If an issue occurs during the handler execution by the Worker, the Client may receive an exception.

Problems when sending an Update

  • There is no Workflow Worker polling the Task Queue

    Your request will be retried by the SDK Client until the calling context is cancelled.

  • Update failed.

    Update failures are like Workflow failures. Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler. These might include:

    • A failed Child Workflow
    • A failed Activity if the activity retries have been set to a finite number
    • The Workflow author returning an error
    • A panic in the handler, depending on the WorkflowPanicPolicy
  • The handler caused the Workflow Task to fail A Workflow Task Failure causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage:

    • If the request hasn't been accepted by the server, you receive a FAILED_PRECONDITION error.
    • If the request has been accepted, it is durable. Once the Workflow is healthy again after a code deploy, use a WorkflowUpdateHandle to fetch the Update result.
  • The Workflow finished while the Update handler execution was in progress: You'll receive a ServiceError "workflow execution already completed"`.

    This will happen if the Workflow finished while the Update handler execution was in progress, for example because

Problems when sending a Query

  • There is no Workflow Worker polling the Task Queue

    You'll receive a ServiceError on which the status is FAILED_PRECONDITION.

  • Query failed. You'll receive a QueryFailed error. Any panic in a Query handler will trigger this error. This differs from Signal and Update, where panics can lead to Workflow Task Failure instead.

  • The handler caused the Workflow Task to fail. This would happen, for example, if the Query handler blocks the thread for too long without yielding.