@@ -6,6 +6,11 @@ const LatencyMonitor = require('latency-monitor').default
66const debug = require ( 'debug' ) ( 'libp2p:connection-manager' )
77const retimer = require ( 'retimer' )
88
9+ const { EventEmitter } = require ( 'events' )
10+
11+ const PeerId = require ( 'peer-id' )
12+ const { Connection } = require ( 'libp2p-interfaces/src/connection' )
13+
914const {
1015 ERR_INVALID_PARAMETERS
1116} = require ( '../errors' )
@@ -22,7 +27,12 @@ const defaultOptions = {
2227 defaultPeerValue : 1
2328}
2429
25- class ConnectionManager {
30+ /**
31+ * Responsible for managing known connections.
32+ * @fires ConnectionManager#peer:connect Emitted when a new peer is connected.
33+ * @fires ConnectionManager#peer:disconnect Emitted when a known peer supports a different set of protocols.
34+ */
35+ class ConnectionManager extends EventEmitter {
2636 /**
2737 * @constructor
2838 * @param {Libp2p } libp2p
@@ -38,30 +48,50 @@ class ConnectionManager {
3848 * @param {Number } options.defaultPeerValue The value of the peer. Default=1
3949 */
4050 constructor ( libp2p , options ) {
51+ super ( )
52+
4153 this . _libp2p = libp2p
42- this . _registrar = libp2p . registrar
4354 this . _peerId = libp2p . peerId . toB58String ( )
55+
4456 this . _options = mergeOptions . call ( { ignoreUndefined : true } , defaultOptions , options )
4557 if ( this . _options . maxConnections < this . _options . minConnections ) {
4658 throw errcode ( new Error ( 'Connection Manager maxConnections must be greater than minConnections' ) , ERR_INVALID_PARAMETERS )
4759 }
4860
4961 debug ( 'options: %j' , this . _options )
5062
51- this . _metrics = libp2p . metrics
63+ this . _libp2p = libp2p
5264
65+ /**
66+ * Map of peer identifiers to their peer value for pruning connections.
67+ * @type {Map<string, number> }
68+ */
5369 this . _peerValues = new Map ( )
54- this . _connections = new Map ( )
70+
71+ /**
72+ * Map of connections per peer
73+ * @type {Map<string, Array<conn>> }
74+ */
75+ this . connections = new Map ( )
76+
5577 this . _timer = null
5678 this . _checkMetrics = this . _checkMetrics . bind ( this )
5779 }
5880
81+ /**
82+ * Get current number of open connections.
83+ */
84+ get size ( ) {
85+ return Array . from ( this . connections . values ( ) )
86+ . reduce ( ( accumulator , value ) => accumulator + value . length , 0 )
87+ }
88+
5989 /**
6090 * Starts the Connection Manager. If Metrics are not enabled on libp2p
6191 * only event loop and connection limits will be monitored.
6292 */
6393 start ( ) {
64- if ( this . _metrics ) {
94+ if ( this . _libp2p . metrics ) {
6595 this . _timer = this . _timer || retimer ( this . _checkMetrics , this . _options . pollInterval )
6696 }
6797
@@ -77,13 +107,33 @@ class ConnectionManager {
77107
78108 /**
79109 * Stops the Connection Manager
110+ * @async
80111 */
81- stop ( ) {
112+ async stop ( ) {
82113 this . _timer && this . _timer . clear ( )
83114 this . _latencyMonitor && this . _latencyMonitor . removeListener ( 'data' , this . _onLatencyMeasure )
115+
116+ await this . _close ( )
84117 debug ( 'stopped' )
85118 }
86119
120+ /**
121+ * Cleans up the connections
122+ * @async
123+ */
124+ async _close ( ) {
125+ // Close all connections we're tracking
126+ const tasks = [ ]
127+ for ( const connectionList of this . connections . values ( ) ) {
128+ for ( const connection of connectionList ) {
129+ tasks . push ( connection . close ( ) )
130+ }
131+ }
132+
133+ await tasks
134+ this . connections . clear ( )
135+ }
136+
87137 /**
88138 * Sets the value of the given peer. Peers with lower values
89139 * will be disconnected first.
@@ -106,7 +156,7 @@ class ConnectionManager {
106156 * @private
107157 */
108158 _checkMetrics ( ) {
109- const movingAverages = this . _metrics . global . movingAverages
159+ const movingAverages = this . _libp2p . metrics . global . movingAverages
110160 const received = movingAverages . dataReceived [ this . _options . movingAverageInterval ] . movingAverage ( )
111161 this . _checkLimit ( 'maxReceivedData' , received )
112162 const sent = movingAverages . dataSent [ this . _options . movingAverageInterval ] . movingAverage ( )
@@ -122,21 +172,65 @@ class ConnectionManager {
122172 * @param {Connection } connection
123173 */
124174 onConnect ( connection ) {
125- const peerId = connection . remotePeer . toB58String ( )
126- this . _connections . set ( connection . id , connection )
127- if ( ! this . _peerValues . has ( peerId ) ) {
128- this . _peerValues . set ( peerId , this . _options . defaultPeerValue )
175+ if ( ! Connection . isConnection ( connection ) ) {
176+ throw errcode ( new Error ( 'conn must be an instance of interface-connection' ) , ERR_INVALID_PARAMETERS )
129177 }
130- this . _checkLimit ( 'maxConnections' , this . _connections . size )
178+
179+ const peerId = connection . remotePeer
180+ const peerIdStr = peerId . toB58String ( )
181+ const storedConn = this . connections . get ( peerIdStr )
182+
183+ if ( storedConn ) {
184+ storedConn . push ( connection )
185+ } else {
186+ this . connections . set ( peerIdStr , [ connection ] )
187+ this . emit ( 'peer:connect' , peerId )
188+ }
189+
190+ if ( ! this . _peerValues . has ( peerIdStr ) ) {
191+ this . _peerValues . set ( peerIdStr , this . _options . defaultPeerValue )
192+ }
193+
194+ this . _checkLimit ( 'maxConnections' , this . size )
131195 }
132196
133197 /**
134198 * Removes the connection from tracking
135199 * @param {Connection } connection
136200 */
137201 onDisconnect ( connection ) {
138- this . _connections . delete ( connection . id )
139- this . _peerValues . delete ( connection . remotePeer . toB58String ( ) )
202+ const peerId = connection . remotePeer
203+ const peerIdStr = peerId . toB58String ( )
204+ let storedConn = this . connections . get ( peerIdStr )
205+
206+ if ( storedConn && storedConn . length > 1 ) {
207+ storedConn = storedConn . filter ( ( conn ) => conn . id !== connection . id )
208+ this . connections . set ( peerIdStr , storedConn )
209+ } else if ( storedConn ) {
210+ this . connections . delete ( peerIdStr )
211+ this . _peerValues . delete ( connection . remotePeer . toB58String ( ) )
212+ this . emit ( 'peer:disconnect' , peerId )
213+ }
214+ }
215+
216+ /**
217+ * Get a connection with a peer.
218+ * @param {PeerId } peerId
219+ * @returns {Connection }
220+ */
221+ get ( peerId ) {
222+ if ( ! PeerId . isPeerId ( peerId ) ) {
223+ throw errcode ( new Error ( 'peerId must be an instance of peer-id' ) , ERR_INVALID_PARAMETERS )
224+ }
225+
226+ const id = peerId . toB58String ( )
227+ const connections = this . connections . get ( id )
228+
229+ // Return the first, open connection
230+ if ( connections ) {
231+ return connections . find ( connection => connection . stat . status === 'open' )
232+ }
233+ return null
140234 }
141235
142236 /**
@@ -169,17 +263,17 @@ class ConnectionManager {
169263 * @private
170264 */
171265 _maybeDisconnectOne ( ) {
172- if ( this . _options . minConnections < this . _connections . size ) {
266+ if ( this . _options . minConnections < this . connections . size ) {
173267 const peerValues = Array . from ( this . _peerValues ) . sort ( byPeerValue )
174268 debug ( '%s: sorted peer values: %j' , this . _peerId , peerValues )
175269 const disconnectPeer = peerValues [ 0 ]
176270 if ( disconnectPeer ) {
177271 const peerId = disconnectPeer [ 0 ]
178272 debug ( '%s: lowest value peer is %s' , this . _peerId , peerId )
179273 debug ( '%s: closing a connection to %j' , this . _peerId , peerId )
180- for ( const connection of this . _connections . values ( ) ) {
181- if ( connection . remotePeer . toB58String ( ) === peerId ) {
182- connection . close ( )
274+ for ( const connections of this . connections . values ( ) ) {
275+ if ( connections [ 0 ] . remotePeer . toB58String ( ) === peerId ) {
276+ connections [ 0 ] . close ( )
183277 break
184278 }
185279 }
0 commit comments