Skip to content

History

tangram_history

log module-attribute

log = getLogger(__name__)

plugin module-attribute

plugin = Plugin(get_typer=get_typer)

get_typer

get_typer() -> Typer
Source code in packages/tangram_history/src/tangram_history/__init__.py
15
16
17
18
def get_typer() -> Typer:
    from .cli import app

    return app

run_history async

run_history(backend_state: BackendState) -> None
Source code in packages/tangram_history/src/tangram_history/__init__.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@plugin.register_service()
async def run_history(backend_state: tangram_core.BackendState) -> None:
    from . import _history

    plugin_config = backend_state.config.plugins.get("tangram_history", {})
    config_history = TypeAdapter(HistoryConfig).validate_python(plugin_config)

    default_log_level = plugin_config.get(
        "log_level", backend_state.config.core.log_level
    )

    _history.init_tracing_stderr(default_log_level)

    config_history.base_path.mkdir(parents=True, exist_ok=True)
    rust_config = _history.HistoryConfig(
        redis_url=backend_state.config.core.redis_url,
        control_channel=config_history.control_channel,
        base_path=str(config_history.base_path),
        redis_read_count=config_history.redis_read_count,
        redis_read_block_ms=config_history.redis_read_block_ms,
    )
    await _history.run_history(rust_config)

cli

A history management CLI that operates safely regardless of whether the tangram serve backend is running. Operations repsect the single-writer principle of Delta Lake when ingestion is active, or function when offline.

When online, the CLI acts as a remote control, sending serialised commands via Redis, with the service orchestrating locks (pausing ingestion/maintenance). When offline, the CLI assumes exclusive access and operates directly on the file system via PyO3 bindings.

app module-attribute

app = Typer(
    name="history", help="Manage historical data storage."
)

console module-attribute

console = Console()

PathTangramConfig module-attribute

PathTangramConfig: TypeAlias = Annotated[
    Path,
    Option(
        help="Path to the tangram.toml config file.",
        envvar="TANGRAM_CONFIG",
        default_factory=default_config_file,
        exists=True,
        dir_okay=False,
    ),
]

ForceOffline module-attribute

ForceOffline: TypeAlias = Annotated[
    bool,
    Option(
        "--force-offline",
        help="Force offline operation, bypassing the tangram service even if online.",
    ),
]

T module-attribute

T = TypeVar('T', bound=SupportsSerde)

yesterday module-attribute

yesterday = isoformat(timespec='seconds')

HistoryStatus

Bases: Enum

Source code in packages/tangram_history/src/tangram_history/cli.py
83
84
85
86
87
88
89
90
91
92
93
94
95
class HistoryStatus(Enum):
    HAS_SUBSCRIBERS = auto()
    NO_SUBSCRIBERS = auto()
    REDIS_OFFLINE = auto()

    def to_message(self) -> str:
        if self == HistoryStatus.HAS_SUBSCRIBERS:
            return "service is online and has subscribers"
        if self == HistoryStatus.NO_SUBSCRIBERS:
            return "no subscribers on history control channel"
        if self == HistoryStatus.REDIS_OFFLINE:
            return "unable to connect to redis"
        assert_never(self)
HAS_SUBSCRIBERS class-attribute instance-attribute
HAS_SUBSCRIBERS = auto()
NO_SUBSCRIBERS class-attribute instance-attribute
NO_SUBSCRIBERS = auto()
REDIS_OFFLINE class-attribute instance-attribute
REDIS_OFFLINE = auto()
to_message
to_message() -> str
Source code in packages/tangram_history/src/tangram_history/cli.py
88
89
90
91
92
93
94
95
def to_message(self) -> str:
    if self == HistoryStatus.HAS_SUBSCRIBERS:
        return "service is online and has subscribers"
    if self == HistoryStatus.NO_SUBSCRIBERS:
        return "no subscribers on history control channel"
    if self == HistoryStatus.REDIS_OFFLINE:
        return "unable to connect to redis"
    assert_never(self)

SupportsSerde

Bases: Protocol

Source code in packages/tangram_history/src/tangram_history/cli.py
113
114
115
116
class SupportsSerde(Protocol):
    @staticmethod
    def from_json_bytes(data: bytes) -> Any: ...
    def to_json_bytes(self) -> bytes: ...
from_json_bytes staticmethod
from_json_bytes(data: bytes) -> Any
Source code in packages/tangram_history/src/tangram_history/cli.py
114
115
@staticmethod
def from_json_bytes(data: bytes) -> Any: ...
to_json_bytes
to_json_bytes() -> bytes
Source code in packages/tangram_history/src/tangram_history/cli.py
116
def to_json_bytes(self) -> bytes: ...

print_error

print_error(v: Any) -> None
Source code in packages/tangram_history/src/tangram_history/cli.py
50
51
def print_error(v: Any) -> None:
    console.print(f"[bold red]error[/bold red]: {v}")

print_note

print_note(v: Any) -> None
Source code in packages/tangram_history/src/tangram_history/cli.py
54
55
def print_note(v: Any) -> None:
    console.print(f"[bold cyan]note[/bold cyan]: {v}")

print_warning

print_warning(v: Any) -> None
Source code in packages/tangram_history/src/tangram_history/cli.py
58
59
def print_warning(v: Any) -> None:
    console.print(f"[bold yellow]warning[/bold yellow]: {v}")

parse_config

parse_config(config_path: Path) -> _CfgResult
Source code in packages/tangram_history/src/tangram_history/cli.py
68
69
70
71
72
73
74
75
76
77
78
79
80
def parse_config(config_path: Path) -> _CfgResult:
    from pydantic import TypeAdapter

    if not config_path.exists():
        print_error(f"config file not found: {config_path}")
        raise typer.Exit(1)

    config = Config.from_file(config_path)
    plugin_config = config.plugins.get("tangram_history", {})
    return _CfgResult(
        config=config,
        history_config=TypeAdapter(HistoryConfig).validate_python(plugin_config),
    )

get_service_status

get_service_status(
    redis_url: str, control_channel: str
) -> HistoryStatus
Source code in packages/tangram_history/src/tangram_history/cli.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def get_service_status(redis_url: str, control_channel: str) -> HistoryStatus:
    try:
        r = redis.from_url(redis_url)  # type: ignore
        response: list[tuple[str, int]] = r.pubsub_numsub(control_channel)  # type: ignore
        if not response:
            return HistoryStatus.NO_SUBSCRIBERS
        return (
            HistoryStatus.HAS_SUBSCRIBERS
            if response[0][1] > 0  # type: ignore
            else HistoryStatus.NO_SUBSCRIBERS
        )
    except redis.ConnectionError:
        return HistoryStatus.REDIS_OFFLINE

send_command

send_command(
    config_result: _CfgResult,
    sender_id: str,
    message_bytes: bytes,
    response_type: type[T],
    *,
    timeout: int = 10,
) -> T | None
Source code in packages/tangram_history/src/tangram_history/cli.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def send_command(
    config_result: _CfgResult,
    sender_id: str,
    message_bytes: bytes,
    response_type: type[T],
    *,
    timeout: int = 10,
) -> T | None:
    redis_url = config_result.config.core.redis_url
    control_channel = config_result.history_config.control_channel
    response_channel = f"{control_channel}:response:{sender_id}"

    try:
        r = redis.from_url(redis_url)  # type: ignore
        p = r.pubsub()
        p.subscribe(response_channel)
        p.get_message(timeout=2.0)  # subscription confirmation

        r.publish(control_channel, message_bytes)

        response_message = p.get_message(timeout=timeout)
        p.unsubscribe()
        r.close()

        if response_message and response_message.get("type") == "message":
            return response_type.from_json_bytes(response_message["data"])
        return None
    except redis.ConnectionError:
        return None

format_schema

format_schema(schema_str: str, prefix: str = '') -> str
Source code in packages/tangram_history/src/tangram_history/cli.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def format_schema(schema_str: str, prefix: str = "") -> str:
    schema = orjson.loads(schema_str)

    lines = []

    fields = schema.get("fields")
    if not fields:
        return str(schema)

    for i, field in enumerate(fields):
        name = field.get("name", "unknown")
        type_info = field.get("type")
        nullable = field.get("nullable", True)

        is_last = i == len(fields) - 1
        connector = "└─" if is_last else "├─"

        type_str = ""
        if isinstance(type_info, str):
            type_str = type_info
        elif isinstance(type_info, dict):
            # TODO enhance this
            type_str = type_info.get("type", "complex")
            if type_str == "struct":
                type_str = "struct<...>"
            elif type_str == "array":
                type_str = "array<...>"
            elif type_str == "map":
                type_str = "map<...>"

        metadata = field.get("metadata", {})
        meta_str = f" (metadata: {metadata})" if metadata else ""
        nullable_str = "[dim] (nullable)[/dim]" if nullable else ""
        lines.append(
            f"{prefix}{connector} [bold cyan]{name}[/bold cyan]: {type_str}"
            f"{nullable_str}{meta_str}"
        )

        # recursively format struct fields is tricky with JSON string passed down
        # simpler to just show it as struct<...> for now or serialize subsection

    return "\n".join(lines)

ls

ls(
    config: PathTangramConfig,
    show_schema: bool = True,
    force_offline: ForceOffline = False,
) -> None

List all history tables found in the configured storage directory.

Source code in packages/tangram_history/src/tangram_history/cli.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
@app.command()
def ls(
    config: PathTangramConfig,
    show_schema: bool = True,
    force_offline: ForceOffline = False,
) -> None:
    """List all history tables found in the configured storage directory."""
    from . import _history

    cfg = parse_config(config)
    status = get_service_status(
        cfg.config.core.redis_url, cfg.history_config.control_channel
    )

    tables: list[_history.TableInfo] = []
    if status == HistoryStatus.HAS_SUBSCRIBERS:
        sender_id = str(uuid.uuid4())
        msg = _history.ControlMessage.ListTables(sender_id).to_json_bytes()
        if response := send_command(
            cfg, sender_id, msg, _history.ControlResponse, timeout=2
        ):
            if isinstance(response, _history.ControlResponse.TableList):
                tables = response.tables
            elif isinstance(response, _history.ControlResponse.CommandFailed):
                print_error(f"list tables failed: {response.error}")
            else:
                print_error(f"unexpected response: {response}")
        else:
            print_warning("service appeared online but did not respond, trying offline")
            status = HistoryStatus.REDIS_OFFLINE

    if force_offline or status != HistoryStatus.HAS_SUBSCRIBERS:
        base_path = str(cfg.history_config.base_path)
        if not force_offline:
            print_note(f"{status.to_message()}, listing tables in {base_path}")

        tables = _history.list_tables_offline(base_path)

    for t in tables:
        console.print(f"{t.name} @ {t.uri} (version {t.version})")
        if show_schema:
            console.print(format_schema(t.schema))

rm

rm(table_name: Annotated[str, Argument(help='Name of the history table.')], predicate: Annotated[str, Argument(help=f'DataFusion SQL condition for rows to delete. "
Examples: "timestamp < '{yesterday}'", "df == 17", "altitude > 54000 OR altitude < 0".
See: <https://datafusion.apache.org/user-guide/sql/index.html>')], config: PathTangramConfig, dry_run: Annotated[bool, Option(--dry - run, help='Perform a dry run only (skip execution).')] = False, force_offline: ForceOffline = False) -> None

Delete rows from a history table based on a condition.

Queries the count and preview using datafusion SessionContext. If the tangram service is online, the service acquires a write lock and acquires semaphores for optimisation/vaccuming before execution.

Source code in packages/tangram_history/src/tangram_history/cli.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
@app.command()
def rm(
    table_name: Annotated[str, typer.Argument(help="Name of the history table.")],
    predicate: Annotated[
        str,
        typer.Argument(
            help=f"""DataFusion SQL condition for rows to delete. "
Examples: "timestamp < \'{yesterday}\'", "df == 17", "altitude > 54000 OR altitude < 0".
See: <https://datafusion.apache.org/user-guide/sql/index.html>"""
        ),
    ],
    config: PathTangramConfig,
    dry_run: Annotated[
        bool,
        typer.Option("--dry-run", help="Perform a dry run only (skip execution)."),
    ] = False,
    force_offline: ForceOffline = False,
) -> None:
    """Delete rows from a history table based on a condition.

    Queries the count and preview using datafusion `SessionContext`.
    If the tangram service is online, the service acquires a write lock and
    acquires semaphores for optimisation/vaccuming before execution.
    """
    from . import _history

    cfg = parse_config(config)
    status = get_service_status(
        cfg.config.core.redis_url, cfg.history_config.control_channel
    )

    with console.status("analyzing delete impact..."):
        if not force_offline and status == HistoryStatus.HAS_SUBSCRIBERS:
            sender_id = str(uuid.uuid4())
            msg = _history.ControlMessage.DeleteRows(
                sender_id, table_name, predicate, dry_run=True
            ).to_json_bytes()
            response = send_command(cfg, sender_id, msg, _history.ControlResponse)
            if not response:
                print_error("failed to get response from history service")
                raise typer.Exit(1)
        else:
            if not force_offline:
                print_note(f"{status.to_message()}, assuming exclusive access of table")
            response = _history.delete_rows_offline(
                base_path=str(cfg.history_config.base_path),
                table_name=table_name,
                predicate=predicate,
                dry_run=True,
            )

    if isinstance(response, _history.ControlResponse.CommandFailed):
        print_error(f"analysis failed: {response.error}")
        raise typer.Exit(1)

    if not isinstance(response, _history.ControlResponse.DeleteOutput):
        print_error(f"unexpected response: {response}")
        raise typer.Exit(1)

    count = response.affected_rows

    if count == 0:
        console.print("no rows matched the predicate.")
        return
    print_note(f"predicate matched [bold]{count}[/bold] rows")
    if preview_json := response.preview:
        rows = orjson.loads(preview_json)
        if rows:
            console.print("preview of rows to be deleted:")
            table_preview = Table(show_header=True)
            for key in rows[0].keys():
                table_preview.add_column(key)
            for row in rows:
                table_preview.add_row(*[str(row.get(k)) for k in rows[0].keys()])
            console.print(table_preview)

    if dry_run:
        return

    if not typer.confirm("are you sure?"):
        raise typer.Abort()

    time_start = time.perf_counter()
    with console.status("deleting..."):
        if not force_offline and status == HistoryStatus.HAS_SUBSCRIBERS:
            sender_id = str(uuid.uuid4())
            msg = _history.ControlMessage.DeleteRows(
                sender_id, table_name, predicate, dry_run=False
            ).to_json_bytes()
            exec_response = send_command(
                cfg, sender_id, msg, _history.ControlResponse, timeout=300
            )
            if not exec_response:
                print_error("timed out waiting for delete confirmation.")
                raise typer.Exit(1)
        else:
            exec_response = _history.delete_rows_offline(
                base_path=str(cfg.history_config.base_path),
                table_name=table_name,
                predicate=predicate,
                dry_run=False,
            )

    if isinstance(exec_response, _history.ControlResponse.CommandFailed):
        print_error(f"delete failed: {exec_response.error}")
        raise typer.Exit(1)
    if not isinstance(exec_response, _history.ControlResponse.DeleteOutput):
        print_error(f"unexpected response: {exec_response}")
        raise typer.Exit(1)
    deleted = exec_response.affected_rows
    console.print(
        f"[bold green]success:[/bold green] deleted {deleted} rows from '{table_name}'"
        f" in [bold]{time.perf_counter() - time_start:.2f} seconds[/bold]."
    )

config

HistoryConfig dataclass

Source code in packages/tangram_history/src/tangram_history/config.py
 7
 8
 9
10
11
12
13
@dataclass(frozen=True)
class HistoryConfig:
    control_channel: str = "history:control"
    base_path: Path = Path(platformdirs.user_cache_dir("tangram_history"))
    log_level: str = "INFO"
    redis_read_count: int = 1000
    redis_read_block_ms: int = 5000
control_channel class-attribute instance-attribute
control_channel: str = 'history:control'
base_path class-attribute instance-attribute
base_path: Path = Path(user_cache_dir('tangram_history'))
log_level class-attribute instance-attribute
log_level: str = 'INFO'
redis_read_count class-attribute instance-attribute
redis_read_count: int = 1000
redis_read_block_ms class-attribute instance-attribute
redis_read_block_ms: int = 5000
__init__
__init__(
    control_channel: str = "history:control",
    base_path: Path = Path(
        user_cache_dir("tangram_history")
    ),
    log_level: str = "INFO",
    redis_read_count: int = 1000,
    redis_read_block_ms: int = 5000,
) -> None

tangram_history._history

ControlMessage

Ping

Bases: ControlMessage

__match_args__ class-attribute instance-attribute
__match_args__ = ('sender',)
sender property
sender: str
__new__
__new__(sender: str) -> Ping

RegisterTable

Bases: ControlMessage

__match_args__ class-attribute instance-attribute
__match_args__ = ('_0',)
__new__
__new__(_0: RegisterTable) -> RegisterTable
__len__
__len__() -> int
__getitem__
__getitem__(key: int) -> Any

ListTables

Bases: ControlMessage

__match_args__ class-attribute instance-attribute
__match_args__ = ('sender_id',)
sender_id property
sender_id: str
__new__
__new__(sender_id: str) -> ListTables

DeleteRows

Bases: ControlMessage

Deletes rows in a table using with a specified predicate.

WARNING:

The current implementation uses raw string formatting to query row counts and previews, with the following SQL operations disallowed

It may be prone to SQL injection.

__match_args__ class-attribute instance-attribute
__match_args__ = (
    "sender_id",
    "table_name",
    "predicate",
    "dry_run",
)
sender_id property
sender_id: str
table_name property
table_name: str
predicate property
predicate: str

The predicate expression, which must have Boolean type

See: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html

dry_run property
dry_run: bool
__new__
__new__(
    sender_id: str,
    table_name: str,
    predicate: str,
    dry_run: bool,
) -> DeleteRows

from_json_bytes staticmethod

from_json_bytes(data: bytes) -> ControlMessage

to_json_bytes

to_json_bytes() -> bytes

ControlResponse

TableRegistered

Bases: ControlResponse

__match_args__ class-attribute instance-attribute
__match_args__ = ('request_id', 'table_name', 'table_uri')
request_id property
request_id: str
table_name property
table_name: str
table_uri property
table_uri: str
__new__
__new__(
    request_id: str, table_name: str, table_uri: str
) -> TableRegistered

RegistrationFailed

Bases: ControlResponse

__match_args__ class-attribute instance-attribute
__match_args__ = ('request_id', 'table_name', 'error')
request_id property
request_id: str
table_name property
table_name: str
error property
error: str
__new__
__new__(
    request_id: str, table_name: str, error: str
) -> RegistrationFailed

Pong

Bases: ControlResponse

__match_args__ class-attribute instance-attribute
__match_args__ = ('sender',)
sender property
sender: str
__new__
__new__(sender: str) -> Pong

TableList

Bases: ControlResponse

__match_args__ class-attribute instance-attribute
__match_args__ = ('request_id', 'tables')
request_id property
request_id: str
tables property
tables: list[TableInfo]
__new__
__new__(
    request_id: str, tables: Sequence[TableInfo]
) -> TableList

DeleteOutput

Bases: ControlResponse

Successful delete response with affected row count and optional preview.

__match_args__ class-attribute instance-attribute
__match_args__ = (
    "request_id",
    "dry_run",
    "affected_rows",
    "preview",
)
request_id property
request_id: str
dry_run property
dry_run: bool
affected_rows property
affected_rows: int
preview property
preview: Optional[str]

JSON string of RecordBatch

__new__
__new__(
    request_id: str,
    dry_run: bool,
    affected_rows: int,
    preview: Optional[str],
) -> DeleteOutput

CommandFailed

Bases: ControlResponse

Returned when a control command fails; contains the error message.

__match_args__ class-attribute instance-attribute
__match_args__ = ('request_id', 'error')
request_id property
request_id: str
error property
error: str
__new__
__new__(request_id: str, error: str) -> CommandFailed

from_json_bytes staticmethod

from_json_bytes(data: bytes) -> ControlResponse

to_json_bytes

to_json_bytes() -> bytes

HistoryConfig

redis_url property writable

redis_url: str

control_channel property writable

control_channel: str

base_path property writable

base_path: str

redis_read_count property writable

redis_read_count: int

redis_read_block_ms property writable

redis_read_block_ms: int

__new__

__new__(
    redis_url: str,
    control_channel: str,
    base_path: str,
    redis_read_count: int,
    redis_read_block_ms: int,
) -> HistoryConfig

RegisterTable

sender_id property writable

sender_id: str

table_name property writable

table_name: str

schema property writable

schema: str

Base64 encoded arrow ipc schema format

partition_columns property writable

partition_columns: list[str]

optimize_interval_secs property writable

optimize_interval_secs: int

optimize_target_file_size property writable

optimize_target_file_size: int

vacuum_interval_secs property writable

vacuum_interval_secs: int

vacuum_retention_period_secs property writable

vacuum_retention_period_secs: Optional[int]

checkpoint_interval property writable

checkpoint_interval: int

from_json_bytes staticmethod

from_json_bytes(data: bytes) -> RegisterTable

to_json_bytes

to_json_bytes() -> bytes

TableInfo

name property writable

name: str

uri property writable

uri: str

version property writable

version: int

schema property writable

schema: str

Serialised JSON schema

from_json_bytes staticmethod

from_json_bytes(data: bytes) -> TableInfo

to_json_bytes

to_json_bytes() -> bytes

delete_rows_offline

delete_rows_offline(
    base_path: str,
    table_name: str,
    predicate: str,
    dry_run: bool,
) -> ControlResponse

Delete rows from a history table stored on disk.

Returns:

Type Description
ControlResponse

ControlResponse.DeleteOutput on success, ControlResponse.CommandFailed on failure.

Raises:

Type Description
OSError

if the table does not exist or filesystem access fails.

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

list_tables_offline

list_tables_offline(base_path: str) -> list[TableInfo]

List history tables by inspecting the on-disk Delta Lake directory.

Raises:

Type Description
OSError

if the table does not exist or filesystem access fails.

run_history

run_history(config: HistoryConfig) -> Any

Start the history ingest service.

Raises:

Type Description
RuntimeError

if the service fails to start or crashes.