Skip to content

Weather

tangram_weather

logger module-attribute

logger = getLogger(__name__)

router module-attribute

router = APIRouter(
    prefix="/weather",
    tags=["weather"],
    responses={404: {"description": "Not found"}},
)

plugin module-attribute

plugin = Plugin(
    frontend_path="dist-frontend", routers=[router]
)

get_weather async

get_weather() -> dict[str, str]
Source code in packages/tangram_weather/src/tangram_weather/__init__.py
23
24
25
@router.get("/")
async def get_weather() -> dict[str, str]:
    return {"message": "This is the weather plugin response"}

wind async

wind(
    isobaric: int, backend_state: InjectBackendState
) -> ORJSONResponse
Source code in packages/tangram_weather/src/tangram_weather/__init__.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@router.get("/wind")
async def wind(
    isobaric: int, backend_state: tangram_core.InjectBackendState
) -> ORJSONResponse:
    logger.info("fetching wind data for %s", isobaric)

    now = pd.Timestamp.now(tz="UTC").floor("1h")
    ds = await latest_arpege_data(backend_state.http_client, now)
    res = ds.sel(isobaricInhPa=isobaric, time=now.tz_convert(None))[["u", "v"]]

    u_attrs = res.data_vars["u"].attrs

    bounds = [
        u_attrs["GRIB_longitudeOfFirstGridPointInDegrees"],
        u_attrs["GRIB_latitudeOfLastGridPointInDegrees"],
        u_attrs["GRIB_longitudeOfLastGridPointInDegrees"],
        u_attrs["GRIB_latitudeOfFirstGridPointInDegrees"],
    ]

    u_data = res["u"].values
    v_data = res["v"].values

    valid_data_mask = ~np.isnan(u_data)

    min_val, max_val = -70.0, 70.0
    image_unscale = [min_val, max_val]
    value_range = max_val - min_val

    u_scaled = (np.nan_to_num(u_data, nan=0.0) - min_val) / value_range * 255
    v_scaled = (np.nan_to_num(v_data, nan=0.0) - min_val) / value_range * 255

    rgba_data = np.zeros((*u_data.shape, 4), dtype=np.uint8)
    rgba_data[..., 0] = u_scaled.astype(np.uint8)
    rgba_data[..., 1] = v_scaled.astype(np.uint8)
    rgba_data[..., 3] = np.where(valid_data_mask, 255, 0)

    image = Image.fromarray(rgba_data, "RGBA")
    buffer = io.BytesIO()
    image.save(buffer, format="PNG")
    img_str = base64.b64encode(buffer.getvalue()).decode("utf-8")
    image_data_uri = f"data:image/png;base64,{img_str}"

    response_content = {
        "imageDataUri": image_data_uri,
        "bounds": bounds,
        "imageUnscale": image_unscale,
    }

    return ORJSONResponse(content=response_content)

arpege

bare_url module-attribute

bare_url = (
    "https://object.data.gouv.fr/meteofrance-pnt/pnt/"
)

DEFAULT_LEVELS_37 module-attribute

DEFAULT_LEVELS_37 = [
    100,
    125,
    150,
    175,
    200,
    225,
    250,
    300,
    350,
    400,
    450,
    500,
    550,
    600,
    650,
    700,
    750,
    800,
    850,
    900,
    950,
    1000,
]

DEFAULT_IP1_FEATURES module-attribute

DEFAULT_IP1_FEATURES = ['u', 'v', 't', 'r']

tempdir module-attribute

tempdir = Path(gettempdir())

log module-attribute

log = getLogger(__name__)

download_with_progress async

download_with_progress(
    client: AsyncClient, url: str, file: Path
) -> None
Source code in packages/tangram_weather/src/tangram_weather/arpege.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
async def download_with_progress(
    client: httpx.AsyncClient, url: str, file: Path
) -> None:
    try:
        async with client.stream("GET", url) as r:
            if r.status_code != 200:
                raise httpx.HTTPStatusError(
                    f"Error downloading data from {url}", request=r.request, response=r
                )

            total_size = int(r.headers.get("Content-Length", 0))
            with file.open("wb") as buffer:
                with tqdm(
                    total=total_size,
                    unit="B",
                    unit_scale=True,
                    desc=url.split("/")[-1],
                ) as progress_bar:
                    first_chunk = True
                    async for chunk in r.aiter_bytes():
                        if first_chunk and chunk.startswith(b"<?xml"):
                            raise RuntimeError(
                                f"Error downloading data from {url}. "
                                "Check if the requested data is available."
                            )
                        first_chunk = False
                        await asyncio.to_thread(buffer.write, chunk)
                        progress_bar.update(len(chunk))
    except (httpx.RequestError, RuntimeError) as e:
        if file.exists():
            file.unlink()
        raise e

latest_data async

latest_data(
    client: AsyncClient,
    hour: Timestamp,
    model: str = "ARPEGE",
    resolution: Literal["025", "01"] = "025",
    package: Literal[
        "SP1", "SP2", "IP1", "IP2", "IP3", "IP4", "HP1"
    ] = "IP1",
    time_range: Literal[
        "000H024H",
        "025H048H",
        "049H072H",
        "073H102H",
        "000H012H",
        "013H024H",
        "025H036H",
        "037H048H",
        "049H060H",
        "061H072H",
        "073H084H",
        "085H096H",
        "097H102H",
    ] = "000H024H",
    recursion: int = 0,
) -> Dataset

Fetch the latest ARPEGE data for a given hour.

Source code in packages/tangram_weather/src/tangram_weather/arpege.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def latest_data(
    client: httpx.AsyncClient,
    hour: pd.Timestamp,
    model: str = "ARPEGE",
    resolution: Literal["025", "01"] = "025",
    package: Literal["SP1", "SP2", "IP1", "IP2", "IP3", "IP4", "HP1"] = "IP1",
    time_range: Literal[
        "000H024H",  # on the 0.25 degree grid
        "025H048H",  # on the 0.25 degree grid
        "049H072H",  # on the 0.25 degree grid
        "073H102H",  # on the 0.25 degree grid
        "000H012H",  # on the 0.1 degree grid
        "013H024H",  # on the 0.1 degree grid
        "025H036H",  # on the 0.1 degree grid
        "037H048H",  # on the 0.1 degree grid
        "049H060H",  # on the 0.1 degree grid
        "061H072H",  # on the 0.1 degree grid
        "073H084H",  # on the 0.1 degree grid
        "085H096H",  # on the 0.1 degree grid
        "097H102H",  # on the 0.1 degree grid
    ] = "000H024H",
    recursion: int = 0,
) -> xr.Dataset:
    """
    Fetch the latest ARPEGE data for a given hour.
    """
    # let's give them time to upload data to the repo
    runtime = (hour - pd.Timedelta("2h")).floor("6h")

    url = f"{bare_url}{runtime.isoformat()}/"
    url += f"{model.lower()}/{resolution}/{package}/"
    filename = f"{model.lower()}__{resolution}__{package}__"
    filename += f"{time_range}__{runtime.isoformat()}.grib2"
    filename = filename.replace("+00:00", "Z")
    url += filename
    url = url.replace("+00:00", "Z")

    if not (tempdir / filename).exists():
        # If the file does not exist, we try to download it.
        try:
            await download_with_progress(client, url, tempdir / filename)
        except Exception:
            (tempdir / filename).unlink(missing_ok=True)  # remove the file if it exists
            # If the download fails, we try to fetch the latest data
            # (or survive with older data we may have in the /tmp directory)
            if recursion >= 3:
                raise  # do not insist too much in history
            return await latest_data(
                client,
                hour - pd.Timedelta("6h"),
                model,
                resolution,
                package,
                time_range,
                recursion + 1,
            )

    def _load_and_process_dataset() -> xr.Dataset:
        log.info(f"Loading dataset from {tempdir / filename}")
        ds = xr.open_dataset(
            tempdir / filename,
            engine="cfgrib",
            backend_kwargs={
                "filter_by_keys": {
                    "typeOfLevel": "isobaricInhPa",
                    "level": DEFAULT_LEVELS_37,
                }
            },
        )
        ds = ds.assign(step=ds.time + ds.step).drop_vars("time")
        ds = ds.rename(step="time")
        return ds  # type: ignore

    return await asyncio.to_thread(_load_and_process_dataset)