Temporal Batch Processing: From Sequential to Parallel

  ·  9 min read

You built something correct. It worked. Then you realised it was the most expensive possible way to solve a simple parallel problem.

A workflow processing 1,000 accounts for monthly interest accrual. Each account takes ~900ms. Total: 15 minutes of sequential execution for work that could run in parallel. The batch window blocks other workflows. The Temporal Cloud bill has opinions about it too.

Sequential Child Workflow Execution #

The original batch workflow processed accounts sequentially, spawning a child workflow for each account:

func InterestAccrualBatch(ctx workflow.Context, state BatchWorkflowState) error {
    for i := state.CurrentIndex; i < len(state.Items); i++ {
        item := state.Items[i]

        if processedItems >= state.ItemsPerExecutionLimit {
            state.CurrentIndex = i
            return workflow.NewContinueAsNewError(ctx, InterestAccrualBatch, state)
        }

        future := executeAccountWorkflow(ctx, item)

        // Sequential: waits for each account before starting the next
        if err := future.Get(ctx, nil); err != nil {
            workflow.GetLogger(ctx).Error("Forgiveness workflow failed", ...)
        }

        processedItems++
    }
    return nil
}

Each account workflow executed 6-13 activities depending on account type (standard: 6, accounts with linked products requiring additional eligibility checks: 10-13).

Why This Is Slow (And Expensive) #

Sequential execution wastes time. The future.Get(ctx, nil) call blocks until each child workflow completes. The work is embarrassingly parallel; each account’s calculation is independent, yet they’re processed one at a time.

History events accumulate fast. Every Temporal workflow and activity generates history events. These events are persisted and contribute to storage costs. For a batch like this:

Component Events per Account Total (1,000 accounts)
Child workflow started 1 1,000
Child workflow completed 1 1,000
Activities (6-13 each, ~3 events per activity) ~18-39 ~18,000-39,000
Total ~20-41 ~20,000-41,000 events

Temporal Cloud pricing includes storage costs based on history size. Self-hosted Temporal also incurs database storage and query performance costs proportional to history volume.

Temporal server load adds up. Each child workflow occupies a slot, polls the task queue, and maintains state in the persistence layer. Thousands of short-lived workflows create unnecessary churn on Temporal infrastructure.

What We’d Lose By Optimizing #

The original design wasn’t stupid. Child workflows per account provided real benefits:

  1. Isolation - One account’s failure doesn’t cascade
  2. Visibility - Each account shows up separately in Temporal UI
  3. Debugging - Easy to find a specific account’s execution
  4. Retry granularity - Failed accounts can be individually retried

Visibility in particular feels safe. When something breaks at 2am, being able to open Temporal UI, find the specific account that failed, and retry it individually is operationally comforting. That comfort is real. Until the cost outweighs it.

These benefits matter if you’re orchestrating long-running processes that need human intervention or independent signaling. This workflow doesn’t need any of that. Each account processes in under a second with straightforward error handling. We’d be paying workflow overhead for features we aren’t using.

Solution Options #

Temporal supports two primary patterns for parallel batch processing. The choice depends on your requirements for visibility, retry granularity, and storage efficiency.

Aspect Parallel Activities Single Activity with Goroutines
History events (1,000 items) ~3,000 (3 per activity) ~3 (1 activity)
Temporal UI visibility Each item visible Only batch visible
Retry granularity Per-item automatic retry Whole batch or manual
Worker concurrency limit MaxConcurrentActivityExecutionSize (default: 1000) Unlimited (you control)
External service pressure Temporal manages scheduling You manage with semaphore
Complexity Lower Higher

For our use case (storage cost + speed), the single activity approach provides the best balance. However, parallel activities is a valid Temporal pattern when per-item visibility matters.

This approach consolidates batch processing into a single activity that:

  1. Processes accounts in parallel using goroutines with bounded concurrency
  2. Reports progress via heartbeats
  3. Collects results for reporting

Architecture Comparison #

Before: Workflow-per-User

BatchWorkflow
    ├── AccountWorkflow[1] → Activity → Activity → Activity → ...
    ├── AccountWorkflow[2] → Activity → Activity → Activity → ...
    ├── AccountWorkflow[3] → Activity → Activity → Activity → ...
    └── ... (sequential, 1000x)

After: Single Activity with Parallel Processing

BatchWorkflow
    └── ProcessInterestAccrualBatch (single activity)
            ├── goroutine[1] → processUser()
            ├── goroutine[2] → processUser()
            ├── ... (parallel, bounded concurrency)
            └── goroutine[100] → processUser()

Before: 1,000 workflows × 32 events each = 32,000 history events After: 1 workflow × ~20 events = 20 events

1,600× fewer history events. Only one workflow and one activity are recorded, not 1,000 child workflows each carrying their own event trail.

The optimized workflow is simpler: one activity call instead of 1,000 child workflows. The complexity moves into the activity itself, where goroutines handle parallelism:

// Simplified workflow - single activity call
func InterestAccrualBatch(ctx workflow.Context, state BatchWorkflowState) error {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Minute,
        HeartbeatTimeout:    60 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts: 3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    var result BatchProcessingResult
    err := workflow.ExecuteActivity(ctx, "ProcessInterestAccrualBatch", state.Items).Get(ctx, &result)
    if err != nil {
        return err
    }

    workflow.GetLogger(ctx).Info("Batch completed",
        "successCount", result.SuccessCount,
        "failureCount", len(result.Failures),
    )

    return nil
}
// Activity with parallel processing and heartbeating
func (a *ActivityProxy) ProcessInterestAccrualBatch(
    ctx context.Context,
    items []BatchWorkflowInputItem,
) (BatchProcessingResult, error) {
    const maxConcurrency = 100

    result := BatchProcessingResult{}
    var mu sync.Mutex
    sem := make(chan struct{}, maxConcurrency)
    var wg sync.WaitGroup

    processedCount := 0

    for _, item := range items {
        if ctx.Err() != nil {
            return result, ctx.Err()
        }

        sem <- struct{}{}
        wg.Add(1)

        go func(item BatchWorkflowInputItem) {
            defer wg.Done()
            defer func() { <-sem }()

            err := a.processAccountInterest(ctx, item)

            mu.Lock()
            processedCount++
            if err != nil {
                result.Failures = append(result.Failures, BatchFailure{
                    UserID:         item.UserID,
                    AccountGroupID: item.AccountGroupID,
                    Error:          err.Error(),
                })
            } else {
                result.SuccessCount++
            }

            // Heartbeat progress for visibility and resumability
            if processedCount%10 == 0 {
                activity.RecordHeartbeat(ctx, HeartbeatProgress{
                    ProcessedCount: processedCount,
                    SuccessCount:   result.SuccessCount,
                    FailureCount:   len(result.Failures),
                })
            }
            mu.Unlock()
        }(item)
    }

    wg.Wait()
    return result, nil
}

Key Implementation Details #

Bounded Concurrency with Semaphore #

sem := make(chan struct{}, maxConcurrency)
// ...
sem <- struct{}{}  // Acquire before starting work
defer func() { <-sem }()  // Release when done

Tuning guidance:

  • Start conservative (50-100)
  • Monitor external service error rates and latencies
  • Increase gradually based on observed capacity
  • Consider rate limits on downstream services (databases, APIs)

Heartbeating for Progress and Resumability #

activity.RecordHeartbeat(ctx, HeartbeatProgress{
    ProcessedCount: processedCount,
    SuccessCount:   result.SuccessCount,
    FailureCount:   len(result.Failures),
})

Heartbeats serve two purposes:

  1. Progress visibility: The heartbeat data appears in Temporal UI, letting operators monitor batch progress

  2. Failure recovery: If the activity crashes (worker dies, OOM, etc.), Temporal can restart it. The activity can retrieve the last heartbeat:

if activity.HasHeartbeatDetails(ctx) {
    var lastProgress HeartbeatProgress
    activity.GetHeartbeatDetails(ctx, &lastProgress)
    // Resume from lastProgress.ProcessedCount
}

One implication worth being explicit about: if the activity crashes, the entire activity retries. Items processed before the crash may be reprocessed unless your downstream operations are idempotent or you implement resume-from-heartbeat logic using lastProgress.ProcessedCount to skip already-completed items.

var mu sync.Mutex
// ...
mu.Lock()
processedCount++
result.Failures = append(result.Failures, ...)
mu.Unlock()

For higher throughput, consider per-goroutine result channels merged at completion rather than a shared mutex.

If per-item visibility and automatic retry granularity are important, use Temporal’s native activity parallelism. This leverages MaxConcurrentActivityExecutionSize (default: 1000) to process items concurrently.

func InterestAccrualBatchParallelActivities(ctx workflow.Context, state BatchWorkflowState) error {
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 5 * time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts: 3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // Launch all activities without waiting
    futures := make([]workflow.Future, len(state.Items))
    for i, item := range state.Items {
        futures[i] = workflow.ExecuteActivity(ctx, "ProcessAccountInterest", item)
    }

    // Collect results
    var failures []BatchFailure
    for i, future := range futures {
        if err := future.Get(ctx, nil); err != nil {
            workflow.GetLogger(ctx).Error("Account interest accrual failed",
                "userID", state.Items[i].UserID,
                "error", err,
            )
            failures = append(failures, BatchFailure{
                UserID:         state.Items[i].UserID,
                AccountGroupID: state.Items[i].AccountGroupID,
                Error:          err.Error(),
            })
        }
    }

    workflow.GetLogger(ctx).Info("Batch completed",
        "total", len(state.Items),
        "failures", len(failures),
    )

    return nil
}

Key Differences from Solution 1 #

  1. Temporal manages concurrency - Activities are scheduled by the server and executed by workers up to MaxConcurrentActivityExecutionSize
  2. Per-item retry - Failed activities automatically retry per Temporal’s retry policy
  3. Activity-level visibility - Each item appears as a separate activity in Temporal UI
  4. More history events - ~3,000 events for 1,000 items (vs ~3 for Solution 1)

When to Choose This Pattern #

  • You need to inspect individual item progress in Temporal UI
  • Per-item automatic retry with backoff is valuable
  • Storage cost is less critical than operational visibility
  • Items may have variable processing times (Temporal load-balances automatically)

Worker Configuration #

The worker’s MaxConcurrentActivityExecutionSize controls how many activities execute concurrently on a single worker. The default is 1000.

w := worker.New(c, "task-queue", worker.Options{
    MaxConcurrentActivityExecutionSize: 1000,  // Default
    MaxConcurrentLocalActivityExecutionSize: 1000,
    WorkerActivitiesPerSecond: 100000,  // Rate limit
})

For Solution 1 (Single Activity with Goroutines), these settings don’t affect internal parallelism. You control that with your semaphore.

Results Comparison #

Metric Original (Sequential) Solution 1 (Single Activity) Solution 2 (Parallel Activities)
Execution time ~15 min ~2-3 min ~2-3 min
History events ~20,000-41,000 ~20 ~3,000
Storage cost Very High Minimal Moderate
Temporal UI visibility Per-account workflow Batch only Per-account activity
Retry granularity Per-account workflow Whole batch Per-account activity
Temporal server load 1,000 workflows 1 workflow, 1 activity 1 workflow, 1,000 activities

The execution time improvement is obvious for both solutions. The history event reduction is the real differentiator: Solution 1 gives 1,000× reduction compared to the original, while Solution 2 still provides a 7-14× reduction. At scale, this translates to significantly lower storage costs and better query performance, with Solution 1 being the most cost-effective.

Chunked Activities for Very Large Batches #

For batches where activity timeout is a real constraint:

func InterestAccrualBatchChunked(ctx workflow.Context, state BatchWorkflowState) error {
    const chunkSize = 500

    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Minute,
        HeartbeatTimeout:    30 * time.Second,
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    endIndex := min(state.CurrentIndex+chunkSize, len(state.Items))
    chunk := state.Items[state.CurrentIndex:endIndex]

    var result BatchProcessingResult
    err := workflow.ExecuteActivity(ctx, "ProcessInterestAccrualBatch", chunk).Get(ctx, &result)
    if err != nil {
        return err
    }

    state.CurrentIndex = endIndex

    if state.CurrentIndex < len(state.Items) {
        return workflow.NewContinueAsNewError(ctx, InterestAccrualBatchChunked, state)
    }

    return nil
}

When to use:

  • Batches of 10,000+ items
  • Need to checkpoint progress in workflow history
  • Activity timeout constraints

Decision Framework #

Use this flowchart to choose the right pattern:

Does each item need independent workflow features?
(signaling, querying, long waits, human intervention)
├─ YES → Child Workflow per Item
│         Consider parallel launching if items are independent
└─ NO → Is storage cost the primary concern?
        ├─ YES → Solution 1: Single Long-Running Activity
        │         Use heartbeating and bounded concurrency
        │         (~3 history events total)
        └─ NO → Is per-item visibility/retry important?
                ├─ YES → Solution 2: Parallel Activities
                │         Temporal manages concurrency
                │         (~3,000 history events for 1,000 items)
                └─ NO → Either solution works
                         Choose based on team familiarity
Requirement Recommended Solution
Minimize storage cost Solution 1 (Single Activity)
Per-item visibility in Temporal UI Solution 2 (Parallel Activities)
Per-item automatic retry Solution 2 (Parallel Activities)
Fine-grained concurrency control Solution 1 (Single Activity)
Simplest implementation Solution 2 (Parallel Activities)
Items need signals/queries Child Workflows

When This Actually Matters #

If you’re running 50 accounts through a batch workflow once a day, none of this matters. The sequential approach works fine.

This matters when:

  • You’re processing thousands of items regularly
  • Your Temporal Cloud bill is climbing
  • Sequential execution blocks other workflows from running
  • You need faster feedback loops for testing

When items are independent and you’re doing straightforward batch processing, activities with bounded parallelism win. When they’re not, child workflows still make sense. Optimization for its own sake is just procrastination with better charts.

Further Reading #