Skip to content

Ship162

tangram_ship162

log module-attribute

log = getLogger(__name__)

router module-attribute

router = APIRouter(
    prefix="/ship162",
    tags=["ship162"],
    responses={404: {"description": "Not found"}},
)

plugin module-attribute

plugin = Plugin(
    frontend_path="dist-frontend",
    routers=[router],
    config_class=ShipsConfig,
)

ShipsConfig dataclass

Bases: HasTopbarUiConfig, HasSidebarUiConfig

Source code in packages/tangram_ship162/src/tangram_ship162/__init__.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@dataclass(frozen=True)
class ShipsConfig(
    tangram_core.config.HasTopbarUiConfig, tangram_core.config.HasSidebarUiConfig
):
    ship162_channel: str = "ship162"
    history_table_name: str = "ship162"
    history_control_channel: str = "history:control"
    search_channel: Annotated[str, ExposeField()] = "ship162:search"
    state_vector_expire: int = 600  # 10 minutes
    stream_interval_secs: float = 1.0
    log_level: str = "INFO"
    history_buffer_size: int = 100_000
    history_flush_interval_secs: int = 5
    history_optimize_interval_secs: int = 120
    history_optimize_target_file_size: int = 134217728
    history_vacuum_interval_secs: int = 120
    history_vacuum_retention_period_secs: int | None = 120
    topbar_order: Annotated[int, ExposeField()] = 100
    sidebar_order: Annotated[int, ExposeField()] = 100

ship162_channel class-attribute instance-attribute

ship162_channel: str = 'ship162'

history_table_name class-attribute instance-attribute

history_table_name: str = 'ship162'

history_control_channel class-attribute instance-attribute

history_control_channel: str = 'history:control'

search_channel class-attribute instance-attribute

search_channel: Annotated[str, ExposeField()] = (
    "ship162:search"
)

state_vector_expire class-attribute instance-attribute

state_vector_expire: int = 600

stream_interval_secs class-attribute instance-attribute

stream_interval_secs: float = 1.0

log_level class-attribute instance-attribute

log_level: str = 'INFO'

history_buffer_size class-attribute instance-attribute

history_buffer_size: int = 100000

history_flush_interval_secs class-attribute instance-attribute

history_flush_interval_secs: int = 5

history_optimize_interval_secs class-attribute instance-attribute

history_optimize_interval_secs: int = 120

history_optimize_target_file_size class-attribute instance-attribute

history_optimize_target_file_size: int = 134217728

history_vacuum_interval_secs class-attribute instance-attribute

history_vacuum_interval_secs: int = 120

history_vacuum_retention_period_secs class-attribute instance-attribute

history_vacuum_retention_period_secs: int | None = 120

topbar_order class-attribute instance-attribute

topbar_order: Annotated[int, ExposeField()] = 100

sidebar_order class-attribute instance-attribute

sidebar_order: Annotated[int, ExposeField()] = 100

__init__

__init__(
    ship162_channel: str = "ship162",
    history_table_name: str = "ship162",
    history_control_channel: str = "history:control",
    search_channel: Annotated[
        str, ExposeField()
    ] = "ship162:search",
    state_vector_expire: int = 600,
    stream_interval_secs: float = 1.0,
    log_level: str = "INFO",
    history_buffer_size: int = 100000,
    history_flush_interval_secs: int = 5,
    history_optimize_interval_secs: int = 120,
    history_optimize_target_file_size: int = 134217728,
    history_vacuum_interval_secs: int = 120,
    history_vacuum_retention_period_secs: int | None = 120,
    topbar_order: Annotated[int, ExposeField()] = 100,
    sidebar_order: Annotated[int, ExposeField()] = 100,
) -> None

get_trajectory_data async

get_trajectory_data(
    mmsi: int, backend_state: InjectBackendState
) -> Response

Get the full trajectory for a given ship MMSI.

Source code in packages/tangram_ship162/src/tangram_ship162/__init__.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@router.get("/data/{mmsi}")
async def get_trajectory_data(
    mmsi: int, backend_state: tangram_core.InjectBackendState
) -> Response:
    """Get the full trajectory for a given ship MMSI."""
    if not _HISTORY_AVAILABLE:
        raise HTTPException(
            status_code=501,
            detail="History feature is not installed.",
        )

    redis_key = "tangram:history:table_uri:ship162"
    table_uri_bytes = await backend_state.redis_client.get(redis_key)

    if not table_uri_bytes:
        raise HTTPException(status_code=404, detail="Table 'ship162' not found.")
    table_uri = table_uri_bytes.decode("utf-8")

    try:
        df = (
            pl.scan_delta(table_uri)
            .filter(pl.col("mmsi") == mmsi)
            .with_columns(pl.col("timestamp").dt.epoch(time_unit="s"))
            .sort("timestamp")
            .collect()
        )
        return Response(df.write_json(), media_type="application/json")
    except Exception as e:
        raise HTTPException(
            status_code=500, detail=f"Failed to query trajectory data: {e}"
        )

search_ships async

search_ships(
    q: str, backend_state: InjectBackendState
) -> Response
Source code in packages/tangram_ship162/src/tangram_ship162/__init__.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@router.get("/search")
async def search_ships(
    q: str, backend_state: tangram_core.InjectBackendState
) -> Response:
    if not _HISTORY_AVAILABLE:
        return ORJSONResponse(content=[])

    redis_key = "tangram:history:table_uri:ship162"
    table_uri_bytes = await backend_state.redis_client.get(redis_key)
    if not table_uri_bytes:
        return ORJSONResponse(content=[])
    table_uri = table_uri_bytes.decode("utf-8")

    try:
        df = pl.scan_delta(table_uri)
        q_lower = q.lower()

        candidates = (
            df.filter(
                pl.col("ship_name").str.to_lowercase().str.contains(q_lower)
                | pl.col("mmsi").cast(pl.String).str.contains(q_lower)
                | pl.col("callsign").str.to_lowercase().str.contains(q_lower)
            )
            .select("mmsi")
            .unique()
            .head(20)
            .collect()
        )
        candidate_mmsis = candidates["mmsi"].to_list()

        if not candidate_mmsis:
            return ORJSONResponse(content=[])

        intervals = (
            df.filter(pl.col("mmsi").is_in(candidate_mmsis))
            .sort(["mmsi", "timestamp"])
            .with_columns(pl.col("ship_name").forward_fill().over("mmsi"))
            .with_columns(
                gap_minutes=(
                    pl.col("timestamp") - pl.col("timestamp").shift(1)
                ).dt.total_minutes()
            )
            .with_columns(
                new_interval=(
                    (pl.col("gap_minutes") >= 60)
                    | (pl.col("mmsi") != pl.col("mmsi").shift(1))
                ).fill_null(True)
            )
            .with_columns(interval_id=pl.col("new_interval").cast(pl.Int64).cum_sum())
            .group_by(["mmsi", "ship_name", "interval_id"])
            .agg(
                start_ts=pl.col("timestamp").min(),
                end_ts=pl.col("timestamp").max(),
                n_rows=pl.len(),
                lat=pl.col("latitude").mean(),
                lon=pl.col("longitude").mean(),
            )
            .filter((pl.col("n_rows") >= 5))
            .with_columns(
                duration=((pl.col("end_ts") - pl.col("start_ts")).dt.total_seconds()),
            )
            .sort(["start_ts"], descending=True)
            .collect()
        )
        return Response(intervals.write_json(), media_type="application/json")
    except Exception as e:
        log.error(f"Search failed: {e}")
        return ORJSONResponse(content=[])

get_history_slice async

get_history_slice(
    mmsi: int,
    start_timestamp: int,
    end_timestamp: int,
    backend_state: InjectBackendState,
) -> Response
Source code in packages/tangram_ship162/src/tangram_ship162/__init__.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@router.get("/history/{mmsi}/{start_timestamp}/{end_timestamp}")
async def get_history_slice(
    mmsi: int,
    start_timestamp: int,
    end_timestamp: int,
    backend_state: tangram_core.InjectBackendState,
) -> Response:
    if not _HISTORY_AVAILABLE:
        return ORJSONResponse(content=[])

    redis_key = "tangram:history:table_uri:ship162"
    table_uri_bytes = await backend_state.redis_client.get(redis_key)
    if not table_uri_bytes:
        return ORJSONResponse(content=[])
    table_uri = table_uri_bytes.decode("utf-8")

    start_dt = pl.lit(start_timestamp).cast(pl.Datetime("ms"))
    end_dt = pl.lit(end_timestamp).cast(pl.Datetime("ms"))

    try:
        df = (
            pl.scan_delta(table_uri)
            .filter(
                (pl.col("mmsi") == mmsi)
                & (pl.col("timestamp") >= start_dt)
                & (pl.col("timestamp") <= end_dt)
            )
            .sort("timestamp")
            .collect()
        )
        return Response(df.write_json(), media_type="application/json")
    except Exception as e:
        log.error(f"History slice failed: {e}")
        return ORJSONResponse(content=[])

run_ships async

run_ships(backend_state: BackendState) -> None
Source code in packages/tangram_ship162/src/tangram_ship162/__init__.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@plugin.register_service()
async def run_ships(backend_state: tangram_core.BackendState) -> None:
    from . import _ships

    plugin_config = backend_state.config.plugins.get("tangram_ship162", {})
    config_ships = TypeAdapter(ShipsConfig).validate_python(plugin_config)

    default_log_level = plugin_config.get(
        "log_level", backend_state.config.core.log_level
    )

    _ships.init_tracing_stderr(default_log_level)

    rust_config = _ships.ShipsConfig(
        redis_url=backend_state.config.core.redis_url,
        ship162_channel=config_ships.ship162_channel,
        history_control_channel=config_ships.history_control_channel,
        state_vector_expire=config_ships.state_vector_expire,
        stream_interval_secs=config_ships.stream_interval_secs,
        history_table_name=config_ships.history_table_name,
        history_buffer_size=config_ships.history_buffer_size,
        history_flush_interval_secs=config_ships.history_flush_interval_secs,
        history_optimize_interval_secs=config_ships.history_optimize_interval_secs,
        history_optimize_target_file_size=config_ships.history_optimize_target_file_size,
        history_vacuum_interval_secs=config_ships.history_vacuum_interval_secs,
        history_vacuum_retention_period_secs=config_ships.history_vacuum_retention_period_secs,
        search_channel=config_ships.search_channel,
    )
    await _ships.run_ships(rust_config)

tangram_ship162._ships

ShipsConfig

redis_url property writable

redis_url: str

ship162_channel property writable

ship162_channel: str

history_control_channel property writable

history_control_channel: str

state_vector_expire property writable

state_vector_expire: int

stream_interval_secs property writable

stream_interval_secs: float

history_table_name property writable

history_table_name: str

history_buffer_size property writable

history_buffer_size: int

history_flush_interval_secs property writable

history_flush_interval_secs: int

history_optimize_interval_secs property writable

history_optimize_interval_secs: int

history_optimize_target_file_size property writable

history_optimize_target_file_size: int

history_vacuum_interval_secs property writable

history_vacuum_interval_secs: int

history_vacuum_retention_period_secs property writable

history_vacuum_retention_period_secs: Optional[int]

search_channel property writable

search_channel: str

__new__

__new__(
    redis_url: str,
    ship162_channel: str,
    history_control_channel: str,
    state_vector_expire: int,
    stream_interval_secs: float,
    history_table_name: str,
    history_buffer_size: int,
    history_flush_interval_secs: int,
    history_optimize_interval_secs: int,
    history_optimize_target_file_size: int,
    history_vacuum_interval_secs: int,
    history_vacuum_retention_period_secs: Optional[int],
    search_channel: str,
) -> ShipsConfig

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

run_ships

run_ships(config: ShipsConfig) -> Any