A lightweight workflow engine built on NATS JetStream for durable, distributed task orchestration.
You can focus on your business logic while the engine handles persistence, retries, and distributed execution.
Warning
Software is at pre-alpha stage and is just a concept demo, expect bugs.
- Server - Orchestrates workflow execution and state management
- Workflow Worker - Executes workflow logic and decision-making
- Activity Worker - Performs individual tasks (API calls, database operations, etc.)
- Client - Triggers workflow execution and retrieves results
Traditional workflow engines often require complex deployments with proprietary databases and clustering solutions. DurableFuture simplifies this by:
- Delegating persistence to NATS JetStream - No custom database layer
- Minimal operational overhead - Just NATS + your application
- Production-ready foundation - NATS is proven in high-scale environments
- Workflow: See full version at /examples/order.go
- Usage: See full version at /usage/main.go
// OrderWorkflow implements a simple order processing workflow
func OrderWorkflow(ctx workflow.Context, customerId string, productId string, amount float64, quantity int) (any, error) {
var chargeResult ChargeResult
if err := workflow.
ExecuteActivity(ctx, ChargeCreditCardActivity, customerId, amount).
Get(ctx, &chargeResult); err != nil {
return nil, fmt.Errorf("credit card charge failed: %w", err)
}
var shipResult ShipResult
if err := workflow.
ExecuteActivity(ctx, ShipPackageActivity, chargeResult).
Get(ctx, &shipResult); err != nil {
// In a real scenario, you would refund the charge here
return nil, fmt.Errorf("package shipping failed: %w", err)
}
result := map[string]any{
"tracking_id": shipResult.TrackingID,
"carrier": shipResult.Carrier,
"estimated_delivery": shipResult.EstimatedDelivery,
"charge_id": chargeResult.ChargeID,
}
return result, nil
}- First create Workflow Worker:
ctx := context.Background()
// create a Workflow Worker
workerClient, err := worker.NewWorker()
if err != nil {
log.Printf("err: %v", err)
return
}
// register the Workflow to the Worker
err = workerClient.RegisterWorkflow(examples.OrderWorkflow)
if err != nil {
log.Printf("err: %v", err)
return
}
// start the Workflow Worker
if err := workerClient.Run(ctx); err != nil {
log.Printf("err: %v", err)
return
}- Then create Activity Worker(s):
ctx := context.Background()
workerClient, err := worker.NewWorker()
if err != nil {
return
}
err = workerClient.RegisterActivity(examples.AddActivity)
if err != nil {
return
}
err = workerClient.RegisterActivity(examples.DelayedActivity)
if err != nil {
return
}
err = workerClient.RegisterActivity(examples.ChargeCreditCardActivity)
if err != nil {
return
}
err = workerClient.RegisterActivity(examples.ShipPackageActivity)
if err != nil {
return
}
if err := workerClient.Run(ctx); err != nil {
return
}- Finally, the client code:
future, err := workflowClient.ExecuteWorkflow(ctx, examples.OrderWorkflow,
"Bob",
"widget-1000",
1000.0,
2,
)
if err != nil {
log.Fatalf("Starting workflow failed: %v", err)
}
var result any
err = future.Get(ctx, &result)
if err != nil {
log.Fatalf("error: %v", err)
}
log.Printf("result: %v", result)DurableFuture works by leveraging the event sourcing pattern, recording the outcome of Activities within the Workflow. For example, results of operations such as making an API call or a database transaction will be persisted as Events in a NATS Jetstream. So in the event the current Workflow crashed or interrupted, it will be re-run on one of the available Workflow Workers. However, instead of executing the Activities that have already been done, it will return the result from the first successful execution that is being stored on the Event Stream.
Consider the previous example:
If the workflow runs normally without being interrupted, then the event log at the end might look something like this:
| seq | event type | result |
|---|---|---|
| 0 | workflow started | Order |
| 1 | activity scheduled | ChargeCreditCard |
| 2 | activity started | ChargeCreditCard |
| 3 | activity completed | {charge_id: "ch_123"} |
| 4 | activity scheduled | Shipping |
| 5 | activity started | Shipping |
| 6 | activity completed | {tracking_id: "tr_456"} |
| 7 | workflow completed | {charge_id: "ch_123", tracking_id: "tr_456"} |
Suppose, now, that instead of running until the end, some failure occurs after the ChargeCreditCard activity has completed, but before the Shipping activity has completed. The event log might look like this:
| seq | event type | result |
|---|---|---|
| 0 | workflow started | Order |
| 1 | activity scheduled | ChargeCreditCard |
| 2 | activity started | ChargeCreditCard |
| 3 | activity completed | {charge_id: "ch_123"} |
| 4 | activity scheduled | Shipping |
| 5 | activity started | Shipping |
| 6 | activity failed | (crashed before completion) |
| 7 | activity scheduled | Shipping (retries) |
| 8 | activity started | Shipping (retries) |
| 9 | activity completed | {tracking_id: "tr_456"} |
| 10 | workflow completed | {charge_id: "ch_123", tracking_id: "tr_456"} |
So when a Worker picks up the Workflow, it is restarted and it will replay the events in the log, only executing the Activities that have not yet been completed.