Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: upload stream monitoring #562

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/http/plugins/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ export const storage = fastifyPlugin(
request.backend = storageBackend
request.storage = new Storage(storageBackend, database)
})

fastify.addHook('onClose', async () => {
storageBackend.close()
})
},
{ name: 'storage-init' }
)
25 changes: 19 additions & 6 deletions src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import {
import { TenantConnection, PubSub } from '@internal/database'
import { S3Store } from '@tus/s3-store'
import { NodeHttpHandler } from '@smithy/node-http-handler'
import { createAgent } from '@storage/backend'
import { ROUTE_OPERATIONS } from '../operations'
import * as https from 'node:https'
import { createAgent } from '@internal/http'

const {
storageS3MaxSockets,
storageS3Bucket,
storageS3Endpoint,
storageS3ForcePathStyle,
Expand All @@ -57,9 +59,8 @@ type MultiPartRequest = http.IncomingMessage & {
}
}

function createTusStore() {
function createTusStore(agent: { httpsAgent: https.Agent; httpAgent: http.Agent }) {
if (storageBackendType === 's3') {
const agent = createAgent('s3_tus')
return new S3Store({
partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB,
expirationPeriodInMilliseconds: tusUrlExpiryMs,
Expand All @@ -84,8 +85,11 @@ function createTusStore() {
})
}

function createTusServer(lockNotifier: LockNotifier) {
const datastore = createTusStore()
function createTusServer(
lockNotifier: LockNotifier,
agent: { httpsAgent: https.Agent; httpAgent: http.Agent }
) {
const datastore = createTusStore(agent)
const serverOptions: ServerOptions & {
datastore: DataStore
} = {
Expand Down Expand Up @@ -139,7 +143,16 @@ export default async function routes(fastify: FastifyInstance) {
const lockNotifier = new LockNotifier(PubSub)
await lockNotifier.subscribe()

const tusServer = createTusServer(lockNotifier)
const agent = createAgent('s3_tus', {
maxSockets: storageS3MaxSockets,
})
agent.monitor()

fastify.addHook('onClose', () => {
agent.close()
})

const tusServer = createTusServer(lockNotifier, agent)

// authenticated routes
fastify.register(async (fastify) => {
Expand Down
23 changes: 1 addition & 22 deletions src/internal/concurrency/stream.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,11 @@
import { Transform, TransformCallback } from 'stream'

interface ByteCounterStreamOptions {
maxHistory?: number
onMaxHistory?: (history: Date[]) => void
rewriteHistoryOnMax?: boolean
}

export const createByteCounterStream = (options: ByteCounterStreamOptions) => {
const { maxHistory = 100 } = options

export const createByteCounterStream = () => {
let bytes = 0
let history: Date[] = []

const transformStream = new Transform({
transform(chunk: Buffer, encoding: string, callback: TransformCallback) {
bytes += chunk.length
history.push(new Date())

if (history.length === maxHistory) {
if (options.rewriteHistoryOnMax) {
options.onMaxHistory?.(history)
history = []
}
}

callback(null, chunk)
},
})
Expand All @@ -33,8 +15,5 @@ export const createByteCounterStream = (options: ByteCounterStreamOptions) => {
get bytes() {
return bytes
},
get history() {
return history
},
}
}
132 changes: 132 additions & 0 deletions src/internal/http/agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import Agent, { HttpsAgent } from 'agentkeepalive'
import {
HttpPoolErrorGauge,
HttpPoolFreeSocketsGauge,
HttpPoolPendingRequestsGauge,
HttpPoolSocketsGauge,
} from '@internal/monitoring/metrics'
import { getConfig } from '../../config'

const { region } = getConfig()

export interface InstrumentedAgent {
httpAgent: Agent
httpsAgent: HttpsAgent
monitor: () => NodeJS.Timeout | undefined
close: () => void
}

export interface AgentStats {
busySocketCount: number
freeSocketCount: number
pendingRequestCount: number
errorSocketCount: number
timeoutSocketCount: number
createSocketErrorCount: number
}

/**
* Creates an instrumented agent
* Adding prometheus metrics to the agent
*/
export function createAgent(name: string, options: { maxSockets: number }): InstrumentedAgent {
const agentOptions = {
maxSockets: options.maxSockets,
keepAlive: true,
keepAliveMsecs: 1000,
freeSocketTimeout: 1000 * 15,
}

const httpAgent = new Agent(agentOptions)
const httpsAgent = new HttpsAgent(agentOptions)
let watcher: NodeJS.Timeout | undefined = undefined

return {
httpAgent,
httpsAgent,
monitor: () => {
const agent = watchAgent(name, 'https', httpsAgent)
watcher = agent
return agent
},
close: () => {
if (watcher) {
clearInterval(watcher)
}
},
}
}

/**
* Metrics
*
* HttpPoolSockets
* HttpPoolFreeSockets
* HttpPoolPendingRequests
* HttpPoolError
*
* @param name
* @param protocol
* @param stats
*/
function updateHttpAgentMetrics(name: string, protocol: string, stats: AgentStats) {
// Update the metrics with calculated values
HttpPoolSocketsGauge.set({ name, region, protocol }, stats.busySocketCount)
HttpPoolFreeSocketsGauge.set({ name, region, protocol }, stats.freeSocketCount)
HttpPoolPendingRequestsGauge.set({ name, region }, stats.pendingRequestCount)
HttpPoolErrorGauge.set({ name, region, type: 'socket_error', protocol }, stats.errorSocketCount)
HttpPoolErrorGauge.set(
{ name, region, type: 'timeout_socket_error', protocol },
stats.timeoutSocketCount
)
HttpPoolErrorGauge.set(
{ name, region, type: 'create_socket_error', protocol },
stats.createSocketErrorCount
)
}

export function watchAgent(name: string, protocol: 'http' | 'https', agent: Agent | HttpsAgent) {
return setInterval(() => {
const httpStatus = agent.getCurrentStatus()

const httpStats = gatherHttpAgentStats(httpStatus)

updateHttpAgentMetrics(name, protocol, httpStats)
}, 5000)
}

// Function to update Prometheus metrics based on the current status of the agent
export function gatherHttpAgentStats(status: Agent.AgentStatus) {
// Calculate the number of busy sockets by iterating over the `sockets` object
let busySocketCount = 0
for (const host in status.sockets) {
if (status.sockets.hasOwnProperty(host)) {
busySocketCount += status.sockets[host]
}
}

// Calculate the number of free sockets by iterating over the `freeSockets` object
let freeSocketCount = 0
for (const host in status.freeSockets) {
if (status.freeSockets.hasOwnProperty(host)) {
freeSocketCount += status.freeSockets[host]
}
}

// Calculate the number of pending requests by iterating over the `requests` object
let pendingRequestCount = 0
for (const host in status.requests) {
if (status.requests.hasOwnProperty(host)) {
pendingRequestCount += status.requests[host]
}
}

return {
busySocketCount,
freeSocketCount,
pendingRequestCount,
errorSocketCount: status.errorSocketCount,
timeoutSocketCount: status.timeoutSocketCount,
createSocketErrorCount: status.createSocketErrorCount,
}
}
1 change: 1 addition & 0 deletions src/internal/http/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './agent'
8 changes: 8 additions & 0 deletions src/internal/queue/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ export class Event<T extends Omit<BasePayload, '$version'>> {
return this.queueName + '-slow'
}

static onClose() {
// no-op
}

static onStart() {
// no-op
}

static batchSend<T extends Event<any>[]>(messages: T) {
return Queue.getInstance().insert(
messages.map((message) => {
Expand Down
23 changes: 21 additions & 2 deletions src/internal/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export abstract class Queue {
opts.registerWorkers()
}

await Queue.callStart()
await Queue.startWorkers(opts.onMessage)

if (opts.signal) {
Expand All @@ -96,7 +97,8 @@ export abstract class Queue {
type: 'queue',
})
return Queue.stop()
.then(() => {
.then(async () => {
await Queue.callClose()
logSchema.info(logger, '[Queue] Exited', {
type: 'queue',
})
Expand Down Expand Up @@ -142,7 +144,8 @@ export abstract class Queue {
})

await new Promise((resolve) => {
boss.once('stopped', () => {
boss.once('stopped', async () => {
await this.callClose()
resolve(null)
})
})
Expand All @@ -166,6 +169,22 @@ export abstract class Queue {
return Promise.all(workers)
}

protected static callStart() {
const events = Queue.events.map((event) => {
return event.onStart()
})

return Promise.all(events)
}

protected static callClose() {
const events = Queue.events.map((event) => {
return event.onClose()
})

return Promise.all(events)
}

protected static registerTask(
queueName: string,
event: SubclassOfBaseClass,
Expand Down
9 changes: 6 additions & 3 deletions src/scripts/export-docs.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { promises as fs } from 'fs'
import app from '../app'
;(async () => {
const response = await app({
const storageApp = app({
exposeDocs: true,
}).inject({
})

const response = await storageApp.inject({
method: 'GET',
url: '/documentation/json',
})

await fs.writeFile('static/api.json', response.body)
process.exit(0)

await storageApp.close()
})()
4 changes: 4 additions & 0 deletions src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ export abstract class StorageBackendAdapter {
): Promise<{ eTag?: string; lastModified?: Date }> {
throw new Error('not implemented')
}

close(): void {
// do nothing
}
}

const { tusUseFileVersionSeparator } = getConfig()
Expand Down
4 changes: 4 additions & 0 deletions src/storage/backend/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ export class FileBackend implements StorageBackendAdapter {
])
}

close() {
// no-op
}

protected async getFileMetadata(file: string) {
const platform = process.platform == 'darwin' ? 'darwin' : 'linux'
const [cacheControl, contentType] = await Promise.all([
Expand Down
Loading
Loading