This repository contains a proof-of-concept, brain-teaser exercise for designing and implementing a document processing workflow. It's an exercise in designing composable workflows that a user can configure through e.g. a UI and that can be used immediately without the need of a code release.
I implemented this as an exercise in Software Architecture. You may use this solution in your own product, but please let me know which challenges you encounter so that I can improve the architecture. Thanks!
https://www.loom.com/share/0ac77d6e259b48ef8adf684a3d099e5f
First, you need to fetch the dependencies with:
mix deps.get
Then, you can run the application and watch the example workflow handle one document every 10s by running:
iex -S mix
This will start the application and all its GenServers.
You can run the tests with:
mix test
The (self-imposed) requirements of the document parsing workflow architecture were:
- Workflows need to be composable and user-configurable.
- New workflows can be configured and used without a code release.
- Workflows can be suspended (stopped) and re-queued (restarted).
- Workflows can be aborted early without executing all remaining commands.
- Workflow steps can be easily added or modified.
- Workflows can be executed in parallel.
Based on these requirements, I designed a solution that revolves around a few key elements:
A workflow describes the document processing flow from start to end. Every new document receives one Workflow
. The workflow is executed by the Engine
and its state is persisted by the Workflows
context.
A workflow has multiple steps
, which are a list of Commands
. The commands can be configured by the user. When a workflow is executed by the Engine
, every command is execute in order and receives the output result of the previous command.
The status of a workflow can be :pending
, :running
, :waiting
, :aborted
, or :completed
. After the workflow is created, it is waiting to be executed by the Engine
. While it is waiting, it has the status :pending
. Once the engine starts executing the worflow, the workflow status becomes :running
. A workflow can be :aborted
early if a command returns :abort
. The workflow can also be suspended by a command that returns :wait
. If a workflow is waiting, it can be unsuspended by changing its status back to :pending
. The Engine
will then pick up the workflow again. While a workflow is waiting
, it will not be executed by the Engine
. Once every step in the workflow has been executed successfully, the workflow is :completed
.
A command respresents a single step in a Workflow
. Every command has a command module
that implements the logic of the command. Commands can be configured through a params
-map. See the Mock.ExampleWorkflow
for examples of how to create a workflow with multiple commands.
When a command is executed by the Engine
, the command module
receives the command, the workflow, and the result of the previous command. The command module executes the command logic based on the input and returns either {:ok, result}
or {:error, any()}
. The result
can be anything, but special return values are :wait
and :abort
. A command can return {:ok, workflow, :wait}
if it wants that the workflow waits for an input from an external service like a customer's server or a user input in a UI. If a command wants to abort the workflow, it can return {:ok, workflow, :abort}
.
Every command stores its result so that we can see later which command/step in a workflow returned which result.
The engine executes a workflow. It iterates through all commands of a workflow until one of the following exit conditions occurs:
- All commands of the workflow were completed successfully.
- In this case, the engine completes the workflow (sets the workflow status to
:completed
).
- In this case, the engine completes the workflow (sets the workflow status to
- The workflow was aborted.
- In this case, the engine aborts the workflow (sets the workflow status to
:aborted
).
- In this case, the engine aborts the workflow (sets the workflow status to
- The workflow was instructed to wait.
- In this case, the engine suspends the workflow (sets the workflow status to
:waiting
).
- In this case, the engine suspends the workflow (sets the workflow status to
The engine executes every command in the workflow and stores the result of the execution on the command. If a command instructs the workflow to wait, that command is set to :waiting
and it will become the first command to be executed once the workflow is re-queued because a response has come in.
If the engine re-runs a workflow, it will skip any :completed
commands and will continue the execution of the workflow with the first command that is :waiting
.
Here's a rough dependency diagram between all the modules in this proof-of-concept.
Solid lines represent function calls between modules and dotted lines represent indirect calls through e.g. message passing.
For example, the DocumentProducer
sends an event to the Controller
, which is why the line is dotted.
The Workflows
calls the ExampleWorkflow
module directly, which is why the line is solid.
classDiagram
ExampleWorkflow <-- Workflows
CustomerServer <.. SendWebhookEvent
Controller <.. DocumentProducer
Workflows <-- Engine
Command <-- Engine
ParseDocumentContent <-- ParseDocument
Command --> EvaluateIfThenElse
Workflow <-- Engine
Command --> ParseDocument
Command --> ParseHTTPRequest
Command --> WaitForResponse
Command --> SendWebhookEvent
Command <-- Workflow
Document <-- Workflow
Workflows <-- Controller
Controller <.. CustomerServer
class Engine
class ExampleWorkflow
class DocumentProducer
class Controller
class Workflows
class Workflow
class Command
class Document
class EvaluateIfThenElse
class ParseDocument
class ParseHTTPRequest
class SendWebhookEvent
class WaitForResponse
class ParseDocumentContent
class CustomerServer
The example workflow implemented in the Proof of Concept involves a simulated invoice that the system should process and instruct a customer server to pay out. There's a flow diagram below, but these are the general steps in the workflow:
Once a new document was received and a workflow was created, these are the steps executed by the Engine
:
- Send a review request via HTTP to a customer server.
- Suspend the workflow until the customer server responds with a decision about how to proceed with the given document.
- Once the customer server responds with a decision, re-queue the workflow.
- If the decision is to "process" the document, parse the document. If the decison is not to "process" the document, abort the workflow.
- After the parsing of the document, send a HTTP request to the customer server with information about the
amount
of the invoice and the recipientsIBAN
. - End the workflow.
flowchart TD
A(Engine) -->|execute| B[SendWebhookEvent]
B --> C[WaitForResponse]
B -->|POST /review workflow_id+filename| CS{{CustomerServer}}
W -->|2. record response| C
C -->|1. set workflow as waiting| W(Workflow)
subgraph request
CS -->|After some delay\nPOST /review workflow_id+decision| CON(Controller)
CON -->|record response\nput workflow to pending| W
end
C -->|response| D[ParseHTTPRequest]
D --> E[EvaluateIfThenElse]
E --> ITE{response.decision\n==\nprocess}
ITE -->|True| F[ParseDocument]
ITE -->|False| AB((Abort\nWorkflow))
F --> G[SendWebhookEvent]
G -->|POST /pay-invoice amount+iban| CS2{{Customer Server}}
G --> END((Complete\nWorkflow))
style AB fill:#f74d40,color:#fff
style END fill:#339127,color:#fff
I ran the server and recorded the logs for one workflow. You can see how the workflow is created, how a request is sent to the customer server, how the workflow is suspended until the server resopnds, how the server responds with a decision, how the workflow is re-queued together with the responds of the customer server, and how the customer server is eventually instructed to pay out an amount to a given IBAN.
11:10:36.862 [debug] Created new workflow for document: file-984.pdf
11:10:36.895 [debug] Marking Workflow 1be190cb-e401-48b6-b568-0f0351994eec as running
11:10:36.895 [debug] Marking Command 0 - Elixir.Demo.Commands.SendWebhookEvent as completed
11:10:36.895 [debug] Marking Command 1 - Elixir.Demo.Commands.WaitForResponse as waiting
11:10:36.895 [debug] Marking Workflow 1be190cb-e401-48b6-b568-0f0351994eec as waiting
11:10:36.895 [debug] Customer Server: Received a Review Request for Workflow 1be190cb-e401-48b6-b568-0f0351994eec
11:10:38.896 [debug] Customer Server: Sending Response for Review Decision for Workflow: 1be190cb-e401-48b6-b568-0f0351994eec - process
11:10:38.896 [debug] Marking Workflow 1be190cb-e401-48b6-b568-0f0351994eec as pending
11:10:38.897 [debug] Marking Workflow 1be190cb-e401-48b6-b568-0f0351994eec as running
11:10:38.897 [debug] Marking Command 1 - Elixir.Demo.Commands.WaitForResponse as completed
11:10:38.897 [debug] Marking Command 2 - Elixir.Demo.Commands.ParseHTTPRequest as completed
11:10:38.897 [debug] Marking Command 3 - Elixir.Demo.Commands.EvaluateIfThenElse as completed
11:10:38.897 [debug] Marking Command 4 - Elixir.Demo.Commands.ParseDocument as completed
11:10:38.897 [debug] Marking Command 5 - Elixir.Demo.Commands.SendWebhookEvent as completed
11:10:38.897 [debug] Marking Workflow 1be190cb-e401-48b6-b568-0f0351994eec as completed
11:10:38.897 [debug] Customer Server: Received an Invoice Pay Request over 6500 to NL02ABNA674855481
The application only supports sync commands for now, but implements async commands using three sync commands: SendWebhookEvent
, WaitForResponse
, ParseHTTPRequest
. Having three separate commands adds flexibility for which async action we wait for, but it would probably be better UX to have async commands in the future that wrap multiple sync commands.
To access fields in the workflow, document, or previous result, a user can configure a Command through an access path like e.g. ["workflow", "id"]
, ["document", "parsed_content", "amount"]
or ["previous", "decision"]
. Although some fields that can be accessed are atom-based keys in a struct, like :workflow
and :document
, I decided to only allow string-based keys in the access path.
The reason for this is that we would probably receive the access path from some kind of UI through a Websocket connection or HTTP Request. That means that when the access path is transmitted through e.g. a Websocket connection, any atom-based keys would be converted into string-based keys. If we wanted mixed keys, we'd have to manually convert these keys back into atom-based keys.
Secondly, if we'd store the access path in a database, any atom-based key would also get converted into string-based keys. So, keeping the access path keys string-based, we prevent a lot of back-and-forth converting between atom and string-based access keys.
For this Proof of Concept, no persistence layer has been implemented. However, all used structs could be easily converted into Ecto Schemas except for the Workflow
struct, whose steps
field would need to be serialized in a way that allows the reconstruction of the Command module and its parameters. One option for this is to call String.to_existing_atom/1
on the stored module name. This returns the module name as atom which can be used for calling the execute/3
function on it.
iex(1)> module_name = to_string(Demo.Commands.SendWebhookEvent)
"Elixir.Demo.Commands.SendWebhookEvent"
iex(2)> module = String.to_existing_atom(module_name)
Demo.Commands.SendWebhookEvent
I left out the following features because they were irrelevant for this proof of concept, but they would need to be implemented for production use:
- Authentication in the Controller.
- The decision which workflow to use for which user and/or document type. Right now, only one workflow is used for all documents, but a decision strategy would be necessary to decide which workflow should be chosen for which document.
- How to scale the
Engine
. The engine is a single GenServer for now, but could be scaled to a pool of genservers that all pull pending workflows from theWorkflows
context. But a proper scaling strategy would be needed that also handles errors better. - How to store the workflows. For this proof of concept, the workflows are stored in a GenServer instead of in a persistance level like e.g. a Postgres database. One would need to figure out how to store workflows better.
It would be interesting to investigate how to support workflows that have multiple paths. As an example, think about a workflow where we automatically instruct an external service to pay an invoice, but only if the amount is less than $1000. If the amount is higher, a user needs to approve the amount first. This secondary workflow could be wrapped in its own nested workflow.