Skip to content

Core

tangram_core

InjectBackendState module-attribute

InjectBackendState: TypeAlias = Annotated[
    BackendState, Depends(get_state)
]

BackendState dataclass

Source code in packages/tangram_core/src/tangram_core/backend.py
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass
class BackendState:
    redis_client: redis.Redis
    http_client: httpx.AsyncClient
    config: Config

    @property
    def base_url(self) -> str:
        host = self.config.server.host
        port = self.config.server.port
        if host == "0.0.0.0":
            host = "127.0.0.1"
        return f"http://{host}:{port}"

redis_client instance-attribute

redis_client: Redis

http_client instance-attribute

http_client: AsyncClient

config instance-attribute

config: Config

base_url property

base_url: str

Runtime

Manages the lifecycle of the Tangram backend, including the Uvicorn server, background services, and connection pools (Redis, HTTPX).

Source code in packages/tangram_core/src/tangram_core/backend.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
class Runtime:
    """Manages the lifecycle of the Tangram backend, including the
    Uvicorn server, background services, and connection pools (Redis, HTTPX).
    """

    def __init__(self, config: IntoConfig | None = None) -> None:
        if isinstance(config, (str, Path, os.PathLike)):
            self.config = Config.from_file(config)
        else:
            self.config = config or Config()
        self._stack = AsyncExitStack()
        self._state: BackendState | None = None
        self._server: uvicorn.Server | None = None
        self._server_task: asyncio.Task[None] | None = None
        self._service_tasks: list[asyncio.Task[None]] = []

    @property
    def state(self) -> BackendState:
        if self._state is None:
            raise RuntimeError("runtime is not started, call start() first")
        return self._state

    async def start(self) -> Runtime:
        """Starts the backend runtime."""
        if self._state is not None:
            raise RuntimeError("runtime is already started")

        redis_client = await self._stack.enter_async_context(
            redis.from_url(self.config.core.redis_url)  # type: ignore
        )
        http_client = await self._stack.enter_async_context(
            httpx.AsyncClient(http2=True)
        )
        self._state = BackendState(
            redis_client=redis_client,
            http_client=http_client,
            config=self.config,
        )

        loaded_plugins = load_enabled_plugins(self.config)
        app = create_app(self._state, loaded_plugins)

        server_config = uvicorn.Config(
            app,
            host=self.config.server.host,
            port=self.config.server.port,
            log_config=get_log_config_dict(self.config),
        )
        self._server = uvicorn.Server(server_config)

        self._service_tasks.append(
            asyncio.create_task(run_channel_service(self.config))
        )
        for plugin in loaded_plugins:
            for _, service_func in sorted(
                plugin.services, key=lambda s: (s[0], s[1].__name__)
            ):
                self._service_tasks.append(
                    asyncio.create_task(service_func(self._state))
                )
                logger.info(f"started service from plugin: {plugin.dist_name}")

        self._server_task = asyncio.create_task(self._server.serve())

        while not self._server.started:
            if self._server_task.done():
                await self._server_task
            await asyncio.sleep(0.1)

        return self

    async def wait(self) -> None:
        """Waits for the server task to complete (e.g. via signal or internal error)."""
        if self._server_task:
            try:
                await self._server_task
            except asyncio.CancelledError:
                pass

    async def stop(self) -> None:
        """Stops the backend runtime."""
        if self._server and self._server.started:
            self._server.should_exit = True
            if self._server_task:
                try:
                    await self._server_task
                except asyncio.CancelledError:
                    pass

        for task in self._service_tasks:
            task.cancel()
        if self._service_tasks:
            await asyncio.gather(*self._service_tasks, return_exceptions=True)
        self._service_tasks.clear()

        await self._stack.aclose()
        self._state = None
        self._server = None
        self._server_task = None

    async def __aenter__(self) -> Runtime:
        return await self.start()

    async def __aexit__(self, *args: Any) -> None:
        await self.stop()

config instance-attribute

config = from_file(config)

state property

state: BackendState

start async

start() -> Runtime

Starts the backend runtime.

Source code in packages/tangram_core/src/tangram_core/backend.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
async def start(self) -> Runtime:
    """Starts the backend runtime."""
    if self._state is not None:
        raise RuntimeError("runtime is already started")

    redis_client = await self._stack.enter_async_context(
        redis.from_url(self.config.core.redis_url)  # type: ignore
    )
    http_client = await self._stack.enter_async_context(
        httpx.AsyncClient(http2=True)
    )
    self._state = BackendState(
        redis_client=redis_client,
        http_client=http_client,
        config=self.config,
    )

    loaded_plugins = load_enabled_plugins(self.config)
    app = create_app(self._state, loaded_plugins)

    server_config = uvicorn.Config(
        app,
        host=self.config.server.host,
        port=self.config.server.port,
        log_config=get_log_config_dict(self.config),
    )
    self._server = uvicorn.Server(server_config)

    self._service_tasks.append(
        asyncio.create_task(run_channel_service(self.config))
    )
    for plugin in loaded_plugins:
        for _, service_func in sorted(
            plugin.services, key=lambda s: (s[0], s[1].__name__)
        ):
            self._service_tasks.append(
                asyncio.create_task(service_func(self._state))
            )
            logger.info(f"started service from plugin: {plugin.dist_name}")

    self._server_task = asyncio.create_task(self._server.serve())

    while not self._server.started:
        if self._server_task.done():
            await self._server_task
        await asyncio.sleep(0.1)

    return self

wait async

wait() -> None

Waits for the server task to complete (e.g. via signal or internal error).

Source code in packages/tangram_core/src/tangram_core/backend.py
474
475
476
477
478
479
480
async def wait(self) -> None:
    """Waits for the server task to complete (e.g. via signal or internal error)."""
    if self._server_task:
        try:
            await self._server_task
        except asyncio.CancelledError:
            pass

stop async

stop() -> None

Stops the backend runtime.

Source code in packages/tangram_core/src/tangram_core/backend.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
async def stop(self) -> None:
    """Stops the backend runtime."""
    if self._server and self._server.started:
        self._server.should_exit = True
        if self._server_task:
            try:
                await self._server_task
            except asyncio.CancelledError:
                pass

    for task in self._service_tasks:
        task.cancel()
    if self._service_tasks:
        await asyncio.gather(*self._service_tasks, return_exceptions=True)
    self._service_tasks.clear()

    await self._stack.aclose()
    self._state = None
    self._server = None
    self._server_task = None

Config dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@dataclass
class Config:
    core: CoreConfig = field(default_factory=CoreConfig)
    server: ServerConfig = field(default_factory=ServerConfig)
    channel: ChannelConfig = field(default_factory=ChannelConfig)
    map: MapConfig = field(default_factory=MapConfig)
    plugins: dict[str, Any] = field(default_factory=dict)
    """Mapping of plugin name to plugin-specific config."""
    themes: list[ThemeDefinition] = field(default_factory=list)
    cache: CacheConfig = field(default_factory=CacheConfig)

    @classmethod
    def from_file(cls, config_path: StrOrPathLike) -> Config:
        if sys.version_info < (3, 11):
            import tomli as tomllib
        else:
            import tomllib
        from pydantic import TypeAdapter

        with open(config_path, "rb") as f:
            cfg_data = tomllib.load(f)

        base_dir = Path(config_path).parent
        map = cfg_data.setdefault("map", {})
        if (s := map.get("style", None)) is not None:
            map["style"] = try_resolve_local_style(base_dir, s, allow_style_name=True)
        map["styles"] = [
            try_resolve_local_style(base_dir, style, allow_style_name=False)
            for style in map.get("styles", [])
        ]
        config_adapter = TypeAdapter(cls)
        config = config_adapter.validate_python(cfg_data)
        return config

core class-attribute instance-attribute

core: CoreConfig = field(default_factory=CoreConfig)

server class-attribute instance-attribute

server: ServerConfig = field(default_factory=ServerConfig)

channel class-attribute instance-attribute

channel: ChannelConfig = field(
    default_factory=ChannelConfig
)

map class-attribute instance-attribute

map: MapConfig = field(default_factory=MapConfig)

plugins class-attribute instance-attribute

plugins: dict[str, Any] = field(default_factory=dict)

Mapping of plugin name to plugin-specific config.

themes class-attribute instance-attribute

themes: list[ThemeDefinition] = field(default_factory=list)

cache class-attribute instance-attribute

cache: CacheConfig = field(default_factory=CacheConfig)

from_file classmethod

from_file(config_path: StrOrPathLike) -> Config
Source code in packages/tangram_core/src/tangram_core/config.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@classmethod
def from_file(cls, config_path: StrOrPathLike) -> Config:
    if sys.version_info < (3, 11):
        import tomli as tomllib
    else:
        import tomllib
    from pydantic import TypeAdapter

    with open(config_path, "rb") as f:
        cfg_data = tomllib.load(f)

    base_dir = Path(config_path).parent
    map = cfg_data.setdefault("map", {})
    if (s := map.get("style", None)) is not None:
        map["style"] = try_resolve_local_style(base_dir, s, allow_style_name=True)
    map["styles"] = [
        try_resolve_local_style(base_dir, style, allow_style_name=False)
        for style in map.get("styles", [])
    ]
    config_adapter = TypeAdapter(cls)
    config = config_adapter.validate_python(cfg_data)
    return config

Plugin dataclass

Stores the metadata and registered API routes, background services and frontend assets for a tangram plugin.

Packages should declare an entry point in the tangram_core.plugins group in their pyproject.toml pointing to an instance of this class.

Source code in packages/tangram_core/src/tangram_core/plugin.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@dataclass
class Plugin:
    """Stores the metadata and registered API routes, background services and
    frontend assets for a tangram plugin.

    Packages should declare an entry point in the `tangram_core.plugins` group
    in their `pyproject.toml` pointing to an instance of this class.
    """

    frontend_path: str | None = None
    """Path to the compiled frontend assets, *relative* to the distribution root
    (editable) or package root (wheel).
    """
    routers: list[APIRouter] = field(default_factory=list)
    config_class: type | None = None
    """The configuration class (dataclass or Pydantic model) for this plugin.
    Fields annotated with `tangram_core.config.Expose()` will be exposed to the
    frontend."""
    with_computed_fields: WithComputedFieldsFunction | None = None
    """Function to add additional dynamically computed fields to the frontend
    configuration. It receives the backend state and the validated configuration object
    (if `config_class` is set) or the raw dictionary, and should return a dictionary
    of extra fields to merge."""
    lifespan: Lifespan | None = None
    """Async context manager for plugin initialization and teardown."""
    services: list[tuple[Priority, ServiceAsyncFunc]] = field(
        default_factory=list, init=False
    )
    dist_name: str = field(init=False)
    """Name of the distribution (package) that provided this plugin, populated
    automatically during loading.
    """  # we do this so plugins can know their own package name if needed

    def register_service(
        self, priority: Priority = 0
    ) -> Callable[[ServiceFunc], ServiceFunc]:
        """Decorator to register a background service function.

        Services are long-running async functions that receive the BackendState
        and are started when the application launches.
        """

        def decorator(func: ServiceFunc) -> ServiceFunc:
            @functools.wraps(func)
            async def async_wrapper(backend_state: BackendState) -> None:
                if asyncio.iscoroutinefunction(func):
                    await func(backend_state)
                else:
                    await asyncio.to_thread(func, backend_state)

            self.services.append((priority, async_wrapper))
            return func

        return decorator

frontend_path class-attribute instance-attribute

frontend_path: str | None = None

Path to the compiled frontend assets, relative to the distribution root (editable) or package root (wheel).

routers class-attribute instance-attribute

routers: list[APIRouter] = field(default_factory=list)

config_class class-attribute instance-attribute

config_class: type | None = None

The configuration class (dataclass or Pydantic model) for this plugin. Fields annotated with tangram_core.config.Expose() will be exposed to the frontend.

with_computed_fields class-attribute instance-attribute

with_computed_fields: WithComputedFieldsFunction | None = (
    None
)

Function to add additional dynamically computed fields to the frontend configuration. It receives the backend state and the validated configuration object (if config_class is set) or the raw dictionary, and should return a dictionary of extra fields to merge.

lifespan class-attribute instance-attribute

lifespan: Lifespan | None = None

Async context manager for plugin initialization and teardown.

services class-attribute instance-attribute

services: list[tuple[Priority, ServiceAsyncFunc]] = field(
    default_factory=list, init=False
)

dist_name class-attribute instance-attribute

dist_name: str = field(init=False)

Name of the distribution (package) that provided this plugin, populated automatically during loading.

register_service

register_service(
    priority: Priority = 0,
) -> Callable[[ServiceFunc], ServiceFunc]

Decorator to register a background service function.

Services are long-running async functions that receive the BackendState and are started when the application launches.

Source code in packages/tangram_core/src/tangram_core/plugin.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def register_service(
    self, priority: Priority = 0
) -> Callable[[ServiceFunc], ServiceFunc]:
    """Decorator to register a background service function.

    Services are long-running async functions that receive the BackendState
    and are started when the application launches.
    """

    def decorator(func: ServiceFunc) -> ServiceFunc:
        @functools.wraps(func)
        async def async_wrapper(backend_state: BackendState) -> None:
            if asyncio.iscoroutinefunction(func):
                await func(backend_state)
            else:
                await asyncio.to_thread(func, backend_state)

        self.services.append((priority, async_wrapper))
        return func

    return decorator

backend

logger module-attribute

logger = getLogger(__name__)

InjectBackendState module-attribute

InjectBackendState: TypeAlias = Annotated[
    BackendState, Depends(get_state)
]

CACHE_PARAM_PATTERN module-attribute

CACHE_PARAM_PATTERN = compile('\\{(\\w+)\\}')

DEFAULT_THEMES module-attribute

DEFAULT_THEMES = {
    "light": ThemeDefinition(
        name="light",
        background_color="#ffffff",
        foreground_color="#000000",
        surface_color="#f8f9fa",
        border_color="#e7e7e7",
        hover_color="#e9ecef",
        accent1="oklch(0.5616 0.0895 251.64)",
        accent1_foreground="#ffffff",
        accent2="oklch(0.8021 0.11 92.43)",
        accent2_foreground="#000000",
        muted_color="#666666",
    ),
    "dark": ThemeDefinition(
        name="dark",
        background_color="#1a1a1a",
        foreground_color="#e0e0e0",
        surface_color="#2d2d2d",
        border_color="#404040",
        hover_color="#343434",
        accent1="oklch(0.5059 0.0895 251.64)",
        accent1_foreground="#ffffff",
        accent2="oklch(0.5059 0.0895 93.53)",
        accent2_foreground="#ffffff",
        muted_color="#999999",
    ),
}

BackendState dataclass

Source code in packages/tangram_core/src/tangram_core/backend.py
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass
class BackendState:
    redis_client: redis.Redis
    http_client: httpx.AsyncClient
    config: Config

    @property
    def base_url(self) -> str:
        host = self.config.server.host
        port = self.config.server.port
        if host == "0.0.0.0":
            host = "127.0.0.1"
        return f"http://{host}:{port}"
redis_client instance-attribute
redis_client: Redis
http_client instance-attribute
http_client: AsyncClient
config instance-attribute
config: Config
base_url property
base_url: str

Runtime

Manages the lifecycle of the Tangram backend, including the Uvicorn server, background services, and connection pools (Redis, HTTPX).

Source code in packages/tangram_core/src/tangram_core/backend.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
class Runtime:
    """Manages the lifecycle of the Tangram backend, including the
    Uvicorn server, background services, and connection pools (Redis, HTTPX).
    """

    def __init__(self, config: IntoConfig | None = None) -> None:
        if isinstance(config, (str, Path, os.PathLike)):
            self.config = Config.from_file(config)
        else:
            self.config = config or Config()
        self._stack = AsyncExitStack()
        self._state: BackendState | None = None
        self._server: uvicorn.Server | None = None
        self._server_task: asyncio.Task[None] | None = None
        self._service_tasks: list[asyncio.Task[None]] = []

    @property
    def state(self) -> BackendState:
        if self._state is None:
            raise RuntimeError("runtime is not started, call start() first")
        return self._state

    async def start(self) -> Runtime:
        """Starts the backend runtime."""
        if self._state is not None:
            raise RuntimeError("runtime is already started")

        redis_client = await self._stack.enter_async_context(
            redis.from_url(self.config.core.redis_url)  # type: ignore
        )
        http_client = await self._stack.enter_async_context(
            httpx.AsyncClient(http2=True)
        )
        self._state = BackendState(
            redis_client=redis_client,
            http_client=http_client,
            config=self.config,
        )

        loaded_plugins = load_enabled_plugins(self.config)
        app = create_app(self._state, loaded_plugins)

        server_config = uvicorn.Config(
            app,
            host=self.config.server.host,
            port=self.config.server.port,
            log_config=get_log_config_dict(self.config),
        )
        self._server = uvicorn.Server(server_config)

        self._service_tasks.append(
            asyncio.create_task(run_channel_service(self.config))
        )
        for plugin in loaded_plugins:
            for _, service_func in sorted(
                plugin.services, key=lambda s: (s[0], s[1].__name__)
            ):
                self._service_tasks.append(
                    asyncio.create_task(service_func(self._state))
                )
                logger.info(f"started service from plugin: {plugin.dist_name}")

        self._server_task = asyncio.create_task(self._server.serve())

        while not self._server.started:
            if self._server_task.done():
                await self._server_task
            await asyncio.sleep(0.1)

        return self

    async def wait(self) -> None:
        """Waits for the server task to complete (e.g. via signal or internal error)."""
        if self._server_task:
            try:
                await self._server_task
            except asyncio.CancelledError:
                pass

    async def stop(self) -> None:
        """Stops the backend runtime."""
        if self._server and self._server.started:
            self._server.should_exit = True
            if self._server_task:
                try:
                    await self._server_task
                except asyncio.CancelledError:
                    pass

        for task in self._service_tasks:
            task.cancel()
        if self._service_tasks:
            await asyncio.gather(*self._service_tasks, return_exceptions=True)
        self._service_tasks.clear()

        await self._stack.aclose()
        self._state = None
        self._server = None
        self._server_task = None

    async def __aenter__(self) -> Runtime:
        return await self.start()

    async def __aexit__(self, *args: Any) -> None:
        await self.stop()
config instance-attribute
config = from_file(config)
state property
state: BackendState
start async
start() -> Runtime

Starts the backend runtime.

Source code in packages/tangram_core/src/tangram_core/backend.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
async def start(self) -> Runtime:
    """Starts the backend runtime."""
    if self._state is not None:
        raise RuntimeError("runtime is already started")

    redis_client = await self._stack.enter_async_context(
        redis.from_url(self.config.core.redis_url)  # type: ignore
    )
    http_client = await self._stack.enter_async_context(
        httpx.AsyncClient(http2=True)
    )
    self._state = BackendState(
        redis_client=redis_client,
        http_client=http_client,
        config=self.config,
    )

    loaded_plugins = load_enabled_plugins(self.config)
    app = create_app(self._state, loaded_plugins)

    server_config = uvicorn.Config(
        app,
        host=self.config.server.host,
        port=self.config.server.port,
        log_config=get_log_config_dict(self.config),
    )
    self._server = uvicorn.Server(server_config)

    self._service_tasks.append(
        asyncio.create_task(run_channel_service(self.config))
    )
    for plugin in loaded_plugins:
        for _, service_func in sorted(
            plugin.services, key=lambda s: (s[0], s[1].__name__)
        ):
            self._service_tasks.append(
                asyncio.create_task(service_func(self._state))
            )
            logger.info(f"started service from plugin: {plugin.dist_name}")

    self._server_task = asyncio.create_task(self._server.serve())

    while not self._server.started:
        if self._server_task.done():
            await self._server_task
        await asyncio.sleep(0.1)

    return self
wait async
wait() -> None

Waits for the server task to complete (e.g. via signal or internal error).

Source code in packages/tangram_core/src/tangram_core/backend.py
474
475
476
477
478
479
480
async def wait(self) -> None:
    """Waits for the server task to complete (e.g. via signal or internal error)."""
    if self._server_task:
        try:
            await self._server_task
        except asyncio.CancelledError:
            pass
stop async
stop() -> None

Stops the backend runtime.

Source code in packages/tangram_core/src/tangram_core/backend.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
async def stop(self) -> None:
    """Stops the backend runtime."""
    if self._server and self._server.started:
        self._server.should_exit = True
        if self._server_task:
            try:
                await self._server_task
            except asyncio.CancelledError:
                pass

    for task in self._service_tasks:
        task.cancel()
    if self._service_tasks:
        await asyncio.gather(*self._service_tasks, return_exceptions=True)
    self._service_tasks.clear()

    await self._stack.aclose()
    self._state = None
    self._server = None
    self._server_task = None

get_state async

get_state(request: Request) -> BackendState
Source code in packages/tangram_core/src/tangram_core/backend.py
72
73
async def get_state(request: Request) -> BackendState:
    return request.app.state.backend_state  # type: ignore

get_distribution_path

get_distribution_path(dist_name: str) -> Path

Get the local path of a distribution, handling both editable installs (direct_url.json) and standard wheel installs.

See: https://packaging.python.org/en/latest/specifications/direct-url-data-structure/

Source code in packages/tangram_core/src/tangram_core/backend.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def get_distribution_path(dist_name: str) -> Path:
    """Get the local path of a distribution, handling both editable installs
    (`direct_url.json`) and standard wheel installs.

    See: https://packaging.python.org/en/latest/specifications/direct-url-data-structure/
    """
    # always try direct_url.json first (e.g. for the case of `uv sync --all-packages`)
    try:
        dist = Distribution.from_name(dist_name)
        if direct_url_content := dist.read_text("direct_url.json"):
            direct_url_data = json.loads(direct_url_content)
            if (
                (url := direct_url_data.get("url"))
                # url may point to a git or zip archive, but since we only care
                # about local paths, we only handle the file:// scheme here
                and url.startswith("file://")
            ):
                parsed = urllib.parse.urlparse(url)
                if os.name == "nt":
                    path_str = urllib.request.url2pathname(parsed.path)
                    if parsed.netloc and parsed.netloc not in ("", "localhost"):
                        path_str = f"//{parsed.netloc}{path_str}"
                    path1 = Path(path_str)
                else:
                    path1 = Path(urllib.parse.unquote(parsed.path))
                if path1.is_dir():
                    return path1
    except (PackageNotFoundError, json.JSONDecodeError, FileNotFoundError):
        pass

    # fallback in case it was installed via a wheel
    if (trav := importlib.resources.files(dist_name)).is_dir():
        with importlib.resources.as_file(trav) as path2:
            return path2
    raise FileNotFoundError(f"could not find distribution path for {dist_name}")

resolve_frontend

resolve_frontend(plugin: Plugin) -> Path | None
Source code in packages/tangram_core/src/tangram_core/backend.py
116
117
118
119
def resolve_frontend(plugin: Plugin) -> Path | None:
    if not plugin.frontend_path:
        return None
    return get_distribution_path(plugin.dist_name) / plugin.frontend_path

load_enabled_plugins

load_enabled_plugins(config: Config) -> list[Plugin]
Source code in packages/tangram_core/src/tangram_core/backend.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def load_enabled_plugins(
    config: Config,
) -> list[Plugin]:
    loaded_plugins = []
    enabled_plugin_names = set(config.core.plugins)

    for entry_point in scan_plugins():
        # TODO: should we check entry_point.dist.name instead?
        if entry_point.name not in enabled_plugin_names:
            continue
        if (plugin := load_plugin(entry_point)) is not None:
            loaded_plugins.append(plugin)

    return loaded_plugins

lifespan async

lifespan(
    app: FastAPI,
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> AsyncGenerator[None, None]
Source code in packages/tangram_core/src/tangram_core/backend.py
138
139
140
141
142
143
144
145
146
147
148
149
@asynccontextmanager
async def lifespan(
    app: FastAPI, backend_state: BackendState, loaded_plugins: Iterable[Plugin]
) -> AsyncGenerator[None, None]:
    async with AsyncExitStack() as stack:
        for plugin in loaded_plugins:
            if plugin.lifespan:
                logger.info(f"initializing lifespan for {plugin.dist_name}")
                await stack.enter_async_context(plugin.lifespan(backend_state))

        app.state.backend_state = backend_state
        yield

default_cache_dir

default_cache_dir() -> Path
Source code in packages/tangram_core/src/tangram_core/backend.py
152
153
154
155
156
157
158
159
160
def default_cache_dir() -> Path:
    if (xdg_cache := os.environ.get("XDG_CACHE_HOME")) is not None:
        cache_dir = Path(xdg_cache) / "tangram"
    else:
        cache_dir = Path(platformdirs.user_cache_dir(appname="tangram"))
    if not cache_dir.exists():
        cache_dir.mkdir(parents=True, exist_ok=True)

    return cache_dir

make_cache_route_handler

make_cache_route_handler(
    entry: CacheEntry, state: BackendState
) -> Callable[..., Awaitable[FileResponse]]

Factory function that creates a route handler for caching and serving files. Dynamically handles URL parameters found in both serve_route and origin.

Parameters:

Name Type Description Default
entry CacheEntry

Cache entry configuration

required
state BackendState

Backend state with http_client for fetching remote resources

required

Returns:

Type Description
Callable[..., Awaitable[FileResponse]]

Async function that handles the route with dynamic parameters

Source code in packages/tangram_core/src/tangram_core/backend.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def make_cache_route_handler(
    entry: CacheEntry, state: BackendState
) -> Callable[..., Awaitable[FileResponse]]:
    """
    Factory function that creates a route handler for caching and serving files.
    Dynamically handles URL parameters found in both serve_route and origin.

    :param entry: Cache entry configuration
    :param state: Backend state with http_client for fetching remote resources
    :returns: Async function that handles the route with dynamic parameters
    """
    from inspect import Parameter, Signature

    # Extract parameter names from the serve_route (e.g., {fontstack}, {range})
    params = CACHE_PARAM_PATTERN.findall(entry.serve_route)

    async def cache_route_handler(**kwargs: str) -> FileResponse:
        if (local_path := entry.local_path) is None:
            local_path = default_cache_dir()
        else:
            local_path = local_path.expanduser()

        # Build the local file path by replacing parameters
        local_file = local_path
        for param in params:
            if param in kwargs:
                local_file = local_file / kwargs[param]

        logger.info(f"Serving cached file from {local_file}")

        if not local_file.exists():
            assert entry.origin is not None
            # Build the remote URL by replacing parameters
            remote_url = entry.origin
            for param, value in kwargs.items():
                remote_url = remote_url.replace(f"{{{param}}}", value)

            logger.info(f"Downloading from {remote_url} to {local_file}")
            c = await state.http_client.get(remote_url)
            c.raise_for_status()
            local_file.parent.mkdir(parents=True, exist_ok=True)
            local_file.write_bytes(c.content)

        return FileResponse(path=local_file, media_type=entry.media_type)

    # Create explicit parameters for the function signature
    sig_params = [
        Parameter(
            name=param,
            kind=Parameter.POSITIONAL_OR_KEYWORD,
            annotation=str,
        )
        for param in params
    ]
    cache_route_handler.__signature__ = Signature(  # type: ignore
        parameters=sig_params,
        return_annotation=FileResponse,
    )

    return cache_route_handler

extract_frontend_config

extract_frontend_config(
    config_instance: Any, config_cls: type
) -> dict[str, Any]
Source code in packages/tangram_core/src/tangram_core/backend.py
228
229
230
231
232
233
234
235
def extract_frontend_config(config_instance: Any, config_cls: type) -> dict[str, Any]:
    hints = get_type_hints(config_cls, include_extras=True)
    frontend_config = {}
    for field_name, field_type in hints.items():
        if hasattr(field_type, "__metadata__"):
            if any(isinstance(m, ExposeField) for m in field_type.__metadata__):
                frontend_config[field_name] = getattr(config_instance, field_name)
    return frontend_config

create_app

create_app(
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> FastAPI
Source code in packages/tangram_core/src/tangram_core/backend.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def create_app(
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> FastAPI:
    app = FastAPI(
        lifespan=partial(
            lifespan, backend_state=backend_state, loaded_plugins=loaded_plugins
        ),
        default_response_class=ORJSONResponse,
    )
    frontend_plugins = {}

    for plugin in loaded_plugins:
        for router in plugin.routers:
            app.include_router(router)

        if (frontend_path_resolved := resolve_frontend(plugin)) is not None:
            app.mount(
                f"/plugins/{plugin.dist_name}",
                StaticFiles(directory=str(frontend_path_resolved)),
                name=plugin.dist_name,
            )
            plugin_json_path = frontend_path_resolved / "plugin.json"
            if plugin_json_path.exists():
                try:
                    with plugin_json_path.open("rb") as f:
                        plugin_meta = json.load(f)

                    conf_dict = backend_state.config.plugins.get(plugin.dist_name, {})
                    if plugin.config_class:
                        conf_instance = TypeAdapter(
                            plugin.config_class
                        ).validate_python(conf_dict)
                        conf_frontend = extract_frontend_config(
                            conf_instance, plugin.config_class
                        )
                        if plugin.with_computed_fields is not None:
                            conf_frontend.update(
                                plugin.with_computed_fields(
                                    backend_state, conf_instance
                                )
                            )
                    elif plugin.with_computed_fields is not None:
                        conf_frontend = plugin.with_computed_fields(
                            backend_state, conf_dict
                        )
                    else:
                        conf_frontend = {}

                    plugin_meta["config"] = conf_frontend
                    frontend_plugins[plugin.dist_name] = plugin_meta
                except Exception as e:
                    logger.error(
                        f"failed to read plugin.json for {plugin.dist_name}: {e}"
                    )

    # unlike v0.1 which uses `process.env`, v0.2 *compiles* the js so we no
    # no longer have access to it, so we selectively forward the config.
    @app.get("/config")
    async def get_frontend_config(
        state: Annotated[BackendState, Depends(get_state)],
    ) -> FrontendConfig:
        channel_cfg = state.config.channel
        if channel_cfg.public_url:
            channel_url = channel_cfg.public_url
        else:
            # for local/non-proxied setups, user must set a reachable host.
            # '0.0.0.0' is for listening, not connecting.
            host = "localhost" if channel_cfg.host == "0.0.0.0" else channel_cfg.host
            channel_url = f"http://{host}:{channel_cfg.port}"

        themes = list(DEFAULT_THEMES.values())
        user_theme_names = {t.name for t in state.config.themes}
        themes = [
            t for t in themes if t.name not in user_theme_names
        ] + state.config.themes

        return FrontendConfig(
            channel=FrontendChannelConfig(url=channel_url),
            map=state.config.map,
            theme=FrontendThemeConfig(
                active=state.config.core.theme, definitions=themes
            ),
        )

    @app.get("/manifest.json")
    async def get_manifest() -> ORJSONResponse:
        return ORJSONResponse(content={"plugins": frontend_plugins})

    # Cache mechanism - MUST be registered BEFORE the catch-all frontend mount
    for cache_entry in backend_state.config.cache.entries:
        logger.info(
            f"caching {cache_entry.origin} to {cache_entry.local_path} "
            f"and serving at {cache_entry.serve_route}"
        )
        route_handler = make_cache_route_handler(cache_entry, backend_state)

        logger.info(
            f"Registering route: GET {cache_entry.serve_route} with dynamic params"
        )
        app.add_api_route(
            cache_entry.serve_route,
            route_handler,
            methods=["GET"],
            name=f"cache-{cache_entry.serve_route.replace('/', '_')}",
        )

    if not (
        frontend_path := get_distribution_path("tangram_core") / "dist-frontend"
    ).is_dir():
        raise ValueError(
            f"error: frontend {frontend_path} was not found, "
            "did you run `pnpm i && pnpm run build`?"
        )
    app.mount("/", StaticFiles(directory=str(frontend_path), html=True), name="core")

    return app

run_channel_service async

run_channel_service(config: Config) -> None
Source code in packages/tangram_core/src/tangram_core/backend.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
async def run_channel_service(config: Config) -> None:
    from . import _core

    _core.init_tracing_stderr(config.core.log_level)

    rust_config = _core.ChannelConfig(
        host=config.channel.host,
        port=config.channel.port,
        redis_url=config.core.redis_url,
        jwt_secret=config.channel.jwt_secret,
        jwt_expiration_secs=config.channel.jwt_expiration_secs,
        id_length=config.channel.id_length,
    )
    await _core.run(rust_config)

get_log_config_dict

get_log_config_dict(config: Config) -> dict[str, Any]
Source code in packages/tangram_core/src/tangram_core/backend.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
def get_log_config_dict(config: Config) -> dict[str, Any]:
    def format_time(dt: datetime) -> str:
        return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ ")

    return {
        "version": 1,
        "disable_existing_loggers": False,
        "handlers": {
            "default": {
                "class": "rich.logging.RichHandler",
                "log_time_format": format_time,
                "omit_repeated_times": False,
            },
        },
        "root": {"handlers": ["default"], "level": config.core.log_level.upper()},
    }

config

StyleName module-attribute

StyleName: TypeAlias = str

Url module-attribute

Url: TypeAlias = str

StrOrPathLike module-attribute

StrOrPathLike = str | PathLike[str]

IntoConfig module-attribute

IntoConfig: TypeAlias = 'Config | StrOrPathLike'

HasTopbarUiConfig

Bases: Protocol

Source code in packages/tangram_core/src/tangram_core/config.py
26
27
28
@runtime_checkable
class HasTopbarUiConfig(Protocol):
    topbar_order: int
topbar_order instance-attribute
topbar_order: int

HasSidebarUiConfig

Bases: Protocol

Source code in packages/tangram_core/src/tangram_core/config.py
31
32
33
@runtime_checkable
class HasSidebarUiConfig(Protocol):
    sidebar_order: int
sidebar_order instance-attribute
sidebar_order: int

ServerConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
36
37
38
39
@dataclass
class ServerConfig:
    host: str = "127.0.0.1"
    port: int = 2346
host class-attribute instance-attribute
host: str = '127.0.0.1'
port class-attribute instance-attribute
port: int = 2346

ChannelConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
42
43
44
45
46
47
48
49
50
51
@dataclass
class ChannelConfig:
    # TODO: we should make it clear that host:port is for the *backend* to
    # listen on, and not to be confused with the frontend.
    host: str = "127.0.0.1"
    port: int = 2347
    public_url: str | None = None
    jwt_secret: str = "secret"
    jwt_expiration_secs: int = 315360000  # 10 years
    id_length: int = 8
host class-attribute instance-attribute
host: str = '127.0.0.1'
port class-attribute instance-attribute
port: int = 2347
public_url class-attribute instance-attribute
public_url: str | None = None
jwt_secret class-attribute instance-attribute
jwt_secret: str = 'secret'
jwt_expiration_secs class-attribute instance-attribute
jwt_expiration_secs: int = 315360000
id_length class-attribute instance-attribute
id_length: int = 8

UrlConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
54
55
56
57
@dataclass
class UrlConfig:
    url: str
    type: str = "vector"
url instance-attribute
url: str
type class-attribute instance-attribute
type: str = 'vector'

StyleSpecification dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
63
64
65
66
67
68
69
@dataclass
class StyleSpecification:
    name: StyleName | None = None
    sources: dict[str, UrlConfig] | None = None
    glyphs: str = "https://cdn.protomaps.com/fonts/pbf/{fontstack}/{range}.pbf"
    layers: list[Any] | None = None
    version: Literal[8] = 8
name class-attribute instance-attribute
name: StyleName | None = None
sources class-attribute instance-attribute
sources: dict[str, UrlConfig] | None = None
glyphs class-attribute instance-attribute
glyphs: str = "https://cdn.protomaps.com/fonts/pbf/{fontstack}/{range}.pbf"
layers class-attribute instance-attribute
layers: list[Any] | None = None
version class-attribute instance-attribute
version: Literal[8] = 8

MapConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@dataclass
class MapConfig:
    # users can specify local path in config file but it will be resolved in from_file
    # and so the stored type cannot be Path
    style: Url | StyleName | StyleSpecification = (
        "https://basemaps.cartocdn.com/gl/voyager-gl-style/style.json"
    )
    styles: list[Url | StyleSpecification] = field(default_factory=list)
    attribution: str = (
        '&copy; <a href="https://www.openstreetmap.org/copyright">'
        "OpenStreetMap</a> contributors &copy; "
        '<a href="https://carto.com/attributions">CARTO</a>'
    )
    center_lat: float = 48.0
    center_lon: float = 7.0
    zoom: float = 4
    pitch: float = 0
    bearing: float = 0
    lang: str = "en"
    min_zoom: float = 0
    max_zoom: float = 24
    max_pitch: float = 70
    allow_pitch: bool = True
    allow_bearing: bool = True
    enable_3d: bool = False
style class-attribute instance-attribute
style: Url | StyleName | StyleSpecification = (
    "https://basemaps.cartocdn.com/gl/voyager-gl-style/style.json"
)
styles class-attribute instance-attribute
styles: list[Url | StyleSpecification] = field(
    default_factory=list
)
attribution class-attribute instance-attribute
attribution: str = '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors &copy; <a href="https://carto.com/attributions">CARTO</a>'
center_lat class-attribute instance-attribute
center_lat: float = 48.0
center_lon class-attribute instance-attribute
center_lon: float = 7.0
zoom class-attribute instance-attribute
zoom: float = 4
pitch class-attribute instance-attribute
pitch: float = 0
bearing class-attribute instance-attribute
bearing: float = 0
lang class-attribute instance-attribute
lang: str = 'en'
min_zoom class-attribute instance-attribute
min_zoom: float = 0
max_zoom class-attribute instance-attribute
max_zoom: float = 24
max_pitch class-attribute instance-attribute
max_pitch: float = 70
allow_pitch class-attribute instance-attribute
allow_pitch: bool = True
allow_bearing class-attribute instance-attribute
allow_bearing: bool = True
enable_3d class-attribute instance-attribute
enable_3d: bool = False

ThemeDefinition dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
102
103
104
105
106
107
108
109
110
111
112
113
114
@dataclass
class ThemeDefinition:
    name: str
    background_color: str
    foreground_color: str
    surface_color: str
    border_color: str
    hover_color: str
    accent1: str
    accent1_foreground: str
    accent2: str
    accent2_foreground: str
    muted_color: str
name instance-attribute
name: str
background_color instance-attribute
background_color: str
foreground_color instance-attribute
foreground_color: str
surface_color instance-attribute
surface_color: str
border_color instance-attribute
border_color: str
hover_color instance-attribute
hover_color: str
accent1 instance-attribute
accent1: str
accent1_foreground instance-attribute
accent1_foreground: str
accent2 instance-attribute
accent2: str
accent2_foreground instance-attribute
accent2_foreground: str
muted_color instance-attribute
muted_color: str

AdaptiveTheme dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
117
118
119
120
@dataclass
class AdaptiveTheme:
    light: str = "light"
    dark: str = "dark"
light class-attribute instance-attribute
light: str = 'light'
dark class-attribute instance-attribute
dark: str = 'dark'

CoreConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
123
124
125
126
127
128
@dataclass
class CoreConfig:
    redis_url: str = "redis://127.0.0.1:6379"
    plugins: list[str] = field(default_factory=list)
    log_level: str = "INFO"
    theme: str | AdaptiveTheme = field(default_factory=AdaptiveTheme)
redis_url class-attribute instance-attribute
redis_url: str = 'redis://127.0.0.1:6379'
plugins class-attribute instance-attribute
plugins: list[str] = field(default_factory=list)
log_level class-attribute instance-attribute
log_level: str = 'INFO'
theme class-attribute instance-attribute
theme: str | AdaptiveTheme = field(
    default_factory=AdaptiveTheme
)

CacheEntry dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
131
132
133
134
135
136
137
138
139
140
@dataclass
class CacheEntry:
    origin: str | None = None
    """Origin URL. If None, the local file is served directly."""
    local_path: Path | None = None
    """Local path to cache the file."""
    serve_route: str = ""
    """Where to serve the file in FastAPI."""
    media_type: str = "application/octet-stream"
    """Media type for the response."""
origin class-attribute instance-attribute
origin: str | None = None

Origin URL. If None, the local file is served directly.

local_path class-attribute instance-attribute
local_path: Path | None = None

Local path to cache the file.

serve_route class-attribute instance-attribute
serve_route: str = ''

Where to serve the file in FastAPI.

media_type class-attribute instance-attribute
media_type: str = 'application/octet-stream'

Media type for the response.

CacheConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
143
144
145
@dataclass
class CacheConfig:
    entries: list[CacheEntry] = field(default_factory=list)
entries class-attribute instance-attribute
entries: list[CacheEntry] = field(default_factory=list)

Config dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@dataclass
class Config:
    core: CoreConfig = field(default_factory=CoreConfig)
    server: ServerConfig = field(default_factory=ServerConfig)
    channel: ChannelConfig = field(default_factory=ChannelConfig)
    map: MapConfig = field(default_factory=MapConfig)
    plugins: dict[str, Any] = field(default_factory=dict)
    """Mapping of plugin name to plugin-specific config."""
    themes: list[ThemeDefinition] = field(default_factory=list)
    cache: CacheConfig = field(default_factory=CacheConfig)

    @classmethod
    def from_file(cls, config_path: StrOrPathLike) -> Config:
        if sys.version_info < (3, 11):
            import tomli as tomllib
        else:
            import tomllib
        from pydantic import TypeAdapter

        with open(config_path, "rb") as f:
            cfg_data = tomllib.load(f)

        base_dir = Path(config_path).parent
        map = cfg_data.setdefault("map", {})
        if (s := map.get("style", None)) is not None:
            map["style"] = try_resolve_local_style(base_dir, s, allow_style_name=True)
        map["styles"] = [
            try_resolve_local_style(base_dir, style, allow_style_name=False)
            for style in map.get("styles", [])
        ]
        config_adapter = TypeAdapter(cls)
        config = config_adapter.validate_python(cfg_data)
        return config
core class-attribute instance-attribute
core: CoreConfig = field(default_factory=CoreConfig)
server class-attribute instance-attribute
server: ServerConfig = field(default_factory=ServerConfig)
channel class-attribute instance-attribute
channel: ChannelConfig = field(
    default_factory=ChannelConfig
)
map class-attribute instance-attribute
map: MapConfig = field(default_factory=MapConfig)
plugins class-attribute instance-attribute
plugins: dict[str, Any] = field(default_factory=dict)

Mapping of plugin name to plugin-specific config.

themes class-attribute instance-attribute
themes: list[ThemeDefinition] = field(default_factory=list)
cache class-attribute instance-attribute
cache: CacheConfig = field(default_factory=CacheConfig)
from_file classmethod
from_file(config_path: StrOrPathLike) -> Config
Source code in packages/tangram_core/src/tangram_core/config.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@classmethod
def from_file(cls, config_path: StrOrPathLike) -> Config:
    if sys.version_info < (3, 11):
        import tomli as tomllib
    else:
        import tomllib
    from pydantic import TypeAdapter

    with open(config_path, "rb") as f:
        cfg_data = tomllib.load(f)

    base_dir = Path(config_path).parent
    map = cfg_data.setdefault("map", {})
    if (s := map.get("style", None)) is not None:
        map["style"] = try_resolve_local_style(base_dir, s, allow_style_name=True)
    map["styles"] = [
        try_resolve_local_style(base_dir, style, allow_style_name=False)
        for style in map.get("styles", [])
    ]
    config_adapter = TypeAdapter(cls)
    config = config_adapter.validate_python(cfg_data)
    return config

ExposeField dataclass

Marker class for typing.Annotated to expose fields to the frontend.

Source code in packages/tangram_core/src/tangram_core/config.py
207
208
209
@dataclass(frozen=True)
class ExposeField:
    """Marker class for typing.Annotated to expose fields to the frontend."""

FrontendChannelConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
219
220
221
@dataclass
class FrontendChannelConfig:
    url: str
url instance-attribute
url: str

FrontendThemeConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
224
225
226
227
@dataclass
class FrontendThemeConfig:
    active: str | AdaptiveTheme
    definitions: list[ThemeDefinition]
active instance-attribute
active: str | AdaptiveTheme
definitions instance-attribute
definitions: list[ThemeDefinition]

FrontendConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
230
231
232
233
234
@dataclass
class FrontendConfig:
    channel: FrontendChannelConfig
    map: MapConfig
    theme: FrontendThemeConfig
channel instance-attribute
map instance-attribute
map: MapConfig
theme instance-attribute

default_config_file

default_config_file() -> Path
Source code in packages/tangram_core/src/tangram_core/config.py
13
14
15
16
17
18
19
20
21
22
23
def default_config_file() -> Path:
    import platformdirs

    if (xdg_config := os.environ.get("XDG_CONFIG_HOME")) is not None:
        config_dir = Path(xdg_config) / "tangram"
    else:
        config_dir = Path(platformdirs.user_config_dir(appname="tangram"))
    if not config_dir.exists():
        config_dir.mkdir(parents=True, exist_ok=True)

    return Path(config_dir) / "tangram.toml"

try_resolve_local_style

try_resolve_local_style(
    base_dir: Path,
    style: Url | StyleName | StyleSpecification,
    *,
    allow_style_name: bool,
) -> Url | StyleSpecification
Source code in packages/tangram_core/src/tangram_core/config.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def try_resolve_local_style(
    base_dir: Path,
    style: Url | StyleName | StyleSpecification,
    *,
    allow_style_name: bool,
) -> Url | StyleSpecification:
    if isinstance(style, str):
        scheme = urllib.parse.urlparse(style).scheme
        if scheme in ("http", "https"):
            return style
        if (p := (base_dir / style).resolve()).is_file():
            with open(p, "rb") as f:
                return json.load(f)
        if not allow_style_name:
            raise ValueError(f"expected '{style}' to be a valid URL or file path")
    return style

plugin

ServiceAsyncFunc module-attribute

ServiceAsyncFunc: TypeAlias = Callable[
    [BackendState], Coroutine[Any, Any, None]
]

ServiceFunc module-attribute

ServiceFunc: TypeAlias = (
    ServiceAsyncFunc | Callable[[BackendState], None]
)

Priority module-attribute

Priority: TypeAlias = int

WithComputedFieldsFunction module-attribute

WithComputedFieldsFunction: TypeAlias = Callable[
    [BackendState, Any], dict[str, Any]
]

Lifespan module-attribute

logger module-attribute

logger = getLogger(__name__)

Plugin dataclass

Stores the metadata and registered API routes, background services and frontend assets for a tangram plugin.

Packages should declare an entry point in the tangram_core.plugins group in their pyproject.toml pointing to an instance of this class.

Source code in packages/tangram_core/src/tangram_core/plugin.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@dataclass
class Plugin:
    """Stores the metadata and registered API routes, background services and
    frontend assets for a tangram plugin.

    Packages should declare an entry point in the `tangram_core.plugins` group
    in their `pyproject.toml` pointing to an instance of this class.
    """

    frontend_path: str | None = None
    """Path to the compiled frontend assets, *relative* to the distribution root
    (editable) or package root (wheel).
    """
    routers: list[APIRouter] = field(default_factory=list)
    config_class: type | None = None
    """The configuration class (dataclass or Pydantic model) for this plugin.
    Fields annotated with `tangram_core.config.Expose()` will be exposed to the
    frontend."""
    with_computed_fields: WithComputedFieldsFunction | None = None
    """Function to add additional dynamically computed fields to the frontend
    configuration. It receives the backend state and the validated configuration object
    (if `config_class` is set) or the raw dictionary, and should return a dictionary
    of extra fields to merge."""
    lifespan: Lifespan | None = None
    """Async context manager for plugin initialization and teardown."""
    services: list[tuple[Priority, ServiceAsyncFunc]] = field(
        default_factory=list, init=False
    )
    dist_name: str = field(init=False)
    """Name of the distribution (package) that provided this plugin, populated
    automatically during loading.
    """  # we do this so plugins can know their own package name if needed

    def register_service(
        self, priority: Priority = 0
    ) -> Callable[[ServiceFunc], ServiceFunc]:
        """Decorator to register a background service function.

        Services are long-running async functions that receive the BackendState
        and are started when the application launches.
        """

        def decorator(func: ServiceFunc) -> ServiceFunc:
            @functools.wraps(func)
            async def async_wrapper(backend_state: BackendState) -> None:
                if asyncio.iscoroutinefunction(func):
                    await func(backend_state)
                else:
                    await asyncio.to_thread(func, backend_state)

            self.services.append((priority, async_wrapper))
            return func

        return decorator
frontend_path class-attribute instance-attribute
frontend_path: str | None = None

Path to the compiled frontend assets, relative to the distribution root (editable) or package root (wheel).

routers class-attribute instance-attribute
routers: list[APIRouter] = field(default_factory=list)
config_class class-attribute instance-attribute
config_class: type | None = None

The configuration class (dataclass or Pydantic model) for this plugin. Fields annotated with tangram_core.config.Expose() will be exposed to the frontend.

with_computed_fields class-attribute instance-attribute
with_computed_fields: WithComputedFieldsFunction | None = (
    None
)

Function to add additional dynamically computed fields to the frontend configuration. It receives the backend state and the validated configuration object (if config_class is set) or the raw dictionary, and should return a dictionary of extra fields to merge.

lifespan class-attribute instance-attribute
lifespan: Lifespan | None = None

Async context manager for plugin initialization and teardown.

services class-attribute instance-attribute
services: list[tuple[Priority, ServiceAsyncFunc]] = field(
    default_factory=list, init=False
)
dist_name class-attribute instance-attribute
dist_name: str = field(init=False)

Name of the distribution (package) that provided this plugin, populated automatically during loading.

register_service
register_service(
    priority: Priority = 0,
) -> Callable[[ServiceFunc], ServiceFunc]

Decorator to register a background service function.

Services are long-running async functions that receive the BackendState and are started when the application launches.

Source code in packages/tangram_core/src/tangram_core/plugin.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def register_service(
    self, priority: Priority = 0
) -> Callable[[ServiceFunc], ServiceFunc]:
    """Decorator to register a background service function.

    Services are long-running async functions that receive the BackendState
    and are started when the application launches.
    """

    def decorator(func: ServiceFunc) -> ServiceFunc:
        @functools.wraps(func)
        async def async_wrapper(backend_state: BackendState) -> None:
            if asyncio.iscoroutinefunction(func):
                await func(backend_state)
            else:
                await asyncio.to_thread(func, backend_state)

        self.services.append((priority, async_wrapper))
        return func

    return decorator

scan_plugins

scan_plugins() -> EntryPoints
Source code in packages/tangram_core/src/tangram_core/plugin.py
87
88
def scan_plugins() -> importlib.metadata.EntryPoints:
    return importlib.metadata.entry_points(group="tangram_core.plugins")

load_plugin

load_plugin(entry_point: EntryPoint) -> Plugin | None

Instantiates the plugin object defined in the entry point and injects the name of the distribution into it.

Source code in packages/tangram_core/src/tangram_core/plugin.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def load_plugin(
    entry_point: importlib.metadata.EntryPoint,
) -> Plugin | None:
    """Instantiates the plugin object defined in the entry point
    and injects the name of the distribution into it."""
    try:
        plugin_instance = entry_point.load()
    except Exception as e:
        tb = traceback.format_exc()
        logger.error(
            f"failed to load plugin {entry_point.name}: {e}. {tb}"
            f"\n= help: does {entry_point.value} exist?"
        )
        return None
    if not isinstance(plugin_instance, Plugin):
        logger.error(f"entry point {entry_point.name} is not an instance of `Plugin`")
        return None
    if entry_point.dist is None:
        logger.error(f"could not determine distribution for plugin {entry_point.name}")
        return None
    # NOTE: we ignore `entry_point.name` for now and simply use the distribution's name
    # should we raise an error if they differ? not for now

    plugin_instance.dist_name = entry_point.dist.name
    return plugin_instance

redis

log module-attribute

log = getLogger(__name__)

StateT module-attribute

StateT = TypeVar('StateT')

Subscriber

Bases: ABC, Generic[StateT]

Source code in packages/tangram_core/src/tangram_core/redis.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class Subscriber(abc.ABC, Generic[StateT]):
    redis: Redis
    task: asyncio.Task[None]
    pubsub: PubSub

    def __init__(
        self, name: str, redis_url: str, channels: List[str], initial_state: StateT
    ):
        self.name = name
        self.redis_url: str = redis_url
        self.channels: List[str] = channels
        self.state: StateT = initial_state
        self._running = False

    async def subscribe(self) -> None:
        if self._running:
            log.warning("%s already running", self.name)
            return

        try:
            self.redis = await Redis.from_url(self.redis_url)
            self.pubsub = self.redis.pubsub()
            await self.pubsub.psubscribe(*self.channels)
        except RedisError as e:
            log.error("%s failed to connect to Redis: %s", self.name, e)
            raise

        async def listen() -> None:
            try:
                log.info("%s listening ...", self.name)
                async for message in self.pubsub.listen():
                    log.debug("message: %s", message)
                    if message["type"] == "pmessage":
                        await self.message_handler(
                            message["channel"].decode("utf-8"),
                            message["data"].decode("utf-8"),
                            message["pattern"].decode("utf-8"),
                            self.state,
                        )
            except asyncio.CancelledError:
                log.warning("%s cancelled", self.name)

        self._running = True

        self.task = asyncio.create_task(listen())
        log.info("%s task created, running ...", self.name)

    async def cleanup(self) -> None:
        if not self._running:
            return

        if self.task:
            log.debug("%s canceling task ...", self.name)
            self.task.cancel()
            try:
                log.debug("%s await task to finish ...", self.name)
                await self.task
                log.debug("%s task canceled", self.name)
            except asyncio.CancelledError as exc:
                log.error("%s task canceling error: %s", self.name, exc)
        if self.pubsub:
            await self.pubsub.unsubscribe()
        if self.redis:
            await self.redis.close()
        self._running = False

    def is_active(self) -> bool:
        """Return True if the subscriber is actively listening."""
        return self._running and self.task is not None and not self.task.done()

    @abc.abstractmethod
    async def message_handler(
        self, event: str, payload: str, pattern: str, state: StateT
    ) -> None:
        pass
redis instance-attribute
redis: Redis
task instance-attribute
task: Task[None]
pubsub instance-attribute
pubsub: PubSub
name instance-attribute
name = name
redis_url instance-attribute
redis_url: str = redis_url
channels instance-attribute
channels: List[str] = channels
state instance-attribute
state: StateT = initial_state
subscribe async
subscribe() -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def subscribe(self) -> None:
    if self._running:
        log.warning("%s already running", self.name)
        return

    try:
        self.redis = await Redis.from_url(self.redis_url)
        self.pubsub = self.redis.pubsub()
        await self.pubsub.psubscribe(*self.channels)
    except RedisError as e:
        log.error("%s failed to connect to Redis: %s", self.name, e)
        raise

    async def listen() -> None:
        try:
            log.info("%s listening ...", self.name)
            async for message in self.pubsub.listen():
                log.debug("message: %s", message)
                if message["type"] == "pmessage":
                    await self.message_handler(
                        message["channel"].decode("utf-8"),
                        message["data"].decode("utf-8"),
                        message["pattern"].decode("utf-8"),
                        self.state,
                    )
        except asyncio.CancelledError:
            log.warning("%s cancelled", self.name)

    self._running = True

    self.task = asyncio.create_task(listen())
    log.info("%s task created, running ...", self.name)
cleanup async
cleanup() -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def cleanup(self) -> None:
    if not self._running:
        return

    if self.task:
        log.debug("%s canceling task ...", self.name)
        self.task.cancel()
        try:
            log.debug("%s await task to finish ...", self.name)
            await self.task
            log.debug("%s task canceled", self.name)
        except asyncio.CancelledError as exc:
            log.error("%s task canceling error: %s", self.name, exc)
    if self.pubsub:
        await self.pubsub.unsubscribe()
    if self.redis:
        await self.redis.close()
    self._running = False
is_active
is_active() -> bool

Return True if the subscriber is actively listening.

Source code in packages/tangram_core/src/tangram_core/redis.py
81
82
83
def is_active(self) -> bool:
    """Return True if the subscriber is actively listening."""
    return self._running and self.task is not None and not self.task.done()
message_handler abstractmethod async
message_handler(
    event: str, payload: str, pattern: str, state: StateT
) -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
85
86
87
88
89
@abc.abstractmethod
async def message_handler(
    self, event: str, payload: str, pattern: str, state: StateT
) -> None:
    pass

tangram_core._core

ChannelConfig

host property writable

host: str

port property writable

port: int

redis_url property writable

redis_url: str

jwt_secret property writable

jwt_secret: str

jwt_expiration_secs property writable

jwt_expiration_secs: int

id_length property writable

id_length: int

__new__

__new__(
    host: str,
    port: int,
    redis_url: str,
    jwt_secret: str,
    jwt_expiration_secs: int,
    id_length: int,
) -> ChannelConfig

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

run

run(config: ChannelConfig) -> Any