Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/bolt-connection/src/bolt/response-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export default class ResponseHandler {
this._transformMetadata = transformMetadata || NO_OP_IDENTITY
this._observer = Object.assign(
{
onPendingObserversChange: NO_OP,
onError: NO_OP,
onFailure: NO_OP,
onErrorApplyTransformation: NO_OP_IDENTITY
Expand Down Expand Up @@ -156,6 +157,7 @@ export default class ResponseHandler {
*/
_updateCurrentObserver () {
this._currentObserver = this._pendingObservers.shift()
this._observer.onPendingObserversChange(this._pendingObservers.length)
}

_queueObserver (observer) {
Expand All @@ -168,6 +170,7 @@ export default class ResponseHandler {
} else {
this._pendingObservers.push(observer)
}
this._observer.onPendingObserversChange(this._pendingObservers.length)
return true
}

Expand Down
12 changes: 12 additions & 0 deletions packages/bolt-connection/src/channel/browser/browser-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ export default class WebSocketChannel {
*/
setupReceiveTimeout (receiveTimeout) {}

/**
* Stops the receive timeout for the channel.
*/
stopReceiveTimeout() {
}

/**
* Start the receive timeout for the channel.
*/
startReceiveTimeout () {
}

/**
* Set connection timeout on the given WebSocket, if configured.
* @return {number} the timeout id or null.
Expand Down
24 changes: 23 additions & 1 deletion packages/bolt-connection/src/channel/node/node-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ export default class NodeChannel {
this
)
this._connectionErrorCode = config.connectionErrorCode
this._receiveTimeout = null
this._receiveTimeoutStarted = false

this._conn = connect(
config,
Expand Down Expand Up @@ -353,7 +355,27 @@ export default class NodeChannel {
)
})

this._conn.setTimeout(receiveTimeout)
this._receiveTimeout = receiveTimeout
}

/**
* Stops the receive timeout for the channel.
*/
stopReceiveTimeout() {
if (this._receiveTimeout !== null && this._receiveTimeoutStarted) {
this._receiveTimeoutStarted = false
this._conn.setTimeout(0)
}
}

/**
* Start the receive timeout for the channel.
*/
startReceiveTimeout () {
if (this._receiveTimeout !== null && !this._receiveTimeoutStarted) {
this._receiveTimeoutStarted = true
this._conn.setTimeout(this._receiveTimeout)
}
}

/**
Expand Down
13 changes: 13 additions & 0 deletions packages/bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export function createChannelConnection (
server: conn.server,
log: conn.logger,
observer: {
onPendingObserversChange: conn._handleOngoingRequestsNumberChange.bind(conn),
onError: conn._handleFatalError.bind(conn),
onFailure: conn._resetOnFailure.bind(conn),
onProtocolError: conn._handleProtocolError.bind(conn),
Expand Down Expand Up @@ -350,6 +351,18 @@ export default class ChannelConnection extends Connection {
return !this._isBroken && this._ch._open
}

/**
* Starts and stops the receive timeout timer.
* @param {number} requestsNumber Ongoing requests number
*/
_handleOngoingRequestsNumberChange(requestsNumber) {
if (requestsNumber === 0) {
this._ch.stopReceiveTimeout()
} else {
this._ch.startReceiveTimeout()
}
}

/**
* Call close on the channel.
* @returns {Promise<void>} - A promise that will be resolved when the underlying channel is closed.
Expand Down
120 changes: 118 additions & 2 deletions packages/bolt-connection/test/channel/node/node-channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ describe('NodeChannel', () => {
})

describe('.setupReceiveTimeout()', () => {
it('should call socket.setTimeout(receiveTimeout)', () => {
it('should call not call socket.setTimeout(receiveTimeout)', () => {
const receiveTimeout = 42
const channel = createMockedChannel(true)

channel.setupReceiveTimeout(receiveTimeout)

expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])
expect(channel._conn.getCalls().setTimeout.length).toEqual(1)
})

it('should unsubscribe to the on connect and on timeout created on the create socket', () => {
Expand Down Expand Up @@ -108,6 +108,122 @@ describe('NodeChannel', () => {
expect(channel._conn.getCalls().off).toEqual([])
})
})

describe('.startReceiveTimeout()', () => {
describe('receive timeout is setup', () => {
it('should call socket.setTimeout(receiveTimeout) when called first', () => {
const { receiveTimeout, channel } = setup()

channel.startReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(2)
expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])
})

it ('should call not socket.setTimeout(receiveTimeout) if stream already started', () => {
const { receiveTimeout, channel } = setup()

// setup
channel.startReceiveTimeout()
expect(channel._conn.getCalls().setTimeout.length).toEqual(2)
expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])

// start again
channel.startReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(2)
expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])
})

it ('should call socket.setTimeout(receiveTimeout) when after stop', () => {
const { receiveTimeout, channel } = setup()

// setup
channel.startReceiveTimeout()
expect(channel._conn.getCalls().setTimeout.length).toEqual(2)
expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])
channel.stopReceiveTimeout()
expect(channel._conn.getCalls().setTimeout.length).toEqual(3)
expect(channel._conn.getCalls().setTimeout[2]).toEqual([0])

// start again
channel.startReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(4)
expect(channel._conn.getCalls().setTimeout[3]).toEqual([receiveTimeout])
})

function setup () {
const channel = createMockedChannel(true)
const receiveTimeout = 42
channel.setupReceiveTimeout(receiveTimeout)
return {channel, receiveTimeout}
}
})

describe('receive timemout is not setup', () => {
it ('should call not socket.setTimeout(receiveTimeout) when not started', () => {
const channel = createMockedChannel(true)

// start again
channel.startReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(1)
})
})
})

describe('.stopReceiveTimeout()', () => {
describe('when receive timeout is setup', () => {
it ('should not call socket.setTimeout(0) when not started', () => {
const { channel } = setup()

channel.stopReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(1)
})

it ('should call socket.setTimeout(0) when already started', () => {
const { channel } = setup()

channel.startReceiveTimeout()

channel.stopReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(3)
expect(channel._conn.getCalls().setTimeout[2]).toEqual([0])
})

it ('should not call socket.setTimeout(0) when already stopped', () => {
const { channel } = setup()

channel.startReceiveTimeout()
channel.stopReceiveTimeout()

channel.stopReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(3)
})

function setup () {
const channel = createMockedChannel(true)
const receiveTimeout = 42
channel.setupReceiveTimeout(receiveTimeout)
return {channel, receiveTimeout}
}
})

describe('when receive timeout is not setup', () => {
it ('should not call socket.setTimeout(0)', () => {
const channel = createMockedChannel(true)

channel.startReceiveTimeout()
channel.stopReceiveTimeout()

expect(channel._conn.getCalls().setTimeout.length).toEqual(1)
})
})
})
})

function createMockedChannel (connected, config = {}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,60 @@ describe('ChannelConnection', () => {
})
})

describe('.__handleOngoingRequestsNumberChange()', () => {
it('should call channel.stopReceiveTimeout when requets number equals to 0', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined })

connection._handleOngoingRequestsNumberChange(0)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1)
})

it('should not call channel.startReceiveTimeout when requets number equals to 0', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined })

connection._handleOngoingRequestsNumberChange(0)

expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0)
})

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should call channel.startReceiveTimeout when requets number equals to %d', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined })

connection._handleOngoingRequestsNumberChange(requests)

expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(1)
})

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should not call channel.stopReceiveTimeout when requets number equals to %d', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => undefined })

connection._handleOngoingRequestsNumberChange(requests)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
})
})

function spyOnConnectionChannel ({
channel,
errorHandler,
Expand Down