Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/Valkey/Cluster/ValkeyClusterClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public final class ValkeyClusterClient: Sendable {
@usableFromInline
typealias StateMachine = ValkeyClusterClientStateMachine<
ValkeyNodeClient,
ValkeyNodeClientFactory,
ValkeyClusterNodeClientFactory,
ContinuousClock,
CheckedContinuation<Void, any Error>,
AsyncStream<Void>.Continuation
Expand Down Expand Up @@ -106,7 +106,7 @@ public final class ValkeyClusterClient: Sendable {

(self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self)

let factory = ValkeyNodeClientFactory(
let factory = ValkeyClusterNodeClientFactory(
logger: logger,
configuration: clientConfiguration,
connectionFactory: ValkeyConnectionFactory(
Expand Down
7 changes: 6 additions & 1 deletion Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ package struct ValkeyClusterClientStateMachine<
Clock: _Concurrency.Clock,
SuccessNotifier,
TimerCancellationToken: Sendable
> where ConnectionPool == ConnectionPoolFactory.ConnectionPool, Clock.Duration == Duration {
>
where
ConnectionPool == ConnectionPoolFactory.ConnectionPool,
Clock.Duration == Duration,
ConnectionPoolFactory.NodeDescription == ValkeyNodeDescription
{

@usableFromInline
struct Timer {
Expand Down
57 changes: 57 additions & 0 deletions Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//
// This source file is part of the valkey-swift project
// Copyright (c) 2025 the valkey-swift project authors
//
// See LICENSE.txt for license information
// SPDX-License-Identifier: Apache-2.0
//
import Logging
import NIOCore

/// A factory for creating ``ValkeyNode`` instances to connect to specific nodes.
///
/// This factory is used by the ``ValkeyClusterClient`` to create client instances
/// for each node in the cluster as needed.
@available(valkeySwift 1.0, *)
@usableFromInline
package struct ValkeyClusterNodeClientFactory: ValkeyNodeConnectionPoolFactory {
var logger: Logger
var configuration: ValkeyClientConfiguration
var eventLoopGroup: any EventLoopGroup
let connectionIDGenerator = ConnectionIDGenerator()
let connectionFactory: ValkeyConnectionFactory

/// Creates a new `ValkeyClusterNodeClientFactory` instance.
///
/// - Parameters:
/// - logger: The logger used for diagnostic information.
/// - configuration: Configuration for the Valkey clients created by this factory.
/// - eventLoopGroup: The event loop group to use for client connections.
package init(
logger: Logger,
configuration: ValkeyClientConfiguration,
connectionFactory: ValkeyConnectionFactory,
eventLoopGroup: any EventLoopGroup
) {
self.logger = logger
self.configuration = configuration
self.connectionFactory = connectionFactory
self.eventLoopGroup = eventLoopGroup
}

/// Creates a connection pool (client) for a specific node in the cluster.
///
/// - Parameter nodeDescription: Description of the node to connect to.
/// - Returns: A configured `ValkeyNode` instance ready to connect to the specified node.
@usableFromInline
package func makeConnectionPool(nodeDescription: ValkeyNodeDescription) -> ValkeyNodeClient {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you intend to return ConnectionPool here ?
package typealias ConnectionPool = ValkeyNodeClient is defined, but ConnectionPool is not being used in the file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type conforms to the protocol ValkeyNodeConnectionPoolFactory. This protocol has an associatedtype ConnectionPool. When you create a concrete type that conforms to a protocol with an asssociatedtype you need to indicate what the associated type is. In this case ConnectionPool is ValkeyNodeClient.

You can do this by defining the protocol requirements using that type or add a typealias. We have ended up doing both here. There is no harm in this but yeah I guess it does look a little weird.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the typealias because it isn't necessary

let address = ValkeyServerAddress.hostname(nodeDescription.endpoint, port: nodeDescription.port)
return ValkeyNodeClient(
address,
connectionIDGenerator: self.connectionIDGenerator,
connectionFactory: self.connectionFactory,
eventLoopGroup: self.eventLoopGroup,
logger: self.logger
)
}
}
27 changes: 2 additions & 25 deletions Sources/Valkey/Node/ValkeyNodeClientFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import NIOCore
@available(valkeySwift 1.0, *)
@usableFromInline
package struct ValkeyNodeClientFactory: ValkeyNodeConnectionPoolFactory {
@usableFromInline
package typealias ConnectionPool = ValkeyNodeClient

var logger: Logger
var configuration: ValkeyClientConfiguration
var eventLoopGroup: any EventLoopGroup
Expand Down Expand Up @@ -47,29 +44,9 @@ package struct ValkeyNodeClientFactory: ValkeyNodeConnectionPoolFactory {
/// - Parameter nodeDescription: Description of the node to connect to.
/// - Returns: A configured `ValkeyNode` instance ready to connect to the specified node.
@usableFromInline
package func makeConnectionPool(nodeDescription: ValkeyNodeDescription) -> ValkeyNodeClient {
let serverAddress = ValkeyServerAddress.hostname(
nodeDescription.endpoint,
port: nodeDescription.port
)

return ValkeyNodeClient(
serverAddress,
connectionIDGenerator: self.connectionIDGenerator,
connectionFactory: self.connectionFactory,
eventLoopGroup: self.eventLoopGroup,
logger: self.logger
)
}

/// Creates a connection pool (client) for a specific node in the cluster.
///
/// - Parameter nodeDescription: Description of the node to connect to.
/// - Returns: A configured `ValkeyNode` instance ready to connect to the specified node.
@usableFromInline
package func makeConnectionPool(serverAddress: ValkeyServerAddress) -> ValkeyNodeClient {
package func makeConnectionPool(nodeDescription: ValkeyServerAddress) -> ValkeyNodeClient {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you intend to return ConnectionPool here ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

ValkeyNodeClient(
serverAddress,
nodeDescription,
connectionIDGenerator: self.connectionIDGenerator,
connectionFactory: self.connectionFactory,
eventLoopGroup: self.eventLoopGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ package protocol ValkeyNodeConnectionPool: AnyObject, Sendable {
@available(valkeySwift 1.0, *)
package protocol ValkeyNodeConnectionPoolFactory: Sendable {
associatedtype ConnectionPool: ValkeyNodeConnectionPool
associatedtype NodeDescription

/// Create a shard connection pool based on the provided configuration
func makeConnectionPool(
nodeDescription: ValkeyNodeDescription
nodeDescription: NodeDescription
) -> ConnectionPool
}
33 changes: 12 additions & 21 deletions Sources/Valkey/Node/ValkeyRunningClients.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@
struct ValkeyRunningClientsStateMachine<
ConnectionPool: Sendable,
ConnectionPoolFactory: ValkeyNodeConnectionPoolFactory
> where ConnectionPoolFactory.ConnectionPool == ConnectionPool {
> where ConnectionPoolFactory.ConnectionPool == ConnectionPool, ConnectionPoolFactory.NodeDescription: Sendable & Identifiable {
@usableFromInline typealias NodeDescription = ConnectionPoolFactory.NodeDescription
@usableFromInline
/* private */ struct NodeBundle: Sendable {
@usableFromInline
var nodeID: ValkeyNodeID { self.nodeDescription.id }
var nodeID: NodeDescription.ID { self.nodeDescription.id }
@usableFromInline
var pool: ConnectionPool
@usableFromInline
var nodeDescription: ValkeyNodeDescription
var nodeDescription: NodeDescription
}
let poolFactory: ConnectionPoolFactory
@usableFromInline
var clientMap: [ValkeyNodeID: NodeBundle]
var clientMap: [NodeDescription.ID: NodeBundle]
@inlinable
var clients: some Collection<NodeBundle> { clientMap.values }

Expand All @@ -33,35 +34,25 @@ struct ValkeyRunningClientsStateMachine<

struct PoolUpdateAction {
var poolsToShutdown: [ConnectionPool]
var poolsToRun: [(ConnectionPool, ValkeyNodeID)]
var poolsToRun: [(ConnectionPool, NodeDescription.ID)]

static func empty() -> PoolUpdateAction { PoolUpdateAction(poolsToShutdown: [], poolsToRun: []) }
}

mutating func updateNodes(
_ newNodes: some Collection<ValkeyNodeDescription>,
_ newNodes: some Collection<NodeDescription>,
removeUnmentionedPools: Bool
) -> PoolUpdateAction {
var previousNodes = self.clientMap
self.clientMap.removeAll(keepingCapacity: true)
var newPools = [(ConnectionPool, ValkeyNodeID)]()
var newPools = [(ConnectionPool, NodeDescription.ID)]()
newPools.reserveCapacity(16)
var poolsToShutdown = [ConnectionPool]()

for newNodeDescription in newNodes {
// if we had a pool previously, let's continue to use it!
if let existingPool = previousNodes.removeValue(forKey: newNodeDescription.id) {
if newNodeDescription == existingPool.nodeDescription {
// the existing pool matches the new node description. nothing todo
self.clientMap[newNodeDescription.id] = existingPool
} else {
// the existing pool does not match new node description. For example tls may now be required.
// shutdown the old pool and create a new one
poolsToShutdown.append(existingPool.pool)
let newPool = self.makePool(for: newNodeDescription)
self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
newPools.append((newPool, newNodeDescription.id))
}
self.clientMap[newNodeDescription.id] = existingPool
} else {
let newPool = self.makePool(for: newNodeDescription)
self.clientMap[newNodeDescription.id] = NodeBundle(pool: newPool, nodeDescription: newNodeDescription)
Expand Down Expand Up @@ -95,7 +86,7 @@ struct ValkeyRunningClientsStateMachine<
}

mutating func addNode(
_ node: ValkeyNodeDescription
_ node: NodeDescription
) -> AddNodeAction {
if let pool = self.clientMap[node.id] {
return .useExistingPool(pool.pool)
Expand All @@ -106,7 +97,7 @@ struct ValkeyRunningClientsStateMachine<
}

@inlinable
subscript(_ index: ValkeyNodeID) -> NodeBundle? {
subscript(_ index: NodeDescription.ID) -> NodeBundle? {
self.clientMap[index]
}

Expand All @@ -115,7 +106,7 @@ struct ValkeyRunningClientsStateMachine<
self.clientMap.removeAll(keepingCapacity: false)
}

func makePool(for description: ValkeyNodeDescription) -> ConnectionPool {
func makePool(for description: NodeDescription) -> ConnectionPool {
self.poolFactory.makeConnectionPool(nodeDescription: description)
}
}
2 changes: 1 addition & 1 deletion Sources/Valkey/ValkeyClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public final class ValkeyClient: Sendable {
self.eventLoopGroup = eventLoopGroup
self.logger = logger
self.runningAtomic = .init(false)
self.node = self.nodeClientFactory.makeConnectionPool(serverAddress: address)
self.node = self.nodeClientFactory.makeConnectionPool(nodeDescription: address)
(self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self)
self.queueAction(.runNodeClient(self.node))
}
Expand Down
Loading