Source code for tierproxy.resources.usage

from __future__ import annotations

from collections.abc import AsyncIterator, Iterator
from typing import TYPE_CHECKING, cast

import httpx
from pydantic import BaseModel, ConfigDict, Field

if TYPE_CHECKING:
    from tierproxy.async_client import AsyncTierProxy
    from tierproxy.client import TierProxy


[docs] class UsageDay(BaseModel): date: str bytes_up: int bytes_down: int cost_usd: float
[docs] class Usage(BaseModel): model_config = ConfigDict(populate_by_name=True) client_id: str from_: str = Field(default="", alias="from") to: str days: list[UsageDay] total_bytes: int total_cost_usd: float
[docs] class UsageDelta(BaseModel): ts: str total_bytes: int delta_bytes: int
class UsageResource: def __init__(self, client: TierProxy) -> None: self._client = client def get(self, *, from_: str | None = None, to: str | None = None) -> Usage: params = {k: v for k, v in {"from": from_, "to": to}.items() if v} data = self._client._transport.request("GET", "/v1/usage/me", params=params) return Usage.model_validate(data) def stream(self) -> Iterator[UsageDelta]: """Iterate live usage_delta events from the SSE stream until the gateway closes.""" from tierproxy._internal.sse import iter_sse client = cast(httpx.Client, self._client._transport._client) with client.stream( "GET", self._client._transport.base_url + "/v1/usage/me/stream", headers=self._client._transport._headers(), ) as resp: resp.raise_for_status() for event in iter_sse(resp): if event.event == "usage_delta": yield UsageDelta.model_validate_json(event.data) class AsyncUsageResource: def __init__(self, client: AsyncTierProxy) -> None: self._client = client async def get(self, *, from_: str | None = None, to: str | None = None) -> Usage: params = {k: v for k, v in {"from": from_, "to": to}.items() if v} data = await self._client._transport.arequest("GET", "/v1/usage/me", params=params) return Usage.model_validate(data) async def stream(self) -> AsyncIterator[UsageDelta]: from tierproxy._internal.sse import aiter_sse client = cast(httpx.AsyncClient, self._client._transport._client) async with client.stream( "GET", self._client._transport.base_url + "/v1/usage/me/stream", headers=self._client._transport._headers(), ) as resp: resp.raise_for_status() async for event in aiter_sse(resp): if event.event == "usage_delta": yield UsageDelta.model_validate_json(event.data)