puppeteeR provides two ways to observe a running graph:
| Mechanism | Granularity | How to use |
|---|---|---|
on_step |
One callback per node, after it finishes | config = list(on_step = fn) |
on_event |
Two callbacks per node - before and after | config = list(on_event = fn) |
stream() |
One yielded value per node, after it finishes |
runner$stream() generator |
on_step and stream() exist for simple
progress tracking. on_event is the richer replacement: it
fires twice per node and carries structured data, making it suitable for
logging, timing, dashboards, and debugging.
The event object
Every on_event call receives a single list with four
fields:
| Field | Type | Description |
|---|---|---|
type |
character |
"node_start" or "node_end"
|
node |
character | Name of the node being executed |
iteration |
integer | Loop counter (1-based) |
data |
list or NULL |
NULL on node_start;
list(updates = <named list>) on
node_end
|
data$updates on node_end is exactly the
named list that the node function returned - the raw state updates
before reducers have been applied.
Basic usage
schema <- workflow_state(
result = list(default = "")
)
runner <- state_graph(schema) |>
add_node("prepare", function(state, config) list(result = "prepared")) |>
add_node("finalise", function(state, config) {
list(result = paste(state$get("result"), "and finalised"))
}) |>
add_edge(START, "prepare") |>
add_edge("prepare", "finalise") |>
add_edge("finalise", END) |>
compile()
runner$invoke(config = list(
on_event = function(event) {
cat(sprintf("[%s] %s (iter %d)\n", event$type, event$node, event$iteration))
}
))
#> [node_start] prepare (iter 1)
#> [node_end] prepare (iter 1)
#> [node_start] finalise (iter 2)
#> [node_end] finalise (iter 2)
#> WorkflowState with 1 channel(s):
#> result: chr "prepared and finalised"Timing nodes
Because node_start fires before execution and
node_end fires after, you can measure wall-clock time for
each node without modifying any node function:
timings <- list()
start_times <- list()
runner$invoke(config = list(
on_event = function(event) {
if (event$type == "node_start") {
start_times[[event$node]] <<- proc.time()[["elapsed"]]
} else {
elapsed <- proc.time()[["elapsed"]] - start_times[[event$node]]
timings[[event$node]] <<- elapsed
}
}
))
#> WorkflowState with 1 channel(s):
#> result: chr "prepared and finalised"
str(timings)
#> List of 2
#> $ prepare : num 0.005
#> $ finalise: num 0.001Inspecting state updates
data$updates on a node_end event lets you
log exactly what each node changed without adding print()
calls inside node functions:
schema2 <- workflow_state(
count = list(default = 0L),
status = list(default = "idle")
)
runner2 <- state_graph(schema2) |>
add_node("step_a", function(state, config) list(count = 1L, status = "running")) |>
add_node("step_b", function(state, config) list(count = 2L, status = "done")) |>
add_edge(START, "step_a") |>
add_edge("step_a", "step_b") |>
add_edge("step_b", END) |>
compile()
runner2$invoke(config = list(
on_event = function(event) {
if (event$type == "node_end") {
cat(sprintf(" %s returned: %s\n",
event$node,
paste(names(event$data$updates), collapse = ", ")
))
}
}
))
#> step_a returned: count, status
#> step_b returned: count, status
#> WorkflowState with 2 channel(s):
#> count: int 2
#> status: chr "done"Building an execution log
Collect events into a data frame for post-run analysis:
log_entries <- list()
runner2$invoke(config = list(
on_event = function(event) {
log_entries[[length(log_entries) + 1L]] <<- data.frame(
type = event$type,
node = event$node,
iteration = event$iteration,
stringsAsFactors = FALSE
)
}
))
#> WorkflowState with 2 channel(s):
#> count: int 2
#> status: chr "done"
do.call(rbind, log_entries)
#> type node iteration
#> 1 node_start step_a 1
#> 2 node_end step_a 1
#> 3 node_start step_b 2
#> 4 node_end step_b 2Combining on_event with on_step
on_step is kept for backwards compatibility and works
alongside on_event. Both can be supplied in the same config
- on_event fires first (before and after the node), then
on_step fires once after the node completes:
runner$invoke(config = list(
on_event = function(event) cat("event:", event$type, event$node, "\n"),
on_step = function(node, state) cat("step: ", node, "\n")
))
#> event: node_start prepare
#> event: node_end prepare
#> step: prepare
#> event: node_start finalise
#> event: node_end finalise
#> step: finalise
#> WorkflowState with 1 channel(s):
#> result: chr "prepared and finalised"Current limitation - tool-level events
on_event currently emits only node-level events. Two
additional event types are planned but not yet available:
| Planned type | Meaning |
|---|---|
tool_call |
An LLM inside the node called a tool |
tool_result |
The tool returned a result to the LLM |
These sub-node events require ellmer to expose an intercept hook on
ellmer::Chat so puppeteeR can observe individual tool
invocations without wrapping the Chat object. That hook does not exist
yet. Once it is added upstream, tool_call and
tool_result events will be emitted automatically without
any change to the on_event API.
Until then, if you need visibility into tool calls, the simplest workaround is to wrap the tool function itself:
See also
-
vignette("checkpointing")- persisting state between runs -
vignette("retry-policy")- retrying nodes that fail transiently -
?GraphRunner- full reference for$invoke()and$stream()