Skip to content

Commit 4f242f4

Browse files
authored
fix: keepalive issues (#1855)
* fix: keepalive issues * fix: typo * fix: add missing shift * fix: avoid race conditions * fix: improve tests
1 parent b207984 commit 4f242f4

File tree

4 files changed

+133
-65
lines changed

4 files changed

+133
-65
lines changed

example.ts

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
1-
import mqtt from '.'
1+
import mqtt from './src/index'
22

3-
const client = mqtt.connect('mqtt://test.mosquitto.org', {
3+
const client = mqtt.connect('mqtts://test.mosquitto.org', {
44
keepalive: 10,
5+
port: 8883,
56
reconnectPeriod: 15000,
7+
rejectUnauthorized: false,
68
})
79

8-
const testTopic = 'presence'
10+
const randomNumber = Math.floor(Math.random() * 1000)
11+
12+
const testTopic = `presence_${randomNumber.toString()}`
913

1014
function publish() {
11-
client.publish(
12-
testTopic,
13-
`Hello mqtt ${new Date().toISOString()}`,
14-
(err2) => {
15-
if (!err2) {
16-
console.log('message published')
17-
} else {
18-
console.error(err2)
19-
}
20-
},
21-
)
15+
const msg = `Hello mqtt ${new Date().toISOString()}`
16+
client.publish(testTopic, msg, { qos: 1 }, (err2) => {
17+
if (!err2) {
18+
console.log('message published')
19+
} else {
20+
console.error(err2)
21+
}
22+
})
2223
}
2324

2425
client.subscribe(testTopic, (err) => {
@@ -31,11 +32,12 @@ client.subscribe(testTopic, (err) => {
3132

3233
client.on('message', (topic, message) => {
3334
console.log('received message "%s" from topic "%s"', message, topic)
34-
setTimeout(() => {
35-
publish()
36-
}, 2000)
3735
})
3836

37+
setInterval(() => {
38+
publish()
39+
}, 2000)
40+
3941
client.on('error', (err) => {
4042
console.error(err)
4143
})

src/lib/client.ts

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,6 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
424424

425425
public messageIdProvider: IMessageIdProvider
426426

427-
public pingResp: boolean
428-
429427
public outgoing: Record<
430428
number,
431429
{ volatile: boolean; cb: (err: Error, packet?: Packet) => void }
@@ -435,6 +433,9 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
435433

436434
public noop: (error?: any) => void
437435

436+
/** Timestamp of last received control packet */
437+
public pingResp: number
438+
438439
public pingTimer: PingTimer
439440

440441
/**
@@ -659,11 +660,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
659660
this.log('close :: clearing connackTimer')
660661
clearTimeout(this.connackTimer)
661662

662-
this.log('close :: destroy ping timer')
663-
if (this.pingTimer) {
664-
this.pingTimer.destroy()
665-
this.pingTimer = null
666-
}
663+
this._destroyPingTimer()
667664

668665
if (this.topicAliasRecv) {
669666
this.topicAliasRecv.clear()
@@ -722,6 +719,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
722719
public connect() {
723720
const writable = new Writable()
724721
const parser = mqttPacket.parser(this.options)
722+
725723
let completeParse = null
726724
const packets = []
727725

@@ -1782,11 +1780,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
17821780
this._setupReconnect()
17831781
}
17841782

1785-
if (this.pingTimer) {
1786-
this.log('_cleanUp :: destroy pingTimer')
1787-
this.pingTimer.destroy()
1788-
this.pingTimer = null
1789-
}
1783+
this._destroyPingTimer()
17901784

17911785
if (done && !this.connected) {
17921786
this.log(
@@ -1924,9 +1918,6 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
19241918

19251919
this.emit('packetsend', packet)
19261920

1927-
// When writing a packet, reschedule the ping timer
1928-
this._shiftPingInterval()
1929-
19301921
this.log('_writePacket :: writing to stream')
19311922
const result = mqttPacket.writeToStream(
19321923
packet,
@@ -2084,18 +2075,27 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
20842075
)
20852076

20862077
if (!this.pingTimer && this.options.keepalive) {
2087-
this.pingResp = true
20882078
this.pingTimer = new PingTimer(
20892079
this.options.keepalive,
20902080
() => {
20912081
this._checkPing()
20922082
},
20932083
this.options.timerVariant,
20942084
)
2085+
this.pingResp = Date.now()
2086+
}
2087+
}
2088+
2089+
private _destroyPingTimer() {
2090+
if (this.pingTimer) {
2091+
this.log('_destroyPingTimer :: destroying ping timer')
2092+
this.pingTimer.destroy()
2093+
this.pingTimer = null
20952094
}
20962095
}
20972096

20982097
/**
2098+
20992099
* _shiftPingInterval - reschedule the ping interval
21002100
*
21012101
* @api private
@@ -2106,23 +2106,30 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
21062106
this.options.keepalive &&
21072107
this.options.reschedulePings
21082108
) {
2109-
this.pingTimer.reschedule()
2109+
this._reschedulePing()
21102110
}
21112111
}
21122112

2113+
/**
2114+
* Mostly needed for test purposes
2115+
*/
2116+
private _reschedulePing() {
2117+
this.log('_reschedulePing :: rescheduling ping')
2118+
this.pingTimer.reschedule()
2119+
}
2120+
21132121
/**
21142122
* _checkPing - check if a pingresp has come back, and ping the server again
21152123
*
21162124
* @api private
21172125
*/
21182126
private _checkPing() {
21192127
this.log('_checkPing :: checking ping...')
2120-
if (this.pingResp) {
2121-
this.log(
2122-
'_checkPing :: ping response received. Clearing flag and sending `pingreq`',
2123-
)
2124-
this.pingResp = false
2125-
this._sendPacket({ cmd: 'pingreq' })
2128+
// give 100ms offset to avoid ping timeout when receiving fast responses
2129+
const timeSincePing = Date.now() - this.pingResp - 100
2130+
if (timeSincePing <= this.options.keepalive * 1000) {
2131+
this.log('_checkPing :: ping response received in time')
2132+
this._sendPing()
21262133
} else {
21272134
// do a forced cleanup since socket will be in bad shape
21282135
this.emit('error', new Error('Keepalive timeout'))
@@ -2131,6 +2138,11 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
21312138
}
21322139
}
21332140

2141+
private _sendPing() {
2142+
this.log('_sendPing :: sending pingreq')
2143+
this._sendPacket({ cmd: 'pingreq' })
2144+
}
2145+
21342146
/**
21352147
* _resubscribe
21362148
* @api private

src/lib/handlers/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ const handle: PacketHandler = (client, packet, done) => {
2121
})
2222
return client
2323
}
24+
25+
// keep track of last time we received a packet (for keepalive mechanism)
26+
client.pingResp = Date.now()
27+
28+
// do not shift on pingresp otherwise we would skip the pingreq sending
29+
if (packet.cmd !== 'pingresp') {
30+
client['_shiftPingInterval']()
31+
}
32+
2433
client.log('_handlePacket :: emitting packetreceive')
2534
client.emit('packetreceive', packet)
2635

@@ -49,7 +58,6 @@ const handle: PacketHandler = (client, packet, done) => {
4958
break
5059
case 'pingresp':
5160
// this will be checked in _checkPing client method every keepalive interval
52-
client.pingResp = true
5361
done()
5462
break
5563
case 'disconnect':

test/abstract_client.ts

Lines changed: 70 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Testing dependencies
33
*/
44
import { assert } from 'chai'
5-
import sinon from 'sinon'
5+
import sinon, { SinonSpy } from 'sinon'
66
import fs from 'fs'
77
import levelStore from 'mqtt-level-store'
88
import Store from '../src/lib/store'
@@ -93,11 +93,13 @@ export default function abstractTest(server, config, ports) {
9393

9494
client.once('close', () => {
9595
assert.notExists(client.pingTimer)
96+
9697
client.end(true, (err) => done(err))
9798
})
9899

99100
client.once('connect', () => {
100101
assert.exists(client.pingTimer)
102+
101103
client.stream.end()
102104
})
103105
})
@@ -1980,6 +1982,12 @@ export default function abstractTest(server, config, ports) {
19801982
const spy = sinon.spy()
19811983
client['_checkPing'] = spy
19821984

1985+
client.on('error', (err) => {
1986+
client.end(true, () => {
1987+
done(err)
1988+
})
1989+
})
1990+
19831991
client.once('connect', () => {
19841992
clock.tick(interval * 1000)
19851993
assert.strictEqual(spy.callCount, 1)
@@ -1994,7 +2002,7 @@ export default function abstractTest(server, config, ports) {
19942002
})
19952003
})
19962004

1997-
it('should not checkPing if publishing at a higher rate than keepalive', function _test(t, done) {
2005+
it('should not shift ping on publish', function _test(t, done) {
19982006
const intervalMs = 3000
19992007
const client = connect({ keepalive: intervalMs / 1000 })
20002008

@@ -2003,35 +2011,70 @@ export default function abstractTest(server, config, ports) {
20032011

20042012
client.once('connect', () => {
20052013
client.publish('foo', 'bar')
2006-
clock.tick(intervalMs - 1)
2014+
clock.tick(intervalMs)
20072015
client.publish('foo', 'bar')
2008-
clock.tick(2)
2016+
clock.tick(intervalMs)
20092017

2010-
assert.strictEqual(spy.callCount, 0)
2018+
assert.strictEqual(spy.callCount, 2)
20112019
client.end(true, done)
20122020
})
20132021
})
20142022

2015-
it('should checkPing if publishing at a higher rate than keepalive and reschedulePings===false', function _test(t, done) {
2016-
const intervalMs = 3000
2017-
const client = connect({
2018-
keepalive: intervalMs / 1000,
2019-
reschedulePings: false,
2020-
})
2023+
const checkPing = (reschedulePings: boolean) => {
2024+
it(`should checkPing if publishing at a higher rate than keepalive and reschedulePings===${reschedulePings}`, function _test(t, done) {
2025+
const intervalMs = 3000
2026+
const client = connect({
2027+
keepalive: intervalMs / 1000,
2028+
reschedulePings,
2029+
})
20212030

2022-
const spy = sinon.spy()
2023-
client['_checkPing'] = spy
2031+
const spyReschedule = sinon.spy(
2032+
client,
2033+
'_reschedulePing' as any,
2034+
)
20242035

2025-
client.once('connect', () => {
2026-
client.publish('foo', 'bar')
2027-
clock.tick(intervalMs - 1)
2028-
client.publish('foo', 'bar')
2029-
clock.tick(2)
2036+
let received = 0
20302037

2031-
assert.strictEqual(spy.callCount, 1)
2032-
client.end(true, done)
2038+
client.on('packetreceive', (packet) => {
2039+
if (packet.cmd === 'puback') {
2040+
clock.tick(intervalMs)
2041+
2042+
received++
2043+
2044+
if (reschedulePings) {
2045+
assert.strictEqual(
2046+
spyReschedule.callCount,
2047+
received,
2048+
)
2049+
} else {
2050+
assert.strictEqual(spyReschedule.callCount, 0)
2051+
}
2052+
2053+
if (received === 2) {
2054+
client.end(true, done)
2055+
}
2056+
}
2057+
})
2058+
2059+
server.once('client', (serverClient) => {
2060+
serverClient.on('publish', () => {
2061+
// needed to trigger the setImmediate inside server publish listener and send suback
2062+
clock.tick(1)
2063+
})
2064+
})
2065+
2066+
client.once('connect', () => {
2067+
// reset call count (it's called also on connack)
2068+
spyReschedule.resetHistory()
2069+
// use qos1 so the puback is received (to reschedule ping)
2070+
client.publish('foo', 'bar', { qos: 1 })
2071+
client.publish('foo', 'bar', { qos: 1 })
2072+
})
20332073
})
2034-
})
2074+
}
2075+
2076+
checkPing(true)
2077+
checkPing(false)
20352078
})
20362079

20372080
describe('pinging', () => {
@@ -2067,13 +2110,16 @@ export default function abstractTest(server, config, ports) {
20672110
}
20682111
})
20692112

2070-
let client = connect({
2113+
const options: IClientOptions = {
20712114
keepalive: 60,
20722115
reconnectPeriod: 5000,
2073-
})
2116+
}
2117+
2118+
let client = connect()
20742119

20752120
client.once('connect', () => {
2076-
client.pingResp = false
2121+
// when using fake timers Date.now() counts from 0: https://sinonjs.org/releases/latest/fake-timers/
2122+
client.pingResp = -options.keepalive * 1000
20772123

20782124
client.once('error', (err) => {
20792125
assert.equal(err.message, 'Keepalive timeout')

0 commit comments

Comments
 (0)