diff --git a/lib/functions.js b/lib/functions.js index 3e0d86b..d9d8f91 100644 --- a/lib/functions.js +++ b/lib/functions.js @@ -44,26 +44,50 @@ async function compress(params) { } = params; const handlers = [ - filter(item => item.dirent.isFile()), - filter(item => extWhiteList.includes(path.extname(item.direntPath))), - fileSize && map(async item => { - const stat = await fsp.stat(item.direntPath); - return { - ...item, - size: stat.size - } - }), - fileSize && filter(item => item.size > fileSize), - switchMap(function* (item) { - for (const format of formats) { - yield { + { + condition: true, + operator: 'filter', + handler: item => item.dirent.isFile() + }, + { + condition: true, + operator: 'filter', + handler: item => extWhiteList.includes(path.extname(item.direntPath)) + }, + { + condition: fileSize, + operator: 'map', + handler: async item => { + const stat = await fsp.stat(item.direntPath); + return { ...item, - format + size: stat.size + } + } + }, + { + condition: fileSize, + operator: 'filter', + handler: item => item.size > fileSize + }, + { + condition: true, + operator: 'flatMap', + handler: function* (item) { + for (const format of formats) { + yield { + ...item, + format + } } } - }), - parallel({ - concurrency, + }, + { + condition: true, + operator: 'forEach', + options: { + concurrency + }, handler: async (item) => { const fromPath = item.direntPath; const relativePath = path.relative(from, fromPath); @@ -74,11 +98,15 @@ async function compress(params) { return createCompressStream({ fromPath, toPath, format: item.format }); } - }), - ].filter(Boolean); + } + ].filter(handlerData => handlerData.condition); - const iterable = pipe(...handlers)(walk(from)); - return subscribe(iterable, x => x); + return handlers.reduce( + (currentStream, handlerData) => { + return currentStream[handlerData.operator](handlerData.handler, handlerData.options) + }, + stream.Readable.from(walk(from)) + ) }