Checkpointing lets you persist workflow state after every node execution. This enables:
- Resuming interrupted runs — restart after a crash without losing completed work
- Human-in-the-loop — pause, inspect, modify state, then resume
- Auditing — inspect state at any step in the run history
Checkpointer types
| Class | Where state lives | Best for |
|---|---|---|
MemoryCheckpointer |
RAM | Dev, testing, short sessions |
RDSCheckpointer |
.rds files on disk |
Long runs, multi-session |
SQLiteCheckpointer |
SQLite database | Concurrent threads, query history |
How checkpointing actually works
Understanding the mechanics saves debugging time.
Saving: After each node executes, the runner calls
checkpointer$save(thread_id, step, state$snapshot()). Step
1 is saved after the first node completes, step 2 after the second, and
so on. The snapshot is a plain named list of all channel values at that
moment.
Resuming: When invoke() is called again
with a known thread_id, the runner:
- Creates a fresh state from the schema defaults.
- Loads the latest checkpoint:
checkpointer$load_latest(thread_id). - Calls
state$restore(snapshot)— this writes the saved values directly into the state, bypassing reducers. - Sets
start_stepto the last saved step number, so subsequent saves don’t overwrite history. - Starts executing from the entry node (first node in the graph). There is no automatic node-skipping; execution always begins at the start.
The practical implication: resumed nodes re-execute, but they
find the state already populated. Nodes designed to overwrite
their output channel will replace it with a (likely identical) new
value. Nodes that accumulate into an append reducer will
add to the already-restored list. See the Idempotent node patterns section
for how to handle this.
Thread IDs — your crash recovery handle
Every checkpointed run is identified by a thread_id
string that you supply. There is no auto-generated ID — you own it.
Strategy: make thread IDs meaningful and predictable from context you already have:
# Good: derive from the input data
thread_id <- paste0("report-", format(Sys.Date(), "%Y%m%d"))
thread_id <- paste0("file-", tools::file_path_sans_ext(basename(input_file)))
thread_id <- paste0("user-", user_id, "-task-", task_id)
# Also fine for interactive sessions
thread_id <- "my-experiment-v2"If a crash happens and you didn’t store the ID, you can recover it:
cp <- rds_checkpointer(path = "checkpoints/")
# See every thread that has at least one checkpoint
cp$list_threads()
#> [1] "file-report_2024_q1" "file-report_2024_q2" "user-42-task-7"For SQLite you can also query the database directly:
cp <- sqlite_checkpointer(path = "workflow.sqlite")
cp$list_threads()Basic usage
Pass a checkpointer to compile() and a
thread_id to invoke().
cp <- memory_checkpointer()
schema <- workflow_state(
counter = list(default = 0L),
log = list(default = list(), reducer = reducer_append())
)
runner <- state_graph(schema) |>
add_node("inc", function(state, config) {
n <- state$get("counter")
list(counter = n + 1L, log = paste0("incremented to ", n + 1L))
}) |>
add_edge(START, "inc") |>
add_edge("inc", END) |>
compile(checkpointer = cp)
runner$invoke(list(counter = 0L), config = list(thread_id = "demo"))
#> WorkflowState with 2 channel(s):
#> counter: int 1
#> log: List of 1 $ : chr "incremented to 1"
# Retrieve the last checkpoint
runner$get_state("demo")
#> $counter
#> [1] 1
#>
#> $log
#> $log[[1]]
#> [1] "incremented to 1"Crash scenario: what exactly happens
Here is a step-by-step walkthrough of a crash and recovery using an RDS checkpointer.
cp <- rds_checkpointer(path = "checkpoints/")
schema <- workflow_state(
status = list(default = ""),
result = list(default = "")
)
runner <- state_graph(schema) |>
add_node("fetch", function(state, config) {
# Imagine this reads from a database or API
list(status = "fetched", result = "raw data")
}) |>
add_node("process", function(state, config) {
# This is the expensive LLM call that might time out
result <- config$agents$llm$chat(state$get("result"))
list(status = "processed", result = result)
}) |>
add_node("save", function(state, config) {
# Write result to disk / database
writeLines(state$get("result"), "output.txt")
list(status = "saved")
}) |>
add_edge(START, "fetch") |>
add_edge("fetch", "process") |>
add_edge("process", "save") |>
add_edge("save", END) |>
compile(agents = list(llm = my_agent), checkpointer = cp)Run 1 — partial success, then crash:
tryCatch(
runner$invoke(list(), config = list(thread_id = "job-99")),
error = function(e) message("Run failed: ", conditionMessage(e))
)
# "fetch" completed → checkpoint saved: step=1, status="fetched"
# "process" threw a network timeout → no checkpoint written for step 2
# Run failed: ...At this point the checkpoint directory contains:
checkpoints/
job-99/
step_1.rds ← state after "fetch" completed
Inspect the checkpoint to understand where things stand:
cp$list_threads()
#> [1] "job-99"
cp$load_latest("job-99")
#> $step
#> [1] 1
#>
#> $state
#> $state$status
#> [1] "fetched"
#>
#> $state$result
#> [1] "raw data"Run 2 — resume with the same thread_id:
# No initial_state needed — it comes from the checkpoint
runner$invoke(config = list(thread_id = "job-99", verbose = TRUE))
# Resuming from checkpoint at step 1.
# [1] fetch done.
# [2] process done.
# [3] save done.Notice that fetch re-executes. With
reducer_overwrite() channels (the default for scalars),
this just replaces status and result with the
same values — harmless. But see the next section for when that is not
the case.
Idempotent node patterns
Because execution always restarts from the first node, nodes that do expensive or stateful work should guard against redundant re-execution.
Pattern 1 — skip if output is already set (overwrite channels):
fetch_node <- function(state, config) {
if (nzchar(state$get("result"))) return(list()) # already fetched
raw <- httr2::request("https://api.example.com/data") |>
httr2::req_perform() |>
httr2::resp_body_string()
list(result = raw, status = "fetched")
}This node returns an empty list (no update) when result
is already populated, so it is a no-op on resume.
Pattern 2 — guard with a status flag:
process_node <- function(state, config) {
if (state$get("status") == "processed") return(list())
answer <- config$agents$llm$chat(state$get("result"))
list(result = answer, status = "processed")
}Pattern 3 — be careful with
reducer_append():
Append reducers accumulate on every call. If a node appends to a messages channel and is re-run on resume, it will append again to the already-restored list:
# This node is NOT safe to re-run after checkpoint restore
log_node <- function(state, config) {
list(messages = "step complete") # appends every time
}
# Safe version — guard with a marker in the messages list
log_node_safe <- function(state, config) {
msgs <- state$get("messages")
if (any(vapply(msgs, identical, logical(1L), "step complete"))) return(list())
list(messages = "step complete")
}Alternatively, use reducer_overwrite() for channels that
nodes compute fresh and do not need history.
RDS checkpointer — survives across R sessions
cp <- rds_checkpointer(path = "~/my_checkpoints")
runner <- state_graph(schema) |>
add_node("step1", function(s, cfg) {
if (s$get("status") == "step1-done") return(list())
Sys.sleep(2) # simulate slow work
list(counter = s$get("counter") + 1L, status = "step1-done")
}) |>
add_node("step2", function(s, cfg) {
if (s$get("status") == "step2-done") return(list())
list(counter = s$get("counter") * 2L, status = "step2-done")
}) |>
add_edge(START, "step1") |>
add_edge("step1", "step2") |>
add_edge("step2", END) |>
compile(checkpointer = cp)
# Session 1 — crashes after step1
runner$invoke(list(counter = 5L, status = ""), config = list(thread_id = "job-42"))
# Session 2 (new R process) — re-create runner, then resume
runner$invoke(config = list(thread_id = "job-42"))
# step1 finds status == "step1-done" → returns list(), no-op
# step2 runs fresh → counter = 6 * 2 = 12Files are written as ~/my_checkpoints/job-42/step_1.rds,
step_2.rds, etc.
SQLite checkpointer — multiple threads
cp <- sqlite_checkpointer(path = "workflow.sqlite")
# Run two independent threads against the same compiled runner
runner$invoke(list(counter = 1L), config = list(thread_id = "thread-A"))
runner$invoke(list(counter = 100L), config = list(thread_id = "thread-B"))
cp$list_threads()
#> [1] "thread-A" "thread-B"
# Inspect specific steps
cp$load_step(thread_id = "thread-A", step = 1L)
cp$load_latest("thread-B")Human-in-the-loop
Use update_state() to manually edit checkpointed state
before resuming. The runner treats the edit as one additional checkpoint
step and resumes from the modified state.
review_schema <- workflow_state(
task = list(default = ""),
draft = list(default = ""),
approved = list(default = FALSE),
feedback = list(default = "")
)
library(ellmer)
writer <- agent("writer", chat_anthropic(),
instructions = "Write a short paragraph on the given task.")
cp <- memory_checkpointer()
runner <- state_graph(review_schema) |>
add_node("write", function(state, config) {
# Skip if we already have a draft that was not rejected
if (nzchar(state$get("draft")) && !nzchar(state$get("feedback"))) return(list())
task <- if (nzchar(state$get("feedback"))) {
paste0(
"Original draft:\n", state$get("draft"),
"\n\nFeedback:\n", state$get("feedback"),
"\n\nPlease revise."
)
} else {
state$get("task")
}
list(draft = config$agents$writer$chat(task), feedback = "")
}) |>
add_node("check_approval", function(state, config) {
list() # gate node — routes based on human-set `approved` flag
}) |>
add_conditional_edge(
"check_approval",
routing_fn = function(s) if (isTRUE(s$get("approved"))) "done" else "revise",
route_map = list(done = END, revise = "write")
) |>
add_edge(START, "write") |>
add_edge("write", "check_approval") |>
compile(agents = list(writer = writer), checkpointer = cp)
# Step 1: run — produces a draft, parks at check_approval, then routes to "write"
# (approved is FALSE by default, so it loops back)
runner$invoke(
list(task = "Explain gradient descent."),
config = list(thread_id = "review-1")
)
# Step 2: human reads the draft
draft <- runner$get_state("review-1")$draft
cat(draft)
# Step 3a: human is not satisfied — inject feedback and resume
runner$update_state("review-1", list(
feedback = "Too abstract. Use a concrete analogy.",
approved = FALSE
))
runner$invoke(config = list(thread_id = "review-1"))
# Step 3b: human approves — mark it and resume to finish
runner$update_state("review-1", list(approved = TRUE, feedback = ""))
runner$invoke(config = list(thread_id = "review-1"))
runner$get_state("review-1")$draftWhat update_state() does internally: 1.
Loads the latest checkpoint for the thread. 2. Applies the updates to
the snapshot (no reducers — direct replacement). 3. Saves a new
checkpoint at last_step + 1.
When invoke() is next called, it loads this new
checkpoint and resumes with your edits in place.
Step callbacks
The on_step callback fires after each node, giving you a
lightweight way to observe progress without a full checkpointer.
schema <- workflow_state(n = list(default = 0L))
runner <- state_graph(schema) |>
add_node("a", function(s, cfg) list(n = s$get("n") + 1L)) |>
add_node("b", function(s, cfg) list(n = s$get("n") * 10L)) |>
add_edge(START, "a") |>
add_edge("a", "b") |>
add_edge("b", END) |>
compile()
runner$invoke(
list(n = 3L),
config = list(
verbose = TRUE,
on_step = function(node, state) {
cat(sprintf(" after %-10s n = %d\n", node, state$get("n")))
}
)
)
#> [1] a done.
#> after a n = 4
#> [2] b done.
#> after b n = 40
#> WorkflowState with 1 channel(s):
#> n: int 40Resilient batch pipelines
When processing many files, two failure modes matter:
- Within a run: the API call in node 3 fails after nodes 1 and 2 already ran. Without a checkpoint you re-run the expensive steps from scratch.
- Across runs: the pipeline crashes on item 7 of 20. You want to skip items 1–6 on retry.
Combine a checkpointer (within-run resilience) with a done-file tracker (across-run resilience). Design each node to be idempotent so re-execution on resume is harmless:
library(ellmer)
schema <- workflow_state(
file_path = list(default = ""),
file_content = list(default = ""),
llm_result = list(default = ""),
status = list(default = "")
)
cp <- rds_checkpointer(path = "checkpoints/")
runner <- state_graph(schema) |>
add_node("read_file", function(s, cfg) {
if (nzchar(s$get("file_content"))) return(list()) # already read
list(
file_content = readLines(s$get("file_path"), warn = FALSE) |> paste(collapse = "\n"),
status = "read"
)
}) |>
add_node("call_llm", function(s, cfg) {
if (s$get("status") == "processed") return(list()) # already processed
result <- cfg$agents$llm$chat(s$get("file_content"))
list(llm_result = result, status = "processed")
}) |>
add_node("save_result", function(s, cfg) {
if (s$get("status") == "saved") return(list()) # already saved
out <- file.path("output", basename(s$get("file_path")))
writeLines(s$get("llm_result"), out)
list(status = "saved")
}) |>
add_edge(START, "read_file") |>
add_edge("read_file", "call_llm") |>
add_edge("call_llm", "save_result") |>
add_edge("save_result", END) |>
compile(agents = list(llm = my_agent), checkpointer = cp)
files <- list.files("source", pattern = "\\.txt$", full.names = TRUE)
done_file <- "checkpoints/done.rds"
done <- if (file.exists(done_file)) readRDS(done_file) else character(0)
for (f in setdiff(files, done)) {
thread_id <- paste0("file-", tools::file_path_sans_ext(basename(f)))
message("Processing: ", basename(f))
tryCatch({
runner$invoke(
list(file_path = f),
config = list(thread_id = thread_id)
)
done <- c(done, f)
saveRDS(done, done_file)
}, error = function(e) {
message(" FAILED: ", conditionMessage(e), " — will retry next run")
})
}Failure and recovery walkthrough:
| Event | What happens |
|---|---|
read_file completes |
Checkpoint saved: step=1,
status="read"
|
call_llm hits a timeout |
No checkpoint for step 2; loop catches the error |
| Next run starts |
done.rds does not contain this file → it is
retried |
invoke() called with same thread_id
|
Checkpoint loaded, state restored: status="read"
|
read_file re-runs |
Guard fires: file_content already set → returns
list(), no-op |
call_llm re-runs |
Guard fires: status != "processed" → runs the LLM call
for real |
| Success |
done.rds updated; file is skipped on all future
runs |
Listing and loading specific steps
cp <- sqlite_checkpointer(path = "workflow.sqlite")
# All threads
cp$list_threads()
# Load state at a specific step (useful for auditing)
cp$load_step(thread_id = "job-42", step = 2L)
# Load the most recent state
cp$load_latest("job-42")