Skip to content

Commit

Permalink
add db import/export
Browse files Browse the repository at this point in the history
  • Loading branch information
0fatal committed Apr 1, 2024
1 parent f4d5a8d commit 66f835a
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 6 deletions.
134 changes: 131 additions & 3 deletions server/src/database/database.controller.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,39 @@
import { Controller, Logger, Param, Post, Req, UseGuards } from '@nestjs/common'
import { ApiBearerAuth, ApiOperation, ApiTags } from '@nestjs/swagger'
import {
Body,
Controller,
Get,
Logger,
Param,
Post,
Put,
Req,
Res,
StreamableFile,
UploadedFile,
UseGuards,
UseInterceptors,
} from '@nestjs/common'
import {
ApiBearerAuth,
ApiBody,
ApiConsumes,
ApiOperation,
ApiTags,
} from '@nestjs/swagger'
import { Policy, Proxy } from 'database-proxy/dist'
import { ApplicationAuthGuard } from 'src/authentication/application.auth.guard'
import { JwtAuthGuard } from 'src/authentication/jwt.auth.guard'
import { IRequest } from 'src/utils/interface'
import { IRequest, IResponse } from 'src/utils/interface'
import { DedicatedDatabaseService } from './dedicated-database/dedicated-database.service'
import { FileInterceptor } from '@nestjs/platform-express'
import { ImportDatabaseDto } from './dto/import-database.dto'
import { InjectUser } from 'src/utils/decorator'
import { UserWithKubeconfig } from 'src/user/entities/user'
import { ResponseUtil } from 'src/utils/response'
import path from 'path'
import * as os from 'os'
import { createReadStream, existsSync, mkdirSync } from 'fs'
import { unlink, writeFile } from 'fs/promises'

@ApiTags('Database')
@ApiBearerAuth('Authorization')
Expand Down Expand Up @@ -61,4 +90,103 @@ export class DatabaseController {
}
}
}

@ApiOperation({ summary: 'Export database of an application' })
@UseGuards(JwtAuthGuard, ApplicationAuthGuard)
@Get('export')
async exportDatabase(
@Param('appid') appid: string,
@Res({ passthrough: true }) res: IResponse,
@InjectUser() user: UserWithKubeconfig,
) {
const databaseSyncLimit =
await this.dedicatedDatabaseService.checkDatabaseSyncLimit(user._id)
if (databaseSyncLimit) {
return ResponseUtil.error('database sync limit exceeded')
}
const tempFilePath = path.join(
os.tmpdir(),
'mongodb-data',
'export',
`${appid}-db.gz`,
)

// check if dir exists
if (!existsSync(path.dirname(tempFilePath))) {
mkdirSync(path.dirname(tempFilePath), { recursive: true })
}

await this.dedicatedDatabaseService.exportDatabase(
appid,
tempFilePath,
user,
)
const filename = path.basename(tempFilePath)

res.set({
'Content-Disposition': `attachment; filename="${filename}"`,
})
const file = createReadStream(tempFilePath)
return new StreamableFile(file)
}

@ApiOperation({ summary: 'Import database of an application' })
@ApiConsumes('multipart/form-data')
@ApiBody({
type: ImportDatabaseDto,
})
@UseGuards(JwtAuthGuard, ApplicationAuthGuard)
@Put('import')
@UseInterceptors(
FileInterceptor('file', {
limits: {
fileSize: 3 * 1024 * 1024 * 1024, // 3 GB
},
}),
)
async importDatabase(
@UploadedFile() file: Express.Multer.File,
@Body('sourceAppid') sourceAppid: string,
@Param('appid') appid: string,
@InjectUser() user: UserWithKubeconfig,
) {
const databaseSyncLimit =
await this.dedicatedDatabaseService.checkDatabaseSyncLimit(user._id)
if (databaseSyncLimit) {
return ResponseUtil.error('database sync limit exceeded')
}
// check if db is valid
if (!/^[\-a-z0-9]{6,20}$/.test(sourceAppid)) {
return ResponseUtil.error('Invalid source appid')
}
// check if file is .gz
if (file.mimetype !== 'application/gzip') {
return ResponseUtil.error('Invalid db file')
}

const tempFilePath = path.join(
os.tmpdir(),
'mongodb-data',
'import',
`${appid}-${sourceAppid}.gz`,
)

// check if dir exists
if (!existsSync(path.dirname(tempFilePath))) {
mkdirSync(path.dirname(tempFilePath), { recursive: true })
}

try {
await writeFile(tempFilePath, file.buffer)
await this.dedicatedDatabaseService.importDatabase(
appid,
sourceAppid,
tempFilePath,
user,
)
return ResponseUtil.ok({})
} finally {
if (existsSync(tempFilePath)) await unlink(tempFilePath)
}
}
}
155 changes: 152 additions & 3 deletions server/src/database/dedicated-database/dedicated-database.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,29 @@ import { ClusterService } from 'src/region/cluster/cluster.service'
import * as _ from 'lodash'
import { SystemDatabase } from 'src/system-database'
import { KubernetesObject, loadAllYaml } from '@kubernetes/client-node'
import { TASK_LOCK_INIT_TIME } from 'src/constants'
import { ClientSession } from 'mongodb'
import {
CN_PUBLISHED_CONF,
CN_PUBLISHED_FUNCTIONS,
TASK_LOCK_INIT_TIME,
} from 'src/constants'
import { ClientSession, ObjectId } from 'mongodb'
import * as mongodb_uri from 'mongodb-uri'
import { MongoService } from 'src/database/mongo.service'
import { MongoAccessor } from 'database-proxy'
import { ApplicationBundle } from 'src/application/entities/application-bundle'
import * as assert from 'assert'
import { User } from 'src/user/entities/user'
import { User, UserWithKubeconfig } from 'src/user/entities/user'
import { promisify } from 'util'
import { exec } from 'child_process'
import { CloudFunction } from 'src/function/entities/cloud-function'
import { ApplicationConfiguration } from 'src/application/entities/application-configuration'
import {
DatabaseSyncRecord,
DatabaseSyncState,
} from '../entities/database-sync-record'

const getDedicatedDatabaseName = (appid: string) => `sealaf-${appid}`
const p_exec = promisify(exec)

@Injectable()
export class DedicatedDatabaseService {
Expand Down Expand Up @@ -243,4 +256,140 @@ export class DedicatedDatabaseService {

return doc.value
}

async exportDatabase(
appid: string,
filePath: string,
user: UserWithKubeconfig,
) {
const dedicatedDatabase = await this.findOne(appid)
if (!dedicatedDatabase) {
throw new Error(`database ${appid} not found`)
}
const connectionUri = this.getConnectionUri(user, dedicatedDatabase)
assert(connectionUri, `database ${appid} connection uri not found`)

let syncId: ObjectId
try {
syncId = (
await SystemDatabase.db
.collection<DatabaseSyncRecord>('DatabaseSyncRecord')
.insertOne({
appid,
uid: user._id,
createdAt: new Date(),
state: DatabaseSyncState.Processing,
type: 'Export',
})
).insertedId

await p_exec(
`mongodump --uri='${connectionUri}' --gzip --archive=${filePath}`,
)
} catch (error) {
console.error(`failed to export db ${appid}`, error)
throw error
} finally {
await SystemDatabase.db
.collection<DatabaseSyncRecord>('DatabaseSyncRecord')
.updateOne(
{ _id: syncId },
{ $set: { state: DatabaseSyncState.Complete } },
)
}
}

async importDatabase(
appid: string,
dbName: string,
filePath: string,
user: UserWithKubeconfig,
): Promise<void> {
const dedicatedDatabase = await this.findOne(appid)
if (!dedicatedDatabase) {
throw new Error(`database ${appid} not found`)
}
const connectionUri = this.getConnectionUri(user, dedicatedDatabase)
assert(connectionUri, `database ${appid} connection uri not found`)

let syncId: ObjectId
try {
syncId = (
await SystemDatabase.db
.collection<DatabaseSyncRecord>('DatabaseSyncRecord')
.insertOne({
appid,
uid: user._id,
createdAt: new Date(),
state: DatabaseSyncState.Processing,
type: 'Import',
})
).insertedId

await p_exec(
`mongorestore --uri='${connectionUri}' --gzip --archive='${filePath}' --nsFrom="${dbName}.*" --nsTo="sealaf-${appid}.*" -v --nsInclude="${dbName}.*"`,
)

await this.recoverFunctionsToSystemDatabase(appid, user._id)
} catch (error) {
// eslint-disable-next-line no-console
console.error(`failed to import db to ${appid}`, error)
throw error
} finally {
await SystemDatabase.db
.collection<DatabaseSyncRecord>('DatabaseSyncRecord')
.updateOne(
{ _id: syncId },
{ $set: { state: DatabaseSyncState.Complete } },
)
}
}

async recoverFunctionsToSystemDatabase(appid: string, uid: ObjectId) {
const { db, client } = await this.findAndConnect(appid)

try {
const appFunctionCollection = db.collection(CN_PUBLISHED_FUNCTIONS)
const appConfCollection = db.collection(CN_PUBLISHED_CONF)

const functionsExist = await SystemDatabase.db
.collection<CloudFunction>('CloudFunction')
.countDocuments({ appid })

if (functionsExist) return

const funcs: CloudFunction[] = await appFunctionCollection
.find<CloudFunction>({})
.toArray()

if (funcs.length === 0) return

funcs.forEach((func) => {
delete func._id
func.appid = appid
func.createdBy = uid
})

await SystemDatabase.db
.collection<CloudFunction>('CloudFunction')
.insertMany(funcs)

// sync conf
const conf = await SystemDatabase.db
.collection<ApplicationConfiguration>('ApplicationConfiguration')
.findOne({ appid })

await appConfCollection.deleteMany({})
await appConfCollection.insertOne(conf)
} finally {
await client.close()
}
}

async checkDatabaseSyncLimit(uid: ObjectId) {
const count = await SystemDatabase.db
.collection<DatabaseSyncRecord>('DatabaseSyncRecord')
.countDocuments({ uid, state: DatabaseSyncState.Processing })
return count >= 2
}
}
9 changes: 9 additions & 0 deletions server/src/database/dto/import-database.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { ApiProperty } from '@nestjs/swagger'

export class ImportDatabaseDto {
@ApiProperty({ type: 'binary', format: 'binary' })
file: any

@ApiProperty({ type: 'string', description: 'source appid' })
sourceAppid: string
}
15 changes: 15 additions & 0 deletions server/src/database/entities/database-sync-record.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { ObjectId } from 'mongodb'

export enum DatabaseSyncState {
Processing = 'Processing',
Complete = 'Complete',
}

export class DatabaseSyncRecord {
_id?: ObjectId
appid: string
uid: ObjectId
createdAt: Date
type: 'Export' | 'Import'
state: DatabaseSyncState
}

0 comments on commit 66f835a

Please sign in to comment.