Skip to content

Commit 505688f

Browse files
Propagate dial-side OSError from _query_leader for per-node attribution
Pre-fix the function swallowed pre-handshake OSError as None, losing per-node attribution: a node that consistently refused connection contributed to the aggregate ClusterError as "<addr>: no leader known" rather than "<addr>: ConnectionRefusedError". Operators reading the aggregate could not distinguish "node unreachable" from "node up, no leader elected" — different remediation. Now propagate OSError (and TimeoutError, ConnectionRefusedError, etc.) to the caller. _probe_one already wraps the call in a try/except that classifies transport errors per node into the aggregate. Mirrors go-dqlite's connector.go shape where every failure class is attributed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 60f9e35 commit 505688f

3 files changed

Lines changed: 122 additions & 19 deletions

File tree

src/dqliteclient/cluster.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -924,19 +924,33 @@ async def _probe_one(idx: int, node: _StoreNodeInfo) -> _LeaderHit | _ProbeMiss:
924924
async def _query_leader(
925925
self, address: str, *, trust_server_heartbeat: bool = False
926926
) -> str | None:
927-
"""Query a node for the current leader."""
928-
try:
929-
reader, writer = await asyncio.wait_for(
930-
open_connection(address, dial_func=self._dial_func),
931-
timeout=self._dial_timeout,
932-
)
933-
except OSError:
934-
# OSError subsumes TimeoutError, BrokenPipeError,
935-
# ConnectionError, ConnectionRefusedError, and the rest
936-
# of the stdlib transport-error shapes. Any one of those
937-
# here means the node is unreachable; surface "unknown
938-
# leader" to the caller so it can try another node.
939-
return None
927+
"""Query a node for the current leader.
928+
929+
Raises ``OSError`` (or a subclass) if the dial itself fails;
930+
``DqliteConnectionError`` / ``ProtocolError`` /
931+
``OperationalError`` for handshake-or-later failures. The
932+
caller (``_probe_one``) attributes each failure class to the
933+
per-node aggregate ``ClusterError``. Returns ``None`` when
934+
the node is reachable but reports no leader (the
935+
"unknown leader" branch the wire-level ``LeaderResponse``
936+
handles).
937+
938+
Pre-fix this swallowed pre-handshake ``OSError`` as ``None``,
939+
losing per-node attribution: a node that consistently
940+
refused connection contributed to the aggregate as
941+
``"<addr>: no leader known"`` rather than
942+
``"<addr>: ConnectionRefusedError"``. Operators reading the
943+
aggregate cannot remediate without distinguishing
944+
"node unreachable" from "node up, no leader elected".
945+
"""
946+
# Let OSError (subsumes TimeoutError, ConnectionRefused,
947+
# BrokenPipe, etc.) propagate to the caller. ``_probe_one``
948+
# already wraps this call in a try/except that classifies
949+
# transport errors per node into the aggregate ClusterError.
950+
reader, writer = await asyncio.wait_for(
951+
open_connection(address, dial_func=self._dial_func),
952+
timeout=self._dial_timeout,
953+
)
940954

941955
try:
942956
protocol = DqliteProtocol(

tests/test_dial_attempt_timeout_split.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,15 +141,16 @@ async def slow_open(*args: object, **kwargs: object) -> tuple[object, object]:
141141
"dqliteclient._dial.open_connection_with_keepalive",
142142
side_effect=slow_open,
143143
):
144-
# ``_query_leader`` returns ``None`` on OSError-family
145-
# (which subsumes TimeoutError on cancellation). The test
146-
# passes if it returns within the dial_timeout budget, NOT
147-
# after the full ``timeout=10.0``.
144+
# ``_query_leader`` raises TimeoutError when the dial budget
145+
# is exhausted (the per-node attribution is the caller's
146+
# job — see ``_probe_one``). The test passes if the raise
147+
# arrives within the dial_timeout budget, NOT after the
148+
# full ``timeout=10.0``.
148149
start = asyncio.get_running_loop().time()
149-
result = await cluster._query_leader("localhost:9001")
150+
with pytest.raises(TimeoutError):
151+
await cluster._query_leader("localhost:9001")
150152
elapsed = asyncio.get_running_loop().time() - start
151153

152-
assert result is None
153154
assert elapsed < 1.5, (
154155
f"expected query_leader to give up within ~dial_timeout (0.5s); "
155156
f"actually took {elapsed:.3f}s"
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""``_query_leader`` propagates pre-handshake transport errors
2+
(OSError, TimeoutError, ConnectionRefusedError) to the caller so
3+
``_probe_one`` can attribute them to the per-node aggregate
4+
``ClusterError`` with a specific exception class.
5+
6+
Pre-fix the function swallowed OSError as ``None``, conflating
7+
"node unreachable" with "node up, no leader elected" — operators
8+
reading the aggregate could not distinguish the two and could not
9+
remediate appropriately.
10+
"""
11+
12+
import asyncio
13+
from unittest.mock import patch
14+
15+
import pytest
16+
17+
from dqliteclient.cluster import ClusterClient
18+
from dqliteclient.exceptions import ClusterError
19+
from dqliteclient.node_store import MemoryNodeStore
20+
21+
22+
@pytest.mark.asyncio
23+
async def test_query_leader_propagates_connection_refused() -> None:
24+
cluster = ClusterClient(MemoryNodeStore(["unreachable:9001"]), timeout=2.0)
25+
26+
async def refused_open(*_args: object, **_kwargs: object):
27+
raise ConnectionRefusedError("nothing listening")
28+
29+
with (
30+
patch(
31+
"dqliteclient._dial.open_connection_with_keepalive",
32+
side_effect=refused_open,
33+
),
34+
pytest.raises(ConnectionRefusedError),
35+
):
36+
await cluster._query_leader("unreachable:9001")
37+
38+
39+
@pytest.mark.asyncio
40+
async def test_query_leader_propagates_timeout() -> None:
41+
cluster = ClusterClient(
42+
MemoryNodeStore(["slow:9001"]),
43+
timeout=2.0,
44+
dial_timeout=0.05,
45+
)
46+
47+
async def slow_open(*_args: object, **_kwargs: object):
48+
await asyncio.sleep(1.0)
49+
50+
with (
51+
patch(
52+
"dqliteclient._dial.open_connection_with_keepalive",
53+
side_effect=slow_open,
54+
),
55+
pytest.raises(TimeoutError),
56+
):
57+
await cluster._query_leader("slow:9001")
58+
59+
60+
@pytest.mark.asyncio
61+
async def test_find_leader_aggregate_attributes_dial_error() -> None:
62+
"""End-to-end: a node that refuses connection appears in the
63+
aggregate ClusterError with its specific exception class, not
64+
a vague "no leader known"."""
65+
cluster = ClusterClient(
66+
MemoryNodeStore(["unreachable:9001"]),
67+
timeout=2.0,
68+
dial_timeout=0.5,
69+
attempt_timeout=1.0,
70+
)
71+
72+
async def refused_open(*_args: object, **_kwargs: object):
73+
raise ConnectionRefusedError("nothing listening")
74+
75+
with (
76+
patch(
77+
"dqliteclient._dial.open_connection_with_keepalive",
78+
side_effect=refused_open,
79+
),
80+
pytest.raises(ClusterError) as excinfo,
81+
):
82+
await cluster.find_leader()
83+
84+
# The aggregate error message must mention the node and the
85+
# specific transport error rather than just "no leader known".
86+
msg = str(excinfo.value)
87+
assert "unreachable:9001" in msg
88+
assert "ConnectionRefusedError" in msg or "refused" in msg.lower() or "nothing listening" in msg

0 commit comments

Comments
 (0)