TypeDrop

2026-05-15 Challenge

2026-05-15 Hard

Typed Streaming ETL Pipeline with Middleware

You're building the data-ingestion layer for a real-time analytics platform. Raw records stream in as `unknown` from heterogeneous sources; your typed ETL pipeline must validate them, pass them through a composable middleware chain (transform, enrich, filter), execute stages with a concurrency limit, and emit a strongly-typed pipeline report — with zero `any`.

Goals

  • Implement `validateRecord` to safely narrow `unknown` → `ValidatedRecord | FailedRecord`, extracting branded types and performing best-effort field recovery on failure.
  • Implement the three middleware factories (`createTransformMiddleware`, `createFilterMiddleware`, `createEnrichMiddleware`) that correctly upgrade `ProcessableRecord` stage and track payload mutations.
  • Implement `composeMiddleware` using the classic koa-style onion pattern so middleware executes in declaration order with a proper async `next` chain.
  • Implement `runPipeline` including an internal generic `runConcurrent<T>` helper that enforces the concurrency limit, preserves input order, and returns a fully-typed `PipelineReport`.
challenge.ts
// Key types and main entry point

type Brand<T, B extends string> = T & { readonly __brand: B };
export type RecordId   = Brand<string, "RecordId">;
export type SourceName = Brand<string, "SourceName">;

export type ProcessableRecord = ValidatedRecord | TransformedRecord;

export type MiddlewareNext = (
  record: ProcessableRecord
) => Promise<ProcessableRecord | DroppedRecord>;

export type Middleware = (
  record: ProcessableRecord,
  next:   MiddlewareNext
) => Promise<ProcessableRecord | DroppedRecord>;

export interface PipelineConfig {
  readonly concurrency: number;
  readonly middleware:  Middleware;
}

// Main entry point — implement this (+ internal runConcurrent<T>)
export async function runPipeline(
  rawRecords: ReadonlyArray<unknown>,
  config:     PipelineConfig
): Promise<PipelineReport> { /* TODO */ }
Hints (click to reveal)

Hints

  • For `composeMiddleware`, build the chain from right to left: each middleware's `next` is the invocation of the next middleware in the array with its own `next` — the terminal `next` is the identity (returns the record unchanged).
  • To upgrade a `ValidatedRecord` to `TransformedRecord` without type assertions, construct a new object literal with `stage: 'transformed' as const` and spread the existing fields — TypeScript will infer the narrowed type from the literal.
  • In `runConcurrent<T>`, use a shared index counter and a pool of `concurrency` worker Promises that each pull the next task index atomically — this avoids complex queue management while keeping results in order.

Or clone locally

git clone -b challenge/2026-05-15 https://github.com/niltonheck/typedrop.git