Skip to content

Commit f294b62

Browse files
authored
Add support for decreasing streams (#640)
1 parent 9b40f0d commit f294b62

9 files changed

Lines changed: 635 additions & 22 deletions

Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,16 @@ extension PoolStateMachine {
301301
return nil
302302
}
303303

304-
self.stats.availableStreams += maxStreams - info.oldMaxStreams
304+
// Use signed-safe arithmetic to avoid UInt16 underflow when the
305+
// server reduces maxStreams. The min() clamp is needed because
306+
// availableStreams may be less than the decrease when streams are
307+
// already leased (those are tracked in leasedStreams, not here).
308+
if maxStreams >= info.oldMaxStreams {
309+
self.stats.availableStreams += maxStreams - info.oldMaxStreams
310+
} else {
311+
let decrease = info.oldMaxStreams - maxStreams
312+
self.stats.availableStreams -= min(decrease, self.stats.availableStreams)
313+
}
305314

306315
return NewMaxStreamInfo(index: index, info: info)
307316
}

Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift

Lines changed: 204 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -268,20 +268,16 @@ import Testing
268268
// validate that a keep alive timer and an idle timeout timer is scheduled
269269
var expectedInstants: Set<MockClock.Instant> = [.init(keepAliveDuration), .init(config.idleTimeout)]
270270
let deadline1 = await clock.nextTimerScheduled()
271-
print(deadline1)
272271
#expect(expectedInstants.remove(deadline1) != nil)
273272
let deadline2 = await clock.nextTimerScheduled()
274-
print(deadline2)
275273
#expect(expectedInstants.remove(deadline2) != nil)
276274
#expect(expectedInstants.isEmpty == true)
277275

278276
// move clock forward to keep alive
279277
let newTime = clock.now.advanced(by: keepAliveDuration)
280278
clock.advance(to: newTime)
281-
print("clock advanced to: \(newTime)")
282279

283280
await keepAlive.nextKeepAlive { keepAliveConnection in
284-
defer { print("keep alive 1 has run") }
285281
#expect(keepAliveConnection === connectionLease.connection)
286282
return true
287283
}
@@ -290,7 +286,6 @@ import Testing
290286

291287
let deadline3 = await clock.nextTimerScheduled()
292288
#expect(deadline3 == clock.now.advanced(by: keepAliveDuration))
293-
print(deadline3)
294289

295290
// race keep alive vs timeout
296291
clock.advance(to: clock.now.advanced(by: keepAliveDuration))
@@ -348,10 +343,8 @@ import Testing
348343
// validate that a keep alive timer and an idle timeout timer is scheduled
349344
var expectedInstants: Set<MockClock.Instant> = [.init(keepAliveDuration), .init(config.idleTimeout)]
350345
let deadline1 = await clock.nextTimerScheduled()
351-
print(deadline1)
352346
#expect(expectedInstants.remove(deadline1) != nil)
353347
let deadline2 = await clock.nextTimerScheduled()
354-
print(deadline2)
355348
#expect(expectedInstants.remove(deadline2) != nil)
356349
#expect(expectedInstants.isEmpty)
357350

@@ -435,23 +428,19 @@ import Testing
435428
// validate that a keep alive timer and an idle timeout timer is scheduled
436429
var expectedInstants: Set<MockClock.Instant> = [.init(keepAliveDuration), .init(config.idleTimeout)]
437430
let deadline1 = await clock.nextTimerScheduled()
438-
print(deadline1)
439431
#expect(expectedInstants.remove(deadline1) != nil)
440432
let deadline2 = await clock.nextTimerScheduled()
441-
print(deadline2)
442433
#expect(expectedInstants.remove(deadline2) != nil)
443434
#expect(expectedInstants.isEmpty)
444435

445436
clock.advance(to: clock.now.advanced(by: keepAliveDuration))
446437

447438
await keepAlive.nextKeepAlive { keepAliveConnection in
448-
defer { print("keep alive 1 has run") }
449439
#expect(keepAliveConnection === connectionLease.connection)
450440
return true
451441
}
452442

453443
taskGroup.cancelAll()
454-
print("cancelled")
455444

456445
for connection in factory.runningConnections {
457446
connection.closeIfClosing()
@@ -629,11 +618,9 @@ import Testing
629618
_ = try await pool.leaseConnection()
630619
Issue.record("Expected a failure")
631620
} catch {
632-
print("failed")
633621
#expect(error as? ConnectionPoolError == .poolShutdown)
634622
}
635623

636-
print("will close connections: \(factory.runningConnections)")
637624
for connection in factory.runningConnections {
638625
try await connection.signalToClose
639626
connection.closeIfClosing()
@@ -1255,6 +1242,210 @@ import Testing
12551242
}
12561243
}
12571244
}
1245+
1246+
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
1247+
@Test func decreaseMaxStreams() async throws {
1248+
let clock = MockClock()
1249+
let factory = MockConnectionFactory<MockClock>()
1250+
let keepAliveDuration = Duration.seconds(30)
1251+
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)
1252+
1253+
var mutableConfig = ConnectionPoolConfiguration()
1254+
mutableConfig.minimumConnectionCount = 0
1255+
mutableConfig.maximumConnectionSoftLimit = 1
1256+
mutableConfig.maximumConnectionHardLimit = 1
1257+
mutableConfig.idleTimeout = .seconds(10)
1258+
let config = mutableConfig
1259+
1260+
let pool = ConnectionPool(
1261+
configuration: config,
1262+
idGenerator: ConnectionIDGenerator(),
1263+
requestType: ConnectionFuture.self,
1264+
keepAliveBehavior: keepAlive,
1265+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
1266+
clock: clock
1267+
) {
1268+
try await factory.makeConnection(id: $0, for: $1)
1269+
}
1270+
1271+
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
1272+
taskGroup.addTask {
1273+
await pool.run()
1274+
}
1275+
1276+
// Create a connection with 4 streams
1277+
let requests = (0..<3).map { ConnectionFuture(id: $0) }
1278+
pool.leaseConnections(requests)
1279+
1280+
let createdConnection = await factory.nextConnectAttempt { _ in
1281+
return 4
1282+
}
1283+
1284+
let lease1 = try await requests[0].future.success
1285+
let lease2 = try await requests[1].future.success
1286+
let lease3 = try await requests[2].future.success
1287+
#expect(lease1.connection === createdConnection)
1288+
1289+
// Decrease maxStreams from 4 to 2 while 3 streams are leased
1290+
pool.connectionReceivedNewMaxStreamSetting(createdConnection, newMaxStreamSetting: 2)
1291+
1292+
// Release stream — 2 out of 2 are still used
1293+
lease1.release()
1294+
1295+
// Lease again, but 2 out of 2 are still used
1296+
let request4 = ConnectionFuture(id: 3)
1297+
pool.leaseConnection(request4)
1298+
#expect(request4.future.isUnfulfilled)
1299+
1300+
// Release stream — this means more space is available
1301+
lease2.release()
1302+
let lease4 = try await request4.future.success
1303+
#expect(lease4.connection === createdConnection)
1304+
1305+
// Release streams — all streams available again
1306+
lease3.release()
1307+
lease4.release()
1308+
1309+
// Verify pool still works after decrease
1310+
let newRequest = ConnectionFuture(id: 10)
1311+
pool.leaseConnections([newRequest])
1312+
let newLease = try await newRequest.future.success
1313+
#expect(newLease.connection === createdConnection)
1314+
newLease.release()
1315+
1316+
taskGroup.cancelAll()
1317+
for connection in factory.runningConnections {
1318+
connection.closeIfClosing()
1319+
}
1320+
}
1321+
}
1322+
1323+
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
1324+
@Test func increaseMaxStreamsWithNoWaitingRequests() async throws {
1325+
let clock = MockClock()
1326+
let factory = MockConnectionFactory<MockClock>()
1327+
let keepAliveDuration = Duration.seconds(30)
1328+
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)
1329+
1330+
var mutableConfig = ConnectionPoolConfiguration()
1331+
mutableConfig.minimumConnectionCount = 0
1332+
mutableConfig.maximumConnectionSoftLimit = 1
1333+
mutableConfig.maximumConnectionHardLimit = 1
1334+
mutableConfig.idleTimeout = .seconds(10)
1335+
let config = mutableConfig
1336+
1337+
let pool = ConnectionPool(
1338+
configuration: config,
1339+
idGenerator: ConnectionIDGenerator(),
1340+
requestType: ConnectionFuture.self,
1341+
keepAliveBehavior: keepAlive,
1342+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
1343+
clock: clock
1344+
) {
1345+
try await factory.makeConnection(id: $0, for: $1)
1346+
}
1347+
1348+
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
1349+
taskGroup.addTask {
1350+
await pool.run()
1351+
}
1352+
1353+
// Create connection with 1 stream, lease it
1354+
let request = ConnectionFuture(id: 0)
1355+
pool.leaseConnection(request)
1356+
let createdConnection = await factory.nextConnectAttempt { _ in
1357+
return 1
1358+
}
1359+
let lease = try await request.future.success
1360+
#expect(lease.connection === createdConnection)
1361+
1362+
// Increase maxStreams with no waiting requests
1363+
pool.connectionReceivedNewMaxStreamSetting(createdConnection, newMaxStreamSetting: 10)
1364+
1365+
// Release and verify we can now lease multiple streams
1366+
lease.release()
1367+
1368+
let multiRequests = (1..<6).map { ConnectionFuture(id: $0) }
1369+
pool.leaseConnections(multiRequests)
1370+
for req in multiRequests {
1371+
let l = try await req.future.success
1372+
#expect(l.connection === createdConnection)
1373+
l.release()
1374+
}
1375+
1376+
taskGroup.cancelAll()
1377+
for connection in factory.runningConnections {
1378+
connection.closeIfClosing()
1379+
}
1380+
}
1381+
}
1382+
1383+
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
1384+
@Test func multipleMaxStreamsChangesInSequence() async throws {
1385+
let clock = MockClock()
1386+
let factory = MockConnectionFactory<MockClock>()
1387+
let keepAliveDuration = Duration.seconds(30)
1388+
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)
1389+
1390+
var mutableConfig = ConnectionPoolConfiguration()
1391+
mutableConfig.minimumConnectionCount = 0
1392+
mutableConfig.maximumConnectionSoftLimit = 1
1393+
mutableConfig.maximumConnectionHardLimit = 1
1394+
mutableConfig.idleTimeout = .seconds(10)
1395+
let config = mutableConfig
1396+
1397+
let pool = ConnectionPool(
1398+
configuration: config,
1399+
idGenerator: ConnectionIDGenerator(),
1400+
requestType: ConnectionFuture.self,
1401+
keepAliveBehavior: keepAlive,
1402+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
1403+
clock: clock
1404+
) {
1405+
try await factory.makeConnection(id: $0, for: $1)
1406+
}
1407+
1408+
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
1409+
taskGroup.addTask {
1410+
await pool.run()
1411+
}
1412+
1413+
// Create connection with 1 stream
1414+
let request = ConnectionFuture(id: 0)
1415+
pool.leaseConnections([request])
1416+
let createdConnection = await factory.nextConnectAttempt { _ in
1417+
return 1
1418+
}
1419+
let lease = try await request.future.success
1420+
lease.release()
1421+
1422+
// Increase to 10
1423+
pool.connectionReceivedNewMaxStreamSetting(createdConnection, newMaxStreamSetting: 10)
1424+
1425+
// Decrease to 3
1426+
pool.connectionReceivedNewMaxStreamSetting(createdConnection, newMaxStreamSetting: 3)
1427+
1428+
// Verify we can lease up to 3 streams
1429+
let requests = (1..<4).map { ConnectionFuture(id: $0) }
1430+
pool.leaseConnections(requests)
1431+
var leases = [ConnectionLease<MockConnection>]()
1432+
for req in requests {
1433+
let l = try await req.future.success
1434+
#expect(l.connection === createdConnection)
1435+
leases.append(l)
1436+
}
1437+
1438+
// Release all
1439+
for lease in leases {
1440+
lease.release()
1441+
}
1442+
1443+
taskGroup.cancelAll()
1444+
for connection in factory.runningConnections {
1445+
connection.closeIfClosing()
1446+
}
1447+
}
1448+
}
12581449
}
12591450

12601451
struct ConnectionFuture: ConnectionRequestProtocol {

Tests/ConnectionPoolModuleTests/ConnectionRequestTests.swift

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import Testing
66

77
@Test func testHappyPath() async throws {
88
let mockConnection = MockConnection(id: 1)
9-
let lease = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<ConnectionLease<MockConnection>, any Error>) in
9+
let lease: ConnectionLease<MockConnection> = try await withCheckedThrowingContinuation { (continuation) in
1010
let request = ConnectionRequest(id: 42, continuation: continuation)
1111
#expect(request.id == 42)
1212
let lease = ConnectionLease(connection: mockConnection) { _ in }
@@ -18,8 +18,10 @@ import Testing
1818

1919
@Test func testSadPath() async throws {
2020
do {
21-
_ = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<MockConnection, any Error>) in
22-
continuation.resume(with: .failure(ConnectionPoolError.requestCancelled))
21+
let _: ConnectionLease<MockConnection> = try await withCheckedThrowingContinuation { (continuation) in
22+
let request = ConnectionRequest(id: 42, continuation: continuation)
23+
#expect(request.id == 42)
24+
request.complete(with: .failure(ConnectionPoolError.requestCancelled))
2325
}
2426
Issue.record("This point should not be reached")
2527
} catch {

0 commit comments

Comments
 (0)