Skip to content

Jet1090

tangram_jet1090

log module-attribute

log = getLogger(__name__)

router module-attribute

router = APIRouter(
    prefix="/jet1090",
    tags=["jet1090"],
    responses={404: {"description": "Not found"}},
)

plugin module-attribute

plugin = Plugin(
    frontend_path="dist-frontend",
    routers=[router],
    config_class=PlanesConfig,
)

TrailColorOptions dataclass

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
216
217
218
219
220
@dataclass(frozen=True)
class TrailColorOptions:
    by_attribute: Literal["altitude", "groundspeed", "vertical_rate", "track"]
    min: float | None = None
    max: float | None = None

by_attribute instance-attribute

by_attribute: Literal[
    "altitude", "groundspeed", "vertical_rate", "track"
]

min class-attribute instance-attribute

min: float | None = None

max class-attribute instance-attribute

max: float | None = None

__init__

__init__(
    by_attribute: Literal[
        "altitude", "groundspeed", "vertical_rate", "track"
    ],
    min: float | None = None,
    max: float | None = None,
) -> None

PlanesConfig dataclass

Bases: HasTopbarUiConfig, HasSidebarUiConfig

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
@dataclass(frozen=True)
class PlanesConfig(
    tangram_core.config.HasTopbarUiConfig, tangram_core.config.HasSidebarUiConfig
):
    jet1090_channel: str = "jet1090"
    history_table_name: str = "jet1090"
    history_control_channel: str = "history:control"
    search_channel: Annotated[str, ExposeField()] = "jet1090:search"
    state_vector_expire: int = 20
    stream_interval_secs: float = 1.0
    aircraft_db_url: str = (
        "https://jetvision.de/resources/sqb_databases/basestation.zip"
    )
    jet1090_url: str = "http://localhost:8080"
    path_cache: Path = Path(platformdirs.user_cache_dir("tangram_jet1090"))
    log_level: str = "INFO"
    show_route_lines: Annotated[bool, ExposeField()] = True
    history_buffer_size: int = 100_000
    history_flush_interval_secs: int = 5
    history_optimize_interval_secs: int = 120
    history_optimize_target_file_size: int = 134217728
    history_vacuum_interval_secs: int = 120
    history_vacuum_retention_period_secs: int | None = 120
    topbar_order: Annotated[int, ExposeField()] = 50
    sidebar_order: Annotated[int, ExposeField()] = 50
    trail_type: Annotated[Literal["line", "curtain"], ExposeField()] = "line"
    trail_color: Annotated[str | TrailColorOptions, ExposeField()] = "#600000"
    trail_alpha: Annotated[float, ExposeField()] = 0.6

jet1090_channel class-attribute instance-attribute

jet1090_channel: str = 'jet1090'

history_table_name class-attribute instance-attribute

history_table_name: str = 'jet1090'

history_control_channel class-attribute instance-attribute

history_control_channel: str = 'history:control'

search_channel class-attribute instance-attribute

search_channel: Annotated[str, ExposeField()] = (
    "jet1090:search"
)

state_vector_expire class-attribute instance-attribute

state_vector_expire: int = 20

stream_interval_secs class-attribute instance-attribute

stream_interval_secs: float = 1.0

aircraft_db_url class-attribute instance-attribute

aircraft_db_url: str = "https://jetvision.de/resources/sqb_databases/basestation.zip"

jet1090_url class-attribute instance-attribute

jet1090_url: str = 'http://localhost:8080'

path_cache class-attribute instance-attribute

path_cache: Path = Path(user_cache_dir('tangram_jet1090'))

log_level class-attribute instance-attribute

log_level: str = 'INFO'

show_route_lines class-attribute instance-attribute

show_route_lines: Annotated[bool, ExposeField()] = True

history_buffer_size class-attribute instance-attribute

history_buffer_size: int = 100000

history_flush_interval_secs class-attribute instance-attribute

history_flush_interval_secs: int = 5

history_optimize_interval_secs class-attribute instance-attribute

history_optimize_interval_secs: int = 120

history_optimize_target_file_size class-attribute instance-attribute

history_optimize_target_file_size: int = 134217728

history_vacuum_interval_secs class-attribute instance-attribute

history_vacuum_interval_secs: int = 120

history_vacuum_retention_period_secs class-attribute instance-attribute

history_vacuum_retention_period_secs: int | None = 120

topbar_order class-attribute instance-attribute

topbar_order: Annotated[int, ExposeField()] = 50

sidebar_order class-attribute instance-attribute

sidebar_order: Annotated[int, ExposeField()] = 50

trail_type class-attribute instance-attribute

trail_type: Annotated[
    Literal["line", "curtain"], ExposeField()
] = "line"

trail_color class-attribute instance-attribute

trail_color: Annotated[
    str | TrailColorOptions, ExposeField()
] = "#600000"

trail_alpha class-attribute instance-attribute

trail_alpha: Annotated[float, ExposeField()] = 0.6

__init__

__init__(
    jet1090_channel: str = "jet1090",
    history_table_name: str = "jet1090",
    history_control_channel: str = "history:control",
    search_channel: Annotated[
        str, ExposeField()
    ] = "jet1090:search",
    state_vector_expire: int = 20,
    stream_interval_secs: float = 1.0,
    aircraft_db_url: str = "https://jetvision.de/resources/sqb_databases/basestation.zip",
    jet1090_url: str = "http://localhost:8080",
    path_cache: Path = Path(
        user_cache_dir("tangram_jet1090")
    ),
    log_level: str = "INFO",
    show_route_lines: Annotated[bool, ExposeField()] = True,
    history_buffer_size: int = 100000,
    history_flush_interval_secs: int = 5,
    history_optimize_interval_secs: int = 120,
    history_optimize_target_file_size: int = 134217728,
    history_vacuum_interval_secs: int = 120,
    history_vacuum_retention_period_secs: int | None = 120,
    topbar_order: Annotated[int, ExposeField()] = 50,
    sidebar_order: Annotated[int, ExposeField()] = 50,
    trail_type: Annotated[
        Literal["line", "curtain"], ExposeField()
    ] = "line",
    trail_color: Annotated[
        str | TrailColorOptions, ExposeField()
    ] = "#600000",
    trail_alpha: Annotated[float, ExposeField()] = 0.6,
) -> None

get_trajectory_data async

get_trajectory_data(
    icao24: str, backend_state: InjectBackendState
) -> Response

Get the full trajectory for a given ICAO24 address.

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@router.get("/data/{icao24}")
async def get_trajectory_data(
    icao24: str, backend_state: tangram_core.InjectBackendState
) -> Response:
    """Get the full trajectory for a given ICAO24 address."""
    if not _HISTORY_AVAILABLE:
        raise HTTPException(
            status_code=501,
            detail="History feature is not installed.",
        )

    redis_key = "tangram:history:table_uri:jet1090"
    table_uri_bytes = await backend_state.redis_client.get(redis_key)

    if not table_uri_bytes:
        raise HTTPException(status_code=404, detail="Table 'jet1090' not found.")
    table_uri = table_uri_bytes.decode("utf-8")

    try:
        df = (
            pl.scan_delta(table_uri)
            .filter(pl.col("icao24") == icao24)
            .with_columns(pl.col("timestamp").dt.epoch(time_unit="us") / 1000000)
            .sort("timestamp")
            .collect()
        )
        return Response(df.write_json(), media_type="application/json")
    except Exception as e:
        log.error(f"Failed to query trajectory for {icao24}: {e}")
        raise HTTPException(status_code=500, detail="Failed to query trajectory data.")

search_flights async

search_flights(
    q: str, backend_state: InjectBackendState
) -> Response
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
@router.get("/search")
async def search_flights(
    q: str, backend_state: tangram_core.InjectBackendState
) -> Response:
    if not _HISTORY_AVAILABLE:
        return ORJSONResponse(content=[])

    redis_key = "tangram:history:table_uri:jet1090"
    table_uri_bytes = await backend_state.redis_client.get(redis_key)
    if not table_uri_bytes:
        return ORJSONResponse(content=[])
    table_uri = table_uri_bytes.decode("utf-8")

    try:
        df = pl.scan_delta(table_uri)
        q_lower = q.lower()

        candidates = (
            df.filter(
                pl.col("callsign").str.to_lowercase().str.contains(q_lower)
                | pl.col("icao24").str.contains(q_lower)
            )
            .select("icao24")
            .unique()
            .head(20)
            .collect()
        )
        candidate_icaos = candidates["icao24"].to_list()

        if not candidate_icaos:
            return ORJSONResponse(content=[])

        intervals = (
            df.filter(pl.col("icao24").is_in(candidate_icaos))
            .sort(["icao24", "timestamp"])
            .with_columns(pl.col("callsign").forward_fill().over("icao24"))
            .with_columns(
                gap_minutes=(
                    pl.col("timestamp") - pl.col("timestamp").shift(1)
                ).dt.total_minutes()
            )
            .with_columns(
                new_interval=(
                    (pl.col("gap_minutes") >= 30)
                    | (pl.col("icao24") != pl.col("icao24").shift(1))
                    | (pl.col("callsign") != pl.col("callsign").shift(1))
                ).fill_null(True)
            )
            .with_columns(interval_id=pl.col("new_interval").cast(pl.Int64).cum_sum())
            .group_by(["icao24", "callsign", "interval_id"])
            .agg(
                start_ts=pl.col("timestamp").min(),
                end_ts=pl.col("timestamp").max(),
                n_rows=pl.len(),
                lat=pl.col("latitude").mean(),
                lon=pl.col("longitude").mean(),
            )
            .filter(
                (pl.col("n_rows") >= 5)
                & (
                    pl.col("callsign").str.to_lowercase().str.contains(q_lower)
                    | pl.col("icao24").str.contains(q_lower)
                )
            )
            .with_columns(
                duration=((pl.col("end_ts") - pl.col("start_ts")).dt.total_seconds()),
            )
            .sort(["start_ts"], descending=True)
            .collect()
        )
        return Response(intervals.write_json(), media_type="application/json")
    except Exception as e:
        log.error(f"Search failed: {e}")
        return ORJSONResponse(content=[])

get_history_slice async

get_history_slice(
    icao24: str,
    start_timestamp: int,
    end_timestamp: int,
    backend_state: InjectBackendState,
) -> Response
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
@router.get("/history/{icao24}/{start_timestamp}/{end_timestamp}")
async def get_history_slice(
    icao24: str,
    start_timestamp: int,
    end_timestamp: int,
    backend_state: tangram_core.InjectBackendState,
) -> Response:
    if not _HISTORY_AVAILABLE:
        return ORJSONResponse(content=[])

    redis_key = "tangram:history:table_uri:jet1090"
    table_uri_bytes = await backend_state.redis_client.get(redis_key)
    if not table_uri_bytes:
        return ORJSONResponse(content=[])
    table_uri = table_uri_bytes.decode("utf-8")

    start_dt = pl.lit(start_timestamp).cast(pl.Datetime("ms"))
    end_dt = pl.lit(end_timestamp).cast(pl.Datetime("ms"))

    try:
        df = (
            pl.scan_delta(table_uri)
            .filter(
                (pl.col("icao24") == icao24)
                & (pl.col("timestamp") >= start_dt)
                & (pl.col("timestamp") <= end_dt)
            )
            .sort("timestamp")
            .collect()
        )
        return Response(df.write_json(), media_type="application/json")
    except Exception as e:
        log.error(f"History slice failed: {e}")
        return ORJSONResponse(content=[])

get_route_data async

get_route_data(
    callsign: str, backend_state: InjectBackendState
) -> ORJSONResponse
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
@router.get("/route/{callsign}")
async def get_route_data(
    callsign: str, backend_state: tangram_core.InjectBackendState
) -> ORJSONResponse:
    url = "https://flightroutes.opensky-network.org/api/routeset"
    payload = {"planes": [{"callsign": callsign}]}
    client = backend_state.http_client
    try:
        response = await client.post(url, json=payload, timeout=5.0)
        response.raise_for_status()
        data = response.json()
        return ORJSONResponse(content=data)
    except Exception as e:
        log.error(f"Failed to fetch route data for {callsign}: {e}")
        return ORJSONResponse(content=[], status_code=500)

get_sensors_data async

get_sensors_data(
    backend_state: InjectBackendState,
) -> ORJSONResponse
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
@router.get("/sensors")
async def get_sensors_data(
    backend_state: tangram_core.InjectBackendState,
) -> ORJSONResponse:
    plugin_config = backend_state.config.plugins.get("tangram_jet1090", {})
    config = TypeAdapter(PlanesConfig).validate_python(plugin_config)
    url = f"{config.jet1090_url}/sensors".replace("localhost", "127.0.0.1")

    try:
        response = await backend_state.http_client.get(url, timeout=10.0)
        response.raise_for_status()
        return ORJSONResponse(content=response.json())
    except Exception as e:
        log.error(f"Failed to fetch sensors data from {url}: {e}")
        raise HTTPException(status_code=502, detail=str(e))

get_aircraft_db async

get_aircraft_db(
    client: AsyncClient, url: str, path_cache: Path
) -> dict[str, Aircraft]
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
async def get_aircraft_db(
    client: httpx.AsyncClient, url: str, path_cache: Path
) -> dict[str, _planes.Aircraft]:
    from . import _planes

    path_cache.mkdir(parents=True, exist_ok=True)
    zip_path = path_cache / "basestation.zip"
    db_path = path_cache / "basestation.sqb"

    if not zip_path.exists():
        log.info(f"downloading aircraft database from {url} to {zip_path}")
        async with client.stream("GET", url, follow_redirects=True) as response:
            response.raise_for_status()
            with zip_path.open("wb") as f:
                async for chunk in response.aiter_bytes():
                    f.write(chunk)

    if not db_path.exists():
        log.info(f"extracting {zip_path} to {db_path}")
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            db_filename = zip_ref.namelist()[0]
            zip_ref.extract(db_filename, path=path_cache)
            (path_cache / db_filename).rename(db_path)

    db = {}
    try:
        con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
        cur = con.cursor()
        res = cur.execute("SELECT ModeS, Registration, ICAOTypeCode FROM Aircraft")
        for modes, registration, icaotypecode in res.fetchall():
            if modes:
                db[modes.lower()] = _planes.Aircraft(
                    registration=registration,
                    typecode=icaotypecode,
                )
        con.close()
    except sqlite3.Error as e:
        log.error(f"error reading aircraft database {db_path}: {e}")
        db_path.unlink(missing_ok=True)

    return db

run_planes async

run_planes(backend_state: BackendState) -> None
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
@plugin.register_service()
async def run_planes(backend_state: tangram_core.BackendState) -> None:
    from . import _planes

    plugin_config = backend_state.config.plugins.get("tangram_jet1090", {})
    config_planes = TypeAdapter(PlanesConfig).validate_python(plugin_config)
    if not backend_state.config.map.enable_3d and config_planes.trail_type == "curtain":
        log.warning(
            "expected 'enable_3d' to be true when using 'curtain' trail type"
            ", switching to 'line'"
        )
        config_planes = replace(config_planes, trail_type="line")

    default_log_level = plugin_config.get(
        "log_level", backend_state.config.core.log_level
    )

    _planes.init_tracing_stderr(default_log_level)

    aircraft_db = await get_aircraft_db(
        backend_state.http_client,
        config_planes.aircraft_db_url,
        config_planes.path_cache,
    )

    rust_config = _planes.PlanesConfig(
        redis_url=backend_state.config.core.redis_url,
        jet1090_channel=config_planes.jet1090_channel,
        history_table_name=config_planes.history_table_name,
        state_vector_expire=config_planes.state_vector_expire,
        stream_interval_secs=config_planes.stream_interval_secs,
        aircraft_db=aircraft_db,
        history_buffer_size=config_planes.history_buffer_size,
        history_flush_interval_secs=config_planes.history_flush_interval_secs,
        history_control_channel=config_planes.history_control_channel,
        history_optimize_interval_secs=config_planes.history_optimize_interval_secs,
        history_optimize_target_file_size=config_planes.history_optimize_target_file_size,
        history_vacuum_interval_secs=config_planes.history_vacuum_interval_secs,
        history_vacuum_retention_period_secs=config_planes.history_vacuum_retention_period_secs,
        search_channel=config_planes.search_channel,
    )
    await _planes.run_planes(rust_config)

tangram_jet1090._planes

Aircraft

typecode property writable

typecode: Optional[str]

registration property writable

registration: Optional[str]

__new__

__new__(
    typecode: Optional[str], registration: Optional[str]
) -> Aircraft

PlanesConfig

redis_url property writable

redis_url: str

jet1090_channel property writable

jet1090_channel: str

history_table_name property writable

history_table_name: str

history_control_channel property writable

history_control_channel: str

state_vector_expire property writable

state_vector_expire: int

stream_interval_secs property writable

stream_interval_secs: float

aircraft_db property writable

aircraft_db: dict[str, Aircraft]

history_buffer_size property writable

history_buffer_size: int

history_flush_interval_secs property writable

history_flush_interval_secs: int

history_optimize_interval_secs property writable

history_optimize_interval_secs: int

history_optimize_target_file_size property writable

history_optimize_target_file_size: int

history_vacuum_interval_secs property writable

history_vacuum_interval_secs: int

history_vacuum_retention_period_secs property writable

history_vacuum_retention_period_secs: Optional[int]

search_channel property writable

search_channel: str

__new__

__new__(
    redis_url: str,
    jet1090_channel: str,
    history_table_name: str,
    history_control_channel: str,
    state_vector_expire: int,
    stream_interval_secs: float,
    aircraft_db: Mapping[str, Aircraft],
    history_buffer_size: int,
    history_flush_interval_secs: int,
    history_optimize_interval_secs: int,
    history_optimize_target_file_size: int,
    history_vacuum_interval_secs: int,
    history_vacuum_retention_period_secs: Optional[int],
    search_channel: str,
) -> PlanesConfig

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

run_planes

run_planes(config: PlanesConfig) -> Any