Skip to content

Commit 73f9951

Browse files
Re-check _closed after fresh-slot _create_connection in pool acquire
The fresh-slot arm reserves a new ``_size`` slot under ``_lock`` then drops the lock and awaits ``_create_connection`` (TCP handshake + leader discovery). ``close()`` racing this await flips ``self._closed = True`` while the create is in-flight; without a post-create ``_closed`` re-check, the fresh connection was yielded on a pool whose flag is True — contract violation and a sneaky leak (user runs real queries against an invisibly-closed pool until they exit the ``async with`` block). The dead-conn-replacement arm at lines 1196-1210 already had this discipline; the fresh-slot arm did not. The fix adds the same shielded-close + shielded-release-reservation + ``DqliteConnectionError`` pattern to the fresh-slot arm. Pin races ``pool.close()`` past a suspended fresh-slot ``_create_connection`` and asserts the acquire raises ``DqliteConnectionError("Pool is closed")``, the fresh conn's ``close()`` was awaited (no transport leak), AND the reservation slot was released back to the pool (no slot wedge — the close-awaited assertion alone would NOT catch this). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5b5f922 commit 73f9951

2 files changed

Lines changed: 104 additions & 0 deletions

File tree

src/dqliteclient/pool.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,24 @@ async def acquire(self) -> AsyncIterator[DqliteConnection]:
986986
with contextlib.suppress(asyncio.CancelledError):
987987
await asyncio.shield(self._release_reservation())
988988
raise
989+
# close() may have run while ``_create_connection``
990+
# was suspended on leader discovery / TCP handshake.
991+
# Without this re-check, the fresh connection would
992+
# be yielded on a pool whose ``_closed`` flag is True
993+
# — contract violation and a sneaky leak (user runs
994+
# real queries against an invisibly-closed pool until
995+
# they exit the ``async with`` block). The dead-conn-
996+
# replacement arm at lines 1196-1210 has the
997+
# symmetric discipline; this restores parity. Shield
998+
# both the close and the reservation release so an
999+
# outer cancel cannot leak the freshly-built
1000+
# connection's transport or the reservation slot.
1001+
if self._closed:
1002+
with contextlib.suppress(asyncio.CancelledError):
1003+
await asyncio.shield(conn.close())
1004+
with contextlib.suppress(asyncio.CancelledError):
1005+
await asyncio.shield(self._release_reservation())
1006+
raise DqliteConnectionError(f"Pool is closed (id={id(self)})")
9891007
break
9901008

9911009
# At capacity — wait briefly on the queue, then loop back to
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""``pool.acquire()`` must not yield a fresh connection from the
2+
fresh-slot reservation arm on a closed pool.
3+
4+
The fresh-slot arm reserves a new ``_size`` slot under ``_lock`` then
5+
drops the lock and awaits ``_create_connection`` (TCP handshake +
6+
leader discovery). ``close()`` racing this await flips
7+
``self._closed = True`` while the create is in-flight; without a
8+
post-create ``_closed`` re-check, the fresh connection is yielded on
9+
a pool whose flag is True — contract violation, sneaky leak (user
10+
runs real queries against an invisibly-closed pool until they exit
11+
the ``async with`` block; ``_release`` then closes the conn but the
12+
queries already ran).
13+
14+
The dead-conn-replacement arm at ``pool.py:1202-1210`` already has
15+
this re-check; the fresh-slot arm did not.
16+
17+
Pin: a fresh-slot acquire racing close() must raise
18+
``DqliteConnectionError("Pool is closed")`` AND close the freshly-built
19+
connection (no transport leak).
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import asyncio
25+
from unittest.mock import AsyncMock, MagicMock
26+
27+
import pytest
28+
29+
from dqliteclient.exceptions import DqliteConnectionError
30+
from dqliteclient.pool import ConnectionPool
31+
32+
33+
@pytest.mark.asyncio
34+
async def test_acquire_fresh_slot_raises_on_close_race() -> None:
35+
"""close() landing between the fresh-slot _create_connection
36+
suspension and the yield must raise DqliteConnectionError and
37+
close the fresh connection — symmetric with the dead-conn-revive
38+
arm's discipline.
39+
"""
40+
pool = ConnectionPool(["localhost:19001"], min_size=0, max_size=1, timeout=2.0)
41+
42+
# Empty queue, fresh-slot arm taken (size=0, max_size=1).
43+
fresh_conn = MagicMock()
44+
fresh_conn.is_connected = True
45+
fresh_conn.close = AsyncMock()
46+
fresh_conn._address = "localhost:19001"
47+
fresh_conn._pool_released = False
48+
49+
create_gate = asyncio.Event()
50+
51+
async def _create() -> object:
52+
# Block until the test fires close(); release the fresh conn
53+
# only after close() has set self._closed = True and returned.
54+
await create_gate.wait()
55+
return fresh_conn
56+
57+
pool._create_connection = _create # type: ignore[assignment]
58+
59+
async def _consume() -> None:
60+
async with pool.acquire() as conn:
61+
pytest.fail(f"acquire yielded on a closed pool: {conn!r}")
62+
63+
consume_task = asyncio.create_task(_consume())
64+
65+
# Wait for the fresh-slot arm to reach _create_connection.
66+
for _ in range(50):
67+
await asyncio.sleep(0)
68+
if create_gate._waiters: # consumer is parked on create_gate.wait()
69+
break
70+
71+
# Race close() past the suspended _create_connection.
72+
await pool.close()
73+
# Now let the fresh-slot arm resume and produce the fresh conn.
74+
create_gate.set()
75+
76+
with pytest.raises(DqliteConnectionError, match="Pool is closed"):
77+
await asyncio.wait_for(consume_task, timeout=1.0)
78+
79+
# The fresh conn must have been closed (no transport leak).
80+
fresh_conn.close.assert_awaited()
81+
# The reservation slot must have been released back to the pool —
82+
# otherwise the pool wedges at ``_size = max_size`` even though
83+
# nothing is checked out. Catches a regression where the
84+
# ``_release_reservation()`` shield is dropped from the new arm
85+
# (the close-awaited assertion alone does NOT catch this).
86+
assert pool._size == 0, f"reservation slot leaked on close-race: pool._size={pool._size}"

0 commit comments

Comments
 (0)