Source code for tierproxy.client

from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any

import httpx

from tierproxy._internal.cache import ResponseCache
from tierproxy._internal.cookies import CookieJar
from tierproxy._internal.http import Transport
from tierproxy._internal.streaming import is_stream, sync_stream
from tierproxy.errors import AuthenticationError
from tierproxy.resources.health import HealthResource
from tierproxy.resources.me import MeResource
from tierproxy.resources.ratelimits import RateLimitsResource
from tierproxy.resources.usage import UsageResource
from tierproxy.resources.usage_recent import UsageRecentResource
from tierproxy.retry import RetryPolicy

if TYPE_CHECKING:
    from tierproxy._internal.cost import CostAttributor
    from tierproxy.proxy.selector import SmartSelector


[docs] class TierProxy: """Sync client for the tierproxy public REST API.""" def __init__( self, api_key: str | None = None, *, base_url: str = "https://gw.tierproxy.com:8444", timeout: float = 30.0, http_timeout: float = 30.0, max_retries: int = 3, retry_policy: RetryPolicy | None = None, http_client: httpx.Client | None = None, user_agent_suffix: str | None = None, routing: str | None = None, monthly_budget_usd: float | None = None, cache_ttl: float = 0.0, cache_max_size: int = 256, cache_max_response_size: int = 262144, auto_failover: bool = False, auto_failover_max_attempts: int = 3, ) -> None: key = api_key or os.environ.get("TIERPROXY_API_KEY") if not key: raise AuthenticationError("No api_key passed and TIERPROXY_API_KEY env var not set") retry = retry_policy or RetryPolicy(max_retries=max_retries) self._transport = Transport( api_key=key, base_url=base_url, timeout=timeout, retry=retry, ua_suffix=user_agent_suffix, http_client=http_client, is_async=False, ) self._me: MeResource | None = None self._usage: UsageResource | None = None self._health: HealthResource | None = None self._usage_recent: UsageRecentResource | None = None self._cost_attributor: CostAttributor | None = None self._rate_limits: RateLimitsResource | None = None self._pending_429_reports: set[str] = set() self._http_timeout = http_timeout self._routing = routing self._monthly_budget_usd = monthly_budget_usd self._cache_ttl = cache_ttl self._response_cache: ResponseCache | None = ( ResponseCache( max_size=cache_max_size, default_ttl=cache_ttl, max_response_size=cache_max_response_size, ) if cache_ttl > 0 else None ) self._selector: SmartSelector | None = None self._spent_usd_mtd: float = 0.0 self._budget_cache_ts: float = 0.0 self._auto_failover = auto_failover self._auto_failover_max_attempts = max(1, auto_failover_max_attempts) self._cookie_jar = CookieJar() if routing or auto_failover: from tierproxy.proxy.selector import SmartSelector self._selector = SmartSelector(self, strategy=routing or "balanced") # type: ignore[arg-type] @property def me(self) -> MeResource: if self._me is None: self._me = MeResource(self) return self._me @property def usage(self) -> UsageResource: if self._usage is None: self._usage = UsageResource(self) return self._usage @property def health(self) -> HealthResource: if self._health is None: self._health = HealthResource(self) return self._health @property def usage_recent(self) -> UsageRecentResource: if self._usage_recent is None: self._usage_recent = UsageRecentResource(self) return self._usage_recent @property def rate_limits(self) -> RateLimitsResource: if self._rate_limits is None: self._rate_limits = RateLimitsResource(self) return self._rate_limits @property def cookies(self) -> CookieJar: return self._cookie_jar
[docs] def cost_for(self, resp: httpx.Response) -> float | None: """Return USD cost for a response. Lazy 30s-cached fetch under the hood.""" if self._cost_attributor is None: from tierproxy._internal.cost import CostAttributor self._cost_attributor = CostAttributor(self) return self._cost_attributor.cost_for(resp)
[docs] def upstream_for(self, resp: httpx.Response) -> str | None: """Return upstream_id that served a response. Lazy 30s-cached fetch.""" if self._cost_attributor is None: from tierproxy._internal.cost import CostAttributor self._cost_attributor = CostAttributor(self) return self._cost_attributor.upstream_for(resp)
def close(self) -> None: client = self._transport._client if isinstance(client, httpx.Client): client.close() def __enter__(self) -> TierProxy: return self def __exit__(self, *_: Any) -> None: self.close() # -------- Level 1-2: proxy-through helpers --------
[docs] def request(self, method: str, url: str, **kwargs: Any) -> Any: """Make an HTTP request *through* the tierproxy gateway. Targeting kwargs (country, session_id, ...) are split out; everything else forwards to httpx (timeout, json, data, headers, params, ...). When ``stream=True`` is passed, returns a context manager yielding an ``httpx.Response`` whose body must be iterated (``iter_bytes``/``iter_text``). Cache, failover, and cookie persistence are skipped in stream mode. """ if is_stream(kwargs): return self._stream_request(method, url, **kwargs) cache = self._response_cache params = kwargs.get("params") if isinstance(kwargs.get("params"), dict) else None if cache is not None and method.upper() == "GET": cached = cache.get(method, url, params) if cached is not None: return cached resp = self._dispatch(method, url, **kwargs) if cache is not None and method.upper() == "GET" and 200 <= resp.status_code < 300: cache.set(method, url, params, resp, ttl=self._cache_ttl) return resp
def _dispatch(self, method: str, url: str, **kwargs: Any) -> httpx.Response: if not self._auto_failover or self._selector is None: return self._do_request(method, url, **kwargs) tried: set[str] = set() attempts = 0 last_resp: httpx.Response | None = None last_exc: Exception | None = None max_attempts = self._auto_failover_max_attempts while attempts < max_attempts: override: str | None = None if attempts == 0: chosen = self._selector.pick() else: nxt = self._selector.pick_next(tried) if nxt is None: break chosen = nxt override = chosen.upstream_id tried.add(override) attempts += 1 try: resp = self._do_request(method, url, _upstream_override=override, **kwargs) except (httpx.ConnectError, httpx.NetworkError, httpx.TimeoutException) as e: last_exc = e continue last_resp = resp if resp.status_code == 429 or 500 <= resp.status_code < 600: continue return resp if last_resp is not None: return last_resp assert last_exc is not None raise last_exc def _do_request( self, method: str, url: str, _upstream_override: str | None = None, **kwargs: Any, ) -> httpx.Response: from tierproxy._targeting import build_proxy, split_kwargs targeting, passthrough = split_kwargs(kwargs) host = httpx.URL(self._transport.base_url).host proxy = build_proxy(self._transport.api_key, host, 443, targeting) if _upstream_override is not None: proxy.upstream_hint = _upstream_override elif self._selector is not None and not proxy.upstream_hint: chosen = self._selector.pick() proxy.upstream_hint = chosen.upstream_id headers = passthrough.pop("headers", None) or {} headers = {**headers, **proxy.headers()} if self._pending_429_reports: headers["X-TierProxy-Report-429"] = ",".join(sorted(self._pending_429_reports)) self._pending_429_reports.clear() if (tls_fp := passthrough.pop("tls_fingerprint", None)) is not None: headers["X-TierProxy-TLS-Profile"] = str(tls_fp) if self._monthly_budget_usd is not None: self._guard_budget() session_id = targeting.get("session_id") cookies = self._cookie_jar.get(session_id) if session_id else None with httpx.Client(proxy=proxy.http_url(), timeout=self._http_timeout, cookies=cookies) as h: resp = h.request(method, url, headers=headers, **passthrough) if session_id: self._cookie_jar.update_from(session_id, resp) if resp.status_code == 429: target_host = httpx.URL(url).host if target_host: self._pending_429_reports.add(target_host) return resp def _stream_request(self, method: str, url: str, **kwargs: Any) -> Any: from tierproxy._targeting import build_proxy, split_kwargs targeting, passthrough = split_kwargs(kwargs) host = httpx.URL(self._transport.base_url).host proxy = build_proxy(self._transport.api_key, host, 443, targeting) if self._selector is not None and not proxy.upstream_hint: chosen = self._selector.pick() proxy.upstream_hint = chosen.upstream_id headers = passthrough.pop("headers", None) or {} headers = {**headers, **proxy.headers()} if (tls_fp := passthrough.pop("tls_fingerprint", None)) is not None: headers["X-TierProxy-TLS-Profile"] = str(tls_fp) if self._monthly_budget_usd is not None: self._guard_budget() return sync_stream( method, url, proxy=proxy.http_url(), timeout=self._http_timeout, headers=headers, **passthrough, ) def get(self, url: str, **kwargs: Any) -> Any: return self.request("GET", url, **kwargs) def post(self, url: str, **kwargs: Any) -> Any: return self.request("POST", url, **kwargs)
[docs] def session(self, **targeting_kwargs: Any) -> httpx.Client: """Return an httpx.Client preconfigured with targeting baked in. Useful when the caller wants to make many requests with the same targeting (e.g. one scraper job) without rebuilding the proxy every call. """ from tierproxy._targeting import build_proxy host = httpx.URL(self._transport.base_url).host proxy = build_proxy(self._transport.api_key, host, 443, targeting_kwargs) return httpx.Client( proxy=proxy.http_url(), headers=proxy.headers(), timeout=self._http_timeout, )
[docs] def target(self, **kwargs: Any) -> TargetedRequest: """Builder pattern: g.target(country='US').get('https://x.com').""" return TargetedRequest(self, kwargs)
# -------- Level 3: cost guard -------- def _guard_budget(self) -> None: """Fetches latest /v1/usage/me cost and refuses request if over budget. Cached for 60s. Caller can clear via ``client._budget_cache_ts = 0``. """ import time from tierproxy._guard import check_budget if time.time() - self._budget_cache_ts > 60: usage = self.usage.get() self._spent_usd_mtd = usage.total_cost_usd self._budget_cache_ts = time.time() avg_cost = 4.0 estimated = avg_cost * (1024 * 1024) / (1024**3) assert self._monthly_budget_usd is not None check_budget(estimated, self._spent_usd_mtd, self._monthly_budget_usd)
class TargetedRequest: """One-shot helper that captures targeting kwargs and forwards .get/.post/.request.""" def __init__(self, client: TierProxy, targeting: dict[str, Any]) -> None: self._client = client self._targeting = targeting def get(self, url: str, **kw: Any) -> Any: return self._client.get(url, **{**self._targeting, **kw}) def post(self, url: str, **kw: Any) -> Any: return self._client.post(url, **{**self._targeting, **kw}) def request(self, method: str, url: str, **kw: Any) -> Any: return self._client.request(method, url, **{**self._targeting, **kw})