Skip to content

Commit

Permalink
stream-management: fix enabling when the next element is not the resp…
Browse files Browse the repository at this point in the history
…onse (#823)
  • Loading branch information
sonnyp authored Feb 15, 2020
1 parent c0548a5 commit 6449ae2
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 39 deletions.
8 changes: 4 additions & 4 deletions packages/stream-management/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ Included and enabled in `@xmpp/client`.

Supports Node.js and browsers.

Does not support requesting acks yet.
When the session is resumed the `online` event is not emitted as session resumption is transparent.
However `entity.status` is set to `online`.
If the session fails to resume, entity will fallback to regular session establishment in which case `online` event will be emitted.

Responds to ack requests and resumes connection uppon disconnect whenever possible.

`online` event is not emitted when the session is resumed as it should be transparent.
Automatically responds to acks but does not support requesting acks yet.
53 changes: 37 additions & 16 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
'use strict'

const xml = require('@xmpp/xml')
const StanzaError = require('@xmpp/middleware/lib/StanzaError')

// https://xmpp.org/extensions/xep-0198.html

const NS = 'urn:xmpp:sm:3'

async function enable(entity, resume, max) {
const response = await entity.sendReceive(
entity.send(
xml('enable', {xmlns: NS, max, resume: resume ? 'true' : undefined})
)

if (!response.is('enabled')) {
throw StanzaError.fromElement(response)
}
return new Promise((resolve, reject) => {
function listener(nonza) {
if (nonza.is('enabled', NS)) {
resolve(nonza)
} else if (nonza.is('failed', NS)) {
reject(nonza)
} else {
return
}

return response
entity.removeListener('nonza', listener)
}

entity.on('nonza', listener)
})
}

async function resume(entity, h, previd) {
const response = await entity.sendReceive(
xml('resume', {xmlns: NS, h, previd})
)

if (!response.is('resumed')) {
throw StanzaError.fromElement(response)
if (!response.is('resumed', NS)) {
throw response
}

return response
Expand All @@ -48,14 +57,13 @@ module.exports = function({streamFeatures, entity, middleware}) {
address = jid
sm.outbound = 0
sm.inbound = 0
sm.enabled = false
})

entity.on('offline', () => {
address = null
sm.outbound = 0
sm.inbound = 0
sm.enabled = false
sm.id = ''
})

middleware.use((context, next) => {
Expand All @@ -73,19 +81,24 @@ module.exports = function({streamFeatures, entity, middleware}) {
return next()
})

// https://xmpp.org/extensions/xep-0198.html#enable
// For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session

streamFeatures.use('sm', NS, async (context, next) => {
// Resuming
if (sm.id && address) {
if (sm.id) {
try {
await resume(entity, sm.inbound, sm.id)
sm.enabled = true
entity.jid = address
entity.status = 'online'
return true
// If resumption fails, continue with session establishment
// eslint-disable-next-line no-unused-vars
} catch (err) {
sm.id = ''
address = null
sm.enabled = false
sm.outbound = 0
}
}

Expand All @@ -99,10 +112,18 @@ module.exports = function({streamFeatures, entity, middleware}) {
// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
sm.outbound = 0

const response = await promiseEnable
try {
const response = await promiseEnable
sm.enabled = true
sm.id = response.attrs.id
sm.max = response.attrs.max
// eslint-disable-next-line no-unused-vars
} catch (err) {
sm.enabled = false
}

sm.inbound = 0
sm.enabled = true
sm.id = response.attrs.id
sm.max = response.attrs.max
})

return sm
}
1 change: 0 additions & 1 deletion packages/stream-management/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"management"
],
"dependencies": {
"@xmpp/middleware": "^0.11.0",
"@xmpp/xml": "^0.11.0"
},
"engines": {
Expand Down
162 changes: 144 additions & 18 deletions packages/stream-management/test.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,169 @@
'use strict'

const test = require('ava')
const {mockClient, promise, timeout} = require('@xmpp/test')
const {mockClient} = require('@xmpp/test')

test('mandatory', async t => {
function tick() {
return new Promise(resolve => process.nextTick(resolve))
}

test('enable - enabled', async t => {
const {entity} = mockClient()

entity.mockInput(
<features xmlns="http://etherx.jabber.org/streams">
<session xmlns="urn:ietf:params:xml:ns:xmpp-session" />
<sm xmlns="urn:xmpp:sm:3" />
</features>
)

entity.scheduleIncomingResult()
entity.streamManagement.outbound = 45

t.deepEqual(
await entity.catchOutgoing(),
<enable xmlns="urn:xmpp:sm:3" resume="true" />
)

t.is(entity.streamManagement.outbound, 0)
t.is(entity.streamManagement.enabled, false)
t.is(entity.streamManagement.id, '')

await entity.catchOutgoingSet().then(child => {
t.deepEqual(child, <session xmlns="urn:ietf:params:xml:ns:xmpp-session" />)
return child
})
entity.mockInput(
<enabled
xmlns="urn:xmpp:sm:3"
id="some-long-sm-id"
location="[2001:41D0:1:A49b::1]:9222"
resume="true"
/>
)

await promise(entity, 'online')
await tick()

t.is(entity.streamManagement.id, 'some-long-sm-id')
t.is(entity.streamManagement.enabled, true)
})

test('optional', async t => {
test('enable - message - enabled', async t => {
const {entity} = mockClient()

entity.mockInput(
<features xmlns="http://etherx.jabber.org/streams">
<session xmlns="urn:ietf:params:xml:ns:xmpp-session">
<optional />
</session>
<sm xmlns="urn:xmpp:sm:3" />
</features>
)

const promiseSend = promise(entity, 'send')
entity.streamManagement.outbound = 45

t.deepEqual(
await entity.catchOutgoing(),
<enable xmlns="urn:xmpp:sm:3" resume="true" />
)

t.is(entity.streamManagement.outbound, 0)
t.is(entity.streamManagement.enabled, false)
t.is(entity.streamManagement.id, '')

entity.mockInput(<message />)

t.is(entity.streamManagement.enabled, false)
t.is(entity.streamManagement.inbound, 1)

entity.mockInput(
<enabled
xmlns="urn:xmpp:sm:3"
id="some-long-sm-id"
location="[2001:41D0:1:A49b::1]:9222"
resume="true"
/>
)

await tick()

t.is(entity.streamManagement.id, 'some-long-sm-id')
t.is(entity.streamManagement.enabled, true)
})

test('enable - failed', async t => {
const {entity} = mockClient()

entity.mockInput(
<features xmlns="http://etherx.jabber.org/streams">
<sm xmlns="urn:xmpp:sm:3" />
</features>
)

entity.streamManagement.outbound = 45

t.deepEqual(
await entity.catchOutgoing(),
<enable xmlns="urn:xmpp:sm:3" resume="true" />
)

t.is(entity.streamManagement.outbound, 0)
entity.streamManagement.enabled = true

entity.mockInput(<failed xmlns="urn:xmpp:sm:3" />)

await tick()

t.is(entity.streamManagement.enabled, false)
})

test('resume - resumed', async t => {
const {entity} = mockClient()

entity.status = 'offline'
entity.streamManagement.id = 'bar'

entity.mockInput(
<features xmlns="http://etherx.jabber.org/streams">
<sm xmlns="urn:xmpp:sm:3" />
</features>
)

entity.streamManagement.outbound = 45

t.deepEqual(
await entity.catchOutgoing(),
<resume xmlns="urn:xmpp:sm:3" previd="bar" h="0" />
)

t.is(entity.streamManagement.enabled, false)

t.is(entity.status, 'offline')

entity.mockInput(<resumed xmlns="urn:xmpp:sm:3" />)

await tick()

t.is(entity.streamManagement.outbound, 45)
t.is(entity.status, 'online')
})

test('resume - failed', async t => {
const {entity} = mockClient()

entity.status = 'bar'
entity.streamManagement.id = 'bar'
entity.streamManagement.enabled = true
entity.streamManagement.outbound = 45

entity.mockInput(
<features xmlns="http://etherx.jabber.org/streams">
<sm xmlns="urn:xmpp:sm:3" />
</features>
)

t.deepEqual(
await entity.catchOutgoing(),
<resume xmlns="urn:xmpp:sm:3" previd="bar" h="0" />
)

entity.mockInput(<failed xmlns="urn:xmpp:sm:3" />)

await promise(entity, 'online')
await tick()

await timeout(promiseSend, 0).catch(err => {
t.is(err.name, 'TimeoutError')
})
t.is(entity.status, 'bar')
t.is(entity.streamManagement.id, '')
t.is(entity.streamManagement.enabled, false)
t.is(entity.streamManagement.outbound, 0)
})

0 comments on commit 6449ae2

Please sign in to comment.