Skip to content

enhancement: error handling + exp flag #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { memoryStorage } from './storage/index.js'
import { getJWT } from './utils/jwt.js'
import { parseUrl, addHttpPrefix } from './utils/url.js'
import { isBrowserContext } from './utils/runtime.js'
import { isErrorUnavoidable } from './utils/errors.js'

const MAX_NODE_WEIGHT = 100
/**
Expand All @@ -28,6 +29,7 @@ export class Saturn {
* @param {number} [opts.downloadTimeout=0]
* @param {string} [opts.orchURL]
* @param {number} [opts.fallbackLimit]
* @param {boolean} [opts.experimental]
* @param {import('./storage/index.js').Storage} [opts.storage]
*/
constructor (opts = {}) {
Expand All @@ -54,7 +56,7 @@ export class Saturn {
this._monitorPerformanceBuffer()
}
this.storage = this.opts.storage || memoryStorage()
this.loadNodesPromise = this._loadNodes(this.opts)
this.loadNodesPromise = this.opts.experimental ? this._loadNodes(this.opts) : null
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DiegoRBaquero added this to only load nodes for the % we desire per a / b testing.

}

/**
Expand All @@ -81,7 +83,11 @@ export class Saturn {
}
}

const origins = options.origins
let origins = options.origins
if (!origins || origins.length === 0) {
const replacementUrl = options.url ?? options.cdnURL
origins = [replacementUrl]
}
const controllers = []

const createFetchPromise = async (origin) => {
Expand Down Expand Up @@ -125,11 +131,12 @@ export class Saturn {

abortRemainingFetches(controller, controllers)
log = Object.assign(log, this._generateLog(res, log), { url })

if (!res.ok) {
throw new Error(
`Non OK response received: ${res.status} ${res.statusText}`
const error = new Error(
`Non OK response received: ${res.status} ${res.statusText}`
)
error.res = res
throw error
}
} catch (err) {
if (!res) {
Expand Down Expand Up @@ -184,11 +191,12 @@ export class Saturn {
clearTimeout(connectTimeout)

log = Object.assign(log, this._generateLog(res, log))

if (!res.ok) {
throw new Error(
const error = new Error(
`Non OK response received: ${res.status} ${res.statusText}`
)
error.res = res
throw error
}
} catch (err) {
if (!res) {
Expand Down Expand Up @@ -240,6 +248,10 @@ export class Saturn {
// this is temporary until range requests are supported.
let byteCountCheckpoint = 0

const throwError = () => {
throw new Error(`All attempts to fetch content have failed. Last error: ${lastError.message}`)
}

const fetchContent = async function * () {
let byteCount = 0
const byteChunks = await this.fetchContent(cidPath, opts)
Expand Down Expand Up @@ -268,6 +280,9 @@ export class Saturn {
return
} catch (err) {
lastError = err
if (err.res?.status === 410 || isErrorUnavoidable(err)) {
throwError()
}
await this.loadNodesPromise
}
}
Expand All @@ -290,21 +305,24 @@ export class Saturn {
return
} catch (err) {
lastError = err
if (err.res?.status === 410 || isErrorUnavoidable(err)) {
break
}
}
fallbackCount += 1
}

if (lastError) {
throw new Error(`All attempts to fetch content have failed. Last error: ${lastError.message}`)
throwError()
}
}

/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]- -
* @param {boolean} [opts.raceNodes]- -
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @returns {Promise<AsyncIterable<Uint8Array>>}
Expand Down Expand Up @@ -391,8 +409,8 @@ export class Saturn {
* @param {object} log
*/
reportLogs (log) {
if (!this.reportingLogs) return
this.logs.push(log)
if (!this.reportingLogs) return
this.reportLogsTimeout && clearTimeout(this.reportLogsTimeout)
this.reportLogsTimeout = setTimeout(this._reportLogs.bind(this), 3_000)
}
Expand Down Expand Up @@ -560,6 +578,6 @@ export class Saturn {
nodes = await orchNodesListPromise
nodes = this._sortNodes(nodes)
this.nodes = nodes
this.storage?.set(Saturn.nodesListKey, nodes)
this.storage.set(Saturn.nodesListKey, nodes)
}
}
19 changes: 19 additions & 0 deletions src/utils/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,22 @@ export class TimeoutError extends Error {
this.name = 'TimeoutError'
}
}

export function isErrorUnavoidable (error) {
if (!error || typeof error.message !== 'string') return false

const errorPatterns = [
/file does not exist/,
/Cannot read properties of undefined \(reading '([^']+)'\)/,
/([a-zA-Z_.]+) is undefined/,
/undefined is not an object \(evaluating '([^']+)'\)/
]

for (const pattern of errorPatterns) {
if (pattern.test(error.message)) {
return true
}
}

return false
}
89 changes: 78 additions & 11 deletions test/fallback.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ import assert from 'node:assert/strict'
import { describe, mock, test } from 'node:test'

import { Saturn } from '#src/index.js'
import { concatChunks, generateNodes, getMockServer, mockJWT, mockNodesHandlers, mockOrchHandler, mockSaturnOriginHandler, MSW_SERVER_OPTS } from './test-utils.js'
import { concatChunks, generateNodes, getMockServer, HTTP_STATUS_GONE, mockJWT, mockNodesHandlers, mockOrchHandler, mockSaturnOriginHandler, MSW_SERVER_OPTS } from './test-utils.js'

const TEST_DEFAULT_ORCH = 'https://orchestrator.strn.pl/nodes'
const TEST_NODES_LIST_KEY = 'saturn-nodes'
const TEST_AUTH = 'https://fz3dyeyxmebszwhuiky7vggmsu0rlkoy.lambda-url.us-west-2.on.aws/'
const TEST_ORIGIN_DOMAIN = 'saturn.ms'
const CLIENT_KEY = 'key'

const experimental = true

describe('Client Fallback', () => {
test('Nodes are loaded from the orchestrator if no storage is passed', async (t) => {
const handlers = [
Expand All @@ -21,7 +24,7 @@ describe('Client Fallback', () => {
const expectedNodes = generateNodes(2, TEST_ORIGIN_DOMAIN)

// No Storage is injected
const saturn = new Saturn({ clientKey: CLIENT_KEY })
const saturn = new Saturn({ clientKey: CLIENT_KEY, experimental })
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }

await saturn._loadNodes(mockOpts)
Expand Down Expand Up @@ -50,7 +53,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY })
const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, experimental })

// Mocking options
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }
Expand Down Expand Up @@ -87,7 +90,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY })
const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, experimental })

// Mocking options
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }
Expand Down Expand Up @@ -126,7 +129,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4')

Expand Down Expand Up @@ -159,7 +162,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })
// const origins =

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })
Expand Down Expand Up @@ -193,7 +196,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })

Expand All @@ -215,7 +218,7 @@ describe('Client Fallback', () => {

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })

const fetchContentMock = mock.fn(async function * (cidPath, opts) {
yield Buffer.from('chunk1')
Expand Down Expand Up @@ -243,7 +246,7 @@ describe('Client Fallback', () => {

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })

const fetchContentMock = mock.fn(async function * (cidPath, opts) { throw new Error('Fetch error') }) // eslint-disable-line
saturn.fetchContent = fetchContentMock
Expand All @@ -264,6 +267,70 @@ describe('Client Fallback', () => {
server.close()
})

test('Should abort fallback on 410s', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN, 3, HTTP_STATUS_GONE)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
await saturn.loadNodesPromise

let error
try {
for await (const _ of saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4')) { // eslint-disable-line
// This loop body shouldn't be reached.
}
} catch (e) {
error = e
}
const logs = saturn.logs

assert(error)
assert.strictEqual(logs.length, 1)
mock.reset()
server.close()
})

test('Should abort fallback on specific errors', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN, 3, HTTP_STATUS_GONE)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
await saturn.loadNodesPromise

let callCount = 0
const fetchContentMock = mock.fn(async function * (cidPath, opts) {
callCount++
yield ''
throw new Error('file does not exist')
})

saturn.fetchContent = fetchContentMock

let error
try {
for await (const _ of saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4')) { // eslint-disable-line
}
} catch (e) {
error = e
}

assert(error)
assert.strictEqual(callCount, 1)
mock.reset()
server.close()
})
test('Handles fallback with chunk overlap correctly', async () => {
const numNodes = 3
const handlers = [
Expand All @@ -274,7 +341,7 @@ describe('Client Fallback', () => {

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })

let callCount = 0
const fetchContentMock = mock.fn(async function * (cidPath, opts) {
Expand Down Expand Up @@ -313,7 +380,7 @@ describe('Client Fallback', () => {

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test' })
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })

let callCount = 0
let fetchContentMock = mock.fn(async function * (cidPath, opts) {
Expand Down
11 changes: 6 additions & 5 deletions test/test-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import { fileURLToPath } from 'node:url'
import fs from 'fs'
import { addHttpPrefix } from '../src/utils/url.js'

const HTTP_STATUS_OK = 200
const HTTP_STATUS_TIMEOUT = 504
export const HTTP_STATUS_OK = 200
export const HTTP_STATUS_TIMEOUT = 504
export const HTTP_STATUS_GONE = 410

const __dirname = dirname(fileURLToPath(import.meta.url))
process.env.TESTING = 'true'
Expand Down Expand Up @@ -120,20 +121,20 @@ export function mockJWT (authURL) {
* @param {number} count - amount of nodes to mock
* @param {string} originDomain - saturn origin domain.
* @param {number} failures
* @param {number} failureCode
* @returns {RestHandler<any>[]}
*/
export function mockNodesHandlers (count, originDomain, failures = 0) {
export function mockNodesHandlers (count, originDomain, failures = 0, failureCode = HTTP_STATUS_TIMEOUT) {
if (failures > count) {
throw Error('failures number cannot exceed node count')
}
const nodes = generateNodes(count, originDomain)

const handlers = nodes.map((node, idx) => {
const url = `${node.url}/ipfs/:cid`
return rest.get(url, (req, res, ctx) => {
if (idx < failures) {
return res(
ctx.status(HTTP_STATUS_TIMEOUT)
ctx.status(failureCode)
)
}
const filepath = getFixturePath('hello.car')
Expand Down