Skip to content

Datalink

plugin module-attribute

plugin = Plugin(
    frontend_path="dist-frontend",
    routers=[router],
    config_class=DatalinkConfig,
    frontend_config_class=DatalinkFrontendConfig,
    into_frontend_config_function=into_frontend,
)

__all__ module-attribute

__all__ = ['plugin']

backend

log module-attribute

log = getLogger(__name__)

router module-attribute

router = APIRouter(prefix='/datalink', tags=['datalink'])

plugin module-attribute

plugin = Plugin(
    frontend_path="dist-frontend",
    routers=[router],
    config_class=DatalinkConfig,
    frontend_config_class=DatalinkFrontendConfig,
    into_frontend_config_function=into_frontend,
)

DatalinkConfig dataclass

Source code in packages/tangram_datalink/src/tangram_datalink/backend.py
17
18
19
20
21
22
23
@dataclass(frozen=True)
class DatalinkConfig:
    state_vector_expire: int = 3600
    stream_interval_secs: float = 1.0
    log_level: str = "INFO"
    topbar_order: int = 90
    sidebar_order: int = 90
state_vector_expire class-attribute instance-attribute
state_vector_expire: int = 3600
stream_interval_secs class-attribute instance-attribute
stream_interval_secs: float = 1.0
log_level class-attribute instance-attribute
log_level: str = 'INFO'
topbar_order class-attribute instance-attribute
topbar_order: int = 90
sidebar_order class-attribute instance-attribute
sidebar_order: int = 90
__init__
__init__(
    state_vector_expire: int = 3600,
    stream_interval_secs: float = 1.0,
    log_level: str = "INFO",
    topbar_order: int = 90,
    sidebar_order: int = 90,
) -> None

DatalinkFrontendConfig dataclass

Bases: HasTopbarUiConfig, HasSidebarUiConfig

Source code in packages/tangram_datalink/src/tangram_datalink/backend.py
26
27
28
29
30
31
32
33
34
35
36
@dataclass(frozen=True)
class DatalinkFrontendConfig(
    tangram_core.config.HasTopbarUiConfig, tangram_core.config.HasSidebarUiConfig
):
    topbar_order: Annotated[int, FrontendMutable()]
    sidebar_order: Annotated[int, FrontendMutable()]
    # Anchor for the entity filter custom widget;
    # actual state lives in the frontend store.
    filter_ui: Annotated[Optional[Any], FrontendMutable(widget="datalink-filter")] = (
        None
    )
topbar_order instance-attribute
topbar_order: Annotated[int, FrontendMutable()]
sidebar_order instance-attribute
sidebar_order: Annotated[int, FrontendMutable()]
filter_ui class-attribute instance-attribute
filter_ui: Annotated[
    Optional[Any], FrontendMutable(widget="datalink-filter")
] = None
__init__
__init__(
    topbar_order: Annotated[int, FrontendMutable()],
    sidebar_order: Annotated[int, FrontendMutable()],
    filter_ui: Annotated[
        Optional[Any],
        FrontendMutable(widget="datalink-filter"),
    ] = None,
) -> None

into_frontend

into_frontend(
    config: DatalinkConfig,
) -> DatalinkFrontendConfig
Source code in packages/tangram_datalink/src/tangram_datalink/backend.py
39
40
41
42
43
def into_frontend(config: DatalinkConfig) -> DatalinkFrontendConfig:
    return DatalinkFrontendConfig(
        topbar_order=config.topbar_order,
        sidebar_order=config.sidebar_order,
    )
run_datalink(backend_state: BackendState) -> None
Source code in packages/tangram_datalink/src/tangram_datalink/backend.py
55
56
57
58
59
60
61
62
63
64
65
66
67
@plugin.register_service()
async def run_datalink(backend_state: tangram_core.BackendState) -> None:
    from . import _datalink

    plugin_config = backend_state.config.plugins.get("tangram_datalink", {})
    config_datalink = TypeAdapter(DatalinkConfig).validate_python(plugin_config)

    rust_config = _datalink.DatalinkConfig(
        redis_url=backend_state.config.core.redis_url,
        state_vector_expire=config_datalink.state_vector_expire,
        stream_interval_secs=config_datalink.stream_interval_secs,
    )
    await _datalink.run_datalink(rust_config)

tangram_datalink.backend

log module-attribute

log = getLogger(__name__)

router module-attribute

router = APIRouter(prefix='/datalink', tags=['datalink'])

plugin module-attribute

plugin = Plugin(
    frontend_path="dist-frontend",
    routers=[router],
    config_class=DatalinkConfig,
    frontend_config_class=DatalinkFrontendConfig,
    into_frontend_config_function=into_frontend,
)

DatalinkConfig dataclass

Source code in packages/tangram_datalink/src/tangram_datalink/backend.py
17
18
19
20
21
22
23
@dataclass(frozen=True)
class DatalinkConfig:
    state_vector_expire: int = 3600
    stream_interval_secs: float = 1.0
    log_level: str = "INFO"
    topbar_order: int = 90
    sidebar_order: int = 90

state_vector_expire class-attribute instance-attribute

state_vector_expire: int = 3600

stream_interval_secs class-attribute instance-attribute

stream_interval_secs: float = 1.0

log_level class-attribute instance-attribute

log_level: str = 'INFO'

topbar_order class-attribute instance-attribute

topbar_order: int = 90

sidebar_order class-attribute instance-attribute

sidebar_order: int = 90

__init__

__init__(
    state_vector_expire: int = 3600,
    stream_interval_secs: float = 1.0,
    log_level: str = "INFO",
    topbar_order: int = 90,
    sidebar_order: int = 90,
) -> None

DatalinkFrontendConfig dataclass

Bases: HasTopbarUiConfig, HasSidebarUiConfig

Source code in packages/tangram_datalink/src/tangram_datalink/backend.py
26
27
28
29
30
31
32
33
34
35
36
@dataclass(frozen=True)
class DatalinkFrontendConfig(
    tangram_core.config.HasTopbarUiConfig, tangram_core.config.HasSidebarUiConfig
):
    topbar_order: Annotated[int, FrontendMutable()]
    sidebar_order: Annotated[int, FrontendMutable()]
    # Anchor for the entity filter custom widget;
    # actual state lives in the frontend store.
    filter_ui: Annotated[Optional[Any], FrontendMutable(widget="datalink-filter")] = (
        None
    )

topbar_order instance-attribute

topbar_order: Annotated[int, FrontendMutable()]

sidebar_order instance-attribute

sidebar_order: Annotated[int, FrontendMutable()]

filter_ui class-attribute instance-attribute

filter_ui: Annotated[
    Optional[Any], FrontendMutable(widget="datalink-filter")
] = None

__init__

__init__(
    topbar_order: Annotated[int, FrontendMutable()],
    sidebar_order: Annotated[int, FrontendMutable()],
    filter_ui: Annotated[
        Optional[Any],
        FrontendMutable(widget="datalink-filter"),
    ] = None,
) -> None

into_frontend

into_frontend(
    config: DatalinkConfig,
) -> DatalinkFrontendConfig
Source code in packages/tangram_datalink/src/tangram_datalink/backend.py
39
40
41
42
43
def into_frontend(config: DatalinkConfig) -> DatalinkFrontendConfig:
    return DatalinkFrontendConfig(
        topbar_order=config.topbar_order,
        sidebar_order=config.sidebar_order,
    )
run_datalink(backend_state: BackendState) -> None
Source code in packages/tangram_datalink/src/tangram_datalink/backend.py
55
56
57
58
59
60
61
62
63
64
65
66
67
@plugin.register_service()
async def run_datalink(backend_state: tangram_core.BackendState) -> None:
    from . import _datalink

    plugin_config = backend_state.config.plugins.get("tangram_datalink", {})
    config_datalink = TypeAdapter(DatalinkConfig).validate_python(plugin_config)

    rust_config = _datalink.DatalinkConfig(
        redis_url=backend_state.config.core.redis_url,
        state_vector_expire=config_datalink.state_vector_expire,
        stream_interval_secs=config_datalink.stream_interval_secs,
    )
    await _datalink.run_datalink(rust_config)

DatalinkConfig

redis_url property writable

redis_url: str

state_vector_expire property writable

state_vector_expire: int

stream_interval_secs property writable

stream_interval_secs: float

__new__

__new__(
    redis_url: str,
    state_vector_expire: int,
    stream_interval_secs: float,
) -> DatalinkConfig
run_datalink(config: DatalinkConfig) -> Any