Skip to content

Jet1090

tangram_jet1090

log module-attribute

log = getLogger(__name__)

router module-attribute

router = APIRouter(
    prefix="/jet1090",
    tags=["jet1090"],
    responses={404: {"description": "Not found"}},
)

plugin module-attribute

plugin = Plugin(
    frontend_path="dist-frontend",
    routers=[router],
    into_frontend_config_function=transform_config,
)

FrontendPlanesConfig dataclass

Bases: HasTopbarUiConfig, HasSidebarUiConfig

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
112
113
114
115
116
117
118
@dataclass(frozen=True)
class FrontendPlanesConfig(
    tangram_core.config.HasTopbarUiConfig, tangram_core.config.HasSidebarUiConfig
):
    show_route_lines: bool
    topbar_order: int
    sidebar_order: int

show_route_lines instance-attribute

show_route_lines: bool

topbar_order instance-attribute

topbar_order: int

sidebar_order instance-attribute

sidebar_order: int

__init__

__init__(
    show_route_lines: bool,
    topbar_order: int,
    sidebar_order: int,
) -> None

PlanesConfig dataclass

Bases: HasTopbarUiConfig, HasSidebarUiConfig

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@dataclass(frozen=True)
class PlanesConfig(
    tangram_core.config.HasTopbarUiConfig, tangram_core.config.HasSidebarUiConfig
):
    jet1090_channel: str = "jet1090"
    history_table_name: str = "jet1090"
    history_control_channel: str = "history:control"
    state_vector_expire: int = 20
    stream_interval_secs: float = 1.0
    aircraft_db_url: str = (
        "https://jetvision.de/resources/sqb_databases/basestation.zip"
    )
    jet1090_url: str = "http://localhost:8080"
    path_cache: Path = Path(platformdirs.user_cache_dir("tangram_jet1090"))
    log_level: str = "INFO"
    show_route_lines: bool = True
    # flush is primarily time-based. this buffer is a backpressure mechanism.
    history_buffer_size: int = 100_000
    history_flush_interval_secs: int = 5
    history_optimize_interval_secs: int = 120
    history_optimize_target_file_size: int = 134217728
    history_vacuum_interval_secs: int = 120
    history_vacuum_retention_period_secs: int | None = 120
    topbar_order: int = 50
    sidebar_order: int = 50

jet1090_channel class-attribute instance-attribute

jet1090_channel: str = 'jet1090'

history_table_name class-attribute instance-attribute

history_table_name: str = 'jet1090'

history_control_channel class-attribute instance-attribute

history_control_channel: str = 'history:control'

state_vector_expire class-attribute instance-attribute

state_vector_expire: int = 20

stream_interval_secs class-attribute instance-attribute

stream_interval_secs: float = 1.0

aircraft_db_url class-attribute instance-attribute

aircraft_db_url: str = "https://jetvision.de/resources/sqb_databases/basestation.zip"

jet1090_url class-attribute instance-attribute

jet1090_url: str = 'http://localhost:8080'

path_cache class-attribute instance-attribute

path_cache: Path = Path(user_cache_dir('tangram_jet1090'))

log_level class-attribute instance-attribute

log_level: str = 'INFO'

show_route_lines class-attribute instance-attribute

show_route_lines: bool = True

history_buffer_size class-attribute instance-attribute

history_buffer_size: int = 100000

history_flush_interval_secs class-attribute instance-attribute

history_flush_interval_secs: int = 5

history_optimize_interval_secs class-attribute instance-attribute

history_optimize_interval_secs: int = 120

history_optimize_target_file_size class-attribute instance-attribute

history_optimize_target_file_size: int = 134217728

history_vacuum_interval_secs class-attribute instance-attribute

history_vacuum_interval_secs: int = 120

history_vacuum_retention_period_secs class-attribute instance-attribute

history_vacuum_retention_period_secs: int | None = 120

topbar_order class-attribute instance-attribute

topbar_order: int = 50

sidebar_order class-attribute instance-attribute

sidebar_order: int = 50

__init__

__init__(
    jet1090_channel: str = "jet1090",
    history_table_name: str = "jet1090",
    history_control_channel: str = "history:control",
    state_vector_expire: int = 20,
    stream_interval_secs: float = 1.0,
    aircraft_db_url: str = "https://jetvision.de/resources/sqb_databases/basestation.zip",
    jet1090_url: str = "http://localhost:8080",
    path_cache: Path = Path(
        user_cache_dir("tangram_jet1090")
    ),
    log_level: str = "INFO",
    show_route_lines: bool = True,
    history_buffer_size: int = 100000,
    history_flush_interval_secs: int = 5,
    history_optimize_interval_secs: int = 120,
    history_optimize_target_file_size: int = 134217728,
    history_vacuum_interval_secs: int = 120,
    history_vacuum_retention_period_secs: int | None = 120,
    topbar_order: int = 50,
    sidebar_order: int = 50,
) -> None

get_trajectory_data async

get_trajectory_data(
    icao24: str, backend_state: InjectBackendState
) -> Response

Get the full trajectory for a given ICAO24 address.

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@router.get("/data/{icao24}")
async def get_trajectory_data(
    icao24: str, backend_state: tangram_core.InjectBackendState
) -> Response:
    """Get the full trajectory for a given ICAO24 address."""
    if not _HISTORY_AVAILABLE:
        raise HTTPException(
            status_code=501,
            detail="History feature is not installed. "
            "Install with `pip install 'tangram_jet1090[history]'`",
        )

    redis_key = "tangram:history:table_uri:jet1090"
    table_uri_bytes = await backend_state.redis_client.get(redis_key)

    if not table_uri_bytes:
        raise HTTPException(
            status_code=404,
            detail=(
                "Table 'jet1090' not found.\nhelp: is the history service running?"
            ),
        )
    table_uri = table_uri_bytes.decode("utf-8")

    try:
        df = (
            pl.scan_delta(table_uri)
            .filter(pl.col("icao24") == icao24)
            .with_columns(pl.col("timestamp").dt.epoch(time_unit="s"))
            .sort("timestamp")
            .collect()
        )
        return Response(df.write_json(), media_type="application/json")
    except Exception as e:
        log.error(f"Failed to query trajectory for {icao24}: {e}")
        raise HTTPException(status_code=500, detail="Failed to query trajectory data.")

get_route_data async

get_route_data(
    callsign: str, backend_state: InjectBackendState
) -> ORJSONResponse
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@router.get("/route/{callsign}")
async def get_route_data(
    callsign: str, backend_state: tangram_core.InjectBackendState
) -> ORJSONResponse:
    url = "https://flightroutes.opensky-network.org/api/routeset"
    payload = {"planes": [{"callsign": callsign}]}
    client = backend_state.http_client
    try:
        response = await client.post(url, json=payload, timeout=5.0)
        response.raise_for_status()
        data = response.json()
        return ORJSONResponse(content=data)
    except Exception as e:
        log.error(f"Failed to fetch route data for {callsign}: {e}")
        return ORJSONResponse(content=[], status_code=500)

get_sensors_data async

get_sensors_data(
    backend_state: InjectBackendState,
) -> ORJSONResponse
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@router.get("/sensors")
async def get_sensors_data(
    backend_state: tangram_core.InjectBackendState,
) -> ORJSONResponse:
    plugin_config = backend_state.config.plugins.get("tangram_jet1090", {})
    config = TypeAdapter(PlanesConfig).validate_python(plugin_config)
    # Keeping "localhost" in the URL can lead to issues on systems where localhost
    # does not resolve to 127.0.0.1 first but rather to ::1 (IPv6).
    # Therefore, we replace it explicitly.
    url = f"{config.jet1090_url}/sensors".replace("localhost", "127.0.0.1")

    try:
        response = await backend_state.http_client.get(url, timeout=10.0)
        response.raise_for_status()
        return ORJSONResponse(content=response.json())
    except Exception as e:
        log.error(f"Failed to fetch sensors data from {url}: {e}")
        raise HTTPException(status_code=502, detail=str(e))

transform_config

transform_config(
    config_dict: dict[str, Any],
) -> FrontendPlanesConfig
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
121
122
123
124
125
126
127
def transform_config(config_dict: dict[str, Any]) -> FrontendPlanesConfig:
    config = TypeAdapter(PlanesConfig).validate_python(config_dict)
    return FrontendPlanesConfig(
        show_route_lines=config.show_route_lines,
        topbar_order=config.topbar_order,
        sidebar_order=config.sidebar_order,
    )

get_aircraft_db async

get_aircraft_db(
    client: AsyncClient, url: str, path_cache: Path
) -> dict[str, Aircraft]
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
async def get_aircraft_db(
    client: httpx.AsyncClient, url: str, path_cache: Path
) -> dict[str, _planes.Aircraft]:
    from . import _planes

    path_cache.mkdir(parents=True, exist_ok=True)
    zip_path = path_cache / "basestation.zip"
    db_path = path_cache / "basestation.sqb"

    if not zip_path.exists():
        log.info(f"downloading aircraft database from {url} to {zip_path}")
        async with client.stream("GET", url, follow_redirects=True) as response:
            response.raise_for_status()
            with zip_path.open("wb") as f:
                async for chunk in response.aiter_bytes():
                    f.write(chunk)

    if not db_path.exists():
        log.info(f"extracting {zip_path} to {db_path}")
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            db_filename = zip_ref.namelist()[0]
            zip_ref.extract(db_filename, path=path_cache)
            (path_cache / db_filename).rename(db_path)

    db = {}
    try:
        con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
        cur = con.cursor()
        res = cur.execute("SELECT ModeS, Registration, ICAOTypeCode FROM Aircraft")
        for modes, registration, icaotypecode in res.fetchall():
            if modes:
                db[modes.lower()] = _planes.Aircraft(
                    registration=registration,
                    typecode=icaotypecode,
                )
        con.close()
    except sqlite3.Error as e:
        log.error(f"error reading aircraft database {db_path}: {e}")
        db_path.unlink(missing_ok=True)

    return db

run_planes async

run_planes(backend_state: BackendState) -> None
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
@plugin.register_service()
async def run_planes(backend_state: tangram_core.BackendState) -> None:
    from . import _planes

    plugin_config = backend_state.config.plugins.get("tangram_jet1090", {})
    config_planes = TypeAdapter(PlanesConfig).validate_python(plugin_config)

    default_log_level = plugin_config.get(
        "log_level", backend_state.config.core.log_level
    )

    _planes.init_tracing_stderr(default_log_level)

    aircraft_db = await get_aircraft_db(
        backend_state.http_client,
        config_planes.aircraft_db_url,
        config_planes.path_cache,
    )

    rust_config = _planes.PlanesConfig(
        redis_url=backend_state.config.core.redis_url,
        jet1090_channel=config_planes.jet1090_channel,
        history_table_name=config_planes.history_table_name,
        state_vector_expire=config_planes.state_vector_expire,
        stream_interval_secs=config_planes.stream_interval_secs,
        aircraft_db=aircraft_db,
        history_buffer_size=config_planes.history_buffer_size,
        history_flush_interval_secs=config_planes.history_flush_interval_secs,
        history_control_channel=config_planes.history_control_channel,
        history_optimize_interval_secs=config_planes.history_optimize_interval_secs,
        history_optimize_target_file_size=config_planes.history_optimize_target_file_size,
        history_vacuum_interval_secs=config_planes.history_vacuum_interval_secs,
        history_vacuum_retention_period_secs=config_planes.history_vacuum_retention_period_secs,
    )
    await _planes.run_planes(rust_config)

tangram_jet1090._planes

Aircraft

typecode property writable

typecode: Optional[str]

registration property writable

registration: Optional[str]

__new__

__new__(
    typecode: Optional[str], registration: Optional[str]
) -> Aircraft

PlanesConfig

redis_url property writable

redis_url: str

jet1090_channel property writable

jet1090_channel: str

history_table_name property writable

history_table_name: str

history_control_channel property writable

history_control_channel: str

state_vector_expire property writable

state_vector_expire: int

stream_interval_secs property writable

stream_interval_secs: float

aircraft_db property writable

aircraft_db: dict[str, Aircraft]

history_buffer_size property writable

history_buffer_size: int

history_flush_interval_secs property writable

history_flush_interval_secs: int

history_optimize_interval_secs property writable

history_optimize_interval_secs: int

history_optimize_target_file_size property writable

history_optimize_target_file_size: int

history_vacuum_interval_secs property writable

history_vacuum_interval_secs: int

history_vacuum_retention_period_secs property writable

history_vacuum_retention_period_secs: Optional[int]

__new__

__new__(
    redis_url: str,
    jet1090_channel: str,
    history_table_name: str,
    history_control_channel: str,
    state_vector_expire: int,
    stream_interval_secs: float,
    aircraft_db: Mapping[str, Aircraft],
    history_buffer_size: int,
    history_flush_interval_secs: int,
    history_optimize_interval_secs: int,
    history_optimize_target_file_size: int,
    history_vacuum_interval_secs: int,
    history_vacuum_retention_period_secs: Optional[int],
) -> PlanesConfig

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

run_planes

run_planes(config: PlanesConfig) -> Any