TypeDrop
2026-05-15 Challenge
2026-05-15
Hard
Typed Streaming ETL Pipeline with Middleware
You're building the data-ingestion layer for a real-time analytics platform. Raw records stream in as `unknown` from heterogeneous sources; your typed ETL pipeline must validate them, pass them through a composable middleware chain (transform, enrich, filter), execute stages with a concurrency limit, and emit a strongly-typed pipeline report — with zero `any`.
Goals
- Implement `validateRecord` to safely narrow `unknown` → `ValidatedRecord | FailedRecord`, extracting branded types and performing best-effort field recovery on failure.
- Implement the three middleware factories (`createTransformMiddleware`, `createFilterMiddleware`, `createEnrichMiddleware`) that correctly upgrade `ProcessableRecord` stage and track payload mutations.
- Implement `composeMiddleware` using the classic koa-style onion pattern so middleware executes in declaration order with a proper async `next` chain.
- Implement `runPipeline` including an internal generic `runConcurrent<T>` helper that enforces the concurrency limit, preserves input order, and returns a fully-typed `PipelineReport`.
challenge.ts
// Key types and main entry point
type Brand<T, B extends string> = T & { readonly __brand: B };
export type RecordId = Brand<string, "RecordId">;
export type SourceName = Brand<string, "SourceName">;
export type ProcessableRecord = ValidatedRecord | TransformedRecord;
export type MiddlewareNext = (
record: ProcessableRecord
) => Promise<ProcessableRecord | DroppedRecord>;
export type Middleware = (
record: ProcessableRecord,
next: MiddlewareNext
) => Promise<ProcessableRecord | DroppedRecord>;
export interface PipelineConfig {
readonly concurrency: number;
readonly middleware: Middleware;
}
// Main entry point — implement this (+ internal runConcurrent<T>)
export async function runPipeline(
rawRecords: ReadonlyArray<unknown>,
config: PipelineConfig
): Promise<PipelineReport> { /* TODO */ }
Hints (click to reveal)
Hints
- For `composeMiddleware`, build the chain from right to left: each middleware's `next` is the invocation of the next middleware in the array with its own `next` — the terminal `next` is the identity (returns the record unchanged).
- To upgrade a `ValidatedRecord` to `TransformedRecord` without type assertions, construct a new object literal with `stage: 'transformed' as const` and spread the existing fields — TypeScript will infer the narrowed type from the literal.
- In `runConcurrent<T>`, use a shared index counter and a pool of `concurrency` worker Promises that each pull the next task index atomically — this avoids complex queue management while keeping results in order.
Useful resources
Or clone locally
git clone -b challenge/2026-05-15 https://github.com/niltonheck/typedrop.git