ADS-B Pipeline Implementation Guide
Table of Contents
Overview
Implementation guide for building a clean ADS-B data pipeline. Each phase is self-contained and testable.
Phase Overview
Phase 1: Core Parsing (Foundation)
│
▼
Phase 2: Validation Layer
│
▼
Phase 3: Multi-Receiver Deduplication
│
▼
Phase 4: Clean Output
│
▼
Phase 5: Identity Resolution (Future)
│
▼
Phase 6: Enrichment (Future)
Phase 1: Core Parsing
Goal: Parse SBS messages from dump1090 reliably.
Tasks
| Task | Input | Output | Handle |
|---|---|---|---|
| SBS Parser | Raw SBS line | RawSBSMessage | Malformed lines, encoding |
| Multi-Receiver Ingestor | (host, port) list | Tagged messages | Connection drops |
| File Reader | CSV path | Message iterator | Extract receiver from filename |
Sequence Diagram
┌────────┐ ┌────────┐ ┌────────┐
│dump1090│ │Ingestor│ │Consumer│
└───┬────┘ └───┬────┘ └───┬────┘
│ │ │
│ "MSG,3,..." │ │
│─────────────>│ │
│ │ │
│ │ parse() │
│ │─────┐ │
│ │<────┘ │
│ │ │
│ │ RawSBSMessage│
│ │─────────────>│
│ │ (receiver_id │
│ │ = serial) │
Validation Criteria
[ ]Parse MSG types 1-8 correctly[ ]Handle empty fields (callsign may be blank)[ ]Tag each message with device serial[ ]No exceptions on malformed input (return None)
Phase 2: Validation Layer
Goal: Detect bad data without discarding it.
Tasks
| Task | Input | Output | Handle |
|---|---|---|---|
| Callsign Validator | callsign string | (valid, classification, confidence) | "NULL", empty, formats |
| Position Validator | lat, lon, alt, speed | PositionValidation | Out of range, teleportation |
| Spoof Detector | position history | (isspoof, indicators, confidence) | Position jumps, impossible speeds |
| Mode S Validator | hexident | (valid, country, confidence) | Format, country ranges |
Sequence Diagram
┌────────────┐ ┌────────────┐ ┌────────────┐
│RawSBSMessage│ │CallsignVal │ │PositionVal │
└─────┬──────┘ └─────┬──────┘ └─────┬──────┘
│ │ │
│ callsign │ │
│──────────────>│ │
│ │ │
│ (valid, │ │
│ "airline", │ │
│ 0.9) │ │
│<──────────────│ │
│ │ │
│ lat,lon,alt,speed │
│──────────────────────────────>│
│ │ │
│ Validation(valid, 0.8, []) │
│<──────────────────────────────│
Validation Criteria
[ ]"NULL" callsign marked as weirdbutvalid[ ]Position > 600 knots implied speed flagged[ ]Altitude > 60,000 ft flagged (not rejected)[ ]Teleportation (>50nm in <60sec) detected
Phase 3: Multi-Receiver Deduplication
Goal: Merge sightings from multiple receivers for same aircraft.
Tasks
| Task | Input | Output | Handle |
|---|---|---|---|
| Time Window Grouper | message stream | grouped messages | Out of order, clock drift |
| Aircraft Grouper | message groups | groups by (hex, time) | Same aircraft from N receivers |
| Field Merger | message list | single MergedSighting | Conflicting values |
| Conflict Recorder | merged with conflicts | conflict record | Different callsigns |
Merge Priority Rules
┌─────────────────────────────────────────────────────────────┐ │ FIELD MERGE PRIORITY │ ├─────────────────────────────────────────────────────────────┤ │ Field Priority │ │ ───────────── ───────────────────────────────────────── │ │ hex_ident Must match (grouping key) │ │ timestamp Use most recent │ │ callsign Prefer non-empty, most recent │ │ position Prefer highest confidence source │ │ altitude Prefer geometric over barometric │ │ receiver_ids Append all (for coverage tracking) │ │ source_count Count of merged messages │ └─────────────────────────────────────────────────────────────┘
Validation Criteria
[ ]Messages within 5 seconds merged[ ]Messages > 10 seconds apart kept separate[ ]receiverids contains all receivers that saw it[ ]Conflicting callsigns recorded, not lost
Phase 4: Clean Output
Goal: Produce reliable, queryable output.
Tasks
| Task | Input | Output | Handle |
|---|---|---|---|
| CSV Writer | MergedSighting stream | CSV file | Escaping, atomic writes |
| SQLite Writer | MergedSighting stream | SQLite DB | WAL mode, concurrent reads |
| Quarantine Manager | failed validations | quarantine table | Preserve original, reason |
| Stats Collector | pipeline events | metrics | Real-time, periodic summary |
Output Schema
CREATE TABLE sightings ( id INTEGER PRIMARY KEY, timestamp TEXT NOT NULL, mode_s_hex TEXT NOT NULL, callsign TEXT, latitude REAL, longitude REAL, altitude_feet INTEGER, confidence REAL, is_potential_spoof INTEGER, receiver_ids TEXT, source_count INTEGER );
Phase 5: Identity Resolution (Future)
Goal: Link observed data to known aircraft and flights.
| Task | Input | Output | Sources |
|---|---|---|---|
| Aircraft Resolver | modeshex | Aircraft record | OpenSky, local cache |
| Flight Resolver | callsign, timestamp | Flight record | Schedules, patterns |
| Operator Resolver | callsign prefix | Operator record | ICAO database |
Phase 6: Enrichment (Future)
Goal: Add metadata from external sources.
| Task | Input | Output | Handle |
|---|---|---|---|
| OpenSky Enricher | modeshex | Registration, type | Rate limiting, caching |
| Route Enricher | origin, destination | Common routes | Unknown routes |
| Historical Context | sighting | Previous sightings | First-time sightings |
Success Metrics
| Phase | Metric | Target |
|---|---|---|
| 1 | Parse rate | 100% of valid SBS lines |
| 2 | Validation accuracy | <0.1% false positives |
| 3 | Dedup effectiveness | >90% multi-receiver merge |
| 4 | Output quality | 0 corrupted records |
| 5 | Resolution rate | >80% aircraft identified |
| 6 | Enrichment coverage | >70% with metadata |
Test Data Generation
# Capture 60 seconds from each receiver for receiver in receiver_a receiver_b; do timeout 60 nc ${receiver} 30003 > test_${receiver}.csv done # Verify SBS format head -5 test_receiver_a.csv
Key Insight: Device Serial as ID
Use USB device serial numbers, not hostnames:
# This allows multiple receivers per host RECEIVERS = { "serial_001": ReceiverConfig( device_serial="serial_001", host="host_a", port=30003, name="receiver-1090", antenna="1090MHz dipole", ), "serial_002": ReceiverConfig( device_serial="serial_002", host="host_a", # Same host, different device port=30004, name="receiver-rabbit", antenna="rabbit ears", ), }
Get device serial with:
# Linux rtl_test -t 2>&1 | grep "SN:" # FreeBSD usbconfig -d ugenX.Y dump_device_desc | grep iSerialNumber
