Skip to content

Core

tangram_core

InjectBackendState module-attribute

InjectBackendState: TypeAlias = Annotated[
    BackendState, Depends(get_state)
]

BackendState dataclass

Source code in packages/tangram_core/src/tangram_core/backend.py
51
52
53
54
55
@dataclass
class BackendState:
    redis_client: redis.Redis
    http_client: httpx.AsyncClient
    config: Config

redis_client instance-attribute

redis_client: Redis

http_client instance-attribute

http_client: AsyncClient

config instance-attribute

config: Config

Config dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@dataclass
class Config:
    core: CoreConfig = field(default_factory=CoreConfig)
    server: ServerConfig = field(default_factory=ServerConfig)
    channel: ChannelConfig = field(default_factory=ChannelConfig)
    map: MapConfig = field(default_factory=MapConfig)
    plugins: dict[str, Any] = field(default_factory=dict)
    cache: CacheConfig = field(default_factory=CacheConfig)

    @classmethod
    def from_file(cls, config_path: Path) -> Config:
        if sys.version_info < (3, 11):
            import tomli as tomllib
        else:
            import tomllib
        from pydantic import TypeAdapter

        with open(config_path, "rb") as f:
            cfg_data = tomllib.load(f)

        config_adapter = TypeAdapter(cls)
        config = config_adapter.validate_python(cfg_data)
        return config

core class-attribute instance-attribute

core: CoreConfig = field(default_factory=CoreConfig)

server class-attribute instance-attribute

server: ServerConfig = field(default_factory=ServerConfig)

channel class-attribute instance-attribute

channel: ChannelConfig = field(
    default_factory=ChannelConfig
)

map class-attribute instance-attribute

map: MapConfig = field(default_factory=MapConfig)

plugins class-attribute instance-attribute

plugins: dict[str, Any] = field(default_factory=dict)

cache class-attribute instance-attribute

cache: CacheConfig = field(default_factory=CacheConfig)

from_file classmethod

from_file(config_path: Path) -> Config
Source code in packages/tangram_core/src/tangram_core/config.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@classmethod
def from_file(cls, config_path: Path) -> Config:
    if sys.version_info < (3, 11):
        import tomli as tomllib
    else:
        import tomllib
    from pydantic import TypeAdapter

    with open(config_path, "rb") as f:
        cfg_data = tomllib.load(f)

    config_adapter = TypeAdapter(cls)
    config = config_adapter.validate_python(cfg_data)
    return config

Plugin dataclass

Stores the metadata and registered API routes, background services and frontend assets for a tangram plugin.

Packages should declare an entry point in the tangram_core.plugins group in their pyproject.toml pointing to an instance of this class.

Source code in packages/tangram_core/src/tangram_core/plugin.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@dataclass
class Plugin:
    """Stores the metadata and registered API routes, background services and
    frontend assets for a tangram plugin.

    Packages should declare an entry point in the `tangram_core.plugins` group
    in their `pyproject.toml` pointing to an instance of this class.
    """

    frontend_path: str | None = None
    """Path to the compiled frontend assets, *relative* to the distribution root
    (editable) or package root (wheel).
    """
    routers: list[APIRouter] = field(default_factory=list)
    into_frontend_config_function: IntoFrontendConfigFunction | None = None
    """Function to parse plugin-scoped backend configuration (within the
    `tangram.toml`) into a frontend-safe configuration object.

    If not specified, the backend configuration dict is passed as-is."""
    lifespan: Lifespan | None = None
    """Async context manager for plugin initialization and teardown."""
    services: list[tuple[Priority, ServiceAsyncFunc]] = field(
        default_factory=list, init=False
    )
    dist_name: str = field(init=False)
    """Name of the distribution (package) that provided this plugin, populated
    automatically during loading.
    """  # we do this so plugins can know their own package name if needed

    def register_service(
        self, priority: Priority = 0
    ) -> Callable[[ServiceFunc], ServiceFunc]:
        """Decorator to register a background service function.

        Services are long-running async functions that receive the BackendState
        and are started when the application launches.
        """

        def decorator(func: ServiceFunc) -> ServiceFunc:
            @functools.wraps(func)
            async def async_wrapper(backend_state: BackendState) -> None:
                if asyncio.iscoroutinefunction(func):
                    await func(backend_state)
                else:
                    await asyncio.to_thread(func, backend_state)

            self.services.append((priority, async_wrapper))
            return func

        return decorator

frontend_path class-attribute instance-attribute

frontend_path: str | None = None

Path to the compiled frontend assets, relative to the distribution root (editable) or package root (wheel).

routers class-attribute instance-attribute

routers: list[APIRouter] = field(default_factory=list)

into_frontend_config_function class-attribute instance-attribute

into_frontend_config_function: (
    IntoFrontendConfigFunction | None
) = None

Function to parse plugin-scoped backend configuration (within the tangram.toml) into a frontend-safe configuration object.

If not specified, the backend configuration dict is passed as-is.

lifespan class-attribute instance-attribute

lifespan: Lifespan | None = None

Async context manager for plugin initialization and teardown.

services class-attribute instance-attribute

services: list[tuple[Priority, ServiceAsyncFunc]] = field(
    default_factory=list, init=False
)

dist_name class-attribute instance-attribute

dist_name: str = field(init=False)

Name of the distribution (package) that provided this plugin, populated automatically during loading.

register_service

register_service(
    priority: Priority = 0,
) -> Callable[[ServiceFunc], ServiceFunc]

Decorator to register a background service function.

Services are long-running async functions that receive the BackendState and are started when the application launches.

Source code in packages/tangram_core/src/tangram_core/plugin.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def register_service(
    self, priority: Priority = 0
) -> Callable[[ServiceFunc], ServiceFunc]:
    """Decorator to register a background service function.

    Services are long-running async functions that receive the BackendState
    and are started when the application launches.
    """

    def decorator(func: ServiceFunc) -> ServiceFunc:
        @functools.wraps(func)
        async def async_wrapper(backend_state: BackendState) -> None:
            if asyncio.iscoroutinefunction(func):
                await func(backend_state)
            else:
                await asyncio.to_thread(func, backend_state)

        self.services.append((priority, async_wrapper))
        return func

    return decorator

backend

logger module-attribute

logger = getLogger(__name__)

InjectBackendState module-attribute

InjectBackendState: TypeAlias = Annotated[
    BackendState, Depends(get_state)
]

CACHE_PARAM_PATTERN module-attribute

CACHE_PARAM_PATTERN = compile('\\{(\\w+)\\}')

BackendState dataclass

Source code in packages/tangram_core/src/tangram_core/backend.py
51
52
53
54
55
@dataclass
class BackendState:
    redis_client: redis.Redis
    http_client: httpx.AsyncClient
    config: Config
redis_client instance-attribute
redis_client: Redis
http_client instance-attribute
http_client: AsyncClient
config instance-attribute
config: Config

get_state async

get_state(request: Request) -> BackendState
Source code in packages/tangram_core/src/tangram_core/backend.py
58
59
async def get_state(request: Request) -> BackendState:
    return request.app.state.backend_state  # type: ignore

get_distribution_path

get_distribution_path(dist_name: str) -> Path

Get the local path of a distribution, handling both editable installs (direct_url.json) and standard wheel installs.

See: https://packaging.python.org/en/latest/specifications/direct-url-data-structure/

Source code in packages/tangram_core/src/tangram_core/backend.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def get_distribution_path(dist_name: str) -> Path:
    """Get the local path of a distribution, handling both editable installs
    (`direct_url.json`) and standard wheel installs.

    See: https://packaging.python.org/en/latest/specifications/direct-url-data-structure/
    """
    # always try direct_url.json first (e.g. for the case of `uv sync --all-packages`)
    try:
        dist = Distribution.from_name(dist_name)
        if direct_url_content := dist.read_text("direct_url.json"):
            direct_url_data = json.loads(direct_url_content)
            if (
                (url := direct_url_data.get("url"))
                # url may point to a git or zip archive, but since we only care
                # about local paths, we only handle the file:// scheme here
                and url.startswith("file://")
                and (
                    path1 := Path(urllib.parse.unquote(urllib.parse.urlparse(url).path))
                ).is_dir()
            ):
                return path1
    except (PackageNotFoundError, json.JSONDecodeError, FileNotFoundError):
        pass

    # fallback in case it was installed via a wheel
    if (trav := importlib.resources.files(dist_name)).is_dir():
        with importlib.resources.as_file(trav) as path2:
            return path2
    raise FileNotFoundError(f"could not find distribution path for {dist_name}")

resolve_frontend

resolve_frontend(plugin: Plugin) -> Path | None
Source code in packages/tangram_core/src/tangram_core/backend.py
96
97
98
99
def resolve_frontend(plugin: Plugin) -> Path | None:
    if not plugin.frontend_path:
        return None
    return get_distribution_path(plugin.dist_name) / plugin.frontend_path

load_enabled_plugins

load_enabled_plugins(config: Config) -> list[Plugin]
Source code in packages/tangram_core/src/tangram_core/backend.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def load_enabled_plugins(
    config: Config,
) -> list[Plugin]:
    loaded_plugins = []
    enabled_plugin_names = set(config.core.plugins)

    for entry_point in scan_plugins():
        # TODO: should we check entry_point.dist.name instead?
        if entry_point.name not in enabled_plugin_names:
            continue
        if (plugin := load_plugin(entry_point)) is not None:
            loaded_plugins.append(plugin)

    return loaded_plugins

lifespan async

lifespan(
    app: FastAPI,
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> AsyncGenerator[None, None]
Source code in packages/tangram_core/src/tangram_core/backend.py
118
119
120
121
122
123
124
125
126
127
128
129
@asynccontextmanager
async def lifespan(
    app: FastAPI, backend_state: BackendState, loaded_plugins: Iterable[Plugin]
) -> AsyncGenerator[None, None]:
    async with AsyncExitStack() as stack:
        for plugin in loaded_plugins:
            if plugin.lifespan:
                logger.info(f"initializing lifespan for {plugin.dist_name}")
                await stack.enter_async_context(plugin.lifespan(backend_state))

        app.state.backend_state = backend_state
        yield

default_cache_dir

default_cache_dir() -> Path
Source code in packages/tangram_core/src/tangram_core/backend.py
132
133
134
135
136
137
138
139
140
def default_cache_dir() -> Path:
    if (xdg_cache := os.environ.get("XDG_CACHE_HOME")) is not None:
        cache_dir = Path(xdg_cache) / "tangram"
    else:
        cache_dir = Path(platformdirs.user_cache_dir(appname="tangram"))
    if not cache_dir.exists():
        cache_dir.mkdir(parents=True, exist_ok=True)

    return cache_dir

make_cache_route_handler

make_cache_route_handler(
    entry: CacheEntry, state: BackendState
) -> Callable[..., Awaitable[FileResponse]]

Factory function that creates a route handler for caching and serving files. Dynamically handles URL parameters found in both serve_route and origin.

Parameters:

Name Type Description Default
entry CacheEntry

Cache entry configuration

required
state BackendState

Backend state with http_client for fetching remote resources

required

Returns:

Type Description
Callable[..., Awaitable[FileResponse]]

Async function that handles the route with dynamic parameters

Source code in packages/tangram_core/src/tangram_core/backend.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def make_cache_route_handler(
    entry: CacheEntry, state: BackendState
) -> Callable[..., Awaitable[FileResponse]]:
    """
    Factory function that creates a route handler for caching and serving files.
    Dynamically handles URL parameters found in both serve_route and origin.

    :param entry: Cache entry configuration
    :param state: Backend state with http_client for fetching remote resources
    :returns: Async function that handles the route with dynamic parameters
    """
    from inspect import Parameter, Signature

    # Extract parameter names from the serve_route (e.g., {fontstack}, {range})
    params = CACHE_PARAM_PATTERN.findall(entry.serve_route)

    async def cache_route_handler(**kwargs: str) -> FileResponse:
        if (local_path := entry.local_path) is None:
            local_path = default_cache_dir()
        else:
            local_path = local_path.expanduser()

        # Build the local file path by replacing parameters
        local_file = local_path
        for param in params:
            if param in kwargs:
                local_file = local_file / kwargs[param]

        logger.info(f"Serving cached file from {local_file}")

        if not local_file.exists():
            assert entry.origin is not None
            # Build the remote URL by replacing parameters
            remote_url = entry.origin
            for param, value in kwargs.items():
                remote_url = remote_url.replace(f"{{{param}}}", value)

            logger.info(f"Downloading from {remote_url} to {local_file}")
            c = await state.http_client.get(remote_url)
            c.raise_for_status()
            local_file.parent.mkdir(parents=True, exist_ok=True)
            local_file.write_bytes(c.content)

        return FileResponse(path=local_file, media_type=entry.media_type)

    # Create explicit parameters for the function signature
    sig_params = [
        Parameter(
            name=param,
            kind=Parameter.POSITIONAL_OR_KEYWORD,
            annotation=str,
        )
        for param in params
    ]
    cache_route_handler.__signature__ = Signature(  # type: ignore
        parameters=sig_params,
        return_annotation=FileResponse,
    )

    return cache_route_handler

create_app

create_app(
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> FastAPI
Source code in packages/tangram_core/src/tangram_core/backend.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def create_app(
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> FastAPI:
    app = FastAPI(
        lifespan=partial(
            lifespan, backend_state=backend_state, loaded_plugins=loaded_plugins
        ),
        default_response_class=ORJSONResponse,
    )
    frontend_plugins = {}

    for plugin in loaded_plugins:
        for router in plugin.routers:
            app.include_router(router)

        if (frontend_path_resolved := resolve_frontend(plugin)) is not None:
            app.mount(
                f"/plugins/{plugin.dist_name}",
                StaticFiles(directory=str(frontend_path_resolved)),
                name=plugin.dist_name,
            )
            plugin_json_path = frontend_path_resolved / "plugin.json"
            if plugin_json_path.exists():
                try:
                    with plugin_json_path.open("rb") as f:
                        plugin_meta = json.load(f)

                    conf_backend = backend_state.config.plugins.get(
                        plugin.dist_name, {}
                    )
                    if to_frontend_conf := plugin.into_frontend_config_function:
                        conf_frontend = to_frontend_conf(conf_backend)
                    else:
                        conf_frontend = conf_backend

                    plugin_meta["config"] = conf_frontend
                    frontend_plugins[plugin.dist_name] = plugin_meta
                except Exception as e:
                    logger.error(
                        f"failed to read plugin.json for {plugin.dist_name}: {e}"
                    )

    # unlike v0.1 which uses `process.env`, v0.2 *compiles* the js so we no
    # no longer have access to it, so we selectively forward the config.
    @app.get("/config")
    async def get_frontend_config(
        state: Annotated[BackendState, Depends(get_state)],
    ) -> FrontendConfig:
        channel_cfg = state.config.channel
        if channel_cfg.public_url:
            channel_url = channel_cfg.public_url
        else:
            # for local/non-proxied setups, user must set a reachable host.
            # '0.0.0.0' is for listening, not connecting.
            host = "localhost" if channel_cfg.host == "0.0.0.0" else channel_cfg.host
            channel_url = f"http://{host}:{channel_cfg.port}"

        return FrontendConfig(
            channel=FrontendChannelConfig(url=channel_url),
            map=state.config.map,
        )

    @app.get("/manifest.json")
    async def get_manifest() -> ORJSONResponse:
        return ORJSONResponse(content={"plugins": frontend_plugins})

    # Cache mechanism - MUST be registered BEFORE the catch-all frontend mount
    for cache_entry in backend_state.config.cache.entries:
        logger.info(
            f"caching {cache_entry.origin} to {cache_entry.local_path} "
            f"and serving at {cache_entry.serve_route}"
        )
        route_handler = make_cache_route_handler(cache_entry, backend_state)

        logger.info(
            f"Registering route: GET {cache_entry.serve_route} with dynamic params"
        )
        app.add_api_route(
            cache_entry.serve_route,
            route_handler,
            methods=["GET"],
            name=f"cache-{cache_entry.serve_route.replace('/', '_')}",
        )

    if not (
        frontend_path := get_distribution_path("tangram_core") / "dist-frontend"
    ).is_dir():
        raise ValueError(
            f"error: frontend {frontend_path} was not found, "
            "did you run `pnpm i && pnpm run build`?"
        )
    app.mount("/", StaticFiles(directory=str(frontend_path), html=True), name="core")

    return app

run_channel_service async

run_channel_service(config: Config) -> None
Source code in packages/tangram_core/src/tangram_core/backend.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
async def run_channel_service(config: Config) -> None:
    from . import _core

    _core.init_tracing_stderr(config.core.log_level)

    rust_config = _core.ChannelConfig(
        host=config.channel.host,
        port=config.channel.port,
        redis_url=config.core.redis_url,
        jwt_secret=config.channel.jwt_secret,
        jwt_expiration_secs=config.channel.jwt_expiration_secs,
        id_length=config.channel.id_length,
    )
    await _core.run(rust_config)

run_services async

run_services(
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> AsyncGenerator[Task[None], None]
Source code in packages/tangram_core/src/tangram_core/backend.py
321
322
323
324
325
326
327
328
329
330
331
332
async def run_services(
    backend_state: BackendState,
    loaded_plugins: Iterable[Plugin],
) -> AsyncGenerator[asyncio.Task[None], None]:
    yield asyncio.create_task(run_channel_service(backend_state.config))

    for plugin in loaded_plugins:
        for _, service_func in sorted(
            plugin.services, key=lambda s: (s[0], s[1].__name__)
        ):
            yield asyncio.create_task(service_func(backend_state))
            logger.info(f"started service from plugin: {plugin.dist_name}")

run_server async

run_server(
    backend_state: BackendState,
    loaded_plugins: list[Plugin],
) -> None
Source code in packages/tangram_core/src/tangram_core/backend.py
335
336
337
338
339
340
341
342
343
344
async def run_server(backend_state: BackendState, loaded_plugins: list[Plugin]) -> None:
    app_instance = create_app(backend_state, loaded_plugins)
    server_config = uvicorn.Config(
        app_instance,
        host=backend_state.config.server.host,
        port=backend_state.config.server.port,
        log_config=get_log_config_dict(backend_state.config),
    )
    server = uvicorn.Server(server_config)
    await server.serve()

start_tasks async

start_tasks(config: Config) -> None
Source code in packages/tangram_core/src/tangram_core/backend.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
async def start_tasks(config: Config) -> None:
    loaded_plugins = load_enabled_plugins(config)

    async with AsyncExitStack() as stack:
        redis_client = await stack.enter_async_context(
            redis.from_url(config.core.redis_url)  # type: ignore
        )
        http_client = await stack.enter_async_context(httpx.AsyncClient(http2=True))
        state = BackendState(
            redis_client=redis_client, http_client=http_client, config=config
        )

        server_task = asyncio.create_task(run_server(state, loaded_plugins))
        service_tasks = [s async for s in run_services(state, loaded_plugins)]

        await asyncio.gather(server_task, *service_tasks)

get_log_config_dict

get_log_config_dict(config: Config) -> dict[str, Any]
Source code in packages/tangram_core/src/tangram_core/backend.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
def get_log_config_dict(config: Config) -> dict[str, Any]:
    def format_time(dt: datetime) -> str:
        return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ ")

    return {
        "version": 1,
        "disable_existing_loggers": False,
        "handlers": {
            "default": {
                "class": "rich.logging.RichHandler",
                "log_time_format": format_time,
                "omit_repeated_times": False,
            },
        },
        "root": {"handlers": ["default"], "level": config.core.log_level.upper()},
    }

config

HasTopbarUiConfig

Bases: Protocol

Source code in packages/tangram_core/src/tangram_core/config.py
 9
10
11
@runtime_checkable
class HasTopbarUiConfig(Protocol):
    topbar_order: int
topbar_order instance-attribute
topbar_order: int

HasSidebarUiConfig

Bases: Protocol

Source code in packages/tangram_core/src/tangram_core/config.py
14
15
16
@runtime_checkable
class HasSidebarUiConfig(Protocol):
    sidebar_order: int
sidebar_order instance-attribute
sidebar_order: int

ServerConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
19
20
21
22
@dataclass
class ServerConfig:
    host: str = "127.0.0.1"
    port: int = 2346
host class-attribute instance-attribute
host: str = '127.0.0.1'
port class-attribute instance-attribute
port: int = 2346

ChannelConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
25
26
27
28
29
30
31
32
33
34
@dataclass
class ChannelConfig:
    # TODO: we should make it clear that host:port is for the *backend* to
    # listen on, and not to be confused with the frontend.
    host: str = "127.0.0.1"
    port: int = 2347
    public_url: str | None = None
    jwt_secret: str = "secret"
    jwt_expiration_secs: int = 315360000  # 10 years
    id_length: int = 8
host class-attribute instance-attribute
host: str = '127.0.0.1'
port class-attribute instance-attribute
port: int = 2347
public_url class-attribute instance-attribute
public_url: str | None = None
jwt_secret class-attribute instance-attribute
jwt_secret: str = 'secret'
jwt_expiration_secs class-attribute instance-attribute
jwt_expiration_secs: int = 315360000
id_length class-attribute instance-attribute
id_length: int = 8

UrlConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
37
38
39
40
@dataclass
class UrlConfig:
    url: str
    type: str = "vector"
url instance-attribute
url: str
type class-attribute instance-attribute
type: str = 'vector'

SourceSpecification dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
43
44
45
46
@dataclass
class SourceSpecification:
    carto: UrlConfig | None = None
    protomaps: UrlConfig | None = None
carto class-attribute instance-attribute
carto: UrlConfig | None = None
protomaps class-attribute instance-attribute
protomaps: UrlConfig | None = None

StyleSpecification dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
49
50
51
52
53
54
@dataclass
class StyleSpecification:
    sources: SourceSpecification | None = None
    glyphs: str = "https://cdn.protomaps.com/fonts/pbf/{fontstack}/{range}.pbf"
    layers: list[Any] | None = None
    version: Literal[8] = 8
sources class-attribute instance-attribute
sources: SourceSpecification | None = None
glyphs class-attribute instance-attribute
glyphs: str = "https://cdn.protomaps.com/fonts/pbf/{fontstack}/{range}.pbf"
layers class-attribute instance-attribute
layers: list[Any] | None = None
version class-attribute instance-attribute
version: Literal[8] = 8

MapConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@dataclass
class MapConfig:
    style: str | StyleSpecification = (
        "https://basemaps.cartocdn.com/gl/voyager-gl-style/style.json"
    )
    attribution: str = (
        '&copy; <a href="https://www.openstreetmap.org/copyright">'
        "OpenStreetMap</a> contributors &copy; "
        '<a href="https://carto.com/attributions">CARTO</a>'
    )
    center_lat: float = 48.0
    center_lon: float = 7.0
    zoom: float = 4
    pitch: float = 0
    bearing: float = 0
    lang: str = "en"
    min_zoom: float = 0
    max_zoom: float = 24
    max_pitch: float = 70
    allow_pitch: bool = True
    allow_bearing: bool = True
style class-attribute instance-attribute
style: str | StyleSpecification = (
    "https://basemaps.cartocdn.com/gl/voyager-gl-style/style.json"
)
attribution class-attribute instance-attribute
attribution: str = '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors &copy; <a href="https://carto.com/attributions">CARTO</a>'
center_lat class-attribute instance-attribute
center_lat: float = 48.0
center_lon class-attribute instance-attribute
center_lon: float = 7.0
zoom class-attribute instance-attribute
zoom: float = 4
pitch class-attribute instance-attribute
pitch: float = 0
bearing class-attribute instance-attribute
bearing: float = 0
lang class-attribute instance-attribute
lang: str = 'en'
min_zoom class-attribute instance-attribute
min_zoom: float = 0
max_zoom class-attribute instance-attribute
max_zoom: float = 24
max_pitch class-attribute instance-attribute
max_pitch: float = 70
allow_pitch class-attribute instance-attribute
allow_pitch: bool = True
allow_bearing class-attribute instance-attribute
allow_bearing: bool = True

CoreConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
80
81
82
83
84
@dataclass
class CoreConfig:
    redis_url: str = "redis://127.0.0.1:6379"
    plugins: list[str] = field(default_factory=list)
    log_level: str = "INFO"
redis_url class-attribute instance-attribute
redis_url: str = 'redis://127.0.0.1:6379'
plugins class-attribute instance-attribute
plugins: list[str] = field(default_factory=list)
log_level class-attribute instance-attribute
log_level: str = 'INFO'

CacheEntry dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
87
88
89
90
91
92
93
94
95
96
@dataclass
class CacheEntry:
    origin: str | None = None
    """Origin URL. If None, the local file is served directly."""
    local_path: Path | None = None
    """Local path to cache the file."""
    serve_route: str = ""
    """Where to serve the file in FastAPI."""
    media_type: str = "application/octet-stream"
    """Media type for the response."""
origin class-attribute instance-attribute
origin: str | None = None

Origin URL. If None, the local file is served directly.

local_path class-attribute instance-attribute
local_path: Path | None = None

Local path to cache the file.

serve_route class-attribute instance-attribute
serve_route: str = ''

Where to serve the file in FastAPI.

media_type class-attribute instance-attribute
media_type: str = 'application/octet-stream'

Media type for the response.

CacheConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
 99
100
101
@dataclass
class CacheConfig:
    entries: list[CacheEntry] = field(default_factory=list)
entries class-attribute instance-attribute
entries: list[CacheEntry] = field(default_factory=list)

Config dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@dataclass
class Config:
    core: CoreConfig = field(default_factory=CoreConfig)
    server: ServerConfig = field(default_factory=ServerConfig)
    channel: ChannelConfig = field(default_factory=ChannelConfig)
    map: MapConfig = field(default_factory=MapConfig)
    plugins: dict[str, Any] = field(default_factory=dict)
    cache: CacheConfig = field(default_factory=CacheConfig)

    @classmethod
    def from_file(cls, config_path: Path) -> Config:
        if sys.version_info < (3, 11):
            import tomli as tomllib
        else:
            import tomllib
        from pydantic import TypeAdapter

        with open(config_path, "rb") as f:
            cfg_data = tomllib.load(f)

        config_adapter = TypeAdapter(cls)
        config = config_adapter.validate_python(cfg_data)
        return config
core class-attribute instance-attribute
core: CoreConfig = field(default_factory=CoreConfig)
server class-attribute instance-attribute
server: ServerConfig = field(default_factory=ServerConfig)
channel class-attribute instance-attribute
channel: ChannelConfig = field(
    default_factory=ChannelConfig
)
map class-attribute instance-attribute
map: MapConfig = field(default_factory=MapConfig)
plugins class-attribute instance-attribute
plugins: dict[str, Any] = field(default_factory=dict)
cache class-attribute instance-attribute
cache: CacheConfig = field(default_factory=CacheConfig)
from_file classmethod
from_file(config_path: Path) -> Config
Source code in packages/tangram_core/src/tangram_core/config.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@classmethod
def from_file(cls, config_path: Path) -> Config:
    if sys.version_info < (3, 11):
        import tomli as tomllib
    else:
        import tomllib
    from pydantic import TypeAdapter

    with open(config_path, "rb") as f:
        cfg_data = tomllib.load(f)

    config_adapter = TypeAdapter(cls)
    config = config_adapter.validate_python(cfg_data)
    return config

FrontendChannelConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
136
137
138
@dataclass
class FrontendChannelConfig:
    url: str
url instance-attribute
url: str

FrontendConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
141
142
143
144
@dataclass
class FrontendConfig:
    channel: FrontendChannelConfig
    map: MapConfig
channel instance-attribute
map instance-attribute
map: MapConfig

plugin

ServiceAsyncFunc module-attribute

ServiceAsyncFunc: TypeAlias = Callable[
    [BackendState], Awaitable[None]
]

ServiceFunc module-attribute

ServiceFunc: TypeAlias = (
    ServiceAsyncFunc | Callable[[BackendState], None]
)

Priority module-attribute

Priority: TypeAlias = int

IntoFrontendConfigFunction module-attribute

IntoFrontendConfigFunction: TypeAlias = Callable[
    [dict[str, Any]], Any
]

Lifespan module-attribute

Lifespan: TypeAlias = Callable[
    [BackendState], AsyncGenerator[None, None]
]

logger module-attribute

logger = getLogger(__name__)

Plugin dataclass

Stores the metadata and registered API routes, background services and frontend assets for a tangram plugin.

Packages should declare an entry point in the tangram_core.plugins group in their pyproject.toml pointing to an instance of this class.

Source code in packages/tangram_core/src/tangram_core/plugin.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@dataclass
class Plugin:
    """Stores the metadata and registered API routes, background services and
    frontend assets for a tangram plugin.

    Packages should declare an entry point in the `tangram_core.plugins` group
    in their `pyproject.toml` pointing to an instance of this class.
    """

    frontend_path: str | None = None
    """Path to the compiled frontend assets, *relative* to the distribution root
    (editable) or package root (wheel).
    """
    routers: list[APIRouter] = field(default_factory=list)
    into_frontend_config_function: IntoFrontendConfigFunction | None = None
    """Function to parse plugin-scoped backend configuration (within the
    `tangram.toml`) into a frontend-safe configuration object.

    If not specified, the backend configuration dict is passed as-is."""
    lifespan: Lifespan | None = None
    """Async context manager for plugin initialization and teardown."""
    services: list[tuple[Priority, ServiceAsyncFunc]] = field(
        default_factory=list, init=False
    )
    dist_name: str = field(init=False)
    """Name of the distribution (package) that provided this plugin, populated
    automatically during loading.
    """  # we do this so plugins can know their own package name if needed

    def register_service(
        self, priority: Priority = 0
    ) -> Callable[[ServiceFunc], ServiceFunc]:
        """Decorator to register a background service function.

        Services are long-running async functions that receive the BackendState
        and are started when the application launches.
        """

        def decorator(func: ServiceFunc) -> ServiceFunc:
            @functools.wraps(func)
            async def async_wrapper(backend_state: BackendState) -> None:
                if asyncio.iscoroutinefunction(func):
                    await func(backend_state)
                else:
                    await asyncio.to_thread(func, backend_state)

            self.services.append((priority, async_wrapper))
            return func

        return decorator
frontend_path class-attribute instance-attribute
frontend_path: str | None = None

Path to the compiled frontend assets, relative to the distribution root (editable) or package root (wheel).

routers class-attribute instance-attribute
routers: list[APIRouter] = field(default_factory=list)
into_frontend_config_function class-attribute instance-attribute
into_frontend_config_function: (
    IntoFrontendConfigFunction | None
) = None

Function to parse plugin-scoped backend configuration (within the tangram.toml) into a frontend-safe configuration object.

If not specified, the backend configuration dict is passed as-is.

lifespan class-attribute instance-attribute
lifespan: Lifespan | None = None

Async context manager for plugin initialization and teardown.

services class-attribute instance-attribute
services: list[tuple[Priority, ServiceAsyncFunc]] = field(
    default_factory=list, init=False
)
dist_name class-attribute instance-attribute
dist_name: str = field(init=False)

Name of the distribution (package) that provided this plugin, populated automatically during loading.

register_service
register_service(
    priority: Priority = 0,
) -> Callable[[ServiceFunc], ServiceFunc]

Decorator to register a background service function.

Services are long-running async functions that receive the BackendState and are started when the application launches.

Source code in packages/tangram_core/src/tangram_core/plugin.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def register_service(
    self, priority: Priority = 0
) -> Callable[[ServiceFunc], ServiceFunc]:
    """Decorator to register a background service function.

    Services are long-running async functions that receive the BackendState
    and are started when the application launches.
    """

    def decorator(func: ServiceFunc) -> ServiceFunc:
        @functools.wraps(func)
        async def async_wrapper(backend_state: BackendState) -> None:
            if asyncio.iscoroutinefunction(func):
                await func(backend_state)
            else:
                await asyncio.to_thread(func, backend_state)

        self.services.append((priority, async_wrapper))
        return func

    return decorator

scan_plugins

scan_plugins() -> EntryPoints
Source code in packages/tangram_core/src/tangram_core/plugin.py
78
79
def scan_plugins() -> importlib.metadata.EntryPoints:
    return importlib.metadata.entry_points(group="tangram_core.plugins")

load_plugin

load_plugin(entry_point: EntryPoint) -> Plugin | None

Instantiates the plugin object defined in the entry point and injects the name of the distribution into it.

Source code in packages/tangram_core/src/tangram_core/plugin.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def load_plugin(
    entry_point: importlib.metadata.EntryPoint,
) -> Plugin | None:
    """Instantiates the plugin object defined in the entry point
    and injects the name of the distribution into it."""
    try:
        plugin_instance = entry_point.load()
    except Exception as e:
        tb = traceback.format_exc()
        logger.error(
            f"failed to load plugin {entry_point.name}: {e}. {tb}"
            f"\n= help: does {entry_point.value} exist?"
        )
        return None
    if not isinstance(plugin_instance, Plugin):
        logger.error(f"entry point {entry_point.name} is not an instance of `Plugin`")
        return None
    if entry_point.dist is None:
        logger.error(f"could not determine distribution for plugin {entry_point.name}")
        return None
    # NOTE: we ignore `entry_point.name` for now and simply use the distribution's name
    # should we raise an error if they differ? not for now

    plugin_instance.dist_name = entry_point.dist.name
    return plugin_instance

redis

log module-attribute

log = getLogger(__name__)

StateT module-attribute

StateT = TypeVar('StateT')

Subscriber

Bases: ABC, Generic[StateT]

Source code in packages/tangram_core/src/tangram_core/redis.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class Subscriber(abc.ABC, Generic[StateT]):
    redis: Redis
    task: asyncio.Task[None]
    pubsub: PubSub

    def __init__(
        self, name: str, redis_url: str, channels: List[str], initial_state: StateT
    ):
        self.name = name
        self.redis_url: str = redis_url
        self.channels: List[str] = channels
        self.state: StateT = initial_state
        self._running = False

    async def subscribe(self) -> None:
        if self._running:
            log.warning("%s already running", self.name)
            return

        try:
            self.redis = await Redis.from_url(self.redis_url)
            self.pubsub = self.redis.pubsub()
            await self.pubsub.psubscribe(*self.channels)
        except RedisError as e:
            log.error("%s failed to connect to Redis: %s", self.name, e)
            raise

        async def listen() -> None:
            try:
                log.info("%s listening ...", self.name)
                async for message in self.pubsub.listen():
                    log.debug("message: %s", message)
                    if message["type"] == "pmessage":
                        await self.message_handler(
                            message["channel"].decode("utf-8"),
                            message["data"].decode("utf-8"),
                            message["pattern"].decode("utf-8"),
                            self.state,
                        )
            except asyncio.CancelledError:
                log.warning("%s cancelled", self.name)

        self._running = True

        self.task = asyncio.create_task(listen())
        log.info("%s task created, running ...", self.name)

    async def cleanup(self) -> None:
        if not self._running:
            return

        if self.task:
            log.debug("%s canceling task ...", self.name)
            self.task.cancel()
            try:
                log.debug("%s await task to finish ...", self.name)
                await self.task
                log.debug("%s task canceled", self.name)
            except asyncio.CancelledError as exc:
                log.error("%s task canceling error: %s", self.name, exc)
        if self.pubsub:
            await self.pubsub.unsubscribe()
        if self.redis:
            await self.redis.close()
        self._running = False

    def is_active(self) -> bool:
        """Return True if the subscriber is actively listening."""
        return self._running and self.task is not None and not self.task.done()

    @abc.abstractmethod
    async def message_handler(
        self, event: str, payload: str, pattern: str, state: StateT
    ) -> None:
        pass
redis instance-attribute
redis: Redis
task instance-attribute
task: Task[None]
pubsub instance-attribute
pubsub: PubSub
name instance-attribute
name = name
redis_url instance-attribute
redis_url: str = redis_url
channels instance-attribute
channels: List[str] = channels
state instance-attribute
state: StateT = initial_state
subscribe async
subscribe() -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def subscribe(self) -> None:
    if self._running:
        log.warning("%s already running", self.name)
        return

    try:
        self.redis = await Redis.from_url(self.redis_url)
        self.pubsub = self.redis.pubsub()
        await self.pubsub.psubscribe(*self.channels)
    except RedisError as e:
        log.error("%s failed to connect to Redis: %s", self.name, e)
        raise

    async def listen() -> None:
        try:
            log.info("%s listening ...", self.name)
            async for message in self.pubsub.listen():
                log.debug("message: %s", message)
                if message["type"] == "pmessage":
                    await self.message_handler(
                        message["channel"].decode("utf-8"),
                        message["data"].decode("utf-8"),
                        message["pattern"].decode("utf-8"),
                        self.state,
                    )
        except asyncio.CancelledError:
            log.warning("%s cancelled", self.name)

    self._running = True

    self.task = asyncio.create_task(listen())
    log.info("%s task created, running ...", self.name)
cleanup async
cleanup() -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def cleanup(self) -> None:
    if not self._running:
        return

    if self.task:
        log.debug("%s canceling task ...", self.name)
        self.task.cancel()
        try:
            log.debug("%s await task to finish ...", self.name)
            await self.task
            log.debug("%s task canceled", self.name)
        except asyncio.CancelledError as exc:
            log.error("%s task canceling error: %s", self.name, exc)
    if self.pubsub:
        await self.pubsub.unsubscribe()
    if self.redis:
        await self.redis.close()
    self._running = False
is_active
is_active() -> bool

Return True if the subscriber is actively listening.

Source code in packages/tangram_core/src/tangram_core/redis.py
81
82
83
def is_active(self) -> bool:
    """Return True if the subscriber is actively listening."""
    return self._running and self.task is not None and not self.task.done()
message_handler abstractmethod async
message_handler(
    event: str, payload: str, pattern: str, state: StateT
) -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
85
86
87
88
89
@abc.abstractmethod
async def message_handler(
    self, event: str, payload: str, pattern: str, state: StateT
) -> None:
    pass

tangram_core._core

ChannelConfig

host property writable

host: str

port property writable

port: int

redis_url property writable

redis_url: str

jwt_secret property writable

jwt_secret: str

jwt_expiration_secs property writable

jwt_expiration_secs: int

id_length property writable

id_length: int

__new__

__new__(
    host: str,
    port: int,
    redis_url: str,
    jwt_secret: str,
    jwt_expiration_secs: int,
    id_length: int,
) -> ChannelConfig

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

run

run(config: ChannelConfig) -> Any