Skip to content

Commit e272eee

Browse files
authored
feat: async Python client for non-blocking sandbox operations (#510)
* feat: add async Python sandbox client Add AsyncSandboxClient and supporting async modules so users can await sandbox creation, command execution, and file I/O without blocking the event loop. This is needed for async frameworks like FastAPI, aiohttp, and async agent orchestrators. Key design decisions: - Explicit retry logic matching sync client's status-code retries - asyncio.Lock guards on shared state for coroutine safety - BaseException handling to catch CancelledError during create - try/finally on all K8s watch streams to prevent leaked connections - Required connection_config (no LocalTunnel support in async) - Lazy __getattr__ import with actionable ImportError message Optional deps: pip install k8s-agent-sandbox[async] Supersedes #256. * fix: clear sandbox registry on close, set httpx default timeout - Clear _active_connection_sandboxes after closing connections to prevent stale references - Set httpx.AsyncClient timeout to 60s matching sync client behavior (httpx defaults to 5s) Made-with: Cursor * remove internal plan doc from PR Made-with: Cursor * fix: address code review feedback - Remove dead retry branch in async connector (retryable status codes are handled before raise_for_status, so the except handler never sees them) - Narrow except BaseException to except (Exception, CancelledError) to avoid catching KeyboardInterrupt/SystemExit - Replace logging.basicConfig() with logging.getLogger(__name__) so library code doesn't hijack the root logger - Use built-in generics (list, dict, tuple) instead of typing imports since project requires Python 3.10+ - Replace __getattr__ lazy import with global try/except pattern - Add test for close() clearing the sandbox registry Made-with: Cursor * fix: address second round of review feedback - Make fallback AsyncSandboxClient a class so isinstance() works - Use logger = logging.getLogger(__name__) in async_connector and async_k8s_helper (was still using root logger) - Add None event guard to resolve_sandbox_name watch loop to prevent TypeError crash on None events - Document no-atexit behavior in AsyncSandboxClient docstring Made-with: Cursor * fix: add transport-level retries for connection errors Add AsyncHTTPTransport(retries=3) so connection-level failures (TCP, DNS) are retried at the transport layer, matching the sync client's urllib3.Retry behavior. The manual retry loop handles status-code retries (500/502/503/504) which the transport doesn't cover. Made-with: Cursor * fix: retry watch streams on premature connection drops Watch streams can close prematurely due to network blips or API server restarts, causing an erroneous TimeoutError even when only seconds have elapsed. Track a deadline and re-establish the watch with remaining time instead of raising immediately. Fixed in all 6 watch methods (3 sync + 3 async): resolve_sandbox_name, wait_for_sandbox_ready, wait_for_gateway_ip. Made-with: Cursor
1 parent 4eceb03 commit e272eee

13 files changed

Lines changed: 1589 additions & 81 deletions

File tree

clients/python/agentic-sandbox-client/README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,40 @@ client = SandboxClient(
189189
sandbox = client.create_sandbox(template="node-sandbox-template", namespace="default").
190190
```
191191

192+
### 5. Async Client
193+
194+
For async applications (FastAPI, aiohttp, async agent orchestrators), use the `AsyncSandboxClient`.
195+
Install the async extras first:
196+
197+
```bash
198+
pip install k8s-agent-sandbox[async]
199+
```
200+
201+
The async client requires an explicit connection config — `LocalTunnel` mode is not supported
202+
because it relies on a synchronous `kubectl port-forward` subprocess. Use `DirectConnection` or
203+
`GatewayConnection` instead.
204+
205+
```python
206+
import asyncio
207+
from k8s_agent_sandbox import AsyncSandboxClient
208+
from k8s_agent_sandbox.models import SandboxDirectConnectionConfig
209+
210+
async def main():
211+
config = SandboxDirectConnectionConfig(
212+
api_url="http://sandbox-router-svc.default.svc.cluster.local:8080"
213+
)
214+
215+
async with AsyncSandboxClient(connection_config=config) as client:
216+
sandbox = await client.create_sandbox(
217+
template="python-sandbox-template",
218+
namespace="default",
219+
)
220+
result = await sandbox.commands.run("echo 'Hello from async!'")
221+
print(result.stdout)
222+
223+
asyncio.run(main())
224+
```
225+
192226
## Testing
193227

194228
A test script is included to verify the full lifecycle (Creation -> Execution -> File I/O -> Cleanup).

clients/python/agentic-sandbox-client/k8s_agent_sandbox/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,15 @@
2020
SandboxPortForwardError,
2121
SandboxRequestError,
2222
)
23+
24+
25+
try:
26+
from .async_sandbox_client import AsyncSandboxClient
27+
except ImportError:
28+
class AsyncSandboxClient: # type: ignore[no-redef]
29+
"""Placeholder that raises ImportError when async extras are missing."""
30+
def __init__(self, *args, **kwargs):
31+
raise ImportError(
32+
"AsyncSandboxClient requires the 'async' extras. "
33+
"Install with: pip install k8s-agent-sandbox[async]"
34+
)
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# Copyright 2026 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import logging
17+
18+
import httpx
19+
20+
logger = logging.getLogger(__name__)
21+
22+
from .async_k8s_helper import AsyncK8sHelper
23+
from .exceptions import SandboxRequestError
24+
from .models import (
25+
SandboxConnectionConfig,
26+
SandboxDirectConnectionConfig,
27+
SandboxGatewayConnectionConfig,
28+
SandboxLocalTunnelConnectionConfig,
29+
)
30+
31+
RETRYABLE_STATUS_CODES = {500, 502, 503, 504}
32+
MAX_RETRIES = 5
33+
BACKOFF_FACTOR = 0.5
34+
35+
36+
class AsyncSandboxConnector:
37+
"""
38+
Async connector for communicating with a Sandbox over HTTP using httpx.
39+
40+
Supports DirectConnection and GatewayConnection modes. LocalTunnel mode
41+
is not supported because it relies on a long-running subprocess; use the
42+
sync SandboxConnector for local development.
43+
"""
44+
45+
def __init__(
46+
self,
47+
sandbox_id: str,
48+
namespace: str,
49+
connection_config: SandboxConnectionConfig,
50+
k8s_helper: AsyncK8sHelper,
51+
):
52+
if isinstance(connection_config, SandboxLocalTunnelConnectionConfig):
53+
raise ValueError(
54+
"AsyncSandboxConnector does not support SandboxLocalTunnelConnectionConfig. "
55+
"Use SandboxDirectConnectionConfig or SandboxGatewayConnectionConfig instead. "
56+
"For local development, use the synchronous SandboxClient."
57+
)
58+
59+
self.id = sandbox_id
60+
self.namespace = namespace
61+
self.connection_config = connection_config
62+
self.k8s_helper = k8s_helper
63+
64+
self._base_url: str | None = None
65+
transport = httpx.AsyncHTTPTransport(retries=3)
66+
self.client = httpx.AsyncClient(
67+
transport=transport, timeout=httpx.Timeout(60.0)
68+
)
69+
70+
async def _resolve_base_url(self) -> str:
71+
if self._base_url:
72+
return self._base_url
73+
74+
if isinstance(self.connection_config, SandboxDirectConnectionConfig):
75+
self._base_url = self.connection_config.api_url
76+
elif isinstance(self.connection_config, SandboxGatewayConnectionConfig):
77+
ip_address = await self.k8s_helper.wait_for_gateway_ip(
78+
self.connection_config.gateway_name,
79+
self.connection_config.gateway_namespace,
80+
self.connection_config.gateway_ready_timeout,
81+
)
82+
self._base_url = f"http://{ip_address}"
83+
else:
84+
raise ValueError(
85+
f"AsyncSandboxConnector does not support {type(self.connection_config).__name__}."
86+
)
87+
88+
return self._base_url
89+
90+
async def send_request(self, method: str, endpoint: str, **kwargs) -> httpx.Response:
91+
base_url = await self._resolve_base_url()
92+
url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}"
93+
94+
headers = kwargs.pop("headers", {}).copy()
95+
headers["X-Sandbox-ID"] = self.id
96+
headers["X-Sandbox-Namespace"] = self.namespace
97+
headers["X-Sandbox-Port"] = str(self.connection_config.server_port)
98+
99+
last_response: httpx.Response | None = None
100+
for attempt in range(MAX_RETRIES + 1):
101+
try:
102+
response = await self.client.request(
103+
method, url, headers=headers, **kwargs
104+
)
105+
if response.status_code in RETRYABLE_STATUS_CODES and attempt < MAX_RETRIES:
106+
delay = BACKOFF_FACTOR * (2 ** attempt)
107+
logger.warning(
108+
f"Retryable status {response.status_code} from {url}, "
109+
f"attempt {attempt + 1}/{MAX_RETRIES + 1}, retrying in {delay:.1f}s"
110+
)
111+
last_response = response
112+
await asyncio.sleep(delay)
113+
continue
114+
response.raise_for_status()
115+
return response
116+
except httpx.HTTPStatusError as e:
117+
logger.error(f"Request to sandbox failed: {e}")
118+
raise SandboxRequestError(
119+
f"Failed to communicate with the sandbox at {url}.",
120+
status_code=e.response.status_code,
121+
response=e.response,
122+
) from e
123+
except httpx.HTTPError as e:
124+
logger.error(f"Request to sandbox failed: {e}")
125+
self._base_url = None
126+
raise SandboxRequestError(
127+
f"Failed to communicate with the sandbox at {url}.",
128+
status_code=None,
129+
response=None,
130+
) from e
131+
132+
logger.error(f"All {MAX_RETRIES + 1} attempts failed for {url}")
133+
raise SandboxRequestError(
134+
f"Failed to communicate with the sandbox at {url} after {MAX_RETRIES + 1} attempts.",
135+
status_code=last_response.status_code if last_response else None,
136+
response=last_response,
137+
)
138+
139+
async def close(self):
140+
await self.client.aclose()
141+
self._base_url = None

0 commit comments

Comments
 (0)