ha-2gis-weather/custom_components/two_gis_weather/api.py
2025-11-04 11:21:01 +06:00

67 lines
2.1 KiB
Python

from __future__ import annotations
import uuid
from typing import Any
import async_timeout
from aiohttp import ClientResponse
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import API_BASE, DEFAULT_USER_AGENT, REQUEST_TIMEOUT
class TwoGisWeatherClient:
def __init__(
self,
hass: HomeAssistant,
api_key: str,
latitude: float,
longitude: float,
language: str = "en-US",
user_agent: str = DEFAULT_USER_AGENT,
) -> None:
self.hass = hass
self.api_key = api_key
self.latitude = latitude
self.longitude = longitude
self.language = language
self.user_agent = user_agent
self._last_modified: str | None = None
def _headers(self) -> dict[str, str]:
headers: dict[str, str] = {
"User-Agent": self.user_agent,
"x-request-id": uuid.uuid4().hex,
"Accept-Language": self.language,
"Accept": "application/json",
}
if self._last_modified:
headers["If-Modified-Since"] = self._last_modified
return headers
async def async_get_current(self) -> dict[str, Any] | None:
session = async_get_clientsession(self.hass)
url = f"{API_BASE}/forecast"
params = {
"lon": str(self.longitude),
"lat": str(self.latitude),
"key": self.api_key,
}
async with async_timeout.timeout(REQUEST_TIMEOUT):
resp: ClientResponse
async with session.get(url, params=params, headers=self._headers()) as resp:
if resp.status == 304:
return None
resp.raise_for_status()
# Track Last-Modified for conditional requests
last_mod = resp.headers.get("Last-Modified")
if last_mod:
self._last_modified = last_mod
return await resp.json(content_type=None)
@property
def last_modified(self) -> str | None:
return self._last_modified