This article is aimed at data scientists and R developers who want to move beyond one-off LLM calls toward production-grade, multi-agent workflows. It covers three things the code examples alone cannot show: when AI automation is worth building, how to frame the business case, and how to deploy the result in a real organisation.
Part 1: Should you automate this with AI?
Not every repetitive task is a good candidate for LLM automation. The decision depends on the nature of the work, not just its volume.
The right problem shape
AI agents add value when tasks share all three of the following properties:
1. The input is unstructured or highly variable. If the input is a fixed-schema database query, write a SQL query. If the input is a free-text email, contract, or analyst note - something where the meaning depends on context and phrasing - an LLM is the right parser.
2. The logic is too complex or too brittle to express as
rules. Rule-based systems break the moment a new document type
appears, a new regulation is passed, or an edge case arrives that nobody
anticipated. LLMs generalise across variation. If your current
automation is a 500-line if/else tree that breaks every
quarter, it’s a candidate for replacement.
3. The output quality can be verified, even if it can’t be predicted. A human or a downstream system must be able to confirm whether the output is correct. Pure “black box” decisions with no verification loop are high-risk regardless of technology.
When NOT to use AI
| Situation | Why AI is the wrong tool |
|---|---|
| Structured data with deterministic logic | SQL, dplyr, or a simple function will be faster, cheaper, and auditable |
| Sub-100 ms latency requirement | LLM inference is 1-30 seconds per call. Use a rules engine or a pre-trained classifier |
| Regulated output with zero error tolerance | LLMs hallucinate. Build AI as a draft layer, not the final decision |
| Problem you haven’t solved manually yet | Automate a understood process, not an undefined one |
| Volume < 50 items/week | Manual processing is faster to build and cheaper to run |
The cost-benefit frame
Before building, estimate the economics. A rough model:
Hours saved per week = (items/week) x (minutes/item saved) / 60
Annual value = hours saved x fully-loaded hourly rate x 52
LLM cost per run = (input tokens + output tokens) / 1000 x price/1k
Annual LLM cost = cost/run x items/week x 52
A contract review pipeline that saves a paralegal 4 hours per week, at a fully-loaded rate of €60/hour, delivers €12,480/year in reclaimed time. If the pipeline costs €0.05 per contract and you process 100 contracts/week, the annual LLM cost is €260 - a 48:1 return before accounting for quality improvements or headcount scaling.
The bar for automation is lower than most teams think. A pipeline that is 80% accurate but runs in seconds costs a fraction of manual processing, and the remaining 20% can be routed to a human reviewer.
The human-in-the-loop question
Most production AI workflows are not fully autonomous. A more useful framing is: where does a human add the most value, and where does AI add the most value?
- AI is better at: processing volume, consistency, availability, first-pass triage
- Humans are better at: judgment calls, novel edge cases, accountability, stakeholder relationships
A well-designed puppeteeR workflow puts humans at the decision boundary, not at the processing layer. Agents do the reading, extracting, drafting, and classifying. A human approves, overrides, or escalates.
Part 2: Production use cases
The following examples show the full picture: the business problem, the pipeline design, the R code, and what the organisation actually gains.
1. Document processing pipeline
Business problem: An investment firm receives 200-400 analyst reports, regulatory filings, and earnings releases per week. Analysts spend 2-3 hours daily reading and tagging documents before they can start actual analysis. At a fully-loaded cost of €80/hour, that is €32,000-48,000 per year in reading time per analyst.
What the pipeline does: Each incoming document is classified by type, key facts are extracted, and a two-sentence executive summary is generated. The structured output lands in a database. Analysts open a dashboard, see pre-tagged summaries, and read only what is relevant to them.
Time saved: 70-90% reduction in initial triage time. Analysts review the AI summary in 20 seconds instead of reading 10 pages in 15 minutes.
Pattern: Linear chain - classify -> extract -> summarise.
library(ellmer)
schema <- workflow_state(
document = list(default = ""),
category = list(default = ""),
key_facts = list(default = ""),
summary = list(default = ""),
log = list(default = list(), reducer = reducer_append())
)
classifier <- agent("classifier", chat_anthropic(),
instructions = "Classify the document as: contract, report, invoice, or correspondence.
Return ONLY the category word.")
extractor <- agent("extractor", chat_anthropic(),
instructions = "Extract the 5 most important facts from this document as a numbered list.")
summariser <- agent("summariser", chat_anthropic(),
instructions = "Write a 2-sentence executive summary based on the category and key facts.")
runner <- state_graph(schema) |>
add_node("classify", function(state, config) {
cat <- config$agents$classifier$chat(state$get("document"))
list(category = trimws(tolower(cat)), log = "classified")
}) |>
add_node("extract", function(state, config) {
facts <- config$agents$extractor$chat(state$get("document"))
list(key_facts = facts, log = "extracted")
}) |>
add_node("summarise", function(state, config) {
prompt <- sprintf("Category: %s\n\nFacts:\n%s",
state$get("category"), state$get("key_facts"))
list(summary = config$agents$summariser$chat(prompt), log = "summarised")
}) |>
add_edge(START, "classify") |>
add_edge("classify", "extract") |>
add_edge("extract", "summarise") |>
add_edge("summarise", END) |>
compile(agents = list(
classifier = classifier,
extractor = extractor,
summariser = summariser
))
# In production: wrap in a loop over incoming documents
documents <- read_incoming_documents() # your ingestion function
results <- lapply(documents, function(doc) {
runner$invoke(list(document = doc$text))
})
# Write structured output to database
summaries <- data.frame(
id = vapply(documents, `[[`, character(1), "id"),
category = vapply(results, function(r) r$get("category"), character(1)),
summary = vapply(results, function(r) r$get("summary"), character(1))
)
DBI::dbWriteTable(con, "document_summaries", summaries, append = TRUE)Why puppeteeR? State is shared - the summariser sees
both the category and the extracted facts without re-reading the
document. The log channel gives a full audit trail of which
steps ran and in what order.
2. Email and ticket triage
Business problem: A SaaS company’s support inbox receives 500 tickets per day. 35% are routine queries answerable by template, 10% are billing disputes requiring empathy and authority to refund, 5% are urgent outages, and 50% are low-priority or spam. Two support agents currently spend 60% of their time routing and drafting - work that adds no value beyond what routing rules could do.
What the pipeline does: Each email is classified and routed to the appropriate specialist agent, which drafts a reply. Drafts are held in a review queue where a human approves or edits before sending. Spam is discarded automatically.
What the organisation gains: - Support agents spend time on complex issues, not sorting - First-response time drops from hours to minutes (drafts ready instantly) - Draft quality is consistent regardless of which human agent eventually sends it
Pattern: Classify -> conditional route to specialist -> human review queue.
schema <- workflow_state(
email = list(default = ""),
classification = list(default = ""),
draft_reply = list(default = ""),
confidence = list(default = 0)
)
classifier <- agent("classifier", chat_anthropic(),
instructions = "Classify this support email as: urgent, billing, technical, or spam.
Return ONLY the label.")
support <- agent("support", chat_anthropic(),
instructions = "Draft a helpful, professional reply to this support email.
Use a warm but efficient tone. Do not promise specific timelines.")
billing <- agent("billing", chat_anthropic(),
instructions = "Draft a reply about billing. Be empathetic.
Offer one concrete resolution path. Escalate refunds > €200 to 'billing@company.com'.")
urgent <- agent("urgent", chat_anthropic(),
instructions = "This is an urgent issue. Acknowledge immediately, confirm the team is
investigating, and provide an incident reference number format: INC-YYYYMMDD-XXX.")
runner <- state_graph(schema) |>
add_node("classify", function(state, config) {
label <- config$agents$classifier$chat(state$get("email"))
list(classification = trimws(tolower(label)))
}) |>
add_node("urgent_reply", function(state, config) {
list(draft_reply = config$agents$urgent$chat(state$get("email")))
}) |>
add_node("support_reply", function(state, config) {
list(draft_reply = config$agents$support$chat(state$get("email")))
}) |>
add_node("billing_reply", function(state, config) {
list(draft_reply = config$agents$billing$chat(state$get("email")))
}) |>
add_node("discard", function(state, config) {
list(draft_reply = "(spam - no reply sent)")
}) |>
add_edge(START, "classify") |>
add_conditional_edge("classify",
routing_fn = function(state) {
cl <- state$get("classification")
if (cl == "urgent") "urgent"
else if (cl == "billing") "billing"
else if (cl == "spam") "spam"
else "support"
},
route_map = list(
urgent = "urgent_reply",
billing = "billing_reply",
support = "support_reply",
spam = "discard"
)
) |>
add_edge("urgent_reply", END) |>
add_edge("support_reply", END) |>
add_edge("billing_reply", END) |>
add_edge("discard", END) |>
compile(agents = list(
classifier = classifier,
urgent = urgent,
support = support,
billing = billing
))
# In production: poll inbox via API, run pipeline, push to review queue
new_tickets <- fetch_new_tickets(inbox_api) # your function
for (ticket in new_tickets) {
result <- runner$invoke(list(email = ticket$body))
push_to_review_queue(
ticket_id = ticket$id,
category = result$get("classification"),
draft_reply = result$get("draft_reply")
)
}3. Automated report generation with human approval
Business problem: A financial services team publishes a weekly market commentary. Writing the first draft takes 3 hours: pulling data, summarising market moves, writing narrative, formatting. Editing takes 1 hour. The work is high-stakes - errors embarrass the firm - but the structure is predictable enough to template.
What the pipeline does: Agents research, draft, and edit the report. A human reviews the final draft and either approves it for publication or sends it back for revision with notes. The loop repeats until approved. Checkpointing means the session can be closed and resumed without restarting the pipeline from scratch.
What the organisation gains: - Draft time: 3 hours -> 4 minutes (agent research + writing) - Editor time: 1 hour -> 20 minutes (reviewing AI draft vs. writing from scratch) - Consistency: structure and formatting are always correct; humans focus on factual accuracy
Pattern: Research -> draft -> edit -> human-in-the-loop approval -> publish or revise.
schema <- workflow_state(
topic = list(default = ""),
analysis = list(default = ""),
draft = list(default = ""),
final = list(default = ""),
revision_notes = list(default = ""),
approved = list(default = FALSE)
)
analyst <- agent("analyst", chat_anthropic(),
instructions = "You are a senior market analyst. Summarise key market moves, macro data,
and sector performance for the given topic. Be factual and cite specific numbers.")
writer <- agent("writer", chat_anthropic(),
instructions = "Write a 3-paragraph market commentary based on the analysis provided.
Use a professional, confident tone. No jargon.")
editor <- agent("editor", chat_anthropic(),
instructions = "Polish the commentary for clarity, flow, and concision. Preserve all facts.
Return only the improved text.")
cp <- rds_checkpointer(path = "checkpoints/") # survives session restarts
runner <- state_graph(schema) |>
add_node("analyse", function(state, config) {
analysis <- config$agents$analyst$chat(
paste("Analyse this topic:", state$get("topic"),
"\nFocus on: key movers, macro context, sector rotation.")
)
list(analysis = analysis)
}) |>
add_node("draft", function(state, config) {
notes <- state$get("revision_notes")
prompt <- if (nchar(notes) > 0) {
paste("Analysis:\n", state$get("analysis"),
"\n\nRevision notes from editor:\n", notes,
"\n\nWrite a revised 3-paragraph commentary addressing the notes.")
} else {
paste("Write a 3-paragraph commentary based on:\n", state$get("analysis"))
}
list(draft = config$agents$writer$chat(prompt))
}) |>
add_node("edit", function(state, config) {
prompt <- paste("Polish this commentary:\n", state$get("draft"))
list(final = config$agents$editor$chat(prompt))
}) |>
add_node("review", function(state, config) {
cat("\n=== DRAFT FOR REVIEW ===\n")
cat(state$get("final"))
cat("\n========================\n")
decision <- readline("Approve? (y/n): ")
if (decision == "y") {
list(approved = TRUE, revision_notes = "")
} else {
notes <- readline("Revision notes: ")
list(approved = FALSE, revision_notes = notes)
}
}) |>
add_edge(START, "analyse") |>
add_edge("analyse", "draft") |>
add_edge("draft", "edit") |>
add_edge("edit", "review") |>
add_conditional_edge("review",
routing_fn = function(s) if (isTRUE(s$get("approved"))) "publish" else "revise",
route_map = list(publish = END, revise = "draft")
) |>
compile(
agents = list(analyst = analyst, writer = writer, editor = editor),
checkpointer = cp,
termination = max_turns(20L)
)
result <- runner$invoke(
list(topic = "European equity markets - week of 2024-Q4"),
config = list(thread_id = "weekly-report-2024-Q4-W1")
)Why checkpointing? Each agent call costs time and
money. If the session crashes after “edit” but before “review”, the
pipeline resumes from the checkpoint - the analyst’s and writer’s work
is not repeated. The thread_id identifies the checkpoint so
you can resume a specific run.
4. Code review assistant
Business problem: Engineering teams delay code reviews because senior engineers are the bottleneck. A PR can wait 2-3 days, slowing delivery. An AI first-pass catches the obvious issues - SQL injection, unvectorised loops, missing documentation - so the human reviewer focuses on architecture and intent rather than style policing.
What the pipeline does: A manager agent coordinates three specialist reviewers (security, performance, style). Each reads the code and reports findings. The manager synthesises a structured review report that lands in the PR as a comment.
What the organisation gains: - PRs arrive at human review pre-screened; reviewers catch more because they spend less time on mechanical checks - Junior developers get instant, specific feedback on every PR, not just when a senior is available - Style and security checks are applied consistently, not depending on who reviews
Pattern: Supervisor delegates to specialist reviewers, synthesises findings.
manager <- agent("manager", chat_anthropic(),
instructions = "You coordinate a code review. Available specialists:
'security' (security vulnerabilities),
'performance' (speed and memory),
'style' (readability and conventions).
Delegate to each specialist in turn, then reply 'DONE'.")
team <- supervisor_workflow(
manager = manager,
workers = list(
security = agent("security", chat_anthropic(),
instructions = "Review for security issues: SQL injection,
exposure of secrets, unsafe use of eval() or system().
Be specific: quote the offending line and explain the risk."),
performance = agent("performance", chat_anthropic(),
instructions = "Review for performance: unvectorised loops that
could use vapply/lapply, unnecessary copies of large objects,
missing indexing. Quote the line and suggest the fix."),
style = agent("style", chat_anthropic(),
instructions = "Review for R style: snake_case naming, pipe usage,
function length (>30 lines is a smell), missing roxygen docs.
Be constructive.")
),
max_rounds = 6L
)
# In production: triggered by a GitHub Actions webhook or Plumber API endpoint
code <- readLines("R/my_function.R") |> paste(collapse = "\n")
result <- team$invoke(list(messages = list(code)))
# Post the synthesised review back to the PR
review_text <- paste(
vapply(result$get("messages"), as.character, character(1)),
collapse = "\n\n"
)
post_pr_comment(pr_number = 42, body = review_text) # your GitHub API call5. Multi-turn data analysis
Business problem: A data science team runs the same exploratory analysis repeatedly across different datasets: form a hypothesis, test it, interpret results, refine. The first 80% of this work - picking the right statistical test, writing the boilerplate, interpreting p-values - is mechanical. The last 20% - “so what does this mean for the business?” - is where humans add value.
What the pipeline does: A statistician agent
proposes a hypothesis and the R code to test it. Results (which a human
runs, or which are executed via eval() in a controlled
environment) are fed back to an interpreter agent that assesses the
findings. The loop continues until the analyst signals the analysis is
complete.
What the organisation gains: - Junior analysts can run rigorous multi-step analyses with senior-level statistical guidance - The hypothesis-test-interpret loop runs in minutes rather than days - The full analytical chain is recorded in state, producing an automatic audit trail
schema <- workflow_state(
data_description = list(default = ""),
hypothesis = list(default = ""),
r_code = list(default = ""),
result = list(default = ""),
conclusion = list(default = ""),
messages = list(default = list(), reducer = reducer_append())
)
analyst <- agent("analyst", chat_anthropic(),
instructions = "You are a statistician. Given a dataset description and any prior findings,
propose ONE specific hypothesis and provide the exact R code to test it.
Format: HYPOTHESIS: ... CODE: ```r ... ```")
interpreter <- agent("interpreter", chat_anthropic(),
instructions = "Given a hypothesis and statistical output, state clearly whether the
hypothesis is supported, the effect size, and the business implication.
When 3 or more hypotheses have been tested, reply 'CONCLUDE: <summary>'.")
runner <- state_graph(schema) |>
add_node("hypothesise", function(state, config) {
prompt <- paste(
"Dataset:", state$get("data_description"),
"\nPrior conclusions:", state$get("conclusion")
)
response <- config$agents$analyst$chat(prompt)
list(hypothesis = response, messages = response)
}) |>
add_node("interpret", function(state, config) {
prompt <- sprintf("Hypothesis tested:\n%s\n\nR output:\n%s",
state$get("hypothesis"), state$get("result"))
conclusion <- config$agents$interpreter$chat(prompt)
list(conclusion = conclusion, messages = conclusion)
}) |>
add_edge(START, "hypothesise") |>
add_edge("hypothesise", "interpret") |>
add_conditional_edge("interpret",
routing_fn = function(s) {
if (grepl("CONCLUDE", s$get("conclusion"), fixed = TRUE)) "done" else "continue"
},
route_map = list(done = END, continue = "hypothesise")
) |>
compile(
agents = list(analyst = analyst, interpreter = interpreter),
termination = max_turns(10L) | cost_limit(2.00)
)Part 3: Deploying to production
Writing the pipeline is 20% of the work. The other 80% is making it reliable, observable, and accessible to the people who need it. This section covers the four most common deployment patterns for puppeteeR workflows in an R environment.
Architecture overview
Every production deployment has the same three layers:
[Trigger] -> [puppeteeR Pipeline] -> [Output destination]
| Layer | Options |
|---|---|
| Trigger | HTTP request (Plumber), schedule (cron / Posit Connect), manual (Shiny), file drop (watchdog) |
| Pipeline | puppeteeR + ellmer, running in an R process |
| Output | Database, file, email, Slack, API call, Shiny UI, return value |
The pipeline itself does not change between deployments. Only the trigger and output wiring differ.
Pattern A: Shiny app (interactive, human-in-the-loop)
Best for: workflows where a business user triggers the pipeline, reviews intermediate output, and makes approval decisions - the report generation and email triage examples above.
library(shiny)
# Build the runner once at app startup, not per request
runner <- build_report_runner() # returns a compiled GraphRunner
ui <- fluidPage(
textAreaInput("topic", "Report topic"),
actionButton("run", "Generate draft"),
verbatimTextOutput("draft_output"),
actionButton("approve", "Approve and publish"),
actionButton("revise", "Request revision"),
textAreaInput("revision_notes", "Revision notes")
)
server <- function(input, output, session) {
result <- reactiveVal(NULL)
observeEvent(input$run, {
r <- runner$invoke(list(topic = input$topic),
config = list(thread_id = paste0("report-", Sys.time())))
result(r)
output$draft_output <- renderText(r$get("final"))
})
observeEvent(input$approve, {
# publish the approved draft
publish_report(result()$get("final"))
showNotification("Published.")
})
}
shinyApp(ui, server)Infrastructure: Deploy on Posit Connect, shinyapps.io, or a Docker container. The R process is long-running; Shiny handles concurrent users with separate sessions.
Pattern B: Plumber REST API (system-to-system)
Best for: integrations where another system (a Python backend, a webhook, a CI runner) calls your pipeline programmatically and gets a structured response.
library(plumber)
# Build runner once at startup
runner <- build_document_runner()
#* Classify and summarise a document
#* @post /summarise
#* @param document:str The document text
#* @serializer json
function(document) {
result <- runner$invoke(list(document = document))
list(
category = result$get("category"),
key_facts = result$get("key_facts"),
summary = result$get("summary")
)
}
#* Health check
#* @get /health
function() list(status = "ok")Run with:
pr <- plumber::plumb("plumber.R")
pr$run(host = "0.0.0.0", port = 8000)Infrastructure: Docker + any container platform (AWS
ECS, Azure Container Apps, Google Cloud Run). A typical
Dockerfile:
FROM rocker/r-ver:4.4.0
RUN install2.r plumber ellmer puppeteeR
COPY plumber.R /app/plumber.R
CMD ["Rscript", "-e", "plumber::plumb('/app/plumber.R')$run(host='0.0.0.0', port=8000)"]
Pattern C: Scheduled batch job (unattended, high volume)
Best for: overnight processing, weekly reports, recurring batch jobs where no human interaction is required during the run.
# batch_runner.R - run via cron or Posit Connect scheduled report
library(puppeteeR)
library(DBI)
library(RSQLite)
con <- dbConnect(RSQLite::SQLite(), "data/production.db")
runner <- build_document_runner()
documents <- dbGetQuery(con, "SELECT id, text FROM documents WHERE processed = 0")
results <- lapply(seq_len(nrow(documents)), function(i) {
doc <- documents[i, ]
tryCatch({
r <- runner$invoke(list(document = doc$text))
data.frame(
id = doc$id,
category = r$get("category"),
summary = r$get("summary"),
processed = 1L,
error = NA_character_
)
}, error = function(e) {
data.frame(
id = doc$id, category = NA, summary = NA,
processed = 0L, error = conditionMessage(e)
)
})
})
output <- do.call(rbind, results)
dbWriteTable(con, "document_summaries", output, append = TRUE)
dbDisconnect(con)Infrastructure options: - Posit
Connect: schedule as a parameterised R Markdown or Quarto
report - GitHub Actions: trigger on schedule
(on: schedule: - cron: '0 6 * * 1') - Linux
cron: Rscript batch_runner.R in a cron job
Pattern D: Event-driven (webhook / message queue)
Best for: real-time integrations where events arrive asynchronously - new support tickets, PR merge events, file uploads.
The simplest approach is a Plumber endpoint (Pattern B) called by the upstream system’s webhook. For higher reliability, put a message queue (e.g. AWS SQS, RabbitMQ) between the event source and the R worker:
[Webhook source]
|
v
[Message queue] <-- decouples: events don't drop if the R worker is busy
|
v
[R worker process running Plumber]
|
v
[Output: database / email / Slack]
The R worker polls the queue and processes messages one at a time. This is consistent with R’s single-threaded model and avoids concurrency issues.
Observability: knowing what your pipeline is doing
A pipeline running in production needs three things:
1. Logging - use the log channel in
your state schema to accumulate a step-by-step trail:
schema <- workflow_state(
...
log = list(default = list(), reducer = reducer_append())
)
# In each node:
add_node("classify", function(state, config) {
result <- config$agents$classifier$chat(state$get("document"))
list(
category = result,
log = list(step = "classify", ts = Sys.time(), chars = nchar(result))
)
})
# After invoke():
log_entries <- result$get("log")2. Cost tracking - check spend after each run:
result <- runner$invoke(list(document = text))
report <- runner$cost_report()
# Log to your monitoring system:
log_metric("llm_cost_usd", sum(report$cost), tags = list(pipeline = "doc-processing"))3. Error handling - wrap invoke() in
tryCatch() and route failures to your alerting system.
Failed runs should be retried or flagged for manual processing, not
silently dropped.
Summary: choosing the right pattern
| You need… | Use |
|---|---|
| A business user to run and review the pipeline | Shiny app (Pattern A) |
| Another system to call your pipeline | Plumber API (Pattern B) |
| Unattended overnight or weekly processing | Scheduled batch job (Pattern C) |
| Real-time event-triggered processing | Webhook + Plumber or message queue (Pattern D) |
| Agents to work in parallel within one node |
future package inside a single node |
| Workflows that can survive session restarts |
rds_checkpointer() or
sqlite_checkpointer()
|
When puppeteeR fits (and when it doesn’t)
| Scenario | Fits? | Reason |
|---|---|---|
| Multi-step pipelines where each step feeds the next | Yes | State management + checkpointing |
| Conditional routing based on content | Yes | Conditional edges |
| Human approval mid-workflow | Yes | Sequential execution is a feature here |
| Long-running pipelines that may crash | Yes | Checkpointing enables resume |
| Repetitive batch processing of many documents | Yes | Wrap invoke() in a loop |
| Real-time API serving < 100 ms latency | No | LLM inference is 1-30 s; use a rules engine |
| Pure single-call LLM inference | No | Call ellmer directly |
| True parallel LLM calls across cores | Partial | Use future inside a node |