Skip to main content

Human-in-the-Loop AI Agent

Last updated Oct 6, 2025

This recipe demonstrates how to build an AI agent that requires human approval before executing certain actions. This pattern is essential for AI systems that make high-stakes decisions, such as financial transactions, content publishing, or infrastructure changes.

The key challenge in human-in-the-loop systems is maintaining state while waiting for human input, which can take seconds, minutes, or even days. Temporal's durable execution ensures the workflow state persists reliably during these waiting periods, and Signals provide a clean mechanism for external systems to send approval decisions back to the running workflow.

This recipe highlights these key design decisions:

  • Using Temporal Signals to receive human approval decisions asynchronously
  • Implementing timeouts for approval requests to prevent workflows from waiting indefinitely
  • Structuring workflows to clearly separate AI decision-making from human oversight
  • Handling approval, rejection, and timeout scenarios gracefully
  • Maintaining a complete audit trail of both AI decisions and human approvals

Understanding the Pattern

In a human-in-the-loop AI agent, the workflow follows this pattern:

  1. AI Analysis: The LLM analyzes the request and proposes an action
  2. Approval Request: The workflow pauses and waits for human approval via Signal
  3. Human Decision: An external system (UI, API, etc.) sends approval/rejection via Signal
  4. Action Execution: If approved, the workflow executes the action; if rejected or timed out, it handles accordingly

The workflow remains running and durable throughout the approval waiting period, even if the worker restarts.

Create the Data Models

We define structured data models for approval requests and responses to ensure type safety and clear communication between the AI agent and human reviewers.

File: models/approval.py

from pydantic import BaseModel

class ProposedAction(BaseModel):
"""Action proposed by the AI agent for human review."""
action_type: str
description: str
reasoning: str

class ApprovalRequest(BaseModel):
"""Request sent to human reviewer."""
request_id: str
proposed_action: ProposedAction
context: str
requested_at: str

class ApprovalDecision(BaseModel):
"""Decision received from human reviewer."""
request_id: str
approved: bool
reviewer_notes: str | None = None
decided_at: str

Create the LLM Activity

We create a generic activity for invoking the OpenAI API to analyze requests and propose actions. The activity returns the text output from the LLM, which we'll parse into our structured models in the workflow.

File: activities/openai_responses.py

from temporalio import activity
from openai import AsyncOpenAI
from dataclasses import dataclass

@dataclass
class OpenAIResponsesRequest:
model: str
instructions: str
input: str

@activity.defn
async def create(request: OpenAIResponsesRequest) -> str:
# Temporal best practice: Disable retry logic in OpenAI API client library.
client = AsyncOpenAI(max_retries=0)

resp = await client.responses.create(
model=request.model,
instructions=request.instructions,
input=request.input,
timeout=30,
)

return resp.output_text

Create the Action Execution Activity

This activity executes the approved action. In a real system, this might call external APIs, modify databases, or trigger other workflows. For this example, we'll simulate an action with a simple logging activity.

File: activities/execute_action.py

from temporalio import activity
from models.approval import ProposedAction
import asyncio

@activity.defn
async def execute_action(action: ProposedAction) -> str:
"""Execute the approved action.

In a real system, this would call external APIs, modify databases,
or trigger other workflows based on the action_type.
"""
activity.logger.info(
f"Executing action: {action.action_type}",
extra={"action": action.model_dump()}
)

return f"Successfully executed: {action.action_type}"

Create the Notification Activity

This activity notifies external systems (like a UI or messaging system) that approval is needed. In a real implementation, this might send emails, Slack messages, or push notifications.

File: activities/notify_approval_needed.py

from temporalio import activity
from models.approval import ApprovalRequest

@activity.defn
async def notify_approval_needed(request: ApprovalRequest) -> None:
"""Notify external systems that human approval is needed.

In a real system, this would send notifications via:
- Email
- Slack/Teams messages
- Push notifications to mobile apps
- Updates to approval queue UI
"""
# Get workflow ID from activity context
workflow_id = activity.info().workflow_id

activity.logger.info(
f"Approval needed for request: {request.request_id}",
extra={"request": request.model_dump()}
)

# In a real implementation, you would call notification services here
print(f"\n{'='*60}")
print(f"APPROVAL REQUIRED")
print(f"{'='*60}")
print(f"Workflow ID: {workflow_id}")
print(f"Request ID: {request.request_id}")
print(f"Action: {request.proposed_action.action_type}")
print(f"Description: {request.proposed_action.description}")
print(f"Reasoning: {request.proposed_action.reasoning}")
print(f"\nTo approve or reject, use the send_approval script:")
print(f" uv run python -m send_approval {workflow_id} {request.request_id} approve 'Looks good'")
print(f" uv run python -m send_approval {workflow_id} {request.request_id} reject 'Too risky'")
print(f"{'='*60}\n")

Create the Human-in-the-Loop Workflow

The workflow orchestrates the entire human-in-the-loop process. It uses Temporal Signals to receive approval decisions and implements a timeout to prevent indefinite waiting.

Key features:

  • Signal Handler: Receives approval decisions asynchronously
  • Condition with Timeout: Waits for approval with a configurable timeout
  • State Management: Tracks approval status durably across worker restarts
  • Audit Trail: Logs all decisions for compliance and debugging

File: workflows/human_in_the_loop_workflow.py

from temporalio import workflow
from datetime import timedelta
from typing import Optional
import asyncio

with workflow.unsafe.imports_passed_through():
from models.approval import ProposedAction, ApprovalRequest, ApprovalDecision
from activities import openai_responses, execute_action, notify_approval_needed


@workflow.defn
class HumanInTheLoopWorkflow:
def __init__(self):
self.current_decision: Optional[ApprovalDecision] = None
self.pending_request_id: Optional[str] = None

@workflow.signal
async def approval_decision(self, decision: ApprovalDecision):
"""Signal handler for receiving approval decisions from humans."""
# Verify this decision is for the current pending request
if decision.request_id == self.pending_request_id:
self.current_decision = decision
workflow.logger.info(
f"Received approval decision: {'approved' if decision.approved else 'rejected'}",
extra={"decision": decision.model_dump()}
)
else:
workflow.logger.warning(
f"Received decision for wrong request ID: {decision.request_id}, expected: {self.pending_request_id}"
)

@workflow.run
async def run(self, user_request: str, approval_timeout_seconds: int = 300) -> str:
"""Execute an AI agent workflow with human-in-the-loop approval.

Args:
user_request: The user's request for the AI agent
approval_timeout_seconds: How long to wait for approval (default: 5 minutes)

Returns:
Result of the workflow execution
"""
workflow.logger.info(f"Starting human-in-the-loop workflow for request: {user_request}")

# Step 1: AI analyzes the request and proposes an action
proposed_action = await self._analyze_and_propose_action(user_request)

workflow.logger.info(
f"AI proposed action: {proposed_action.action_type}",
extra={"proposed_action": proposed_action.model_dump()}
)

# Step 2: Request human approval
approval_result = await self._request_approval(
proposed_action,
user_request,
timeout_seconds=approval_timeout_seconds
)

# Step 3: Handle the approval result
if approval_result == "approved":
workflow.logger.info("Action approved, proceeding with execution")
result = await workflow.execute_activity(
execute_action.execute_action,
proposed_action,
start_to_close_timeout=timedelta(seconds=60),
)
return f"Action completed successfully: {result}"

elif approval_result == "rejected":
workflow.logger.info("Action rejected by human reviewer")
reviewer_notes = self.current_decision.reviewer_notes or 'None provided'
return f"Action rejected. Reviewer notes: {reviewer_notes}"

else: # timeout
workflow.logger.warning("Approval request timed out")
timeout_msg = f"Action cancelled: approval request timed out after {approval_timeout_seconds} seconds"
return timeout_msg

async def _analyze_and_propose_action(self, user_request: str) -> ProposedAction:
"""Use LLM to analyze request and propose an action."""
system_instructions = """
You are an AI assistant that analyzes user requests and proposes actions.
For each request, you should:
1. Determine what action needs to be taken
2. Provide a clear description of the action
3. Explain your reasoning for why this action addresses the request

Be thorough and clear in your analysis.

Respond with a JSON string in this structure:

{
"action_type": "A short name for the action (e.g., \\"delete_test_data\\")",
"description": "A clear description of what the action will do",
"reasoning": "Your explanation for why this action addresses the request"
}
"""

result = await workflow.execute_activity(
openai_responses.create,
openai_responses.OpenAIResponsesRequest(
model="gpt-4o-mini",
instructions=system_instructions,
input=user_request,
),
start_to_close_timeout=timedelta(seconds=30),
)

# Parse the JSON output into our ProposedAction model
return ProposedAction.model_validate_json(result)

async def _request_approval(
self,
proposed_action: ProposedAction,
context: str,
timeout_seconds: int
) -> str:
"""Request human approval and wait for response.

Returns:
"approved", "rejected", or "timeout"
"""
# Generate unique request ID using workflow's deterministic UUID
self.current_decision = None
self.pending_request_id = str(workflow.uuid4())

# Create approval request
approval_request = ApprovalRequest(
request_id=self.pending_request_id,
proposed_action=proposed_action,
context=context,
requested_at=workflow.now().isoformat(),
)

# Send notification to external systems
await workflow.execute_activity(
notify_approval_needed.notify_approval_needed,
approval_request,
start_to_close_timeout=timedelta(seconds=10),
)

# Wait for approval decision with timeout
try:
await workflow.wait_condition(
lambda: self.current_decision is not None,
timeout=timedelta(seconds=timeout_seconds),
)

# Decision received
if self.current_decision.approved:
return "approved"
else:
return "rejected"

except asyncio.TimeoutError:
# Timeout waiting for approval
return "timeout"

Create the Worker

The worker runs the workflow and activities. It uses the pydantic_data_converter to handle serialization of our Pydantic models.

File: worker.py

import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from workflows.human_in_the_loop_workflow import HumanInTheLoopWorkflow
from activities.openai_responses import create
from activities.execute_action import execute_action
from activities.notify_approval_needed import notify_approval_needed
from temporalio.contrib.pydantic import pydantic_data_converter


async def main():
# Configure logging to see workflow.logger output
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)

worker = Worker(
client,
task_queue="human-in-the-loop-task-queue",
workflows=[HumanInTheLoopWorkflow],
activities=[
create,
execute_action,
notify_approval_needed,
],
)

await worker.run()


if __name__ == "__main__":
asyncio.run(main())

Create the Workflow Starter

The starter script initiates the workflow and waits for completion. The workflow will pause for human approval, so this script will block until the approval is received or times out.

File: start_workflow.py

import asyncio
import sys
import uuid

from temporalio.client import Client

from workflows.human_in_the_loop_workflow import HumanInTheLoopWorkflow
from temporalio.contrib.pydantic import pydantic_data_converter


async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)

default_request = "Delete all test data from the production database"
user_request = sys.argv[1] if len(sys.argv) > 1 else default_request

# Use a unique workflow ID so you can run multiple instances
workflow_id = f"human-in-the-loop-{uuid.uuid4()}"

print(f"Starting workflow with ID: {workflow_id}")
print(f"User request: {user_request}")
print("\nWorkflow will pause for approval. Watch the worker output for instructions.\n")

# Submit the workflow for execution
result = await client.execute_workflow(
HumanInTheLoopWorkflow.run,
args=[user_request, 300], # user_request and approval timeout in seconds
id=workflow_id,
task_queue="human-in-the-loop-task-queue",
)

print(f"\nWorkflow completed!")
print(f"Result: {result}")


if __name__ == "__main__":
asyncio.run(main())

Create a Signal Sender Script

This helper script makes it easy to send approval decisions to running workflows. In a real system, this would be replaced by a UI, API endpoint, or other integration.

File: send_approval.py

import asyncio
import sys
from datetime import datetime, timezone

from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from models.approval import ApprovalDecision


async def main():
if len(sys.argv) < 4:
print("Usage: python -m send_approval <workflow_id> <request_id> <approve|reject> [notes]")
print("\nExample:")
example = " python -m send_approval human-in-the-loop-123 abc-def-ghi approve 'Looks good'"
print(example)
sys.exit(1)

workflow_id = sys.argv[1]
request_id = sys.argv[2]
decision = sys.argv[3].lower()
notes = sys.argv[4] if len(sys.argv) > 4 else None

if decision not in ["approve", "reject"]:
print("Decision must be 'approve' or 'reject'")
sys.exit(1)

client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)

approval_decision = ApprovalDecision(
request_id=request_id,
approved=(decision == "approve"),
reviewer_notes=notes,
decided_at=datetime.now(timezone.utc).isoformat(),
)

# Get workflow handle and send signal
handle = client.get_workflow_handle(workflow_id)
await handle.signal("approval_decision", approval_decision)

decision_type = 'approval' if approval_decision.approved else 'rejection'
print(f"Sent {decision_type} signal to workflow {workflow_id}")


if __name__ == "__main__":
asyncio.run(main())

Running

Start the Temporal Dev Server

temporal server start-dev

Install dependencies

From the recipe directory:

uv sync

Run the worker

First set the OPENAI_API_KEY environment variable, then:

uv run python -m worker

Start a workflow

In a new terminal:

uv run python -m start_workflow "Delete all test data from the production database"

The workflow will start, the AI will analyze the request, and then it will pause waiting for approval. The worker output will show instructions for sending the approval signal.

Send approval decision

Watch the worker output for the workflow ID and request ID, then send an approval:

uv run python -m send_approval <workflow-id> <request-id> approve "Verified this is safe to execute"

Or reject it:

uv run python -m send_approval <workflow-id> <request-id> reject "This looks too risky"

Alternatively, you can use the Temporal CLI directly:

temporal workflow signal \
--workflow-id <workflow-id> \
--name approval_decision \
--input '{"request_id": "<request-id>", "approved": true, \
"reviewer_notes": "Approved", "decided_at": "2025-01-01T12:00:00Z"}'

Testing Timeout Behavior

To test the timeout behavior, start a workflow and simply don't send any approval signal. After the timeout period (default 300 seconds / 5 minutes), the workflow will automatically complete with a timeout result.

You can also set a shorter timeout for testing:

# In start_workflow.py, change the timeout parameter:
result = await client.execute_workflow(
HumanInTheLoopWorkflow.run,
args=[user_request, 30], # 30 second timeout for testing
id=workflow_id,
task_queue="human-in-the-loop-task-queue",
)

Key Patterns and Best Practices

Durable Waiting

The workflow can wait for hours or days for human approval without consuming resources. The workflow state is persisted, so even if the worker crashes and restarts, the workflow will resume from where it left off.

Signal Validation

The workflow validates that incoming approval signals match the current pending request ID. This prevents confusion if multiple approval requests are in flight or if stale approvals arrive.

Timeout Handling

Always implement timeouts for human approval requests. Without timeouts, workflows could wait indefinitely if approvals are forgotten or lost.

Audit Trail

Every decision (AI proposal, human approval/rejection, timeout) is logged with full context. This creates a complete audit trail for compliance and debugging.

Idempotency

The workflow uses unique request IDs to ensure each approval request is distinct. This prevents accidental double-approvals and makes it safe to retry operations.

Extensions

This basic pattern can be extended in many ways:

  • Multiple Approvers: Require approval from multiple people or implement voting
  • Escalation: Automatically escalate to higher authority if initial approver doesn't respond
  • Conditional Approval: Only require approval for high-risk actions
  • Approval Chains: Implement multi-stage approval workflows
  • Rich Notifications: Integrate with Slack, email, or custom UIs for approval requests
  • Query Handlers: Add query handlers to check approval status without modifying workflow state