Skip to content

Commit

Permalink
Move memfs to a pluggable FS.
Browse files Browse the repository at this point in the history
  • Loading branch information
TekMonksGitHub committed Nov 4, 2024
1 parent e7b268d commit 85ce31b
Showing 1 changed file with 32 additions and 16 deletions.
48 changes: 32 additions & 16 deletions backend/server/lib/memfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
* write - both - must be routed through this module for cache
* coherency.
*
* Also active caching is on READ only, not WRITES. So writes must
* go on to the disk if the file is not cached already as the replay
* log can be corrupted otherwise (eg told to create a directory, cached it
* as an op, then told to write a file which is not cached, so it will write
* to the disk, but the directly op is still pending in the replay log).
*
* The data to write can be a UTF8 string or a Buffer object
* (Buffer preferred).
*
Expand All @@ -19,7 +25,7 @@ const pathmod = require("path");
const fspromises = require("fs").promises;
const conf = require(`${CONSTANTS.CONFDIR}/memfs.json`);

const FSCACHE = {}, PENDING_PROMISES=[]; let memused = 0, flush_resolver, flush_promise;
const NATIVE_FS = fspromises, FSCACHE = {}, PENDING_PROMISES=[]; let memused = 0, flush_resolver, flush_promise;

exports.readFile = async (path, options) => {
path = pathmod.resolve(path);
Expand All @@ -30,7 +36,8 @@ exports.readFile = async (path, options) => {
}

LOG.info(`memfs cache miss ${path}`);
const data = await fspromises.readFile(path, options), stats = await fspromises.stat(path)
const data = await _runNativeFSFunction("readFile", [path, options]),
stats = await _runNativeFSFunction("stat", [path]);
if ((!options?.memfs_dontcache) && _allocateMemory(stats.size)) { // cache if possible, unless explicitly disabled
FSCACHE[path] = {data, accesstime: Date.now(), stats};
LOG.info(`Memfs cached file ${path} using ${data.length} bytes of memory, total cache size is ${memused} bytes.`);
Expand All @@ -42,9 +49,9 @@ exports.writeFile = async (path, data, options) => {
path = pathmod.resolve(path);
if (FSCACHE[path]) {
delete FSCACHE[path].deleted; // file is no longer deleted
FSCACHE[path] = {data, accesstime: Date.now(), stats: await fspromises.stat(path)};
_addPendingPromises(_=>fspromises.writeFile(path, data, options)); // no need for await as file is cached and read will be via the cache
} else await fspromises.writeFile(path, data, options); // we don't cache on writes, unless already cached
FSCACHE[path] = {data, accesstime: Date.now(), stats: await _runNativeFSFunction("stat", [path])};
_addPendingPromises(_=>_runNativeFSFunction("writeFile", [path, data, options])); // no need for await as file is cached and read will be via the cache
} else await _runNativeFSFunction("writeFile",[path, data, options]); // we don't cache on writes, unless already cached
}

exports.appendFile = async (path, data, options) => {
Expand All @@ -53,47 +60,49 @@ exports.appendFile = async (path, data, options) => {
delete FSCACHE[path].deleted; // file is no longer deleted
FSCACHE[path] = {data: typeof data === "string" ? FSCACHE[path].data + data :
Buffer.concat([Buffer.from(FSCACHE[path].data), Buffer.from(data)]), accesstime: Date.now(),
stats: await fspromises.stat(path)};
_addPendingPromises(_=>fspromises.appendFile(path, data, options)); // no need for await as file is cached and read will be via the cache
} else await fspromises.appendFile(path, data, options); // we don't cache on writes, unless already cached
stats: await _runNativeFSFunction("stat",[path])};
_addPendingPromises(_=>_runNativeFSFunction("appendFile", [path, data, options])); // no need for await as file is cached and read will be via the cache
} else try{
await _runNativeFSFunction("appendFile",[path, data, options]); // we don't cache on writes, unless already cached
} catch (err) {_handleError(err, "appendFile")}
}

exports.stat = async path => FSCACHE[path]?.stats || await fspromises.stat(path); // if cached, we have the stats
exports.stat = async path => FSCACHE[path]?.stats || await _runNativeFSFunction("stat",[path]); // if cached, we have the stats

exports.unlink = async path => {
path = pathmod.resolve(path);
_addPendingPromises(async _ => {
await fspromises.unlink(path); if (FSCACHE[path]?.deleted) delete FSCACHE[path];});
await _runNativeFSFunction("unlink",[path]); if (FSCACHE[path]?.deleted) delete FSCACHE[path];});
_setPathDeleted(path); // will ensure read doesn't read it, even if the disk has this
}

exports.unlinkIfExists = async path => {
path = pathmod.resolve(path);
const safe_unlink = async path => {
try{await fspromises.unlink(path); if (FSCACHE[path]?.deleted) delete FSCACHE[path]; }
try{await _runNativeFSFunction("unlink",[path]); if (FSCACHE[path]?.deleted) delete FSCACHE[path]; }
catch(err) {if (err.code != "ENOENT") throw err;}
}
_addPendingPromises(_=>safe_unlink(path));
_setPathDeleted(path); // will ensure read doesn't read it, even if the disk has this
}

exports.readdir = async (path, options) => (await fspromises.readdir(path, options)).filter( // filter out deleted files (they may still be on the disk)
exports.readdir = async (path, options) => (await _runNativeFSFunction("readdir",[path, options])).filter( // filter out deleted files (they may still be on the disk)
entry => FSCACHE[pathmod.resolve(path+"/"+(entry.name||entry.toString()))]?.deleted != true); // entry.name||entry.toString() takes care of string names, dirent object and buffer objects

exports.mkdir = (path, options) => fspromises.mkdir(path, options); // can't cache this easily, anyways it doesn't take long
exports.mkdir = (path, options) => _runNativeFSFunction("mkdir",[path, options]); // can't cache this easily, anyways it doesn't take long

exports.rmdir = (path, options) => fspromises.rmdir(path, options); // can't cache this easily, anyways it doesn't take long
exports.rmdir = (path, options) => _runNativeFSFunction("rmdir",[path, options]); // can't cache this easily, anyways it doesn't take long

exports.access = async (path, mode) => {
if (FSCACHE[pathmod.resolve(path)]?.deleted) return false; // deleted in memory, no need to check the disk
if (FSCACHE[pathmod.resolve(path)]) return true; // have it locally and it is not deleted
else return await fspromises.access(path, mode); // go to the disk
else return await _runNativeFSFunction("access",[path, mode]); // go to the disk
}

exports.rm = async (path, options) => {
path = pathmod.resolve(path);
_addPendingPromises(async _ => {
await fspromises.rm(path, options);
await _runNativeFSFunction("rm",[path, options]);
if (options?.recursive) for (const pathToTest of Object.keys(FSCACHE))
if (pathToTest.startsWith(path)) delete FSCACHE[pathToTest]; // remove nested entries as parent dir went away
if (FSCACHE[path]?.deleted) delete FSCACHE[path];
Expand All @@ -108,6 +117,13 @@ exports.flush = _ => {
return flush_promise;
}

async function _runNativeFSFunction(functionName, params) {
try {return await NATIVE_FS[functionName](...params)} catch (err) {
LOG.error(`memfs ${functionName} error in the native FS: ${err}`);
throw err;
}
}

function _addPendingPromises(async_function) {
const wrapper = async _ => {
await async_function();
Expand Down

0 comments on commit 85ce31b

Please sign in to comment.