Skip to content

Commit 3756774

Browse files
stac: retry transient child failures once; exit on root failure (#73)
Two related changes that sharpen the startup-resilience story: 1. Retry pass for transient child failures. After the main pool drains, children that timed out get one more attempt in a fresh pool using a longer STAC_CHILD_RETRY_TIMEOUT (default 8s, env-overridable). Rescues the tail-latency case (~2 of 81 collections observed on 2026-04-18 dev rollout) without needing to kill the pod. A retry that also fails leaves the original error in STAC_LOAD_ERRORS. 2. Exit on root-catalog failure. When the root fetch itself fails at startup, sys.exit(1) rather than starting uvicorn with an empty catalog. Lets Kubernetes restart the pod so the next attempt can catch better S3 conditions, instead of presenting a useless server. Partial catalogs (some children failed) still serve — that's the point of the resilience design; only total failure triggers exit. 3. Default STAC_FETCH_CONCURRENCY bumped from 8 to 16 so the main pool plus retry pass both fit within the readiness-probe budget. Tests: 4 new (retry rescue, retry failure is final, retry uses configured timeout, new concurrency default). Full suite: 95 passed. Co-authored-by: Carl Boettiger <cboettig@berkeley.edu>
1 parent b8398c1 commit 3756774

3 files changed

Lines changed: 209 additions & 14 deletions

File tree

server.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from mcp.shared.session import BaseSession
1212
from starlette.middleware.base import BaseHTTPMiddleware
1313
from starlette.responses import JSONResponse
14-
from stac import STAC_DATASETS, STAC_CATALOG_URL, list_datasets as _stac_list, get_dataset as _stac_get, get_collection as _stac_get_collection
14+
from stac import STAC_DATASETS, STAC_LOAD_ERRORS, STAC_CATALOG_URL, list_datasets as _stac_list, get_dataset as _stac_get, get_collection as _stac_get_collection
1515

1616
# Workaround for https://github.com/boettiger-lab/mcp-data-server/issues/5
1717
# send_notification crashes with ClosedResourceError when the client disconnects
@@ -276,6 +276,19 @@ async def dispatch(self, request, call_next):
276276
# 10. SERVER START
277277
# -------------------------------------------------------------------------
278278
if __name__ == "__main__":
279+
# If the STAC root catalog was unreachable at startup, serving would give
280+
# clients a useless empty catalog. Exit non-zero so Kubernetes restarts the
281+
# pod and gets a fresh attempt against whatever S3 looks like now. Child
282+
# failures (partial catalog) are fine — the resilience design serves what
283+
# loaded and records the rest in STAC_LOAD_ERRORS for list_datasets's footer.
284+
if "__root__" in STAC_LOAD_ERRORS:
285+
print(
286+
"💀 STAC root catalog unreachable at startup — exiting so k8s can "
287+
f"restart and retry. Reason: {STAC_LOAD_ERRORS['__root__']}",
288+
file=sys.stderr,
289+
)
290+
sys.exit(1)
291+
279292
app = mcp.streamable_http_app()
280293
app.router.redirect_slashes = False
281294
mount_tiles(app)

stac.py

Lines changed: 87 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@
3030
_STAC_CHILD_TIMEOUT = int(
3131
os.environ.get("STAC_CHILD_TIMEOUT", os.environ.get("STAC_TIMEOUT", "5"))
3232
)
33-
_STAC_FETCH_CONCURRENCY = int(os.environ.get("STAC_FETCH_CONCURRENCY", "8"))
33+
_STAC_FETCH_CONCURRENCY = int(os.environ.get("STAC_FETCH_CONCURRENCY", "16"))
34+
35+
# Child fetches that time out on the first pass are retried once, each with a
36+
# longer per-child timeout. Rescues tail-latency failures without leaving
37+
# otherwise-healthy collections out of the catalog. A persistent failure
38+
# (both attempts time out) stays in STAC_LOAD_ERRORS.
39+
_STAC_CHILD_RETRY_TIMEOUT = int(os.environ.get("STAC_CHILD_RETRY_TIMEOUT", "8"))
3440

3541
# Legacy alias retained so external callers (if any) that read the old name still work.
3642
_STAC_TIMEOUT = _STAC_CHILD_TIMEOUT
@@ -307,7 +313,7 @@ def _collection_to_dict(col, sub_children=None) -> dict:
307313
return {k: v for k, v in result.items() if v is not None}
308314

309315

310-
def _fetch_parent(href: str, title: str | None, token: str | None):
316+
def _fetch_parent(href: str, title: str | None, token: str | None, timeout: int = None):
311317
"""Thread-worker: fetch one top-level child Collection.
312318
313319
Returns a 3-tuple (col, subchild_hrefs, error):
@@ -322,8 +328,13 @@ def _fetch_parent(href: str, title: str | None, token: str | None):
322328
323329
This function must NEVER raise — all exceptions are caught and translated
324330
to the error dict.
331+
332+
`timeout` overrides the default child timeout; used by the retry pass.
325333
"""
326-
child_io = _TimeoutStacIO(token=token, timeout=_STAC_CHILD_TIMEOUT)
334+
child_io = _TimeoutStacIO(
335+
token=token,
336+
timeout=timeout if timeout is not None else _STAC_CHILD_TIMEOUT,
337+
)
327338
try:
328339
col = pystac.Collection.from_file(href, stac_io=child_io)
329340
subchild_hrefs = [l.href for l in (col.links or []) if l.rel == "child"]
@@ -334,16 +345,21 @@ def _fetch_parent(href: str, title: str | None, token: str | None):
334345
return None, [], {ident: reason}
335346

336347

337-
def _fetch_subchild(href: str, parent_id: str, token: str | None):
348+
def _fetch_subchild(href: str, parent_id: str, token: str | None, timeout: int = None):
338349
"""Thread-worker: fetch one sub-child Collection (a leaf of a parent).
339350
340351
Returns a 2-tuple (col, error):
341352
- col: the parsed pystac.Collection on success, else None
342353
- error: None on success, or {identifier: reason} on failure
343354
344355
Never raises — all exceptions caught.
356+
357+
`timeout` overrides the default child timeout; used by the retry pass.
345358
"""
346-
child_io = _TimeoutStacIO(token=token, timeout=_STAC_CHILD_TIMEOUT)
359+
child_io = _TimeoutStacIO(
360+
token=token,
361+
timeout=timeout if timeout is not None else _STAC_CHILD_TIMEOUT,
362+
)
347363
try:
348364
col = pystac.Collection.from_file(href, stac_io=child_io)
349365
return col, None
@@ -402,19 +418,27 @@ def fetch_stac_catalog(catalog_url: str = None, catalog_token: str = None) -> di
402418
subchild_cols_by_parent: dict = {}
403419
errors: dict = {}
404420

421+
# Failed items recorded during the initial pool drain so we can retry them once
422+
# below with a longer per-child timeout. Rescues tail-latency failures without
423+
# leaving healthy-but-slow collections out of the catalog.
424+
failed_parents: list[tuple[str, str | None]] = [] # (href, title)
425+
failed_subchildren: list[tuple[str, str]] = [] # (href, parent_col_id)
426+
405427
with ThreadPoolExecutor(max_workers=_STAC_FETCH_CONCURRENCY) as pool:
406-
# future -> ("parent", href) OR ("subchild", parent_col_id)
428+
# future -> ("parent", href, title) OR ("subchild", href, parent_col_id)
407429
pending: dict = {}
408430

409431
for href, title in parent_links:
410432
fut = pool.submit(_fetch_parent, href, title, catalog_token)
411-
pending[fut] = ("parent", href)
433+
pending[fut] = ("parent", href, title)
412434

413435
while pending:
414436
done, _ = wait(pending.keys(), return_when=FIRST_COMPLETED)
415437
for fut in done:
416-
kind, ctx = pending.pop(fut)
438+
entry = pending.pop(fut)
439+
kind = entry[0]
417440
if kind == "parent":
441+
_, href, title = entry
418442
col, subchild_hrefs, error = fut.result()
419443
if col is not None:
420444
parent_cols[col.id] = col
@@ -424,14 +448,67 @@ def fetch_stac_catalog(catalog_url: str = None, catalog_token: str = None) -> di
424448
sub_fut = pool.submit(
425449
_fetch_subchild, sub_href, col.id, catalog_token,
426450
)
427-
pending[sub_fut] = ("subchild", col.id)
451+
pending[sub_fut] = ("subchild", sub_href, col.id)
452+
if error:
453+
errors.update(error)
454+
failed_parents.append((href, title))
455+
else: # subchild
456+
_, sub_href, parent_col_id = entry
457+
col, error = fut.result()
458+
if col is not None:
459+
subchild_cols_by_parent.setdefault(parent_col_id, {})[col.id] = col
460+
if error:
461+
errors.update(error)
462+
failed_subchildren.append((sub_href, parent_col_id))
463+
464+
# --- Phase 2.5: retry failed children once with a longer timeout ---
465+
# Rescues tail-latency failures (borderline-slow S3 responses). A retry that
466+
# also fails leaves the original error in STAC_LOAD_ERRORS. If a previously-failed
467+
# parent succeeds on retry, its sub-children are NOT re-fetched here — they
468+
# weren't attempted the first time because we didn't have the parent's JSON;
469+
# a cache-miss on any specific sub-child ID will trigger a fresh walk later.
470+
if failed_parents or failed_subchildren:
471+
with ThreadPoolExecutor(max_workers=_STAC_FETCH_CONCURRENCY) as retry_pool:
472+
retry_futures: dict = {}
473+
for href, title in failed_parents:
474+
fut = retry_pool.submit(
475+
_fetch_parent, href, title, catalog_token, _STAC_CHILD_RETRY_TIMEOUT,
476+
)
477+
retry_futures[fut] = ("parent", href, title)
478+
for sub_href, parent_col_id in failed_subchildren:
479+
fut = retry_pool.submit(
480+
_fetch_subchild, sub_href, parent_col_id, catalog_token, _STAC_CHILD_RETRY_TIMEOUT,
481+
)
482+
retry_futures[fut] = ("subchild", sub_href, parent_col_id)
483+
for fut in retry_futures:
484+
entry = retry_futures[fut]
485+
kind = entry[0]
486+
if kind == "parent":
487+
_, href, title = entry
488+
col, _subchild_hrefs, error = fut.result()
489+
if col is not None:
490+
parent_cols[col.id] = col
491+
# Clear the first-pass error for this parent since retry succeeded
492+
errors.pop(_child_identifier(href, title_hint=title), None)
493+
print(
494+
f"🔁 Retry succeeded for parent: {col.id}",
495+
file=sys.stderr,
496+
)
497+
# If retry also failed, `error` carries the same identifier key as the
498+
# first-pass error; updating with it leaves the STAC_LOAD_ERRORS entry
499+
# pointing at the most-recent (retry) reason.
428500
if error:
429501
errors.update(error)
430502
else: # subchild
431-
parent_col_id = ctx
503+
_, sub_href, parent_col_id = entry
432504
col, error = fut.result()
433505
if col is not None:
434506
subchild_cols_by_parent.setdefault(parent_col_id, {})[col.id] = col
507+
errors.pop(_child_identifier(sub_href, title_hint=None), None)
508+
print(
509+
f"🔁 Retry succeeded for sub-child: {col.id}",
510+
file=sys.stderr,
511+
)
435512
if error:
436513
errors.update(error)
437514

tests/test_stac.py

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -541,12 +541,13 @@ def test_child_timeout_default_is_5(self, monkeypatch):
541541
importlib.reload(stac)
542542
assert stac._STAC_CHILD_TIMEOUT == 5
543543

544-
def test_fetch_concurrency_default_is_8(self, monkeypatch):
545-
"""With no env var set, STAC_FETCH_CONCURRENCY defaults to 8."""
544+
def test_fetch_concurrency_default_is_16(self, monkeypatch):
545+
"""Default concurrency is 16 — sized so the main pool plus the retry pass
546+
both fit in the readiness-probe budget."""
546547
monkeypatch.delenv("STAC_FETCH_CONCURRENCY", raising=False)
547548
import stac
548549
importlib.reload(stac)
549-
assert stac._STAC_FETCH_CONCURRENCY == 8
550+
assert stac._STAC_FETCH_CONCURRENCY == 16
550551

551552
def test_stac_timeout_back_compat_applies_to_both(self, monkeypatch):
552553
"""If only STAC_TIMEOUT is set, both root and child timeouts adopt its value."""
@@ -992,3 +993,107 @@ def test_cache_miss_refetch_with_all_children_failing_preserves_previous_cache(s
992993
assert len(stac.STAC_LOAD_ERRORS) == 1
993994
assert any("public-new" in k for k in stac.STAC_LOAD_ERRORS.keys())
994995

996+
def test_child_retry_timeout_default_is_8(self, monkeypatch):
997+
"""Retry pass uses a longer per-child timeout (default 8s) to rescue borderline slow children."""
998+
monkeypatch.delenv("STAC_CHILD_RETRY_TIMEOUT", raising=False)
999+
import stac
1000+
importlib.reload(stac)
1001+
assert stac._STAC_CHILD_RETRY_TIMEOUT == 8
1002+
1003+
def test_retry_rescues_transient_parent_failure(self):
1004+
"""A parent that fails the first pass but succeeds on retry ends up in the catalog
1005+
and out of STAC_LOAD_ERRORS."""
1006+
from unittest.mock import patch
1007+
import requests
1008+
import stac
1009+
1010+
self._reset_module_state(stac)
1011+
1012+
cat = self._make_root_catalog([
1013+
"https://example.com/public-flaky/stac-collection.json",
1014+
])
1015+
col = self._make_leaf_collection("flaky")
1016+
1017+
# First call raises Timeout; subsequent calls return the Collection.
1018+
call_count = {"n": 0}
1019+
def flaky_from_file(href, *args, **kwargs):
1020+
call_count["n"] += 1
1021+
if call_count["n"] == 1:
1022+
raise requests.exceptions.Timeout("first pass slow")
1023+
return col
1024+
1025+
with patch("stac.pystac.Catalog.from_file", return_value=cat), \
1026+
patch("stac.pystac.Collection.from_file", side_effect=flaky_from_file):
1027+
result = stac.fetch_stac_catalog()
1028+
1029+
# Retry saved it
1030+
assert "flaky" in result
1031+
assert call_count["n"] == 2 # one failure + one retry
1032+
# Nothing in the error dict — retry cleared the first-pass failure
1033+
assert stac.STAC_LOAD_ERRORS == {}
1034+
1035+
def test_retry_failure_is_final(self):
1036+
"""If both the initial attempt and the retry fail, the error stays in STAC_LOAD_ERRORS
1037+
and the collection is missing from the catalog."""
1038+
from unittest.mock import patch
1039+
import requests
1040+
import stac
1041+
1042+
self._reset_module_state(stac)
1043+
1044+
cat = self._make_root_catalog([
1045+
"https://example.com/public-dead/stac-collection.json",
1046+
])
1047+
1048+
with patch("stac.pystac.Catalog.from_file", return_value=cat), \
1049+
patch(
1050+
"stac.pystac.Collection.from_file",
1051+
side_effect=requests.exceptions.Timeout("always slow"),
1052+
):
1053+
result = stac.fetch_stac_catalog()
1054+
1055+
assert result == {}
1056+
assert len(stac.STAC_LOAD_ERRORS) == 1
1057+
assert any("public-dead" in k for k in stac.STAC_LOAD_ERRORS.keys())
1058+
1059+
def test_retry_uses_child_retry_timeout(self, monkeypatch):
1060+
"""The retry pass constructs _TimeoutStacIO with _STAC_CHILD_RETRY_TIMEOUT, not _STAC_CHILD_TIMEOUT."""
1061+
from unittest.mock import patch, MagicMock
1062+
import requests
1063+
import stac
1064+
1065+
monkeypatch.setenv("STAC_CHILD_TIMEOUT", "5")
1066+
monkeypatch.setenv("STAC_CHILD_RETRY_TIMEOUT", "8")
1067+
importlib.reload(stac)
1068+
self._reset_module_state(stac)
1069+
1070+
cat = self._make_root_catalog([
1071+
"https://example.com/public-flaky/stac-collection.json",
1072+
])
1073+
col = self._make_leaf_collection("flaky")
1074+
1075+
seen_timeouts = []
1076+
orig_TimeoutStacIO = stac._TimeoutStacIO
1077+
1078+
def capturing_stac_io(*args, **kwargs):
1079+
seen_timeouts.append(kwargs.get("timeout"))
1080+
return orig_TimeoutStacIO(*args, **kwargs)
1081+
1082+
call_count = {"n": 0}
1083+
def flaky_from_file(href, *args, **kwargs):
1084+
call_count["n"] += 1
1085+
if call_count["n"] == 1:
1086+
raise requests.exceptions.Timeout("slow")
1087+
return col
1088+
1089+
with patch("stac._TimeoutStacIO", side_effect=capturing_stac_io), \
1090+
patch("stac.pystac.Catalog.from_file", return_value=cat), \
1091+
patch("stac.pystac.Collection.from_file", side_effect=flaky_from_file):
1092+
stac.fetch_stac_catalog()
1093+
1094+
# Filter out the root_io construction (timeout=_STAC_ROOT_TIMEOUT=15)
1095+
# and look only at the child-fetch constructions.
1096+
child_timeouts = [t for t in seen_timeouts if t != stac._STAC_ROOT_TIMEOUT]
1097+
assert 5 in child_timeouts, f"expected first-pass child timeout (5s) in {child_timeouts}"
1098+
assert 8 in child_timeouts, f"expected retry child timeout (8s) in {child_timeouts}"
1099+

0 commit comments

Comments
 (0)