diff --git a/backend/server/lib/memfs.js b/backend/server/lib/memfs.js index a8e6af4..19b8959 100644 --- a/backend/server/lib/memfs.js +++ b/backend/server/lib/memfs.js @@ -8,6 +8,12 @@ * write - both - must be routed through this module for cache * coherency. * + * Also active caching is on READ only, not WRITES. So writes must + * go on to the disk if the file is not cached already as the replay + * log can be corrupted otherwise (eg told to create a directory, cached it + * as an op, then told to write a file which is not cached, so it will write + * to the disk, but the directly op is still pending in the replay log). + * * The data to write can be a UTF8 string or a Buffer object * (Buffer preferred). * @@ -19,7 +25,7 @@ const pathmod = require("path"); const fspromises = require("fs").promises; const conf = require(`${CONSTANTS.CONFDIR}/memfs.json`); -const FSCACHE = {}, PENDING_PROMISES=[]; let memused = 0, flush_resolver, flush_promise; +const NATIVE_FS = fspromises, FSCACHE = {}, PENDING_PROMISES=[]; let memused = 0, flush_resolver, flush_promise; exports.readFile = async (path, options) => { path = pathmod.resolve(path); @@ -30,7 +36,8 @@ exports.readFile = async (path, options) => { } LOG.info(`memfs cache miss ${path}`); - const data = await fspromises.readFile(path, options), stats = await fspromises.stat(path) + const data = await _runNativeFSFunction("readFile", [path, options]), + stats = await _runNativeFSFunction("stat", [path]); if ((!options?.memfs_dontcache) && _allocateMemory(stats.size)) { // cache if possible, unless explicitly disabled FSCACHE[path] = {data, accesstime: Date.now(), stats}; LOG.info(`Memfs cached file ${path} using ${data.length} bytes of memory, total cache size is ${memused} bytes.`); @@ -42,9 +49,9 @@ exports.writeFile = async (path, data, options) => { path = pathmod.resolve(path); if (FSCACHE[path]) { delete FSCACHE[path].deleted; // file is no longer deleted - FSCACHE[path] = {data, accesstime: Date.now(), stats: await fspromises.stat(path)}; - _addPendingPromises(_=>fspromises.writeFile(path, data, options)); // no need for await as file is cached and read will be via the cache - } else await fspromises.writeFile(path, data, options); // we don't cache on writes, unless already cached + FSCACHE[path] = {data, accesstime: Date.now(), stats: await _runNativeFSFunction("stat", [path])}; + _addPendingPromises(_=>_runNativeFSFunction("writeFile", [path, data, options])); // no need for await as file is cached and read will be via the cache + } else await _runNativeFSFunction("writeFile",[path, data, options]); // we don't cache on writes, unless already cached } exports.appendFile = async (path, data, options) => { @@ -53,47 +60,49 @@ exports.appendFile = async (path, data, options) => { delete FSCACHE[path].deleted; // file is no longer deleted FSCACHE[path] = {data: typeof data === "string" ? FSCACHE[path].data + data : Buffer.concat([Buffer.from(FSCACHE[path].data), Buffer.from(data)]), accesstime: Date.now(), - stats: await fspromises.stat(path)}; - _addPendingPromises(_=>fspromises.appendFile(path, data, options)); // no need for await as file is cached and read will be via the cache - } else await fspromises.appendFile(path, data, options); // we don't cache on writes, unless already cached + stats: await _runNativeFSFunction("stat",[path])}; + _addPendingPromises(_=>_runNativeFSFunction("appendFile", [path, data, options])); // no need for await as file is cached and read will be via the cache + } else try{ + await _runNativeFSFunction("appendFile",[path, data, options]); // we don't cache on writes, unless already cached + } catch (err) {_handleError(err, "appendFile")} } -exports.stat = async path => FSCACHE[path]?.stats || await fspromises.stat(path); // if cached, we have the stats +exports.stat = async path => FSCACHE[path]?.stats || await _runNativeFSFunction("stat",[path]); // if cached, we have the stats exports.unlink = async path => { path = pathmod.resolve(path); _addPendingPromises(async _ => { - await fspromises.unlink(path); if (FSCACHE[path]?.deleted) delete FSCACHE[path];}); + await _runNativeFSFunction("unlink",[path]); if (FSCACHE[path]?.deleted) delete FSCACHE[path];}); _setPathDeleted(path); // will ensure read doesn't read it, even if the disk has this } exports.unlinkIfExists = async path => { path = pathmod.resolve(path); const safe_unlink = async path => { - try{await fspromises.unlink(path); if (FSCACHE[path]?.deleted) delete FSCACHE[path]; } + try{await _runNativeFSFunction("unlink",[path]); if (FSCACHE[path]?.deleted) delete FSCACHE[path]; } catch(err) {if (err.code != "ENOENT") throw err;} } _addPendingPromises(_=>safe_unlink(path)); _setPathDeleted(path); // will ensure read doesn't read it, even if the disk has this } -exports.readdir = async (path, options) => (await fspromises.readdir(path, options)).filter( // filter out deleted files (they may still be on the disk) +exports.readdir = async (path, options) => (await _runNativeFSFunction("readdir",[path, options])).filter( // filter out deleted files (they may still be on the disk) entry => FSCACHE[pathmod.resolve(path+"/"+(entry.name||entry.toString()))]?.deleted != true); // entry.name||entry.toString() takes care of string names, dirent object and buffer objects -exports.mkdir = (path, options) => fspromises.mkdir(path, options); // can't cache this easily, anyways it doesn't take long +exports.mkdir = (path, options) => _runNativeFSFunction("mkdir",[path, options]); // can't cache this easily, anyways it doesn't take long -exports.rmdir = (path, options) => fspromises.rmdir(path, options); // can't cache this easily, anyways it doesn't take long +exports.rmdir = (path, options) => _runNativeFSFunction("rmdir",[path, options]); // can't cache this easily, anyways it doesn't take long exports.access = async (path, mode) => { if (FSCACHE[pathmod.resolve(path)]?.deleted) return false; // deleted in memory, no need to check the disk if (FSCACHE[pathmod.resolve(path)]) return true; // have it locally and it is not deleted - else return await fspromises.access(path, mode); // go to the disk + else return await _runNativeFSFunction("access",[path, mode]); // go to the disk } exports.rm = async (path, options) => { path = pathmod.resolve(path); _addPendingPromises(async _ => { - await fspromises.rm(path, options); + await _runNativeFSFunction("rm",[path, options]); if (options?.recursive) for (const pathToTest of Object.keys(FSCACHE)) if (pathToTest.startsWith(path)) delete FSCACHE[pathToTest]; // remove nested entries as parent dir went away if (FSCACHE[path]?.deleted) delete FSCACHE[path]; @@ -108,6 +117,13 @@ exports.flush = _ => { return flush_promise; } +async function _runNativeFSFunction(functionName, params) { + try {return await NATIVE_FS[functionName](...params)} catch (err) { + LOG.error(`memfs ${functionName} error in the native FS: ${err}`); + throw err; + } +} + function _addPendingPromises(async_function) { const wrapper = async _ => { await async_function();