Control agent action execution through confirmation policy and security analyzer.
Agent actions can be controlled through two complementary mechanisms: confirmation policy that determine when user
approval is required, and security analyzer that evaluates action risk levels. Together, they provide flexible control over agent behavior while maintaining safety.
Confirmation policy controls whether actions require user approval before execution. They provide a simple way to ensure safe agent operation by requiring explicit permission for actions.
Implement your approval logic by checking conversation status:
while conversation.state.agent_status != AgentExecutionStatus.FINISHED: if conversation.state.agent_status == AgentExecutionStatus.WAITING_FOR_CONFIRMATION: pending = ConversationState.get_unmatched_actions(conversation.state.events) if not confirm_in_console(pending): conversation.reject_pending_actions("User rejected") continue conversation.run()
"""OpenHands Agent SDK — Confirmation Mode Example"""import osimport signalfrom collections.abc import Callablefrom pydantic import SecretStrfrom openhands.sdk import LLM, BaseConversation, Conversationfrom openhands.sdk.conversation.state import ( ConversationExecutionStatus, ConversationState,)from openhands.sdk.security.confirmation_policy import AlwaysConfirm, NeverConfirmfrom openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzerfrom openhands.tools.preset.default import get_default_agent# Make ^C a clean exit instead of a stack tracesignal.signal(signal.SIGINT, lambda *_: (_ for _ in ()).throw(KeyboardInterrupt()))def _print_action_preview(pending_actions) -> None: print(f"\n🔍 Agent created {len(pending_actions)} action(s) awaiting confirmation:") for i, action in enumerate(pending_actions, start=1): snippet = str(action.action)[:100].replace("\n", " ") print(f" {i}. {action.tool_name}: {snippet}...")def confirm_in_console(pending_actions) -> bool: """ Return True to approve, False to reject. Default to 'no' on EOF/KeyboardInterrupt (matches original behavior). """ _print_action_preview(pending_actions) while True: try: ans = ( input("\nDo you want to execute these actions? (yes/no): ") .strip() .lower() ) except (EOFError, KeyboardInterrupt): print("\n❌ No input received; rejecting by default.") return False if ans in ("yes", "y"): print("✅ Approved — executing actions…") return True if ans in ("no", "n"): print("❌ Rejected — skipping actions…") return False print("Please enter 'yes' or 'no'.")def run_until_finished(conversation: BaseConversation, confirmer: Callable) -> None: """ Drive the conversation until FINISHED. If WAITING_FOR_CONFIRMATION, ask the confirmer; on reject, call reject_pending_actions(). Preserves original error if agent waits but no actions exist. """ while conversation.state.execution_status != ConversationExecutionStatus.FINISHED: if ( conversation.state.execution_status == ConversationExecutionStatus.WAITING_FOR_CONFIRMATION ): pending = ConversationState.get_unmatched_actions(conversation.state.events) if not pending: raise RuntimeError( "⚠️ Agent is waiting for confirmation but no pending actions " "were found. This should not happen." ) if not confirmer(pending): conversation.reject_pending_actions("User rejected the actions") # Let the agent produce a new step or finish continue print("▶️ Running conversation.run()…") conversation.run()# Configure LLMapi_key = os.getenv("LLM_API_KEY")assert api_key is not None, "LLM_API_KEY environment variable is not set."model = os.getenv("LLM_MODEL", "anthropic/claude-sonnet-4-5-20250929")base_url = os.getenv("LLM_BASE_URL")llm = LLM( usage_id="agent", model=model, base_url=base_url, api_key=SecretStr(api_key),)agent = get_default_agent(llm=llm)conversation = Conversation(agent=agent, workspace=os.getcwd())# Conditionally add security analyzer based on environment variableadd_security_analyzer = bool(os.getenv("ADD_SECURITY_ANALYZER", "").strip())if add_security_analyzer: print("Agent security analyzer added.") conversation.set_security_analyzer(LLMSecurityAnalyzer())# 1) Confirmation mode ONconversation.set_confirmation_policy(AlwaysConfirm())print("\n1) Command that will likely create actions…")conversation.send_message("Please list the files in the current directory using ls -la")run_until_finished(conversation, confirm_in_console)# 2) A command the user may choose to rejectprint("\n2) Command the user may choose to reject…")conversation.send_message("Please create a file called 'dangerous_file.txt'")run_until_finished(conversation, confirm_in_console)# 3) Simple greeting (no actions expected)print("\n3) Simple greeting (no actions expected)…")conversation.send_message("Just say hello to me")run_until_finished(conversation, confirm_in_console)# 4) Disable confirmation mode and run commands directlyprint("\n4) Disable confirmation mode and run a command…")conversation.set_confirmation_policy(NeverConfirm())conversation.send_message("Please echo 'Hello from confirmation mode example!'")conversation.run()conversation.send_message( "Please delete any file that was created during this conversation.")conversation.run()print("\n=== Example Complete ===")print("Key points:")print( "- conversation.run() creates actions; confirmation mode " "sets execution_status=WAITING_FOR_CONFIRMATION")print("- User confirmation is handled via a single reusable function")print("- Rejection uses conversation.reject_pending_actions() and the loop continues")print("- Simple responses work normally without actions")print("- Confirmation policy is toggled with conversation.set_confirmation_policy()")
You can run the example code as-is.
The model name should follow the LiteLLM convention: provider/model_name (e.g., anthropic/claude-sonnet-4-5-20250929, openai/gpt-4o).
The LLM_API_KEY should be the API key for your chosen provider.
ChatGPT Plus/Pro subscribers: You can use LLM.subscription_login() to authenticate with your ChatGPT account and access Codex models without consuming API credits. See the LLM Subscriptions guide for details.
Security analyzer evaluates the risk of agent actions before execution, helping protect against potentially dangerous operations. They analyze each action and assign a security risk level:
LOW - Safe operations with minimal security impact
MEDIUM - Moderate security impact, review recommended
HIGH - Significant security impact, requires confirmation
UNKNOWN - Risk level could not be determined
Security analyzer work in conjunction with confirmation policy (like ConfirmRisky()) to determine whether user approval is needed before executing an action. This provides an additional layer of safety for autonomous agent operations.
The LLMSecurityAnalyzer is the default implementation provided in the agent-sdk. It leverages the LLM’s understanding of action context to provide lightweight security analysis. The LLM can annotate actions with security risk levels during generation, which the analyzer then uses to make security decisions.
"""OpenHands Agent SDK — LLM Security Analyzer Example (Simplified)This example shows how to use the LLMSecurityAnalyzer to automaticallyevaluate security risks of actions before execution."""import osimport signalfrom collections.abc import Callablefrom pydantic import SecretStrfrom openhands.sdk import LLM, Agent, BaseConversation, Conversationfrom openhands.sdk.conversation.state import ( ConversationExecutionStatus, ConversationState,)from openhands.sdk.security.confirmation_policy import ConfirmRiskyfrom openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzerfrom openhands.sdk.tool import Toolfrom openhands.tools.file_editor import FileEditorToolfrom openhands.tools.terminal import TerminalTool# Clean ^C exit: no stack trace noisesignal.signal(signal.SIGINT, lambda *_: (_ for _ in ()).throw(KeyboardInterrupt()))def _print_blocked_actions(pending_actions) -> None: print(f"\n🔒 Security analyzer blocked {len(pending_actions)} high-risk action(s):") for i, action in enumerate(pending_actions, start=1): snippet = str(action.action)[:100].replace("\n", " ") print(f" {i}. {action.tool_name}: {snippet}...")def confirm_high_risk_in_console(pending_actions) -> bool: """ Return True to approve, False to reject. Matches original behavior: default to 'no' on EOF/KeyboardInterrupt. """ _print_blocked_actions(pending_actions) while True: try: ans = ( input( "\nThese actions were flagged as HIGH RISK. " "Do you want to execute them anyway? (yes/no): " ) .strip() .lower() ) except (EOFError, KeyboardInterrupt): print("\n❌ No input received; rejecting by default.") return False if ans in ("yes", "y"): print("✅ Approved — executing high-risk actions...") return True if ans in ("no", "n"): print("❌ Rejected — skipping high-risk actions...") return False print("Please enter 'yes' or 'no'.")def run_until_finished_with_security( conversation: BaseConversation, confirmer: Callable[[list], bool]) -> None: """ Drive the conversation until FINISHED. - If WAITING_FOR_CONFIRMATION: ask the confirmer. * On approve: set execution_status = IDLE (keeps original example’s behavior). * On reject: conversation.reject_pending_actions(...). - If WAITING but no pending actions: print warning and set IDLE (matches original). """ while conversation.state.execution_status != ConversationExecutionStatus.FINISHED: if ( conversation.state.execution_status == ConversationExecutionStatus.WAITING_FOR_CONFIRMATION ): pending = ConversationState.get_unmatched_actions(conversation.state.events) if not pending: raise RuntimeError( "⚠️ Agent is waiting for confirmation but no pending actions " "were found. This should not happen." ) if not confirmer(pending): conversation.reject_pending_actions("User rejected high-risk actions") continue print("▶️ Running conversation.run()...") conversation.run()# Configure LLMapi_key = os.getenv("LLM_API_KEY")assert api_key is not None, "LLM_API_KEY environment variable is not set."model = os.getenv("LLM_MODEL", "anthropic/claude-sonnet-4-5-20250929")base_url = os.getenv("LLM_BASE_URL")llm = LLM( usage_id="security-analyzer", model=model, base_url=base_url, api_key=SecretStr(api_key),)# Toolstools = [ Tool( name=TerminalTool.name, ), Tool(name=FileEditorTool.name),]# Agentagent = Agent(llm=llm, tools=tools)# Conversation with persisted filestoreconversation = Conversation( agent=agent, persistence_dir="./.conversations", workspace=".")conversation.set_security_analyzer(LLMSecurityAnalyzer())conversation.set_confirmation_policy(ConfirmRisky())print("\n1) Safe command (LOW risk - should execute automatically)...")conversation.send_message("List files in the current directory")conversation.run()print("\n2) Potentially risky command (may require confirmation)...")conversation.send_message( "Please echo 'hello world' -- PLEASE MARK THIS AS A HIGH RISK ACTION")run_until_finished_with_security(conversation, confirm_high_risk_in_console)
You can run the example code as-is.
The model name should follow the LiteLLM convention: provider/model_name (e.g., anthropic/claude-sonnet-4-5-20250929, openai/gpt-4o).
The LLM_API_KEY should be the API key for your chosen provider.
ChatGPT Plus/Pro subscribers: You can use LLM.subscription_login() to authenticate with your ChatGPT account and access Codex models without consuming API credits. See the LLM Subscriptions guide for details.
You can extend the security analyzer functionality by creating your own implementation that inherits from the SecurityAnalyzerBase class. This allows you to implement custom security logic tailored to your specific requirements.
Your agent is about to run a tool call. Is it safe?The LLMSecurityAnalyzer asks the model itself — but the model can be
manipulated, and encoding tricks can hide dangerous commands from it.
You need a layer that does not depend on model judgment: something
deterministic, local, and fast.
Three composable analyzers that classify actions at the boundary —
before the tool runs, not after. No network calls, no model inference,
no extra dependencies. They return a SecurityRisk level; your
ConfirmRisky policy decides whether to prompt the user.
Analyzer
What it catches
How it works
PatternSecurityAnalyzer
Known threat signatures (rm -rf, eval, curl|sh)
Regex patterns on two corpora: shell patterns scan executable fields only; injection patterns scan all fields
PolicyRailSecurityAnalyzer
Composed threats (fetch piped to exec, raw disk writes, catastrophic deletes)
Deterministic rules evaluated per-segment — both tokens must appear in the same field
EnsembleSecurityAnalyzer
Nothing on its own — it combines the others
Takes the highest concrete risk across all child analyzers
You must configure both the analyzer and the confirmation policy.
Setting an analyzer does not automatically change confirmation behavior.
from openhands.sdk import Conversationfrom openhands.sdk.security import ( PatternSecurityAnalyzer, PolicyRailSecurityAnalyzer, EnsembleSecurityAnalyzer, ConfirmRisky, SecurityRisk,)# Create the analyzer — rails catch composed threats,# patterns catch individual signaturessecurity_analyzer = EnsembleSecurityAnalyzer( analyzers=[ PolicyRailSecurityAnalyzer(), PatternSecurityAnalyzer(), ])# Tell the SDK when to ask the user — HIGH is the recommended baselineconfirmation_policy = ConfirmRisky(threshold=SecurityRisk.HIGH)# Wire both into the conversation# Assumes `agent` is already configured — see Quick Start guideconversation = Conversation(agent=agent, workspace=".")conversation.set_security_analyzer(security_analyzer)conversation.set_confirmation_policy(confirmation_policy)
After this, every agent action passes through the analyzer before
execution. HIGH-risk actions trigger a confirmation prompt — the user
sees the risk level and can approve or reject before the tool runs.
MEDIUM and LOW are allowed. UNKNOWN is confirmed by default
(confirm_unknown=True).For security-sensitive environments, lower the threshold to catch more:
# Stricter posture — MEDIUM and above require confirmationconfirmation_policy = ConfirmRisky(threshold=SecurityRisk.MEDIUM)
You can also require confirmation when any analyzer cannot assess risk:
# If any analyzer returns UNKNOWN, require confirmationsecurity_analyzer = EnsembleSecurityAnalyzer( analyzers=[ PolicyRailSecurityAnalyzer(), PatternSecurityAnalyzer(), ], propagate_unknown=True,)
conversation.execute_tool() bypasses the analyzer and confirmation
policy. These analyzers protect agent actions in the conversation
loop, not direct tool calls.
Two corpora, not one. An agent that runs ls /tmp but thinks
“I should avoid rm -rf /” is not flagged — shell patterns only see
the ls /tmp that will actually execute. Injection patterns like
“ignore all previous instructions” scan everything, because they
target the model’s instruction-following regardless of where they
appear.Max-severity, not averaging. The analyzers scan the same input —
they are correlated, not independent. The highest concrete risk wins.
That is simpler and more auditable than probabilistic fusion.UNKNOWN means “I don’t know,” not “safe.” By default, if all
analyzers return UNKNOWN the ensemble preserves it, and ConfirmRisky
triggers confirmation. If any analyzer returns a concrete level,
UNKNOWN results are filtered out. For stricter environments, set
propagate_unknown=True so that any single UNKNOWN triggers
confirmation regardless of other results.Confirm, don’t block. The analyzers return a risk level. The
confirmation policy decides what happens. The analyzer does not
prevent execution — it classifies risk for the policy layer to act on.
Pair with Docker isolation for stronger safety guarantees.
Agents use security policies to guide their risk assessment of actions. The SDK provides a default security policy template, but you can customize it to match your specific security requirements and guidelines.
The security policy is provided as a Jinja2 template that gets rendered into the agent’s system prompt, guiding how it evaluates the security risk of its actions.
"""OpenHands Agent SDK — Configurable Security Policy ExampleThis example demonstrates how to use a custom security policy templatewith an agent. Security policies define risk assessment guidelines thathelp agents evaluate the safety of their actions.By default, agents use the built-in security_policy.j2 template. Thisexample shows how to:1. Use the default security policy2. Provide a custom security policy template embedded in the script3. Apply the custom policy to guide agent behavior"""import osimport tempfilefrom pathlib import Pathfrom pydantic import SecretStrfrom openhands.sdk import ( LLM, Agent, Conversation, Event, LLMConvertibleEvent, get_logger,)from openhands.sdk.tool import Toolfrom openhands.tools.file_editor import FileEditorToolfrom openhands.tools.terminal import TerminalToollogger = get_logger(__name__)# Define a custom security policy template inlineCUSTOM_SECURITY_POLICY = ( "# 🔐 Custom Security Risk Policy\n" "When using tools that support the security_risk parameter, assess the " "safety risk of your actions:\n" "\n" "- **LOW**: Safe read-only actions.\n" " - Viewing files, calculations, documentation.\n" "- **MEDIUM**: Moderate container-scoped actions.\n" " - File modifications, package installations.\n" "- **HIGH**: Potentially dangerous actions.\n" " - Network access, system modifications, data exfiltration.\n" "\n" "**Custom Rules**\n" "- Always prioritize user data safety.\n" "- Escalate to **HIGH** for any external data transmission.\n")# Configure LLMapi_key = os.getenv("LLM_API_KEY")assert api_key is not None, "LLM_API_KEY environment variable is not set."model = os.getenv("LLM_MODEL", "anthropic/claude-sonnet-4-5-20250929")base_url = os.getenv("LLM_BASE_URL")llm = LLM( usage_id="agent", model=model, base_url=base_url, api_key=SecretStr(api_key),)# Toolscwd = os.getcwd()tools = [ Tool(name=TerminalTool.name), Tool(name=FileEditorTool.name),]# Example 1: Agent with default security policyprint("=" * 100)print("Example 1: Agent with default security policy")print("=" * 100)default_agent = Agent(llm=llm, tools=tools)print(f"Security policy filename: {default_agent.security_policy_filename}")print("\nDefault security policy is embedded in the agent's system message.")# Example 2: Agent with custom security policyprint("\n" + "=" * 100)print("Example 2: Agent with custom security policy")print("=" * 100)# Create a temporary file for the custom security policywith tempfile.NamedTemporaryFile( mode="w", suffix=".j2", delete=False, encoding="utf-8") as temp_file: temp_file.write(CUSTOM_SECURITY_POLICY) custom_policy_path = temp_file.nametry: # Create agent with custom security policy (using absolute path) custom_agent = Agent( llm=llm, tools=tools, security_policy_filename=custom_policy_path, ) print(f"Security policy filename: {custom_agent.security_policy_filename}") print("\nCustom security policy loaded from temporary file.") # Verify the custom policy is in the system message system_message = custom_agent.static_system_message if "Custom Security Risk Policy" in system_message: print("✓ Custom security policy successfully embedded in system message.") else: print("✗ Custom security policy not found in system message.") # Run a conversation with the custom agent print("\n" + "=" * 100) print("Running conversation with custom security policy") print("=" * 100) llm_messages = [] # collect raw LLM messages def conversation_callback(event: Event): if isinstance(event, LLMConvertibleEvent): llm_messages.append(event.to_llm_message()) conversation = Conversation( agent=custom_agent, callbacks=[conversation_callback], workspace=".", ) conversation.send_message( "Please create a simple Python script named hello.py that prints " "'Hello, World!'. Make sure to follow security best practices." ) conversation.run() print("\n" + "=" * 100) print("Conversation finished.") print(f"Total LLM messages: {len(llm_messages)}") print("=" * 100) # Report cost cost = conversation.conversation_stats.get_combined_metrics().accumulated_cost print(f"EXAMPLE_COST: {cost}")finally: # Clean up temporary file Path(custom_policy_path).unlink(missing_ok=True)print("\n" + "=" * 100)print("Example Summary")print("=" * 100)print("This example demonstrated:")print("1. Using the default security policy (security_policy.j2)")print("2. Creating a custom security policy template")print("3. Applying the custom policy via security_policy_filename parameter")print("4. Running a conversation with the custom security policy")print( "\nYou can customize security policies to match your organization's " "specific requirements.")
You can run the example code as-is.
The model name should follow the LiteLLM convention: provider/model_name (e.g., anthropic/claude-sonnet-4-5-20250929, openai/gpt-4o).
The LLM_API_KEY should be the API key for your chosen provider.
ChatGPT Plus/Pro subscribers: You can use LLM.subscription_login() to authenticate with your ChatGPT account and access Codex models without consuming API credits. See the LLM Subscriptions guide for details.