-
Notifications
You must be signed in to change notification settings - Fork 15.6k
Expand file tree
/
Copy pathutils.py
More file actions
271 lines (216 loc) · 9.08 KB
/
utils.py
File metadata and controls
271 lines (216 loc) · 9.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""Shared utility functions for hermes-agent."""
import json
import logging
import os
import stat
import tempfile
from pathlib import Path
from typing import Any, Union
from urllib.parse import urlparse
import yaml
logger = logging.getLogger(__name__)
TRUTHY_STRINGS = frozenset({"1", "true", "yes", "on"})
def is_truthy_value(value: Any, default: bool = False) -> bool:
"""Coerce bool-ish values using the project's shared truthy string set."""
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in TRUTHY_STRINGS
return bool(value)
def env_var_enabled(name: str, default: str = "") -> bool:
"""Return True when an environment variable is set to a truthy value."""
return is_truthy_value(os.getenv(name, default), default=False)
def _preserve_file_mode(path: Path) -> "int | None":
"""Capture the permission bits of *path* if it exists, else ``None``."""
try:
return stat.S_IMODE(path.stat().st_mode) if path.exists() else None
except OSError:
return None
def _restore_file_mode(path: Path, mode: "int | None") -> None:
"""Re-apply *mode* to *path* after an atomic replace.
``tempfile.mkstemp`` creates files with 0o600 (owner-only). After
``os.replace`` swaps the temp file into place the target inherits
those restrictive permissions, breaking Docker / NAS volume mounts
that rely on broader permissions set by the user. Calling this
right after ``os.replace`` restores the original permissions.
"""
if mode is None:
return
try:
os.chmod(path, mode)
except OSError:
pass
def atomic_json_write(
path: Union[str, Path],
data: Any,
*,
indent: int = 2,
**dump_kwargs: Any,
) -> None:
"""Write JSON data to a file atomically.
Uses temp file + fsync + os.replace to ensure the target file is never
left in a partially-written state. If the process crashes mid-write,
the previous version of the file remains intact.
Args:
path: Target file path (will be created or overwritten).
data: JSON-serializable data to write.
indent: JSON indentation (default 2).
**dump_kwargs: Additional keyword args forwarded to json.dump(), such
as default=str for non-native types.
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
original_mode = _preserve_file_mode(path)
fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent),
prefix=f".{path.stem}_",
suffix=".tmp",
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(
data,
f,
indent=indent,
ensure_ascii=False,
**dump_kwargs,
)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path)
_restore_file_mode(path, original_mode)
except BaseException:
# Intentionally catch BaseException so temp-file cleanup still runs for
# KeyboardInterrupt/SystemExit before re-raising the original signal.
try:
os.unlink(tmp_path)
except OSError:
pass
raise
def atomic_yaml_write(
path: Union[str, Path],
data: Any,
*,
default_flow_style: bool = False,
sort_keys: bool = False,
extra_content: str | None = None,
) -> None:
"""Write YAML data to a file atomically.
Uses temp file + fsync + os.replace to ensure the target file is never
left in a partially-written state. If the process crashes mid-write,
the previous version of the file remains intact.
Args:
path: Target file path (will be created or overwritten).
data: YAML-serializable data to write.
default_flow_style: YAML flow style (default False).
sort_keys: Whether to sort dict keys (default False).
extra_content: Optional string to append after the YAML dump
(e.g. commented-out sections for user reference).
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
original_mode = _preserve_file_mode(path)
fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent),
prefix=f".{path.stem}_",
suffix=".tmp",
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=default_flow_style, sort_keys=sort_keys)
if extra_content:
f.write(extra_content)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path)
_restore_file_mode(path, original_mode)
except BaseException:
# Match atomic_json_write: cleanup must also happen for process-level
# interruptions before we re-raise them.
try:
os.unlink(tmp_path)
except OSError:
pass
raise
# ─── JSON Helpers ─────────────────────────────────────────────────────────────
def safe_json_loads(text: str, default: Any = None) -> Any:
"""Parse JSON, returning *default* on any parse error.
Replaces the ``try: json.loads(x) except (JSONDecodeError, TypeError)``
pattern duplicated across display.py, anthropic_adapter.py,
auxiliary_client.py, and others.
"""
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError, ValueError):
return default
# ─── Environment Variable Helpers ─────────────────────────────────────────────
def env_int(key: str, default: int = 0) -> int:
"""Read an environment variable as an integer, with fallback."""
raw = os.getenv(key, "").strip()
if not raw:
return default
try:
return int(raw)
except (ValueError, TypeError):
return default
def env_bool(key: str, default: bool = False) -> bool:
"""Read an environment variable as a boolean."""
return is_truthy_value(os.getenv(key, ""), default=default)
# ─── Proxy Helpers ────────────────────────────────────────────────────────────
_PROXY_ENV_KEYS = (
"HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY",
"https_proxy", "http_proxy", "all_proxy",
)
def normalize_proxy_url(proxy_url: str | None) -> str | None:
"""Normalize proxy URLs for httpx/aiohttp compatibility.
WSL/Clash-style environments often export SOCKS proxies as
``socks://127.0.0.1:PORT``. httpx rejects that alias and expects the
explicit ``socks5://`` scheme instead.
"""
candidate = str(proxy_url or "").strip()
if not candidate:
return None
if candidate.lower().startswith("socks://"):
return f"socks5://{candidate[len('socks://'):]}"
return candidate
def normalize_proxy_env_vars() -> None:
"""Rewrite supported proxy env vars to canonical URL forms in-place."""
for key in _PROXY_ENV_KEYS:
value = os.getenv(key, "")
normalized = normalize_proxy_url(value)
if normalized and normalized != value:
os.environ[key] = normalized
# ─── URL Parsing Helpers ──────────────────────────────────────────────────────
def base_url_hostname(base_url: str) -> str:
"""Return the lowercased hostname for a base URL, or ``""`` if absent.
Use exact-hostname comparisons against known provider hosts
(``api.openai.com``, ``api.x.ai``, ``api.anthropic.com``) instead of
substring matches on the raw URL. Substring checks treat attacker- or
proxy-controlled paths/hosts like ``https://api.openai.com.example/v1``
or ``https://proxy.test/api.openai.com/v1`` as native endpoints, which
leads to wrong api_mode / auth routing.
"""
raw = (base_url or "").strip()
if not raw:
return ""
parsed = urlparse(raw if "://" in raw else f"//{raw}")
return (parsed.hostname or "").lower().rstrip(".")
def base_url_host_matches(base_url: str, domain: str) -> bool:
"""Return True when the base URL's hostname is ``domain`` or a subdomain.
Safer counterpart to ``domain in base_url``, which is the substring
false-positive class documented on ``base_url_hostname``. Accepts bare
hosts, full URLs, and URLs with paths.
base_url_host_matches("https://api.moonshot.ai/v1", "moonshot.ai") == True
base_url_host_matches("https://moonshot.ai", "moonshot.ai") == True
base_url_host_matches("https://evil.com/moonshot.ai/v1", "moonshot.ai") == False
base_url_host_matches("https://moonshot.ai.evil/v1", "moonshot.ai") == False
"""
hostname = base_url_hostname(base_url)
if not hostname:
return False
domain = (domain or "").strip().lower().rstrip(".")
if not domain:
return False
return hostname == domain or hostname.endswith("." + domain)