Skip to main content

Events Reference

Moco supports event-driven workflows through two complementary statements: emit_event (send) and wait_for (receive). Events enable coordination between concurrent workflows, signal state machine transitions, and implement multi-agent patterns.

For full workflowspec context, see the Workflowspec Reference.


emit_event

Sends an event to the event bus.

- emit_event:
input_data:
topic: notification_events
data:
type: order_created
order_id: "{{ order_id }}"
timestamp: "{{ __sys_info__.timestamp }}"

Parameters

ParameterTypeRequiredDescription
topicstringYesEvent topic
dataobjectYesEvent payload
target_workflow_idstringNoRoute event to a specific workflow
metadataobjectNoAdditional event metadata

Targeting a Specific Workflow

Use target_workflow_id to send events directly to another workflow instance rather than broadcasting on the topic:

- emit_event:
input_data:
topic: child_events
target_workflow_id: "{{ parent_workflow_id }}"
data:
event_name: child_complete
result: "{{ processing_result }}"

The target workflow must be listening on the same topic with a matching wait_for or state machine transition.

Event Metadata

- emit_event:
input_data:
topic: analytics_events
data:
action: page_view
page: /products
metadata:
priority: low
source: web_app

wait_for

Waits until an event matching the filter arrives, or until the timeout expires.

- wait_for:
event:
topic: order_events
match_expression: "{{ event.data.get('order_id') == order_id }}"
timeout_sec: 60
output_name: received_event

Parameters

ParameterTypeRequiredDescription
event.topicstringYesEvent topic to subscribe to
event.match_expressionexpressionNoPython expression that must be truthy for an event to match
timeout_secintegerYesMaximum wait time in seconds
output_namestringNoVariable to store the received event

Event Object Structure

Inside match_expression, the event variable has this structure:

{
"data": {...}, # Event payload (from emit_event data)
"topic": "...", # Event topic
"source_workflow_id": "...", # Workflow that emitted the event
"metadata": {...} # Event metadata
}

Filtering Events

Use match_expression to select only events that meet specific criteria:

- wait_for:
event:
topic: payment_events
match_expression: >
{{ event.data.get('transaction_id') == transaction_id and
event.data.get('status') == 'completed' }}
timeout_sec: 120
output_name: payment_event

Timeout Only

Omit event to use wait_for as a simple delay:

- wait_for:
timeout_sec: 30

State Machine Events

Events are the primary trigger mechanism for state machines. A state's on_enter callback typically does work and emits an event that drives the next transition:

states:
- name: processing
on_enter:
sequence:
elements:
- activity:
type: process-payment
input_data:
order: "{{ order_data }}"
output_name: payment_result
- emit_event:
input_data:
topic: order_events
data:
event_name: "{{ 'payment_complete' if payment_result.success else 'payment_failed' }}"

transitions:
- from_state: processing
to_state: completed
trigger:
event_name: payment_complete

- from_state: processing
to_state: failed
trigger:
event_name: payment_failed

Multi-Agent Pattern

Coordinate multiple child workflows using events.

Parent workflow — starts child workflows and collects results:

wfspec_name: parent-orchestrator
wfspec_version: 1.0.0

context:
child_workflow_ids: []

body:
sequence:
elements:
- iteration:
iter_type: parallel
input_data: "{{ agent_configs }}"
body:
sequence:
elements:
- workflow:
wfspec:
name: child-agent
version: 1.0.0
child_mode: async
execute_options:
workflow_id: "child-{{ iter_item.agent_id }}"
input_data:
config: "{{ iter_item }}"
parent_workflow_id: "{{ __sys_info__.workflow_id }}"
output_name: child_info
- transform:
output_data:
- _tmp: "{{ child_workflow_ids.append(child_info.workflow_id) }}"

- iteration:
iter_type: sequence
input_data: "{{ child_workflow_ids }}"
body:
wait_for:
event:
topic: child_events
match_expression: >
{{ event.data.get('event_name') == 'complete' and
event.source_workflow_id == iter_item }}
timeout_sec: 300
output_name: child_result

Child workflow — does work and signals the parent:

wfspec_name: child-agent
wfspec_version: 1.0.0

input_data:
config:
parent_workflow_id:

body:
sequence:
elements:
- activity:
type: process-data
input_data:
config: "{{ config }}"
output_name: result

- emit_event:
input_data:
topic: child_events
target_workflow_id: "{{ parent_workflow_id }}"
data:
event_name: complete
result: "{{ result }}"

Next Steps