From 260878e3c8c60951c900ae03f2373aba87415e1b Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 25 Sep 2025 14:43:12 +0100 Subject: [PATCH 1/3] Add associatedtype NodeDescription to ValkeyNodeConnectionPoolFactory This allows us have separate node descriptions for different implementations of ValkeyRunningClientsStateMachine Signed-off-by: Adam Fowler --- .../ValkeyClusterClientStateMachine.swift | 7 +++- .../Cluster/ValkeyNodeConnectionPool.swift | 3 +- .../Valkey/Node/ValkeyRunningClients.swift | 33 +++++++------------ 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift b/Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift index fff7b193..86b50d3b 100644 --- a/Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift +++ b/Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift @@ -60,7 +60,12 @@ package struct ValkeyClusterClientStateMachine< Clock: _Concurrency.Clock, SuccessNotifier, TimerCancellationToken: Sendable -> where ConnectionPool == ConnectionPoolFactory.ConnectionPool, Clock.Duration == Duration { +> +where + ConnectionPool == ConnectionPoolFactory.ConnectionPool, + Clock.Duration == Duration, + ConnectionPoolFactory.NodeDescription == ValkeyNodeDescription +{ @usableFromInline struct Timer { diff --git a/Sources/Valkey/Cluster/ValkeyNodeConnectionPool.swift b/Sources/Valkey/Cluster/ValkeyNodeConnectionPool.swift index 3c7f0734..918d0aa3 100644 --- a/Sources/Valkey/Cluster/ValkeyNodeConnectionPool.swift +++ b/Sources/Valkey/Cluster/ValkeyNodeConnectionPool.swift @@ -25,9 +25,10 @@ package protocol ValkeyNodeConnectionPool: AnyObject, Sendable { @available(valkeySwift 1.0, *) package protocol ValkeyNodeConnectionPoolFactory: Sendable { associatedtype ConnectionPool: ValkeyNodeConnectionPool + associatedtype NodeDescription /// Create a shard connection pool based on the provided configuration func makeConnectionPool( - nodeDescription: ValkeyNodeDescription + nodeDescription: NodeDescription ) -> ConnectionPool } diff --git a/Sources/Valkey/Node/ValkeyRunningClients.swift b/Sources/Valkey/Node/ValkeyRunningClients.swift index 403af91a..64106d40 100644 --- a/Sources/Valkey/Node/ValkeyRunningClients.swift +++ b/Sources/Valkey/Node/ValkeyRunningClients.swift @@ -10,19 +10,20 @@ struct ValkeyRunningClientsStateMachine< ConnectionPool: Sendable, ConnectionPoolFactory: ValkeyNodeConnectionPoolFactory -> where ConnectionPoolFactory.ConnectionPool == ConnectionPool { +> where ConnectionPoolFactory.ConnectionPool == ConnectionPool, ConnectionPoolFactory.NodeDescription: Sendable & Identifiable { + @usableFromInline typealias NodeDescription = ConnectionPoolFactory.NodeDescription @usableFromInline /* private */ struct NodeBundle: Sendable { @usableFromInline - var nodeID: ValkeyNodeID { self.nodeDescription.id } + var nodeID: NodeDescription.ID { self.nodeDescription.id } @usableFromInline var pool: ConnectionPool @usableFromInline - var nodeDescription: ValkeyNodeDescription + var nodeDescription: NodeDescription } let poolFactory: ConnectionPoolFactory @usableFromInline - var clientMap: [ValkeyNodeID: NodeBundle] + var clientMap: [NodeDescription.ID: NodeBundle] @inlinable var clients: some Collection { clientMap.values } @@ -33,35 +34,25 @@ struct ValkeyRunningClientsStateMachine< struct PoolUpdateAction { var poolsToShutdown: [ConnectionPool] - var poolsToRun: [(ConnectionPool, ValkeyNodeID)] + var poolsToRun: [(ConnectionPool, NodeDescription.ID)] static func empty() -> PoolUpdateAction { PoolUpdateAction(poolsToShutdown: [], poolsToRun: []) } } mutating func updateNodes( - _ newNodes: some Collection, + _ newNodes: some Collection, removeUnmentionedPools: Bool ) -> PoolUpdateAction { var previousNodes = self.clientMap self.clientMap.removeAll(keepingCapacity: true) - var newPools = [(ConnectionPool, ValkeyNodeID)]() + var newPools = [(ConnectionPool, NodeDescription.ID)]() newPools.reserveCapacity(16) var poolsToShutdown = [ConnectionPool]() for newNodeDescription in newNodes { // if we had a pool previously, let's continue to use it! if let existingPool = previousNodes.removeValue(forKey: newNodeDescription.id) { - if newNodeDescription == existingPool.nodeDescription { - // the existing pool matches the new node description. nothing todo - self.clientMap[newNodeDescription.id] = existingPool - } else { - // the existing pool does not match new node description. For example tls may now be required. - // shutdown the old pool and create a new one - poolsToShutdown.append(existingPool.pool) - let newPool = self.makePool(for: newNodeDescription) - self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription) - newPools.append((newPool, newNodeDescription.id)) - } + self.clientMap[newNodeDescription.id] = existingPool } else { let newPool = self.makePool(for: newNodeDescription) self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription) @@ -95,7 +86,7 @@ struct ValkeyRunningClientsStateMachine< } mutating func addNode( - _ node: ValkeyNodeDescription + _ node: NodeDescription ) -> AddNodeAction { if let pool = self.clientMap[node.id] { return .useExistingPool(pool.pool) @@ -106,7 +97,7 @@ struct ValkeyRunningClientsStateMachine< } @inlinable - subscript(_ index: ValkeyNodeID) -> NodeBundle? { + subscript(_ index: NodeDescription.ID) -> NodeBundle? { self.clientMap[index] } @@ -115,7 +106,7 @@ struct ValkeyRunningClientsStateMachine< self.clientMap.removeAll(keepingCapacity: false) } - func makePool(for description: ValkeyNodeDescription) -> ConnectionPool { + func makePool(for description: NodeDescription) -> ConnectionPool { self.poolFactory.makeConnectionPool(nodeDescription: description) } } From 184813a9bc895128d46b1de3e597af7e0d18d1ab Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 25 Sep 2025 14:14:22 +0000 Subject: [PATCH 2/3] Add separate ValkeyNodeConnectionPoolFactory for cluster and standalone Signed-off-by: Adam Fowler --- .../Valkey/Cluster/ValkeyClusterClient.swift | 4 +- .../ValkeyClusterNodeClientFactory.swift | 60 +++++++++++++++++++ .../Valkey/Node/ValkeyNodeClientFactory.swift | 24 +------- .../ValkeyNodeConnectionPool.swift | 0 Sources/Valkey/ValkeyClient.swift | 2 +- 5 files changed, 65 insertions(+), 25 deletions(-) create mode 100644 Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift rename Sources/Valkey/{Cluster => Node}/ValkeyNodeConnectionPool.swift (100%) diff --git a/Sources/Valkey/Cluster/ValkeyClusterClient.swift b/Sources/Valkey/Cluster/ValkeyClusterClient.swift index 81b1810b..ea490638 100644 --- a/Sources/Valkey/Cluster/ValkeyClusterClient.swift +++ b/Sources/Valkey/Cluster/ValkeyClusterClient.swift @@ -58,7 +58,7 @@ public final class ValkeyClusterClient: Sendable { @usableFromInline typealias StateMachine = ValkeyClusterClientStateMachine< ValkeyNodeClient, - ValkeyNodeClientFactory, + ValkeyClusterNodeClientFactory, ContinuousClock, CheckedContinuation, AsyncStream.Continuation @@ -106,7 +106,7 @@ public final class ValkeyClusterClient: Sendable { (self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self) - let factory = ValkeyNodeClientFactory( + let factory = ValkeyClusterNodeClientFactory( logger: logger, configuration: clientConfiguration, connectionFactory: ValkeyConnectionFactory( diff --git a/Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift b/Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift new file mode 100644 index 00000000..36451e16 --- /dev/null +++ b/Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift @@ -0,0 +1,60 @@ +// +// This source file is part of the valkey-swift project +// Copyright (c) 2025 the valkey-swift project authors +// +// See LICENSE.txt for license information +// SPDX-License-Identifier: Apache-2.0 +// +import Logging +import NIOCore + +/// A factory for creating ``ValkeyNode`` instances to connect to specific nodes. +/// +/// This factory is used by the ``ValkeyClusterClient`` to create client instances +/// for each node in the cluster as needed. +@available(valkeySwift 1.0, *) +@usableFromInline +package struct ValkeyClusterNodeClientFactory: ValkeyNodeConnectionPoolFactory { + @usableFromInline + package typealias ConnectionPool = ValkeyNodeClient + + var logger: Logger + var configuration: ValkeyClientConfiguration + var eventLoopGroup: any EventLoopGroup + let connectionIDGenerator = ConnectionIDGenerator() + let connectionFactory: ValkeyConnectionFactory + + /// Creates a new `ValkeyClusterNodeClientFactory` instance. + /// + /// - Parameters: + /// - logger: The logger used for diagnostic information. + /// - configuration: Configuration for the Valkey clients created by this factory. + /// - eventLoopGroup: The event loop group to use for client connections. + package init( + logger: Logger, + configuration: ValkeyClientConfiguration, + connectionFactory: ValkeyConnectionFactory, + eventLoopGroup: any EventLoopGroup + ) { + self.logger = logger + self.configuration = configuration + self.connectionFactory = connectionFactory + self.eventLoopGroup = eventLoopGroup + } + + /// Creates a connection pool (client) for a specific node in the cluster. + /// + /// - Parameter nodeDescription: Description of the node to connect to. + /// - Returns: A configured `ValkeyNode` instance ready to connect to the specified node. + @usableFromInline + package func makeConnectionPool(nodeDescription: ValkeyNodeDescription) -> ValkeyNodeClient { + let address = ValkeyServerAddress.hostname(nodeDescription.endpoint, port: nodeDescription.port) + return ValkeyNodeClient( + address, + connectionIDGenerator: self.connectionIDGenerator, + connectionFactory: self.connectionFactory, + eventLoopGroup: self.eventLoopGroup, + logger: self.logger + ) + } +} diff --git a/Sources/Valkey/Node/ValkeyNodeClientFactory.swift b/Sources/Valkey/Node/ValkeyNodeClientFactory.swift index 91903a38..f584e847 100644 --- a/Sources/Valkey/Node/ValkeyNodeClientFactory.swift +++ b/Sources/Valkey/Node/ValkeyNodeClientFactory.swift @@ -47,29 +47,9 @@ package struct ValkeyNodeClientFactory: ValkeyNodeConnectionPoolFactory { /// - Parameter nodeDescription: Description of the node to connect to. /// - Returns: A configured `ValkeyNode` instance ready to connect to the specified node. @usableFromInline - package func makeConnectionPool(nodeDescription: ValkeyNodeDescription) -> ValkeyNodeClient { - let serverAddress = ValkeyServerAddress.hostname( - nodeDescription.endpoint, - port: nodeDescription.port - ) - - return ValkeyNodeClient( - serverAddress, - connectionIDGenerator: self.connectionIDGenerator, - connectionFactory: self.connectionFactory, - eventLoopGroup: self.eventLoopGroup, - logger: self.logger - ) - } - - /// Creates a connection pool (client) for a specific node in the cluster. - /// - /// - Parameter nodeDescription: Description of the node to connect to. - /// - Returns: A configured `ValkeyNode` instance ready to connect to the specified node. - @usableFromInline - package func makeConnectionPool(serverAddress: ValkeyServerAddress) -> ValkeyNodeClient { + package func makeConnectionPool(nodeDescription: ValkeyServerAddress) -> ValkeyNodeClient { ValkeyNodeClient( - serverAddress, + nodeDescription, connectionIDGenerator: self.connectionIDGenerator, connectionFactory: self.connectionFactory, eventLoopGroup: self.eventLoopGroup, diff --git a/Sources/Valkey/Cluster/ValkeyNodeConnectionPool.swift b/Sources/Valkey/Node/ValkeyNodeConnectionPool.swift similarity index 100% rename from Sources/Valkey/Cluster/ValkeyNodeConnectionPool.swift rename to Sources/Valkey/Node/ValkeyNodeConnectionPool.swift diff --git a/Sources/Valkey/ValkeyClient.swift b/Sources/Valkey/ValkeyClient.swift index 54eb9d0d..6bbc4abf 100644 --- a/Sources/Valkey/ValkeyClient.swift +++ b/Sources/Valkey/ValkeyClient.swift @@ -82,7 +82,7 @@ public final class ValkeyClient: Sendable { self.eventLoopGroup = eventLoopGroup self.logger = logger self.runningAtomic = .init(false) - self.node = self.nodeClientFactory.makeConnectionPool(serverAddress: address) + self.node = self.nodeClientFactory.makeConnectionPool(nodeDescription: address) (self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self) self.queueAction(.runNodeClient(self.node)) } From 3a59e06216dd1df052d94f2719198cedff176efa Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 8 Oct 2025 09:31:59 +0100 Subject: [PATCH 3/3] Remove ConnectionPool typealias Signed-off-by: Adam Fowler --- Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift | 3 --- Sources/Valkey/Node/ValkeyNodeClientFactory.swift | 3 --- 2 files changed, 6 deletions(-) diff --git a/Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift b/Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift index 36451e16..fec25963 100644 --- a/Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift +++ b/Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift @@ -15,9 +15,6 @@ import NIOCore @available(valkeySwift 1.0, *) @usableFromInline package struct ValkeyClusterNodeClientFactory: ValkeyNodeConnectionPoolFactory { - @usableFromInline - package typealias ConnectionPool = ValkeyNodeClient - var logger: Logger var configuration: ValkeyClientConfiguration var eventLoopGroup: any EventLoopGroup diff --git a/Sources/Valkey/Node/ValkeyNodeClientFactory.swift b/Sources/Valkey/Node/ValkeyNodeClientFactory.swift index f584e847..07a78eae 100644 --- a/Sources/Valkey/Node/ValkeyNodeClientFactory.swift +++ b/Sources/Valkey/Node/ValkeyNodeClientFactory.swift @@ -15,9 +15,6 @@ import NIOCore @available(valkeySwift 1.0, *) @usableFromInline package struct ValkeyNodeClientFactory: ValkeyNodeConnectionPoolFactory { - @usableFromInline - package typealias ConnectionPool = ValkeyNodeClient - var logger: Logger var configuration: ValkeyClientConfiguration var eventLoopGroup: any EventLoopGroup