Skip to content

Jet1090

tangram_jet1090

log module-attribute

log = getLogger(__name__)

router module-attribute

router = APIRouter(
    prefix="/jet1090",
    tags=["jet1090"],
    responses={404: {"description": "Not found"}},
)

plugin module-attribute

plugin = Plugin(
    frontend_path="dist-frontend",
    routers=[router],
    config_class=PlanesConfig,
    frontend_config_class=PlanesFrontendConfig,
    into_frontend_config_function=into_frontend,
)

TrailColorOptions dataclass

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
217
218
219
220
221
222
223
224
@dataclass(frozen=True)
class TrailColorOptions:
    by_attribute: Annotated[
        Literal["altitude", "groundspeed", "vertical_rate", "track"],
        FrontendMutable(),
    ] = "altitude"
    min: Annotated[float, FrontendMutable()] = 0.0
    max: Annotated[float, FrontendMutable()] = 45000.0

by_attribute class-attribute instance-attribute

by_attribute: Annotated[
    Literal[
        "altitude", "groundspeed", "vertical_rate", "track"
    ],
    FrontendMutable(),
] = "altitude"

min class-attribute instance-attribute

max class-attribute instance-attribute

max: Annotated[float, FrontendMutable()] = 45000.0

__init__

__init__(
    by_attribute: Annotated[
        Literal[
            "altitude",
            "groundspeed",
            "vertical_rate",
            "track",
        ],
        FrontendMutable(),
    ] = "altitude",
    min: Annotated[float, FrontendMutable()] = 0.0,
    max: Annotated[float, FrontendMutable()] = 45000.0,
) -> None

PlanesConfig dataclass

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
@dataclass(frozen=True)
class PlanesConfig:
    jet1090_channel: str = "jet1090"
    history_table_name: str = "jet1090"
    history_control_channel: str = "history:control"
    search_channel: str = "jet1090:search"
    state_vector_expire: int = 20
    stream_interval_secs: float = 1.0
    aircraft_db_url: str = (
        "https://jetvision.de/resources/sqb_databases/basestation.zip"
    )
    jet1090_url: str = "http://localhost:8080"
    path_cache: Path = Path(platformdirs.user_cache_dir("tangram_jet1090"))
    log_level: str = "INFO"
    show_route_lines: bool = True
    history_buffer_size: int = 100_000
    history_flush_interval_secs: int = 5
    history_optimize_interval_secs: int = 120
    history_optimize_target_file_size: int = 134217728
    history_vacuum_interval_secs: int = 120
    history_vacuum_retention_period_secs: int | None = 120
    history_checkpoint_interval: int = 10
    topbar_order: int = 50
    sidebar_order: int = 50
    trail_type: Literal["line", "curtain"] = "line"
    trail_color: str | TrailColorOptions = "#600000"
    trail_alpha: float = 0.6
    enable_3d: bool = False

jet1090_channel class-attribute instance-attribute

jet1090_channel: str = 'jet1090'

history_table_name class-attribute instance-attribute

history_table_name: str = 'jet1090'

history_control_channel class-attribute instance-attribute

history_control_channel: str = 'history:control'

search_channel class-attribute instance-attribute

search_channel: str = 'jet1090:search'

state_vector_expire class-attribute instance-attribute

state_vector_expire: int = 20

stream_interval_secs class-attribute instance-attribute

stream_interval_secs: float = 1.0

aircraft_db_url class-attribute instance-attribute

aircraft_db_url: str = "https://jetvision.de/resources/sqb_databases/basestation.zip"

jet1090_url class-attribute instance-attribute

jet1090_url: str = 'http://localhost:8080'

path_cache class-attribute instance-attribute

path_cache: Path = Path(user_cache_dir('tangram_jet1090'))

log_level class-attribute instance-attribute

log_level: str = 'INFO'

show_route_lines class-attribute instance-attribute

show_route_lines: bool = True

history_buffer_size class-attribute instance-attribute

history_buffer_size: int = 100000

history_flush_interval_secs class-attribute instance-attribute

history_flush_interval_secs: int = 5

history_optimize_interval_secs class-attribute instance-attribute

history_optimize_interval_secs: int = 120

history_optimize_target_file_size class-attribute instance-attribute

history_optimize_target_file_size: int = 134217728

history_vacuum_interval_secs class-attribute instance-attribute

history_vacuum_interval_secs: int = 120

history_vacuum_retention_period_secs class-attribute instance-attribute

history_vacuum_retention_period_secs: int | None = 120

history_checkpoint_interval class-attribute instance-attribute

history_checkpoint_interval: int = 10

topbar_order class-attribute instance-attribute

topbar_order: int = 50

sidebar_order class-attribute instance-attribute

sidebar_order: int = 50

trail_type class-attribute instance-attribute

trail_type: Literal['line', 'curtain'] = 'line'

trail_color class-attribute instance-attribute

trail_color: str | TrailColorOptions = '#600000'

trail_alpha class-attribute instance-attribute

trail_alpha: float = 0.6

enable_3d class-attribute instance-attribute

enable_3d: bool = False

__init__

__init__(
    jet1090_channel: str = "jet1090",
    history_table_name: str = "jet1090",
    history_control_channel: str = "history:control",
    search_channel: str = "jet1090:search",
    state_vector_expire: int = 20,
    stream_interval_secs: float = 1.0,
    aircraft_db_url: str = "https://jetvision.de/resources/sqb_databases/basestation.zip",
    jet1090_url: str = "http://localhost:8080",
    path_cache: Path = Path(
        user_cache_dir("tangram_jet1090")
    ),
    log_level: str = "INFO",
    show_route_lines: bool = True,
    history_buffer_size: int = 100000,
    history_flush_interval_secs: int = 5,
    history_optimize_interval_secs: int = 120,
    history_optimize_target_file_size: int = 134217728,
    history_vacuum_interval_secs: int = 120,
    history_vacuum_retention_period_secs: int | None = 120,
    history_checkpoint_interval: int = 10,
    topbar_order: int = 50,
    sidebar_order: int = 50,
    trail_type: Literal["line", "curtain"] = "line",
    trail_color: str | TrailColorOptions = "#600000",
    trail_alpha: float = 0.6,
    enable_3d: bool = False,
) -> None

PlanesFrontendConfig dataclass

Bases: HasTopbarUiConfig, HasSidebarUiConfig

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
257
258
259
260
261
262
263
264
265
266
267
268
@dataclass(frozen=True)
class PlanesFrontendConfig(
    tangram_core.config.HasTopbarUiConfig, tangram_core.config.HasSidebarUiConfig
):
    search_channel: str
    show_route_lines: Annotated[bool, FrontendMutable()]
    topbar_order: Annotated[int, FrontendMutable()]
    sidebar_order: Annotated[int, FrontendMutable()]
    trail_type: Annotated[Literal["line", "curtain"], FrontendMutable()]
    trail_color: Annotated[str, FrontendMutable(kind="color")] | TrailColorOptions
    trail_alpha: Annotated[float, Ge(0), Le(1), FrontendMutable()]
    enable_3d: Annotated[bool, FrontendMutable()]

search_channel instance-attribute

search_channel: str

show_route_lines instance-attribute

show_route_lines: Annotated[bool, FrontendMutable()]

topbar_order instance-attribute

topbar_order: Annotated[int, FrontendMutable()]

sidebar_order instance-attribute

sidebar_order: Annotated[int, FrontendMutable()]

trail_type instance-attribute

trail_type: Annotated[
    Literal["line", "curtain"], FrontendMutable()
]

trail_color instance-attribute

trail_color: (
    Annotated[str, FrontendMutable(kind="color")]
    | TrailColorOptions
)

trail_alpha instance-attribute

trail_alpha: Annotated[
    float, Ge(0), Le(1), FrontendMutable()
]

enable_3d instance-attribute

__init__

__init__(
    search_channel: str,
    show_route_lines: Annotated[bool, FrontendMutable()],
    topbar_order: Annotated[int, FrontendMutable()],
    sidebar_order: Annotated[int, FrontendMutable()],
    trail_type: Annotated[
        Literal["line", "curtain"], FrontendMutable()
    ],
    trail_color: Annotated[
        str, FrontendMutable(kind="color")
    ]
    | TrailColorOptions,
    trail_alpha: Annotated[
        float, Ge(0), Le(1), FrontendMutable()
    ],
    enable_3d: Annotated[bool, FrontendMutable()],
) -> None

get_trajectory_data async

get_trajectory_data(
    icao24: str, backend_state: InjectBackendState
) -> Response

Get the full trajectory for a given ICAO24 address.

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@router.get("/data/{icao24}")
async def get_trajectory_data(
    icao24: str, backend_state: tangram_core.InjectBackendState
) -> Response:
    """Get the full trajectory for a given ICAO24 address."""
    if not _HISTORY_AVAILABLE:
        raise HTTPException(
            status_code=501,
            detail="History feature is not installed.",
        )

    redis_key = "tangram:history:table_uri:jet1090"
    table_uri_bytes = await backend_state.redis_client.get(redis_key)

    if not table_uri_bytes:
        raise HTTPException(status_code=404, detail="Table 'jet1090' not found.")
    table_uri = table_uri_bytes.decode("utf-8")

    try:
        df = (
            pl.scan_delta(table_uri)
            .filter(pl.col("icao24") == icao24)
            .with_columns(pl.col("timestamp").dt.epoch(time_unit="us") / 1000000)
            .sort("timestamp")
            .collect()
        )
        return Response(df.write_json(), media_type="application/json")
    except Exception as e:
        log.error(f"Failed to query trajectory for {icao24}: {e}")
        raise HTTPException(status_code=500, detail="Failed to query trajectory data.")

search_flights async

search_flights(
    q: str, backend_state: InjectBackendState
) -> Response
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@router.get("/search")
async def search_flights(
    q: str, backend_state: tangram_core.InjectBackendState
) -> Response:
    if not _HISTORY_AVAILABLE:
        return ORJSONResponse(content=[])

    redis_key = "tangram:history:table_uri:jet1090"
    table_uri_bytes = await backend_state.redis_client.get(redis_key)
    if not table_uri_bytes:
        return ORJSONResponse(content=[])
    table_uri = table_uri_bytes.decode("utf-8")

    try:
        df = pl.scan_delta(table_uri)
        q_lower = q.lower()

        candidates = (
            df.filter(
                pl.col("callsign").str.to_lowercase().str.contains(q_lower)
                | pl.col("icao24").str.contains(q_lower)
            )
            .select("icao24")
            .unique()
            .head(20)
            .collect()
        )
        candidate_icaos = candidates["icao24"].to_list()

        if not candidate_icaos:
            return ORJSONResponse(content=[])

        intervals = (
            df.filter(pl.col("icao24").is_in(candidate_icaos))
            .sort(["icao24", "timestamp"])
            .with_columns(pl.col("callsign").forward_fill().over("icao24"))
            .with_columns(
                gap_minutes=(
                    pl.col("timestamp") - pl.col("timestamp").shift(1)
                ).dt.total_minutes()
            )
            .with_columns(
                new_interval=(
                    (pl.col("gap_minutes") >= 30)
                    | (pl.col("icao24") != pl.col("icao24").shift(1))
                    | (pl.col("callsign") != pl.col("callsign").shift(1))
                ).fill_null(True)
            )
            .with_columns(interval_id=pl.col("new_interval").cast(pl.Int64).cum_sum())
            .group_by(["icao24", "callsign", "interval_id"])
            .agg(
                start_ts=pl.col("timestamp").min(),
                end_ts=pl.col("timestamp").max(),
                n_rows=pl.len(),
                lat=pl.col("latitude").mean(),
                lon=pl.col("longitude").mean(),
            )
            .filter(
                (pl.col("n_rows") >= 5)
                & (
                    pl.col("callsign").str.to_lowercase().str.contains(q_lower)
                    | pl.col("icao24").str.contains(q_lower)
                )
            )
            .with_columns(
                duration=((pl.col("end_ts") - pl.col("start_ts")).dt.total_seconds()),
            )
            .sort(["start_ts"], descending=True)
            .collect()
        )
        return Response(intervals.write_json(), media_type="application/json")
    except Exception as e:
        log.error(f"Search failed: {e}")
        return ORJSONResponse(content=[])

get_history_slice async

get_history_slice(
    icao24: str,
    start_timestamp: int,
    end_timestamp: int,
    backend_state: InjectBackendState,
) -> Response
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@router.get("/history/{icao24}/{start_timestamp}/{end_timestamp}")
async def get_history_slice(
    icao24: str,
    start_timestamp: int,
    end_timestamp: int,
    backend_state: tangram_core.InjectBackendState,
) -> Response:
    if not _HISTORY_AVAILABLE:
        return ORJSONResponse(content=[])

    redis_key = "tangram:history:table_uri:jet1090"
    table_uri_bytes = await backend_state.redis_client.get(redis_key)
    if not table_uri_bytes:
        return ORJSONResponse(content=[])
    table_uri = table_uri_bytes.decode("utf-8")

    start_dt = pl.lit(start_timestamp).cast(pl.Datetime("ms"))
    end_dt = pl.lit(end_timestamp).cast(pl.Datetime("ms"))

    try:
        df = (
            pl.scan_delta(table_uri)
            .filter(
                (pl.col("icao24") == icao24)
                & (pl.col("timestamp") >= start_dt)
                & (pl.col("timestamp") <= end_dt)
            )
            .sort("timestamp")
            .collect()
        )
        return Response(df.write_json(), media_type="application/json")
    except Exception as e:
        log.error(f"History slice failed: {e}")
        return ORJSONResponse(content=[])

get_route_data async

get_route_data(
    callsign: str, backend_state: InjectBackendState
) -> ORJSONResponse
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@router.get("/route/{callsign}")
async def get_route_data(
    callsign: str, backend_state: tangram_core.InjectBackendState
) -> ORJSONResponse:
    url = "https://flightroutes.opensky-network.org/api/routeset"
    payload = {"planes": [{"callsign": callsign}]}
    client = backend_state.http_client
    try:
        response = await client.post(url, json=payload, timeout=5.0)
        response.raise_for_status()
        data = response.json()
        return ORJSONResponse(content=data)
    except Exception as e:
        log.error(f"Failed to fetch route data for {callsign}: {e}")
        return ORJSONResponse(content=[], status_code=500)

get_sensors_data async

get_sensors_data(
    backend_state: InjectBackendState,
) -> ORJSONResponse
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@router.get("/sensors")
async def get_sensors_data(
    backend_state: tangram_core.InjectBackendState,
) -> ORJSONResponse:
    plugin_config = backend_state.config.plugins.get("tangram_jet1090", {})
    config = TypeAdapter(PlanesConfig).validate_python(plugin_config)
    url = f"{config.jet1090_url}/sensors".replace("localhost", "127.0.0.1")

    try:
        response = await backend_state.http_client.get(url, timeout=10.0)
        response.raise_for_status()
        return ORJSONResponse(content=response.json())
    except Exception as e:
        log.error(f"Failed to fetch sensors data from {url}: {e}")
        raise HTTPException(status_code=502, detail=str(e))

into_frontend

into_frontend(config: PlanesConfig) -> PlanesFrontendConfig
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
271
272
273
274
275
276
277
278
279
280
281
def into_frontend(config: PlanesConfig) -> PlanesFrontendConfig:
    return PlanesFrontendConfig(
        search_channel=config.search_channel,
        show_route_lines=config.show_route_lines,
        topbar_order=config.topbar_order,
        sidebar_order=config.sidebar_order,
        trail_type=config.trail_type,
        trail_color=config.trail_color,
        trail_alpha=config.trail_alpha,
        enable_3d=config.enable_3d,
    )

get_aircraft_db async

get_aircraft_db(
    client: AsyncClient, url: str, path_cache: Path
) -> dict[str, Aircraft]
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
async def get_aircraft_db(
    client: httpx.AsyncClient, url: str, path_cache: Path
) -> dict[str, _planes.Aircraft]:
    from . import _planes

    path_cache.mkdir(parents=True, exist_ok=True)
    zip_path = path_cache / "basestation.zip"
    db_path = path_cache / "basestation.sqb"

    if not zip_path.exists():
        log.info(f"downloading aircraft database from {url} to {zip_path}")
        async with client.stream("GET", url, follow_redirects=True) as response:
            response.raise_for_status()
            with zip_path.open("wb") as f:
                async for chunk in response.aiter_bytes():
                    f.write(chunk)

    if not db_path.exists():
        log.info(f"extracting {zip_path} to {db_path}")
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            db_filename = zip_ref.namelist()[0]
            zip_ref.extract(db_filename, path=path_cache)
            (path_cache / db_filename).rename(db_path)

    db = {}
    try:
        con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
        cur = con.cursor()
        res = cur.execute("SELECT ModeS, Registration, ICAOTypeCode FROM Aircraft")
        for modes, registration, icaotypecode in res.fetchall():
            if modes:
                db[modes.lower()] = _planes.Aircraft(
                    registration=registration,
                    typecode=icaotypecode,
                )
        con.close()
    except sqlite3.Error as e:
        log.error(f"error reading aircraft database {db_path}: {e}")
        db_path.unlink(missing_ok=True)

    return db

run_planes async

run_planes(backend_state: BackendState) -> None
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
@plugin.register_service()
async def run_planes(backend_state: tangram_core.BackendState) -> None:
    from . import _planes

    plugin_config = backend_state.config.plugins.get("tangram_jet1090", {})
    config_planes = TypeAdapter(PlanesConfig).validate_python(plugin_config)
    if not config_planes.enable_3d and config_planes.trail_type == "curtain":
        log.warning(
            "expected 'enable_3d' to be true when using 'curtain' trail type"
            ", switching to 'line'"
        )
        config_planes = replace(config_planes, trail_type="line")

    default_log_level = plugin_config.get(
        "log_level", backend_state.config.core.log_level
    )

    _planes.init_tracing_stderr(default_log_level)

    aircraft_db = await get_aircraft_db(
        backend_state.http_client,
        config_planes.aircraft_db_url,
        config_planes.path_cache,
    )

    rust_config = _planes.PlanesConfig(
        redis_url=backend_state.config.core.redis_url,
        jet1090_channel=config_planes.jet1090_channel,
        history_table_name=config_planes.history_table_name,
        state_vector_expire=config_planes.state_vector_expire,
        stream_interval_secs=config_planes.stream_interval_secs,
        aircraft_db=aircraft_db,
        history_buffer_size=config_planes.history_buffer_size,
        history_flush_interval_secs=config_planes.history_flush_interval_secs,
        history_control_channel=config_planes.history_control_channel,
        history_optimize_interval_secs=config_planes.history_optimize_interval_secs,
        history_optimize_target_file_size=config_planes.history_optimize_target_file_size,
        history_vacuum_interval_secs=config_planes.history_vacuum_interval_secs,
        history_vacuum_retention_period_secs=config_planes.history_vacuum_retention_period_secs,
        history_checkpoint_interval=config_planes.history_checkpoint_interval,
        search_channel=config_planes.search_channel,
    )
    await _planes.run_planes(rust_config)

tangram_jet1090._planes

Aircraft

typecode property writable

typecode: Optional[str]

registration property writable

registration: Optional[str]

__new__

__new__(
    typecode: Optional[str], registration: Optional[str]
) -> Aircraft

PlanesConfig

redis_url property writable

redis_url: str

jet1090_channel property writable

jet1090_channel: str

history_table_name property writable

history_table_name: str

history_control_channel property writable

history_control_channel: str

state_vector_expire property writable

state_vector_expire: int

stream_interval_secs property writable

stream_interval_secs: float

aircraft_db property writable

aircraft_db: dict[str, Aircraft]

history_buffer_size property writable

history_buffer_size: int

history_flush_interval_secs property writable

history_flush_interval_secs: int

history_optimize_interval_secs property writable

history_optimize_interval_secs: int

history_optimize_target_file_size property writable

history_optimize_target_file_size: int

history_vacuum_interval_secs property writable

history_vacuum_interval_secs: int

history_vacuum_retention_period_secs property writable

history_vacuum_retention_period_secs: Optional[int]

history_checkpoint_interval property writable

history_checkpoint_interval: int

search_channel property writable

search_channel: str

__new__

__new__(
    redis_url: str,
    jet1090_channel: str,
    history_table_name: str,
    history_control_channel: str,
    state_vector_expire: int,
    stream_interval_secs: float,
    aircraft_db: Mapping[str, Aircraft],
    history_buffer_size: int,
    history_flush_interval_secs: int,
    history_optimize_interval_secs: int,
    history_optimize_target_file_size: int,
    history_vacuum_interval_secs: int,
    history_vacuum_retention_period_secs: Optional[int],
    history_checkpoint_interval: int,
    search_channel: str,
) -> PlanesConfig

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

run_planes

run_planes(config: PlanesConfig) -> Any