Skip to content

Core

tangram_core

InjectBackendState module-attribute

InjectBackendState: TypeAlias = Annotated[
    BackendState, Depends(get_state)
]

BackendState dataclass

Source code in packages/tangram_core/src/tangram_core/backend.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@dataclass
class BackendState:
    redis_client: redis.Redis
    http_client: httpx.AsyncClient
    config: Config
    loaded_plugins: dict[str, Plugin] = field(default_factory=dict)

    @property
    def base_url(self) -> str:
        host = self.config.server.host
        port = self.config.server.port
        if host == "0.0.0.0":
            host = "127.0.0.1"
        return f"http://{host}:{port}"

redis_client instance-attribute

redis_client: Redis

http_client instance-attribute

http_client: AsyncClient

config instance-attribute

config: Config

loaded_plugins class-attribute instance-attribute

loaded_plugins: dict[str, Plugin] = field(
    default_factory=dict
)

base_url property

base_url: str

Runtime

Manages the lifecycle of the Tangram backend, including the Uvicorn server, background services, and connection pools (Redis, HTTPX).

Source code in packages/tangram_core/src/tangram_core/backend.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
class Runtime:
    """Manages the lifecycle of the Tangram backend, including the
    Uvicorn server, background services, and connection pools (Redis, HTTPX).
    """

    def __init__(self, config: IntoConfig | None = None) -> None:
        if isinstance(config, (str, Path, os.PathLike)):
            self.config = Config.from_file(config)
        else:
            self.config = config or Config()
        self._stack = AsyncExitStack()
        self._state: BackendState | None = None
        self._server: uvicorn.Server | None = None
        self._server_task: asyncio.Task[None] | None = None
        self._service_tasks: list[asyncio.Task[None]] = []

    @property
    def state(self) -> BackendState:
        if self._state is None:
            raise RuntimeError("runtime is not started, call start() first")
        return self._state

    async def start(self) -> Runtime:
        """Starts the backend runtime."""
        if self._state is not None:
            raise RuntimeError("runtime is already started")

        redis_client = await self._stack.enter_async_context(
            redis.from_url(self.config.core.redis_url)  # type: ignore
        )
        http_client = await self._stack.enter_async_context(
            httpx.AsyncClient(http2=True)
        )
        self._state = BackendState(
            redis_client=redis_client,
            http_client=http_client,
            config=self.config,
        )

        loaded_plugins = load_enabled_plugins(self.config)
        app = create_app(self._state, loaded_plugins)

        server_config = uvicorn.Config(
            app,
            host=self.config.server.host,
            port=self.config.server.port,
            log_config=get_log_config_dict(self.config),
        )
        self._server = uvicorn.Server(server_config)

        self._service_tasks.append(
            asyncio.create_task(run_channel_service(self.config))
        )
        for plugin in loaded_plugins:
            for _, service_func in sorted(
                plugin.services, key=lambda s: (s[0], s[1].__name__)
            ):
                self._service_tasks.append(
                    asyncio.create_task(service_func(self._state))
                )
                logger.info(f"started service from plugin: {plugin.dist_name}")

        self._server_task = asyncio.create_task(self._server.serve())

        while not self._server.started:
            if self._server_task.done():
                await self._server_task
            await asyncio.sleep(0.1)

        return self

    async def wait(self) -> None:
        """Waits for the server task to complete (e.g. via signal or internal error)."""
        if self._server_task:
            try:
                await self._server_task
            except asyncio.CancelledError:
                pass

    async def stop(self) -> None:
        """Stops the backend runtime."""
        if self._server and self._server.started:
            self._server.should_exit = True
            if self._server_task:
                try:
                    await self._server_task
                except asyncio.CancelledError:
                    pass

        for task in self._service_tasks:
            task.cancel()
        if self._service_tasks:
            await asyncio.gather(*self._service_tasks, return_exceptions=True)
        self._service_tasks.clear()

        await self._stack.aclose()
        self._state = None
        self._server = None
        self._server_task = None

    async def __aenter__(self) -> Runtime:
        return await self.start()

    async def __aexit__(self, *args: Any) -> None:
        await self.stop()

config instance-attribute

config = from_file(config)

state property

state: BackendState

start async

start() -> Runtime

Starts the backend runtime.

Source code in packages/tangram_core/src/tangram_core/backend.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
async def start(self) -> Runtime:
    """Starts the backend runtime."""
    if self._state is not None:
        raise RuntimeError("runtime is already started")

    redis_client = await self._stack.enter_async_context(
        redis.from_url(self.config.core.redis_url)  # type: ignore
    )
    http_client = await self._stack.enter_async_context(
        httpx.AsyncClient(http2=True)
    )
    self._state = BackendState(
        redis_client=redis_client,
        http_client=http_client,
        config=self.config,
    )

    loaded_plugins = load_enabled_plugins(self.config)
    app = create_app(self._state, loaded_plugins)

    server_config = uvicorn.Config(
        app,
        host=self.config.server.host,
        port=self.config.server.port,
        log_config=get_log_config_dict(self.config),
    )
    self._server = uvicorn.Server(server_config)

    self._service_tasks.append(
        asyncio.create_task(run_channel_service(self.config))
    )
    for plugin in loaded_plugins:
        for _, service_func in sorted(
            plugin.services, key=lambda s: (s[0], s[1].__name__)
        ):
            self._service_tasks.append(
                asyncio.create_task(service_func(self._state))
            )
            logger.info(f"started service from plugin: {plugin.dist_name}")

    self._server_task = asyncio.create_task(self._server.serve())

    while not self._server.started:
        if self._server_task.done():
            await self._server_task
        await asyncio.sleep(0.1)

    return self

wait async

wait() -> None

Waits for the server task to complete (e.g. via signal or internal error).

Source code in packages/tangram_core/src/tangram_core/backend.py
490
491
492
493
494
495
496
async def wait(self) -> None:
    """Waits for the server task to complete (e.g. via signal or internal error)."""
    if self._server_task:
        try:
            await self._server_task
        except asyncio.CancelledError:
            pass

stop async

stop() -> None

Stops the backend runtime.

Source code in packages/tangram_core/src/tangram_core/backend.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
async def stop(self) -> None:
    """Stops the backend runtime."""
    if self._server and self._server.started:
        self._server.should_exit = True
        if self._server_task:
            try:
                await self._server_task
            except asyncio.CancelledError:
                pass

    for task in self._service_tasks:
        task.cancel()
    if self._service_tasks:
        await asyncio.gather(*self._service_tasks, return_exceptions=True)
    self._service_tasks.clear()

    await self._stack.aclose()
    self._state = None
    self._server = None
    self._server_task = None

Config dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@dataclass
class Config:
    core: CoreConfig = field(default_factory=CoreConfig)
    server: ServerConfig = field(default_factory=ServerConfig)
    channel: ChannelConfig = field(default_factory=ChannelConfig)
    map: MapConfig = field(default_factory=MapConfig)
    plugins: dict[str, Any] = field(default_factory=dict)
    """Mapping of plugin name to plugin-specific config."""
    cache: CacheConfig = field(default_factory=CacheConfig)

    @classmethod
    def from_file(cls, config_path: StrOrPathLike) -> Config:
        if sys.version_info < (3, 11):
            import tomli as tomllib
        else:
            import tomllib
        from pydantic import TypeAdapter

        with open(config_path, "rb") as f:
            cfg_data = tomllib.load(f)

        base_dir = Path(config_path).parent
        map = cfg_data.setdefault("map", {})
        if (s := map.get("style", None)) is not None:
            map["style"] = try_resolve_local_style(base_dir, s, allow_style_name=True)

        map["styles"] = [
            try_resolve_local_style(base_dir, style, allow_style_name=False)
            for style in map.get("styles", []) or default_styles()
        ]

        config_adapter = TypeAdapter(cls)
        config = config_adapter.validate_python(cfg_data)
        return config

core class-attribute instance-attribute

core: CoreConfig = field(default_factory=CoreConfig)

server class-attribute instance-attribute

server: ServerConfig = field(default_factory=ServerConfig)

channel class-attribute instance-attribute

channel: ChannelConfig = field(
    default_factory=ChannelConfig
)

map class-attribute instance-attribute

map: MapConfig = field(default_factory=MapConfig)

plugins class-attribute instance-attribute

plugins: dict[str, Any] = field(default_factory=dict)

Mapping of plugin name to plugin-specific config.

cache class-attribute instance-attribute

cache: CacheConfig = field(default_factory=CacheConfig)

from_file classmethod

from_file(config_path: StrOrPathLike) -> Config
Source code in packages/tangram_core/src/tangram_core/config.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@classmethod
def from_file(cls, config_path: StrOrPathLike) -> Config:
    if sys.version_info < (3, 11):
        import tomli as tomllib
    else:
        import tomllib
    from pydantic import TypeAdapter

    with open(config_path, "rb") as f:
        cfg_data = tomllib.load(f)

    base_dir = Path(config_path).parent
    map = cfg_data.setdefault("map", {})
    if (s := map.get("style", None)) is not None:
        map["style"] = try_resolve_local_style(base_dir, s, allow_style_name=True)

    map["styles"] = [
        try_resolve_local_style(base_dir, style, allow_style_name=False)
        for style in map.get("styles", []) or default_styles()
    ]

    config_adapter = TypeAdapter(cls)
    config = config_adapter.validate_python(cfg_data)
    return config

Plugin dataclass

Stores the metadata and registered API routes, background services and frontend assets for a tangram plugin.

Packages should declare an entry point in the tangram_core.plugins group in their pyproject.toml pointing to an instance of this class.

Source code in packages/tangram_core/src/tangram_core/plugin.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@dataclass
class Plugin:
    """Stores the metadata and registered API routes, background services and
    frontend assets for a tangram plugin.

    Packages should declare an entry point in the `tangram_core.plugins` group
    in their `pyproject.toml` pointing to an instance of this class.
    """

    frontend_path: str | None = None
    """Path to the compiled frontend assets, *relative* to the distribution root
    (editable) or package root (wheel).
    """
    routers: list[APIRouter] = field(default_factory=list)
    config_class: type | None = None
    """The **backend** configuration class (dataclass, TypedDict or Pydantic model)
    for the plugin."""
    frontend_config_class: type | None = None
    """The **frontend** configuration class for the plugin. If set, it will be used to
    generate the frontend schema and validate settings updates.
    Fields should be annotated with [tangram_core.config.FrontendMutable][] if they
    are allowed to be modified from the frontend settings UI.
    """
    into_frontend_config_function: IntoFrontendConfigFunction | None = None
    """Function to transform the backend configuration into the frontend
    configuration. It receives the validated backend configuration object and
    should return an instance of `frontend_config_class`.
    Useful if the user wants to hide sensitive fields (e.g. API keys) from the frontend
    or dynamically compute certain fields.
    Required if `frontend_config_class` is set.
    """
    lifespan: Lifespan | None = None
    """Async context manager for plugin initialization and teardown."""
    services: list[tuple[Priority, ServiceAsyncFunc]] = field(
        default_factory=list, init=False
    )
    get_typer: Callable[[], Typer] | None = None
    """A function that returns the subcommands which will later be registered."""
    dist_name: str = field(init=False)
    """Name of the distribution (package) that provided this plugin, populated
    automatically during loading.
    """  # we do this so plugins can know their own package name if needed

    def register_service(
        self, priority: Priority = 0
    ) -> Callable[[ServiceFunc], ServiceFunc]:
        """Decorator to register a background service function.

        Services are long-running async functions that receive the BackendState
        and are started when the application launches.
        """

        def decorator(func: ServiceFunc) -> ServiceFunc:
            @functools.wraps(func)
            async def async_wrapper(backend_state: BackendState) -> None:
                if asyncio.iscoroutinefunction(func):
                    await func(backend_state)
                else:
                    await asyncio.to_thread(func, backend_state)

            self.services.append((priority, async_wrapper))
            return func

        return decorator

    # HACK: so lru_cache on adapter works.
    # since we have mutable fields (routers, register_service, dist_name on init)
    # its difficult to make this class frozen,
    # so maybe we should implement __eq__ and __hash__ ourselves?
    __hash__ = object.__hash__

    @functools.lru_cache
    def adapter(self) -> TypeAdapter | None:
        """Returns a cached Pydantic TypeAdapter for the plugin's configuration class.

        Avoids expensive rebuilds on every validation request, such as those from the
        settings UI.
        """
        from pydantic import TypeAdapter

        if self.config_class:
            return TypeAdapter(self.config_class)
        return None

    @functools.lru_cache
    def frontend_adapter(self) -> TypeAdapter | None:
        """Returns a cached Pydantic TypeAdapter for the plugin's frontend
        configuration class."""
        from pydantic import TypeAdapter

        if self.frontend_config_class:
            return TypeAdapter(self.frontend_config_class)
        return None

frontend_path class-attribute instance-attribute

frontend_path: str | None = None

Path to the compiled frontend assets, relative to the distribution root (editable) or package root (wheel).

routers class-attribute instance-attribute

routers: list[APIRouter] = field(default_factory=list)

config_class class-attribute instance-attribute

config_class: type | None = None

The backend configuration class (dataclass, TypedDict or Pydantic model) for the plugin.

frontend_config_class class-attribute instance-attribute

frontend_config_class: type | None = None

The frontend configuration class for the plugin. If set, it will be used to generate the frontend schema and validate settings updates. Fields should be annotated with tangram_core.config.FrontendMutable if they are allowed to be modified from the frontend settings UI.

into_frontend_config_function class-attribute instance-attribute

into_frontend_config_function: (
    IntoFrontendConfigFunction | None
) = None

Function to transform the backend configuration into the frontend configuration. It receives the validated backend configuration object and should return an instance of frontend_config_class. Useful if the user wants to hide sensitive fields (e.g. API keys) from the frontend or dynamically compute certain fields. Required if frontend_config_class is set.

lifespan class-attribute instance-attribute

lifespan: Lifespan | None = None

Async context manager for plugin initialization and teardown.

services class-attribute instance-attribute

services: list[tuple[Priority, ServiceAsyncFunc]] = field(
    default_factory=list, init=False
)

get_typer class-attribute instance-attribute

get_typer: Callable[[], Typer] | None = None

A function that returns the subcommands which will later be registered.

dist_name class-attribute instance-attribute

dist_name: str = field(init=False)

Name of the distribution (package) that provided this plugin, populated automatically during loading.

register_service

register_service(
    priority: Priority = 0,
) -> Callable[[ServiceFunc], ServiceFunc]

Decorator to register a background service function.

Services are long-running async functions that receive the BackendState and are started when the application launches.

Source code in packages/tangram_core/src/tangram_core/plugin.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def register_service(
    self, priority: Priority = 0
) -> Callable[[ServiceFunc], ServiceFunc]:
    """Decorator to register a background service function.

    Services are long-running async functions that receive the BackendState
    and are started when the application launches.
    """

    def decorator(func: ServiceFunc) -> ServiceFunc:
        @functools.wraps(func)
        async def async_wrapper(backend_state: BackendState) -> None:
            if asyncio.iscoroutinefunction(func):
                await func(backend_state)
            else:
                await asyncio.to_thread(func, backend_state)

        self.services.append((priority, async_wrapper))
        return func

    return decorator

adapter cached

adapter() -> TypeAdapter | None

Returns a cached Pydantic TypeAdapter for the plugin's configuration class.

Avoids expensive rebuilds on every validation request, such as those from the settings UI.

Source code in packages/tangram_core/src/tangram_core/plugin.py
103
104
105
106
107
108
109
110
111
112
113
114
@functools.lru_cache
def adapter(self) -> TypeAdapter | None:
    """Returns a cached Pydantic TypeAdapter for the plugin's configuration class.

    Avoids expensive rebuilds on every validation request, such as those from the
    settings UI.
    """
    from pydantic import TypeAdapter

    if self.config_class:
        return TypeAdapter(self.config_class)
    return None

frontend_adapter cached

frontend_adapter() -> TypeAdapter | None

Returns a cached Pydantic TypeAdapter for the plugin's frontend configuration class.

Source code in packages/tangram_core/src/tangram_core/plugin.py
116
117
118
119
120
121
122
123
124
@functools.lru_cache
def frontend_adapter(self) -> TypeAdapter | None:
    """Returns a cached Pydantic TypeAdapter for the plugin's frontend
    configuration class."""
    from pydantic import TypeAdapter

    if self.frontend_config_class:
        return TypeAdapter(self.frontend_config_class)
    return None

backend

logger module-attribute

logger = getLogger(__name__)

InjectBackendState module-attribute

InjectBackendState: TypeAlias = Annotated[
    BackendState, Depends(get_state)
]

CACHE_PARAM_PATTERN module-attribute

CACHE_PARAM_PATTERN = compile('\\{(\\w+)\\}')

DEFAULT_THEMES module-attribute

DEFAULT_THEMES = (
    ThemeDefinition(
        name="light",
        background="#ffffff",
        foreground="#000000",
        surface="#f8f9fa",
        border="#e7e7e7",
        hover="#e9ecef",
        accent1="oklch(0.5616 0.0895 251.64)",
        accent1_foreground="#ffffff",
        accent2="oklch(0.8021 0.11 92.43)",
        accent2_foreground="#000000",
        muted="#666666",
        error="#8e1b27",
    ),
    ThemeDefinition(
        name="dark",
        background="#1a1a1a",
        foreground="#e0e0e0",
        surface="#2d2d2d",
        border="#404040",
        hover="#343434",
        accent1="oklch(0.5059 0.0895 251.64)",
        accent1_foreground="#ffffff",
        accent2="oklch(0.5059 0.0895 93.53)",
        accent2_foreground="#ffffff",
        muted="#999999",
        error="#844d53",
    ),
)

BackendState dataclass

Source code in packages/tangram_core/src/tangram_core/backend.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@dataclass
class BackendState:
    redis_client: redis.Redis
    http_client: httpx.AsyncClient
    config: Config
    loaded_plugins: dict[str, Plugin] = field(default_factory=dict)

    @property
    def base_url(self) -> str:
        host = self.config.server.host
        port = self.config.server.port
        if host == "0.0.0.0":
            host = "127.0.0.1"
        return f"http://{host}:{port}"
redis_client instance-attribute
redis_client: Redis
http_client instance-attribute
http_client: AsyncClient
config instance-attribute
config: Config
loaded_plugins class-attribute instance-attribute
loaded_plugins: dict[str, Plugin] = field(
    default_factory=dict
)
base_url property
base_url: str

Runtime

Manages the lifecycle of the Tangram backend, including the Uvicorn server, background services, and connection pools (Redis, HTTPX).

Source code in packages/tangram_core/src/tangram_core/backend.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
class Runtime:
    """Manages the lifecycle of the Tangram backend, including the
    Uvicorn server, background services, and connection pools (Redis, HTTPX).
    """

    def __init__(self, config: IntoConfig | None = None) -> None:
        if isinstance(config, (str, Path, os.PathLike)):
            self.config = Config.from_file(config)
        else:
            self.config = config or Config()
        self._stack = AsyncExitStack()
        self._state: BackendState | None = None
        self._server: uvicorn.Server | None = None
        self._server_task: asyncio.Task[None] | None = None
        self._service_tasks: list[asyncio.Task[None]] = []

    @property
    def state(self) -> BackendState:
        if self._state is None:
            raise RuntimeError("runtime is not started, call start() first")
        return self._state

    async def start(self) -> Runtime:
        """Starts the backend runtime."""
        if self._state is not None:
            raise RuntimeError("runtime is already started")

        redis_client = await self._stack.enter_async_context(
            redis.from_url(self.config.core.redis_url)  # type: ignore
        )
        http_client = await self._stack.enter_async_context(
            httpx.AsyncClient(http2=True)
        )
        self._state = BackendState(
            redis_client=redis_client,
            http_client=http_client,
            config=self.config,
        )

        loaded_plugins = load_enabled_plugins(self.config)
        app = create_app(self._state, loaded_plugins)

        server_config = uvicorn.Config(
            app,
            host=self.config.server.host,
            port=self.config.server.port,
            log_config=get_log_config_dict(self.config),
        )
        self._server = uvicorn.Server(server_config)

        self._service_tasks.append(
            asyncio.create_task(run_channel_service(self.config))
        )
        for plugin in loaded_plugins:
            for _, service_func in sorted(
                plugin.services, key=lambda s: (s[0], s[1].__name__)
            ):
                self._service_tasks.append(
                    asyncio.create_task(service_func(self._state))
                )
                logger.info(f"started service from plugin: {plugin.dist_name}")

        self._server_task = asyncio.create_task(self._server.serve())

        while not self._server.started:
            if self._server_task.done():
                await self._server_task
            await asyncio.sleep(0.1)

        return self

    async def wait(self) -> None:
        """Waits for the server task to complete (e.g. via signal or internal error)."""
        if self._server_task:
            try:
                await self._server_task
            except asyncio.CancelledError:
                pass

    async def stop(self) -> None:
        """Stops the backend runtime."""
        if self._server and self._server.started:
            self._server.should_exit = True
            if self._server_task:
                try:
                    await self._server_task
                except asyncio.CancelledError:
                    pass

        for task in self._service_tasks:
            task.cancel()
        if self._service_tasks:
            await asyncio.gather(*self._service_tasks, return_exceptions=True)
        self._service_tasks.clear()

        await self._stack.aclose()
        self._state = None
        self._server = None
        self._server_task = None

    async def __aenter__(self) -> Runtime:
        return await self.start()

    async def __aexit__(self, *args: Any) -> None:
        await self.stop()
config instance-attribute
config = from_file(config)
state property
state: BackendState
start async
start() -> Runtime

Starts the backend runtime.

Source code in packages/tangram_core/src/tangram_core/backend.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
async def start(self) -> Runtime:
    """Starts the backend runtime."""
    if self._state is not None:
        raise RuntimeError("runtime is already started")

    redis_client = await self._stack.enter_async_context(
        redis.from_url(self.config.core.redis_url)  # type: ignore
    )
    http_client = await self._stack.enter_async_context(
        httpx.AsyncClient(http2=True)
    )
    self._state = BackendState(
        redis_client=redis_client,
        http_client=http_client,
        config=self.config,
    )

    loaded_plugins = load_enabled_plugins(self.config)
    app = create_app(self._state, loaded_plugins)

    server_config = uvicorn.Config(
        app,
        host=self.config.server.host,
        port=self.config.server.port,
        log_config=get_log_config_dict(self.config),
    )
    self._server = uvicorn.Server(server_config)

    self._service_tasks.append(
        asyncio.create_task(run_channel_service(self.config))
    )
    for plugin in loaded_plugins:
        for _, service_func in sorted(
            plugin.services, key=lambda s: (s[0], s[1].__name__)
        ):
            self._service_tasks.append(
                asyncio.create_task(service_func(self._state))
            )
            logger.info(f"started service from plugin: {plugin.dist_name}")

    self._server_task = asyncio.create_task(self._server.serve())

    while not self._server.started:
        if self._server_task.done():
            await self._server_task
        await asyncio.sleep(0.1)

    return self
wait async
wait() -> None

Waits for the server task to complete (e.g. via signal or internal error).

Source code in packages/tangram_core/src/tangram_core/backend.py
490
491
492
493
494
495
496
async def wait(self) -> None:
    """Waits for the server task to complete (e.g. via signal or internal error)."""
    if self._server_task:
        try:
            await self._server_task
        except asyncio.CancelledError:
            pass
stop async
stop() -> None

Stops the backend runtime.

Source code in packages/tangram_core/src/tangram_core/backend.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
async def stop(self) -> None:
    """Stops the backend runtime."""
    if self._server and self._server.started:
        self._server.should_exit = True
        if self._server_task:
            try:
                await self._server_task
            except asyncio.CancelledError:
                pass

    for task in self._service_tasks:
        task.cancel()
    if self._service_tasks:
        await asyncio.gather(*self._service_tasks, return_exceptions=True)
    self._service_tasks.clear()

    await self._stack.aclose()
    self._state = None
    self._server = None
    self._server_task = None

get_state async

get_state(request: Request) -> BackendState
Source code in packages/tangram_core/src/tangram_core/backend.py
73
74
async def get_state(request: Request) -> BackendState:
    return request.app.state.backend_state  # type: ignore

get_distribution_path

get_distribution_path(dist_name: str) -> Path

Get the local path of a distribution, handling both editable installs (direct_url.json) and standard wheel installs.

See: https://packaging.python.org/en/latest/specifications/direct-url-data-structure/

Source code in packages/tangram_core/src/tangram_core/backend.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def get_distribution_path(dist_name: str) -> Path:
    """Get the local path of a distribution, handling both editable installs
    (`direct_url.json`) and standard wheel installs.

    See: https://packaging.python.org/en/latest/specifications/direct-url-data-structure/
    """
    # always try direct_url.json first (e.g. for the case of `uv sync --all-packages`)
    try:
        dist = Distribution.from_name(dist_name)
        if direct_url_content := dist.read_text("direct_url.json"):
            direct_url_data = json.loads(direct_url_content)
            if (
                (url := direct_url_data.get("url"))
                # url may point to a git or zip archive, but since we only care
                # about local paths, we only handle the file:// scheme here
                and url.startswith("file://")
            ):
                parsed = urllib.parse.urlparse(url)
                if os.name == "nt":
                    path_str = urllib.request.url2pathname(parsed.path)
                    if parsed.netloc and parsed.netloc not in ("", "localhost"):
                        path_str = f"//{parsed.netloc}{path_str}"
                    path1 = Path(path_str)
                else:
                    path1 = Path(urllib.parse.unquote(parsed.path))
                if path1.is_dir():
                    return path1
    except (PackageNotFoundError, json.JSONDecodeError, FileNotFoundError):
        pass

    # fallback in case it was installed via a wheel
    if (trav := importlib.resources.files(dist_name)).is_dir():
        with importlib.resources.as_file(trav) as path2:
            return path2
    raise FileNotFoundError(f"could not find distribution path for {dist_name}")

resolve_frontend

resolve_frontend(plugin: Plugin) -> Path | None
Source code in packages/tangram_core/src/tangram_core/backend.py
117
118
119
120
def resolve_frontend(plugin: Plugin) -> Path | None:
    if not plugin.frontend_path:
        return None
    return get_distribution_path(plugin.dist_name) / plugin.frontend_path

load_enabled_plugins

load_enabled_plugins(config: Config) -> list[Plugin]
Source code in packages/tangram_core/src/tangram_core/backend.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def load_enabled_plugins(
    config: Config,
) -> list[Plugin]:
    loaded_plugins = []
    enabled_plugin_names = set(config.core.plugins)

    for entry_point in scan_plugins():
        # TODO: should we check entry_point.dist.name instead?
        if entry_point.name not in enabled_plugin_names:
            continue
        if (plugin := load_plugin(entry_point)) is not None:
            loaded_plugins.append(plugin)

    return loaded_plugins

lifespan async

lifespan(
    app: FastAPI,
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> AsyncGenerator[None, None]
Source code in packages/tangram_core/src/tangram_core/backend.py
139
140
141
142
143
144
145
146
147
148
149
150
@asynccontextmanager
async def lifespan(
    app: FastAPI, backend_state: BackendState, loaded_plugins: Iterable[Plugin]
) -> AsyncGenerator[None, None]:
    async with AsyncExitStack() as stack:
        for plugin in loaded_plugins:
            if plugin.lifespan:
                logger.info(f"initializing lifespan for {plugin.dist_name}")
                await stack.enter_async_context(plugin.lifespan(backend_state))

        app.state.backend_state = backend_state
        yield

default_cache_dir

default_cache_dir() -> Path
Source code in packages/tangram_core/src/tangram_core/backend.py
153
154
155
156
157
158
159
160
161
def default_cache_dir() -> Path:
    if (xdg_cache := os.environ.get("XDG_CACHE_HOME")) is not None:
        cache_dir = Path(xdg_cache) / "tangram"
    else:
        cache_dir = Path(platformdirs.user_cache_dir(appname="tangram"))
    if not cache_dir.exists():
        cache_dir.mkdir(parents=True, exist_ok=True)

    return cache_dir

make_cache_route_handler

make_cache_route_handler(
    entry: CacheEntry, state: BackendState
) -> Callable[..., Awaitable[FileResponse]]

Factory function that creates a route handler for caching and serving files. Dynamically handles URL parameters found in both serve_route and origin.

Parameters:

Name Type Description Default
entry CacheEntry

Cache entry configuration

required
state BackendState

Backend state with http_client for fetching remote resources

required

Returns:

Type Description
Callable[..., Awaitable[FileResponse]]

Async function that handles the route with dynamic parameters

Source code in packages/tangram_core/src/tangram_core/backend.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def make_cache_route_handler(
    entry: CacheEntry, state: BackendState
) -> Callable[..., Awaitable[FileResponse]]:
    """
    Factory function that creates a route handler for caching and serving files.
    Dynamically handles URL parameters found in both serve_route and origin.

    :param entry: Cache entry configuration
    :param state: Backend state with http_client for fetching remote resources
    :returns: Async function that handles the route with dynamic parameters
    """
    from inspect import Parameter, Signature

    # Extract parameter names from the serve_route (e.g., {fontstack}, {range})
    params = CACHE_PARAM_PATTERN.findall(entry.serve_route)

    async def cache_route_handler(**kwargs: str) -> FileResponse:
        if (local_path := entry.local_path) is None:
            local_path = default_cache_dir()
        else:
            local_path = local_path.expanduser()

        # Build the local file path by replacing parameters
        local_file = local_path
        for param in params:
            if param in kwargs:
                local_file = local_file / kwargs[param]

        logger.info(f"Serving cached file from {local_file}")

        if not local_file.exists():
            assert entry.origin is not None
            # Build the remote URL by replacing parameters
            remote_url = entry.origin
            for param, value in kwargs.items():
                remote_url = remote_url.replace(f"{{{param}}}", value)

            logger.info(f"Downloading from {remote_url} to {local_file}")
            c = await state.http_client.get(remote_url)
            c.raise_for_status()
            local_file.parent.mkdir(parents=True, exist_ok=True)
            local_file.write_bytes(c.content)

        return FileResponse(path=local_file, media_type=entry.media_type)

    # Create explicit parameters for the function signature
    sig_params = [
        Parameter(
            name=param,
            kind=Parameter.POSITIONAL_OR_KEYWORD,
            annotation=str,
        )
        for param in params
    ]
    cache_route_handler.__signature__ = Signature(  # type: ignore
        parameters=sig_params,
        return_annotation=FileResponse,
    )

    return cache_route_handler

core_into_frontend_config

core_into_frontend_config(config: Config) -> FrontendConfig
Source code in packages/tangram_core/src/tangram_core/backend.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def core_into_frontend_config(config: Config) -> FrontendConfig:
    if config.channel.public_url:
        channel_url = config.channel.public_url
    else:
        host = "localhost" if config.channel.host == "0.0.0.0" else config.channel.host
        channel_url = f"http://{host}:{config.channel.port}"
    frontend_channel = FrontendChannelConfig(url=channel_url)

    user_theme_names = {t.name for t in config.core.themes}
    merged_themes = [
        t for t in DEFAULT_THEMES if t.name not in user_theme_names
    ] + config.core.themes

    frontend_core = FrontendCoreConfig(theme=config.core.theme, themes=merged_themes)

    return FrontendConfig(core=frontend_core, map=config.map, channel=frontend_channel)

create_app

create_app(
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> FastAPI
Source code in packages/tangram_core/src/tangram_core/backend.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def create_app(
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> FastAPI:
    app = FastAPI(
        lifespan=partial(
            lifespan, backend_state=backend_state, loaded_plugins=loaded_plugins
        ),
        default_response_class=ORJSONResponse,
    )

    frontend_config_instance = core_into_frontend_config(backend_state.config)
    frontend_adapter = TypeAdapter(FrontendConfig)
    manifest_core = to_frontend_manifest(frontend_adapter, frontend_config_instance)

    manifest_plugins = {}
    for plugin in loaded_plugins:
        backend_state.loaded_plugins[plugin.dist_name] = plugin
        for router in plugin.routers:
            app.include_router(router)

        if (frontend_path_resolved := resolve_frontend(plugin)) is None:
            continue
        app.mount(
            f"/plugins/{plugin.dist_name}",
            StaticFiles(directory=str(frontend_path_resolved)),
            name=plugin.dist_name,
        )
        if not (plugin_json_path := frontend_path_resolved / "plugin.json").exists():
            continue
        with plugin_json_path.open("rb") as f:
            plugin_meta = json.load(f)

        manifest_plugins[plugin.dist_name] = plugin_meta

        # NOTE: on the js side the `config` and `config_json_schema` keys
        # may be missing if the plugin does not configure them properly
        conf_dict = backend_state.config.plugins.get(plugin.dist_name, {})
        if (backend_adapter := plugin.adapter()) is None:
            continue
        conf_instance = backend_adapter.validate_python(conf_dict)
        if not plugin.frontend_config_class:
            continue
        if plugin.into_frontend_config_function is None:
            logger.warning(
                f"expected `into_frontend_config_function` for"
                f" plugin {plugin.dist_name} but it is None. "
                "Frontend config will be empty!"
            )
            continue
        frontend_instance = plugin.into_frontend_config_function(conf_instance)
        frontend_adapter_ = plugin.frontend_adapter()
        assert frontend_adapter_ is not None
        frontend_manifest = to_frontend_manifest(frontend_adapter_, frontend_instance)
        manifest_plugins[plugin.dist_name] = {
            **plugin_meta,
            **frontend_manifest,
        }

    @app.post("/settings/validate/{plugin_name}")
    async def validate_settings(
        plugin_name: str,
        data: dict,
        state: Annotated[BackendState, Depends(get_state)],
    ) -> dict:
        SUCCESS = {"success": True, "errors": {}}
        try:
            if plugin_name == "tangram_core":
                _ = parse_frontend_config(frontend_adapter, data)
                return SUCCESS
            plugin = state.loaded_plugins.get(plugin_name)
            assert plugin is not None, f"plugin {plugin_name} not found"

            if plugin.frontend_config_class:
                if (frontend_adapter_ := plugin.frontend_adapter()) is not None:
                    frontend_adapter_.validate_python(data)
                return SUCCESS

            # if no frontend config defined, validation always succeeds because
            # nothing to validate against
            return SUCCESS
        except ValidationError as e:
            errs = {
                ".".join(str(loc) for loc in err["loc"]): err["msg"]
                for err in e.errors()
            }
            return {"success": False, "errors": errs}

    @app.get("/manifest.json")
    async def get_manifest() -> ORJSONResponse:
        return ORJSONResponse(
            content={"core": manifest_core, "plugins": manifest_plugins}
        )

    # Cache mechanism - MUST be registered BEFORE the catch-all frontend mount
    for cache_entry in backend_state.config.cache.entries:
        logger.info(
            f"caching {cache_entry.origin} to {cache_entry.local_path} "
            f"and serving at {cache_entry.serve_route}"
        )
        route_handler = make_cache_route_handler(cache_entry, backend_state)

        logger.info(
            f"Registering route: GET {cache_entry.serve_route} with dynamic params"
        )
        app.add_api_route(
            cache_entry.serve_route,
            route_handler,
            methods=["GET"],
            name=f"cache-{cache_entry.serve_route.replace('/', '_')}",
        )

    if not (
        frontend_path := get_distribution_path("tangram_core") / "dist-frontend"
    ).is_dir():
        raise ValueError(
            f"error: frontend {frontend_path} was not found, "
            "did you run `pnpm i && pnpm run build`?"
        )
    app.mount("/", StaticFiles(directory=str(frontend_path), html=True), name="core")

    return app

run_channel_service async

run_channel_service(config: Config) -> None
Source code in packages/tangram_core/src/tangram_core/backend.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
async def run_channel_service(config: Config) -> None:
    from . import _core

    _core.init_tracing_stderr(config.core.log_level)

    rust_config = _core.ChannelConfig(
        host=config.channel.host,
        port=config.channel.port,
        redis_url=config.core.redis_url,
        jwt_secret=config.channel.jwt_secret,
        jwt_expiration_secs=config.channel.jwt_expiration_secs,
        id_length=config.channel.id_length,
    )
    await _core.run(rust_config)

get_log_config_dict

get_log_config_dict(config: Config) -> dict[str, Any]
Source code in packages/tangram_core/src/tangram_core/backend.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def get_log_config_dict(config: Config) -> dict[str, Any]:
    def format_time(dt: datetime) -> str:
        return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ ")

    return {
        "version": 1,
        "disable_existing_loggers": False,
        "handlers": {
            "default": {
                "class": "rich.logging.RichHandler",
                "log_time_format": format_time,
                "omit_repeated_times": False,
            },
        },
        "root": {"handlers": ["default"], "level": config.core.log_level.upper()},
    }

config

StyleName module-attribute

StyleName: TypeAlias = str

Url module-attribute

Url: TypeAlias = str

StrOrPathLike module-attribute

StrOrPathLike = str | PathLike[str]

IntoConfig module-attribute

IntoConfig: TypeAlias = 'Config | StrOrPathLike'

HasTopbarUiConfig

Bases: Protocol

Source code in packages/tangram_core/src/tangram_core/config.py
45
46
47
@runtime_checkable
class HasTopbarUiConfig(Protocol):
    topbar_order: int
topbar_order instance-attribute
topbar_order: int

HasSidebarUiConfig

Bases: Protocol

Source code in packages/tangram_core/src/tangram_core/config.py
50
51
52
@runtime_checkable
class HasSidebarUiConfig(Protocol):
    sidebar_order: int
sidebar_order instance-attribute
sidebar_order: int

ServerConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
55
56
57
58
@dataclass
class ServerConfig:
    host: str = "127.0.0.1"
    port: int = 2346
host class-attribute instance-attribute
host: str = '127.0.0.1'
port class-attribute instance-attribute
port: int = 2346

ChannelConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
61
62
63
64
65
66
67
68
69
70
@dataclass
class ChannelConfig:
    # TODO: we should make it clear that host:port is for the *backend* to
    # listen on, and not to be confused with the frontend.
    host: str = "127.0.0.1"
    port: int = 2347
    public_url: str | None = None
    jwt_secret: str = "secret"
    jwt_expiration_secs: int = 315360000  # 10 years
    id_length: int = 8
host class-attribute instance-attribute
host: str = '127.0.0.1'
port class-attribute instance-attribute
port: int = 2347
public_url class-attribute instance-attribute
public_url: str | None = None
jwt_secret class-attribute instance-attribute
jwt_secret: str = 'secret'
jwt_expiration_secs class-attribute instance-attribute
jwt_expiration_secs: int = 315360000
id_length class-attribute instance-attribute
id_length: int = 8

UrlConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
73
74
75
76
@dataclass
class UrlConfig:
    url: str
    type: str = "vector"
url instance-attribute
url: str
type class-attribute instance-attribute
type: str = 'vector'

StyleSpecification dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
82
83
84
85
86
87
88
@dataclass
class StyleSpecification:
    name: StyleName | None = None
    sources: dict[str, UrlConfig] | None = None
    glyphs: str = "https://cdn.protomaps.com/fonts/pbf/{fontstack}/{range}.pbf"
    layers: list[Any] | None = None
    version: int = 8
name class-attribute instance-attribute
name: StyleName | None = None
sources class-attribute instance-attribute
sources: dict[str, UrlConfig] | None = None
glyphs class-attribute instance-attribute
glyphs: str = "https://cdn.protomaps.com/fonts/pbf/{fontstack}/{range}.pbf"
layers class-attribute instance-attribute
layers: list[Any] | None = None
version class-attribute instance-attribute
version: int = 8

MapConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@dataclass
class MapConfig:
    # users can specify local path in config file but it will be resolved in from_file
    # and so the stored type cannot be Path
    style: Annotated[
        Url | StyleName | StyleSpecification, FrontendMutable(widget="map-settings")
    ] = "https://basemaps.cartocdn.com/gl/voyager-gl-style/style.json"
    styles: Annotated[
        list[Url | StyleSpecification], FrontendMutable(widget="map-settings")
    ] = field(default_factory=default_styles)
    center_lat: Annotated[float, Ge(-90), Le(90), FrontendMutable()] = 48.0
    center_lon: Annotated[float, Ge(-180), Le(180), FrontendMutable()] = 7.0
    zoom: Annotated[float, Ge(0), Le(24), FrontendMutable()] = 4
    pitch: Annotated[float, Ge(0), Le(85), FrontendMutable()] = 0
    bearing: Annotated[float, Ge(-180), Le(180), FrontendMutable()] = 0
    lang: Annotated[str, FrontendMutable()] = "en"
    min_zoom: Annotated[float, Ge(0), Le(24), FrontendMutable()] = 0
    max_zoom: Annotated[float, Ge(0), Le(24), FrontendMutable()] = 24
    max_pitch: Annotated[float, Ge(0), Le(85), FrontendMutable()] = 70
    allow_pitch: Annotated[bool, FrontendMutable()] = True
    allow_bearing: Annotated[bool, FrontendMutable()] = True
    layers_visibility: Annotated[
        dict[str, bool] | None, FrontendMutable(widget="map-settings")
    ] = None
style class-attribute instance-attribute
style: Annotated[
    Url | StyleName | StyleSpecification,
    FrontendMutable(widget="map-settings"),
] = "https://basemaps.cartocdn.com/gl/voyager-gl-style/style.json"
styles class-attribute instance-attribute
styles: Annotated[
    list[Url | StyleSpecification],
    FrontendMutable(widget="map-settings"),
] = field(default_factory=default_styles)
center_lat class-attribute instance-attribute
center_lat: Annotated[
    float, Ge(-90), Le(90), FrontendMutable()
] = 48.0
center_lon class-attribute instance-attribute
center_lon: Annotated[
    float, Ge(-180), Le(180), FrontendMutable()
] = 7.0
zoom class-attribute instance-attribute
zoom: Annotated[float, Ge(0), Le(24), FrontendMutable()] = 4
pitch class-attribute instance-attribute
pitch: Annotated[
    float, Ge(0), Le(85), FrontendMutable()
] = 0
bearing class-attribute instance-attribute
bearing: Annotated[
    float, Ge(-180), Le(180), FrontendMutable()
] = 0
lang class-attribute instance-attribute
lang: Annotated[str, FrontendMutable()] = 'en'
min_zoom class-attribute instance-attribute
min_zoom: Annotated[
    float, Ge(0), Le(24), FrontendMutable()
] = 0
max_zoom class-attribute instance-attribute
max_zoom: Annotated[
    float, Ge(0), Le(24), FrontendMutable()
] = 24
max_pitch class-attribute instance-attribute
max_pitch: Annotated[
    float, Ge(0), Le(85), FrontendMutable()
] = 70
allow_pitch class-attribute instance-attribute
allow_pitch: Annotated[bool, FrontendMutable()] = True
allow_bearing class-attribute instance-attribute
allow_bearing: Annotated[bool, FrontendMutable()] = True
layers_visibility class-attribute instance-attribute
layers_visibility: Annotated[
    dict[str, bool] | None,
    FrontendMutable(widget="map-settings"),
] = None

ThemeDefinition dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@dataclass
class ThemeDefinition:
    name: str
    background: Annotated[str, FrontendMutable(kind="color")]
    foreground: Annotated[str, FrontendMutable(kind="color")]
    surface: Annotated[str, FrontendMutable(kind="color")]
    border: Annotated[str, FrontendMutable(kind="color")]
    hover: Annotated[str, FrontendMutable(kind="color")]
    accent1: Annotated[str, FrontendMutable(kind="color")]
    accent1_foreground: Annotated[str, FrontendMutable(kind="color")]
    accent2: Annotated[str, FrontendMutable(kind="color")]
    accent2_foreground: Annotated[str, FrontendMutable(kind="color")]
    muted: Annotated[str, FrontendMutable(kind="color")]
    error: Annotated[str, FrontendMutable(kind="color")]
name instance-attribute
name: str
background instance-attribute
background: Annotated[str, FrontendMutable(kind='color')]
foreground instance-attribute
foreground: Annotated[str, FrontendMutable(kind='color')]
surface instance-attribute
surface: Annotated[str, FrontendMutable(kind='color')]
border instance-attribute
border: Annotated[str, FrontendMutable(kind='color')]
hover instance-attribute
hover: Annotated[str, FrontendMutable(kind='color')]
accent1 instance-attribute
accent1: Annotated[str, FrontendMutable(kind='color')]
accent1_foreground instance-attribute
accent1_foreground: Annotated[
    str, FrontendMutable(kind="color")
]
accent2 instance-attribute
accent2: Annotated[str, FrontendMutable(kind='color')]
accent2_foreground instance-attribute
accent2_foreground: Annotated[
    str, FrontendMutable(kind="color")
]
muted instance-attribute
muted: Annotated[str, FrontendMutable(kind='color')]
error instance-attribute
error: Annotated[str, FrontendMutable(kind='color')]

AdaptiveTheme dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
144
145
146
147
@dataclass
class AdaptiveTheme:
    light: Annotated[str, FrontendMutable()] = "light"
    dark: Annotated[str, FrontendMutable()] = "dark"
light class-attribute instance-attribute
light: Annotated[str, FrontendMutable()] = 'light'
dark class-attribute instance-attribute
dark: Annotated[str, FrontendMutable()] = 'dark'

CoreConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
150
151
152
153
154
155
156
157
158
159
160
@dataclass
class CoreConfig:
    redis_url: str = "redis://127.0.0.1:6379"
    plugins: list[str] = field(default_factory=list)
    log_level: str = "INFO"
    theme: Annotated[str | AdaptiveTheme, FrontendMutable(widget="theme-settings")] = (
        field(default_factory=AdaptiveTheme)
    )
    themes: Annotated[
        list[ThemeDefinition], FrontendMutable(widget="theme-settings")
    ] = field(default_factory=list)
redis_url class-attribute instance-attribute
redis_url: str = 'redis://127.0.0.1:6379'
plugins class-attribute instance-attribute
plugins: list[str] = field(default_factory=list)
log_level class-attribute instance-attribute
log_level: str = 'INFO'
theme class-attribute instance-attribute
theme: Annotated[
    str | AdaptiveTheme,
    FrontendMutable(widget="theme-settings"),
] = field(default_factory=AdaptiveTheme)
themes class-attribute instance-attribute
themes: Annotated[
    list[ThemeDefinition],
    FrontendMutable(widget="theme-settings"),
] = field(default_factory=list)

CacheEntry dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
163
164
165
166
167
168
169
170
171
172
@dataclass
class CacheEntry:
    origin: str | None = None
    """Origin URL. If None, the local file is served directly."""
    local_path: Path | None = None
    """Local path to cache the file."""
    serve_route: str = ""
    """Where to serve the file in FastAPI."""
    media_type: str = "application/octet-stream"
    """Media type for the response."""
origin class-attribute instance-attribute
origin: str | None = None

Origin URL. If None, the local file is served directly.

local_path class-attribute instance-attribute
local_path: Path | None = None

Local path to cache the file.

serve_route class-attribute instance-attribute
serve_route: str = ''

Where to serve the file in FastAPI.

media_type class-attribute instance-attribute
media_type: str = 'application/octet-stream'

Media type for the response.

CacheConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
175
176
177
@dataclass
class CacheConfig:
    entries: list[CacheEntry] = field(default_factory=list)
entries class-attribute instance-attribute
entries: list[CacheEntry] = field(default_factory=list)

Config dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@dataclass
class Config:
    core: CoreConfig = field(default_factory=CoreConfig)
    server: ServerConfig = field(default_factory=ServerConfig)
    channel: ChannelConfig = field(default_factory=ChannelConfig)
    map: MapConfig = field(default_factory=MapConfig)
    plugins: dict[str, Any] = field(default_factory=dict)
    """Mapping of plugin name to plugin-specific config."""
    cache: CacheConfig = field(default_factory=CacheConfig)

    @classmethod
    def from_file(cls, config_path: StrOrPathLike) -> Config:
        if sys.version_info < (3, 11):
            import tomli as tomllib
        else:
            import tomllib
        from pydantic import TypeAdapter

        with open(config_path, "rb") as f:
            cfg_data = tomllib.load(f)

        base_dir = Path(config_path).parent
        map = cfg_data.setdefault("map", {})
        if (s := map.get("style", None)) is not None:
            map["style"] = try_resolve_local_style(base_dir, s, allow_style_name=True)

        map["styles"] = [
            try_resolve_local_style(base_dir, style, allow_style_name=False)
            for style in map.get("styles", []) or default_styles()
        ]

        config_adapter = TypeAdapter(cls)
        config = config_adapter.validate_python(cfg_data)
        return config
core class-attribute instance-attribute
core: CoreConfig = field(default_factory=CoreConfig)
server class-attribute instance-attribute
server: ServerConfig = field(default_factory=ServerConfig)
channel class-attribute instance-attribute
channel: ChannelConfig = field(
    default_factory=ChannelConfig
)
map class-attribute instance-attribute
map: MapConfig = field(default_factory=MapConfig)
plugins class-attribute instance-attribute
plugins: dict[str, Any] = field(default_factory=dict)

Mapping of plugin name to plugin-specific config.

cache class-attribute instance-attribute
cache: CacheConfig = field(default_factory=CacheConfig)
from_file classmethod
from_file(config_path: StrOrPathLike) -> Config
Source code in packages/tangram_core/src/tangram_core/config.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@classmethod
def from_file(cls, config_path: StrOrPathLike) -> Config:
    if sys.version_info < (3, 11):
        import tomli as tomllib
    else:
        import tomllib
    from pydantic import TypeAdapter

    with open(config_path, "rb") as f:
        cfg_data = tomllib.load(f)

    base_dir = Path(config_path).parent
    map = cfg_data.setdefault("map", {})
    if (s := map.get("style", None)) is not None:
        map["style"] = try_resolve_local_style(base_dir, s, allow_style_name=True)

    map["styles"] = [
        try_resolve_local_style(base_dir, style, allow_style_name=False)
        for style in map.get("styles", []) or default_styles()
    ]

    config_adapter = TypeAdapter(cls)
    config = config_adapter.validate_python(cfg_data)
    return config

FrontendCoreConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
247
248
249
250
251
252
253
254
@dataclass
class FrontendCoreConfig:
    theme: Annotated[str | AdaptiveTheme, FrontendMutable(widget="theme-settings")] = (
        field(default_factory=AdaptiveTheme)
    )
    themes: Annotated[
        list[ThemeDefinition], FrontendMutable(widget="theme-settings")
    ] = field(default_factory=list)
theme class-attribute instance-attribute
theme: Annotated[
    str | AdaptiveTheme,
    FrontendMutable(widget="theme-settings"),
] = field(default_factory=AdaptiveTheme)
themes class-attribute instance-attribute
themes: Annotated[
    list[ThemeDefinition],
    FrontendMutable(widget="theme-settings"),
] = field(default_factory=list)

FrontendChannelConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
257
258
259
@dataclass
class FrontendChannelConfig:
    url: str
url instance-attribute
url: str

FrontendConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
262
263
264
265
266
@dataclass
class FrontendConfig:
    core: FrontendCoreConfig
    map: MapConfig
    channel: FrontendChannelConfig
core instance-attribute
map instance-attribute
map: MapConfig
channel instance-attribute

FrontendMutable dataclass

Marker to allow a particular field in a frontend configuration to be mutated in the settings page.

Source code in packages/tangram_core/src/tangram_core/config.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
@dataclass(frozen=True)
class FrontendMutable:
    """Marker to allow a particular field in a frontend configuration to be mutated
    in the settings page."""

    # TODO maybe in the future we can merge the two
    kind: Literal["color"] | None = None
    widget: str | None = None
    """Identifier for a custom widget to use for this field in the frontend UI.
    If multiple fields share the same widget id, they will be grouped together.
    """

    def __get_pydantic_json_schema__(
        self, core_schema: CoreSchema, handler: GetJsonSchemaHandler
    ) -> JsonSchemaValue:
        json_schema = handler(core_schema)
        json_schema["tangram_mutable"] = True
        if self.kind:
            json_schema["tangram_kind"] = self.kind
        if self.widget:
            json_schema["tangram_widget"] = self.widget
        return json_schema
kind class-attribute instance-attribute
kind: Literal['color'] | None = None
widget class-attribute instance-attribute
widget: str | None = None

Identifier for a custom widget to use for this field in the frontend UI. If multiple fields share the same widget id, they will be grouped together.

FrontendData

Bases: TypedDict

Source code in packages/tangram_core/src/tangram_core/config.py
298
299
300
class FrontendData(TypedDict):
    config: dict[str, Any]
    config_json_schema: dict[str, Any]
config instance-attribute
config: dict[str, Any]
config_json_schema instance-attribute
config_json_schema: dict[str, Any]

default_config_file

default_config_file() -> Path
Source code in packages/tangram_core/src/tangram_core/config.py
32
33
34
35
36
37
38
39
40
41
42
def default_config_file() -> Path:
    import platformdirs

    if (xdg_config := os.environ.get("XDG_CONFIG_HOME")) is not None:
        config_dir = Path(xdg_config) / "tangram"
    else:
        config_dir = Path(platformdirs.user_config_dir(appname="tangram"))
    if not config_dir.exists():
        config_dir.mkdir(parents=True, exist_ok=True)

    return Path(config_dir) / "tangram.toml"

default_styles

default_styles() -> list[Url | StyleSpecification]
Source code in packages/tangram_core/src/tangram_core/config.py
94
95
96
97
98
99
def default_styles() -> list[Url | StyleSpecification]:
    return [
        "https://basemaps.cartocdn.com/gl/voyager-gl-style/style.json",
        "https://basemaps.cartocdn.com/gl/positron-gl-style/style.json",
        "https://basemaps.cartocdn.com/gl/dark-matter-gl-style/style.json",
    ]

try_resolve_local_style

try_resolve_local_style(
    base_dir: Path,
    style: Url | StyleName | StyleSpecification,
    *,
    allow_style_name: bool,
) -> Url | StyleSpecification
Source code in packages/tangram_core/src/tangram_core/config.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def try_resolve_local_style(
    base_dir: Path,
    style: Url | StyleName | StyleSpecification,
    *,
    allow_style_name: bool,
) -> Url | StyleSpecification:
    if isinstance(style, str):
        scheme = urllib.parse.urlparse(style).scheme
        if scheme in ("http", "https"):
            return style
        if (p := (base_dir / style).resolve()).is_file():
            with open(p, "rb") as f:
                return json.load(f)
        if not allow_style_name:
            pass
    return style

to_frontend_manifest

to_frontend_manifest(
    adapter: TypeAdapter, frontend_cfg: Any
) -> FrontendData

Serialises a frontend configuration.

Source code in packages/tangram_core/src/tangram_core/config.py
303
304
305
306
307
def to_frontend_manifest(adapter: TypeAdapter, frontend_cfg: Any) -> FrontendData:
    """Serialises a frontend configuration."""
    config_dump = adapter.dump_python(frontend_cfg)
    config_json_schema = adapter.json_schema()
    return {"config": config_dump, "config_json_schema": config_json_schema}

parse_frontend_config

parse_frontend_config(
    adapter: TypeAdapter, frontend_cfg: Any
) -> Any

Deserialises and validates frontend-submitted config data.

Source code in packages/tangram_core/src/tangram_core/config.py
310
311
312
def parse_frontend_config(adapter: TypeAdapter, frontend_cfg: Any) -> Any:
    """Deserialises and validates frontend-submitted config data."""
    return adapter.validate_python(frontend_cfg)

plugin

ServiceAsyncFunc module-attribute

ServiceAsyncFunc: TypeAlias = Callable[
    [BackendState], Coroutine[Any, Any, None]
]

ServiceFunc module-attribute

ServiceFunc: TypeAlias = (
    ServiceAsyncFunc | Callable[[BackendState], None]
)

Priority module-attribute

Priority: TypeAlias = int

IntoFrontendConfigFunction module-attribute

IntoFrontendConfigFunction: TypeAlias = Callable[[Any], Any]

Lifespan module-attribute

logger module-attribute

logger = getLogger(__name__)

Plugin dataclass

Stores the metadata and registered API routes, background services and frontend assets for a tangram plugin.

Packages should declare an entry point in the tangram_core.plugins group in their pyproject.toml pointing to an instance of this class.

Source code in packages/tangram_core/src/tangram_core/plugin.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@dataclass
class Plugin:
    """Stores the metadata and registered API routes, background services and
    frontend assets for a tangram plugin.

    Packages should declare an entry point in the `tangram_core.plugins` group
    in their `pyproject.toml` pointing to an instance of this class.
    """

    frontend_path: str | None = None
    """Path to the compiled frontend assets, *relative* to the distribution root
    (editable) or package root (wheel).
    """
    routers: list[APIRouter] = field(default_factory=list)
    config_class: type | None = None
    """The **backend** configuration class (dataclass, TypedDict or Pydantic model)
    for the plugin."""
    frontend_config_class: type | None = None
    """The **frontend** configuration class for the plugin. If set, it will be used to
    generate the frontend schema and validate settings updates.
    Fields should be annotated with [tangram_core.config.FrontendMutable][] if they
    are allowed to be modified from the frontend settings UI.
    """
    into_frontend_config_function: IntoFrontendConfigFunction | None = None
    """Function to transform the backend configuration into the frontend
    configuration. It receives the validated backend configuration object and
    should return an instance of `frontend_config_class`.
    Useful if the user wants to hide sensitive fields (e.g. API keys) from the frontend
    or dynamically compute certain fields.
    Required if `frontend_config_class` is set.
    """
    lifespan: Lifespan | None = None
    """Async context manager for plugin initialization and teardown."""
    services: list[tuple[Priority, ServiceAsyncFunc]] = field(
        default_factory=list, init=False
    )
    get_typer: Callable[[], Typer] | None = None
    """A function that returns the subcommands which will later be registered."""
    dist_name: str = field(init=False)
    """Name of the distribution (package) that provided this plugin, populated
    automatically during loading.
    """  # we do this so plugins can know their own package name if needed

    def register_service(
        self, priority: Priority = 0
    ) -> Callable[[ServiceFunc], ServiceFunc]:
        """Decorator to register a background service function.

        Services are long-running async functions that receive the BackendState
        and are started when the application launches.
        """

        def decorator(func: ServiceFunc) -> ServiceFunc:
            @functools.wraps(func)
            async def async_wrapper(backend_state: BackendState) -> None:
                if asyncio.iscoroutinefunction(func):
                    await func(backend_state)
                else:
                    await asyncio.to_thread(func, backend_state)

            self.services.append((priority, async_wrapper))
            return func

        return decorator

    # HACK: so lru_cache on adapter works.
    # since we have mutable fields (routers, register_service, dist_name on init)
    # its difficult to make this class frozen,
    # so maybe we should implement __eq__ and __hash__ ourselves?
    __hash__ = object.__hash__

    @functools.lru_cache
    def adapter(self) -> TypeAdapter | None:
        """Returns a cached Pydantic TypeAdapter for the plugin's configuration class.

        Avoids expensive rebuilds on every validation request, such as those from the
        settings UI.
        """
        from pydantic import TypeAdapter

        if self.config_class:
            return TypeAdapter(self.config_class)
        return None

    @functools.lru_cache
    def frontend_adapter(self) -> TypeAdapter | None:
        """Returns a cached Pydantic TypeAdapter for the plugin's frontend
        configuration class."""
        from pydantic import TypeAdapter

        if self.frontend_config_class:
            return TypeAdapter(self.frontend_config_class)
        return None
frontend_path class-attribute instance-attribute
frontend_path: str | None = None

Path to the compiled frontend assets, relative to the distribution root (editable) or package root (wheel).

routers class-attribute instance-attribute
routers: list[APIRouter] = field(default_factory=list)
config_class class-attribute instance-attribute
config_class: type | None = None

The backend configuration class (dataclass, TypedDict or Pydantic model) for the plugin.

frontend_config_class class-attribute instance-attribute
frontend_config_class: type | None = None

The frontend configuration class for the plugin. If set, it will be used to generate the frontend schema and validate settings updates. Fields should be annotated with tangram_core.config.FrontendMutable if they are allowed to be modified from the frontend settings UI.

into_frontend_config_function class-attribute instance-attribute
into_frontend_config_function: (
    IntoFrontendConfigFunction | None
) = None

Function to transform the backend configuration into the frontend configuration. It receives the validated backend configuration object and should return an instance of frontend_config_class. Useful if the user wants to hide sensitive fields (e.g. API keys) from the frontend or dynamically compute certain fields. Required if frontend_config_class is set.

lifespan class-attribute instance-attribute
lifespan: Lifespan | None = None

Async context manager for plugin initialization and teardown.

services class-attribute instance-attribute
services: list[tuple[Priority, ServiceAsyncFunc]] = field(
    default_factory=list, init=False
)
get_typer class-attribute instance-attribute
get_typer: Callable[[], Typer] | None = None

A function that returns the subcommands which will later be registered.

dist_name class-attribute instance-attribute
dist_name: str = field(init=False)

Name of the distribution (package) that provided this plugin, populated automatically during loading.

register_service
register_service(
    priority: Priority = 0,
) -> Callable[[ServiceFunc], ServiceFunc]

Decorator to register a background service function.

Services are long-running async functions that receive the BackendState and are started when the application launches.

Source code in packages/tangram_core/src/tangram_core/plugin.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def register_service(
    self, priority: Priority = 0
) -> Callable[[ServiceFunc], ServiceFunc]:
    """Decorator to register a background service function.

    Services are long-running async functions that receive the BackendState
    and are started when the application launches.
    """

    def decorator(func: ServiceFunc) -> ServiceFunc:
        @functools.wraps(func)
        async def async_wrapper(backend_state: BackendState) -> None:
            if asyncio.iscoroutinefunction(func):
                await func(backend_state)
            else:
                await asyncio.to_thread(func, backend_state)

        self.services.append((priority, async_wrapper))
        return func

    return decorator
adapter cached
adapter() -> TypeAdapter | None

Returns a cached Pydantic TypeAdapter for the plugin's configuration class.

Avoids expensive rebuilds on every validation request, such as those from the settings UI.

Source code in packages/tangram_core/src/tangram_core/plugin.py
103
104
105
106
107
108
109
110
111
112
113
114
@functools.lru_cache
def adapter(self) -> TypeAdapter | None:
    """Returns a cached Pydantic TypeAdapter for the plugin's configuration class.

    Avoids expensive rebuilds on every validation request, such as those from the
    settings UI.
    """
    from pydantic import TypeAdapter

    if self.config_class:
        return TypeAdapter(self.config_class)
    return None
frontend_adapter cached
frontend_adapter() -> TypeAdapter | None

Returns a cached Pydantic TypeAdapter for the plugin's frontend configuration class.

Source code in packages/tangram_core/src/tangram_core/plugin.py
116
117
118
119
120
121
122
123
124
@functools.lru_cache
def frontend_adapter(self) -> TypeAdapter | None:
    """Returns a cached Pydantic TypeAdapter for the plugin's frontend
    configuration class."""
    from pydantic import TypeAdapter

    if self.frontend_config_class:
        return TypeAdapter(self.frontend_config_class)
    return None

scan_plugins

scan_plugins() -> EntryPoints
Source code in packages/tangram_core/src/tangram_core/plugin.py
127
128
def scan_plugins() -> importlib.metadata.EntryPoints:
    return importlib.metadata.entry_points(group="tangram_core.plugins")

load_plugin

load_plugin(entry_point: EntryPoint) -> Plugin | None

Instantiates the plugin object defined in the entry point and injects the name of the distribution into it.

Source code in packages/tangram_core/src/tangram_core/plugin.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def load_plugin(
    entry_point: importlib.metadata.EntryPoint,
) -> Plugin | None:
    """Instantiates the plugin object defined in the entry point
    and injects the name of the distribution into it."""
    try:
        plugin_instance = entry_point.load()
    except Exception as e:
        tb = traceback.format_exc()
        logger.error(
            f"failed to load plugin {entry_point.name}: {e}. {tb}"
            f"\n= help: does {entry_point.value} exist?"
        )
        return None
    if not isinstance(plugin_instance, Plugin):
        logger.error(f"entry point {entry_point.name} is not an instance of `Plugin`")
        return None
    if entry_point.dist is None:
        logger.error(f"could not determine distribution for plugin {entry_point.name}")
        return None
    # NOTE: we ignore `entry_point.name` for now and simply use the distribution's name
    # should we raise an error if they differ? not for now

    plugin_instance.dist_name = entry_point.dist.name
    return plugin_instance

redis

log module-attribute

log = getLogger(__name__)

StateT module-attribute

StateT = TypeVar('StateT')

Subscriber

Bases: ABC, Generic[StateT]

Source code in packages/tangram_core/src/tangram_core/redis.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class Subscriber(abc.ABC, Generic[StateT]):
    redis: Redis
    task: asyncio.Task[None]
    pubsub: PubSub

    def __init__(
        self, name: str, redis_url: str, channels: List[str], initial_state: StateT
    ):
        self.name = name
        self.redis_url: str = redis_url
        self.channels: List[str] = channels
        self.state: StateT = initial_state
        self._running = False

    async def subscribe(self) -> None:
        if self._running:
            log.warning("%s already running", self.name)
            return

        try:
            self.redis = await Redis.from_url(self.redis_url)
            self.pubsub = self.redis.pubsub()
            await self.pubsub.psubscribe(*self.channels)
        except RedisError as e:
            log.error("%s failed to connect to Redis: %s", self.name, e)
            raise

        async def listen() -> None:
            try:
                log.info("%s listening ...", self.name)
                async for message in self.pubsub.listen():
                    log.debug("message: %s", message)
                    if message["type"] == "pmessage":
                        await self.message_handler(
                            message["channel"].decode("utf-8"),
                            message["data"].decode("utf-8"),
                            message["pattern"].decode("utf-8"),
                            self.state,
                        )
            except asyncio.CancelledError:
                log.warning("%s cancelled", self.name)

        self._running = True

        self.task = asyncio.create_task(listen())
        log.info("%s task created, running ...", self.name)

    async def cleanup(self) -> None:
        if not self._running:
            return

        if self.task:
            log.debug("%s canceling task ...", self.name)
            self.task.cancel()
            try:
                log.debug("%s await task to finish ...", self.name)
                await self.task
                log.debug("%s task canceled", self.name)
            except asyncio.CancelledError as exc:
                log.error("%s task canceling error: %s", self.name, exc)
        if self.pubsub:
            await self.pubsub.unsubscribe()
        if self.redis:
            await self.redis.close()
        self._running = False

    def is_active(self) -> bool:
        """Return True if the subscriber is actively listening."""
        return self._running and self.task is not None and not self.task.done()

    @abc.abstractmethod
    async def message_handler(
        self, event: str, payload: str, pattern: str, state: StateT
    ) -> None:
        pass
redis instance-attribute
redis: Redis
task instance-attribute
task: Task[None]
pubsub instance-attribute
pubsub: PubSub
name instance-attribute
name = name
redis_url instance-attribute
redis_url: str = redis_url
channels instance-attribute
channels: List[str] = channels
state instance-attribute
state: StateT = initial_state
subscribe async
subscribe() -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def subscribe(self) -> None:
    if self._running:
        log.warning("%s already running", self.name)
        return

    try:
        self.redis = await Redis.from_url(self.redis_url)
        self.pubsub = self.redis.pubsub()
        await self.pubsub.psubscribe(*self.channels)
    except RedisError as e:
        log.error("%s failed to connect to Redis: %s", self.name, e)
        raise

    async def listen() -> None:
        try:
            log.info("%s listening ...", self.name)
            async for message in self.pubsub.listen():
                log.debug("message: %s", message)
                if message["type"] == "pmessage":
                    await self.message_handler(
                        message["channel"].decode("utf-8"),
                        message["data"].decode("utf-8"),
                        message["pattern"].decode("utf-8"),
                        self.state,
                    )
        except asyncio.CancelledError:
            log.warning("%s cancelled", self.name)

    self._running = True

    self.task = asyncio.create_task(listen())
    log.info("%s task created, running ...", self.name)
cleanup async
cleanup() -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def cleanup(self) -> None:
    if not self._running:
        return

    if self.task:
        log.debug("%s canceling task ...", self.name)
        self.task.cancel()
        try:
            log.debug("%s await task to finish ...", self.name)
            await self.task
            log.debug("%s task canceled", self.name)
        except asyncio.CancelledError as exc:
            log.error("%s task canceling error: %s", self.name, exc)
    if self.pubsub:
        await self.pubsub.unsubscribe()
    if self.redis:
        await self.redis.close()
    self._running = False
is_active
is_active() -> bool

Return True if the subscriber is actively listening.

Source code in packages/tangram_core/src/tangram_core/redis.py
81
82
83
def is_active(self) -> bool:
    """Return True if the subscriber is actively listening."""
    return self._running and self.task is not None and not self.task.done()
message_handler abstractmethod async
message_handler(
    event: str, payload: str, pattern: str, state: StateT
) -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
85
86
87
88
89
@abc.abstractmethod
async def message_handler(
    self, event: str, payload: str, pattern: str, state: StateT
) -> None:
    pass

tangram_core._core

ChannelConfig

host property writable

host: str

port property writable

port: int

redis_url property writable

redis_url: str

jwt_secret property writable

jwt_secret: str

jwt_expiration_secs property writable

jwt_expiration_secs: int

id_length property writable

id_length: int

__new__

__new__(
    host: str,
    port: int,
    redis_url: str,
    jwt_secret: str,
    jwt_expiration_secs: int,
    id_length: int,
) -> ChannelConfig

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

run

run(config: ChannelConfig) -> Any