diff --git a/src/dlhdhr/app.py b/src/dlhdhr/app.py index 71b2d16..9fd1b21 100644 --- a/src/dlhdhr/app.py +++ b/src/dlhdhr/app.py @@ -10,6 +10,8 @@ from starlette.responses import JSONResponse, Response, StreamingResponse from dlhdhr import config from dlhdhr.dlhd import DLHDClient from dlhdhr.tuner import TunerManager, TunerNotFoundError +from dlhdhr.xmltv import generate_xmltv +from dlhdhr.zap2it import Zap2it def get_public_url(request: Request, path: str) -> str: @@ -159,20 +161,10 @@ async def xmltv_xml(request: Request) -> Response: ) dlhd = cast(DLHDClient, request.app.state.dlhd) + zap2it = cast(Zap2it, request.app.state.zap2it) dlhd_channels = await dlhd.get_channels() - - response = '' - - for channel in dlhd_channels: - name = saxutils.escape(channel.name) - response += f'' - response += f'{name}' - response += f"{channel.number}" - response += "" - response += "" - - return Response(response, media_type="application/xml; charset=utf-8") + return Response(await generate_xmltv(dlhd_channels, zap2it), media_type="application/xml; charset=utf-8") async def iptv_m3u(request: Request) -> Response: @@ -193,10 +185,12 @@ async def iptv_m3u(request: Request) -> Response: def create_app() -> Starlette: dlhd_client = DLHDClient() tuner_manager = TunerManager() + zap2it = Zap2it() app = Starlette() app.state.dlhd = dlhd_client app.state.tuners = tuner_manager + app.state.zap2it = zap2it app.add_route("/discover.json", discover_json) app.add_route("/lineup_status.json", lineup_status_json) app.add_route("/listings.json", listings_json) diff --git a/src/dlhdhr/config.py b/src/dlhdhr/config.py index a03fe31..ef39016 100644 --- a/src/dlhdhr/config.py +++ b/src/dlhdhr/config.py @@ -22,3 +22,8 @@ CHANNEL_ALLOW: set[str] | None = _set_or_none("DLHDHR_CHANNEL_ALLOW") EPG_PROVIDER: str | None = os.getenv("DLHDHR_EPG_PROVIDER") EPG_BEST_XMLTV_URL: str | None = os.getenv("DLHDHR_EPG_BEST_XMLTV_URL") + +ZAP2IT_POSTAL_CODE: str = os.getenv("DLHDHR_ZAP2IT_POSTAL_CODE", "10001") +ZAP2IT_REFRESH_DELAY: int = int(os.getenv("DLHDHR_ZAP2IT_REFRESH_DELAY", "3600")) +ZAP2IT_LINEUP_ID: str = os.getenv("DLHDHR_ZAP2IT_LINEUP_ID", "USA-NY31519-DEFAULT") +ZAP2IT_HEADEND_ID: str = os.getenv("DLHDHR_ZAP2IT_HEADEND_ID", "NY31519") diff --git a/src/dlhdhr/dlhd.py b/src/dlhdhr/dlhd.py index 577cf54..6b9c794 100644 --- a/src/dlhdhr/dlhd.py +++ b/src/dlhdhr/dlhd.py @@ -9,6 +9,7 @@ import m3u8 from dlhdhr import config from dlhdhr.tvg_id import get_tvg_id +from dlhdhr.zap2it import get_channel_call_sign @dataclass(frozen=True) @@ -20,6 +21,10 @@ class DLHDChannel: def tvg_id(self) -> str | None: return get_tvg_id(self.number) + @property + def call_sign(self) -> str | None: + return get_channel_call_sign(self.number) + @property def playlist_m3u8(self) -> str: return f"/channel/{self.number}/playlist.m3u8" diff --git a/src/dlhdhr/xmltv.py b/src/dlhdhr/xmltv.py new file mode 100644 index 0000000..4c05cbb --- /dev/null +++ b/src/dlhdhr/xmltv.py @@ -0,0 +1,56 @@ +from xml.etree.ElementTree import Element, SubElement, tostring + +from dlhdhr.dlhd import DLHDChannel +from dlhdhr.zap2it import Zap2it + + +async def generate_xmltv(channels: list[DLHDChannel], zap2it: Zap2it) -> bytes: + tv = Element("tv", attrib={"generator-info-name": "dlhdhr"}) + + for channel in channels: + ch_node = SubElement(tv, "channel", attrib={"id": channel.number}) + SubElement(ch_node, "display-name", attrib={"lang": "en"}).text = channel.name + SubElement(ch_node, "lcn").text = channel.number + + if not channel.call_sign: + continue + + z_channel = await zap2it.get_channel(channel.call_sign) + + if not z_channel: + continue + + if not channel.tvg_id: + continue + + for event in z_channel.events: + start_time = event.start_time.strftime("%Y%m%d%H%M%S %z") + end_time = event.start_time.strftime("%Y%m%d%H%M%S %z") + + programme = SubElement( + tv, "programme", attrib={"start": start_time, "stop": end_time, "channel": channel.tvg_id} + ) + if event.program.title: + SubElement(programme, "title", attrib={"lang": "en"}).text = event.program.title + if event.program.short_desc: + SubElement( + programme, "desc", attrib={"lang": "en", "desc": event.program.short_desc} + ).text = event.program.short_desc + + if event.program.release_year: + SubElement(programme, "date").text = event.program.release_year + + if event.rating: + rating = SubElement(programme, "rating", attrib={"system": "MPAA"}) + SubElement(rating, "value").text = event.rating + + if event.program.season or event.program.episode: + e_id = ".".join([event.program.season or "", event.program.episode or "", ""]) + SubElement(programme, "episode-num", attrib={"system": "xmltv_ns"}).text = e_id + + if event.thumbnail: + SubElement(programme, "icon", attrib={"src": event.thumbnail}) + + for tag in event.tags: + SubElement(programme, "category", attrib={"lang": "en"}).text = tag + return tostring(tv) diff --git a/src/dlhdhr/zap2it.py b/src/dlhdhr/zap2it.py new file mode 100644 index 0000000..1482470 --- /dev/null +++ b/src/dlhdhr/zap2it.py @@ -0,0 +1,206 @@ +import datetime +from dataclasses import dataclass, field +import time + +import httpx + +from dlhdhr import config + + +# Mapping of DLHD channel number to zap2it call sign +_MAPPING = dict( + [ + ("44", "ESPN"), + ("45", "ESPN2"), + ("66", "TUDN"), + ("295", "TOON"), + ("298", "FXX"), + ("301", "FREEFRM"), + ("303", "AMC"), + ("305", "BBCA"), + ("306", "BET"), + ("307", "BRAVO"), + ("309", "CNBC"), + ("310", "COMEDY"), + ("312", "DISN"), + ("316", "ESPNU"), + ("317", "FX"), + ("319", "GSN"), + ("321", "HBO"), + ("325", "ION"), + ("326", "LIFE"), + ("327", "MSNBC"), + ("329", "NICJR"), + ("330", "NIK"), + ("331", "OWN"), + ("333", "SHOW"), + ("334", "PAR"), + ("335", "STARZ"), + ("336", "TBS"), + ("337", "TLC"), + ("338", "TNT"), + ("339", "TOON"), + ("340", "TRAV"), + ("343", "USA"), + ("344", "VH1"), + ("345", "CNN"), + ("371", "MTV"), + ("373", "SYFY"), + ("381", "FXM"), + ("382", "HGTV"), + ("647", "CMTV"), + ("651", "DEST"), + ("658", "SUNDANC"), + ("665", "FYISD"), + ("689", "HBO2"), + ("691", "HBOF"), + ("693", "HBOSIG"), + ("697", "COOK"), + ("745", "NGC"), + ("766", "ABC"), + ] +) + + +def get_channel_call_sign(channel_number: str) -> str | None: + return _MAPPING.get(channel_number) + + +class Zap2it: + _BASE_URL = "https://tvlistings.zap2it.com/api/" + _listings: dict[str, "Zap2it.Channel"] + _last_fetch: float = 0 + + @dataclass + class Program: + title: str + id: str + short_desc: str + season: str | None + release_year: str | None + episode: str | None + episode_title: str | None + series_id: str | None + + @dataclass + class Event: + duration: int + start_time: datetime.datetime + end_time: datetime.datetime + thumbnail: str + series_id: str + rating: str + tags: list[str] + program: "Zap2it.Program" + + @dataclass + class Channel: + call_sign: str + name: str + number: str + id: str + thumbnail: str + events: list["Zap2it.Event"] = field(default_factory=list) + + def __init__(self): + self._listings = {} + + def _get_client(self) -> httpx.AsyncClient: + return httpx.AsyncClient( + base_url=self._BASE_URL, + timeout=2.0, + verify=True, + max_redirects=1, + headers={ + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:120.0) Gecko/20100101 Firefox/120.0", + "Referer": "https://tvlistings.zap2it.com/?aid=gapzap", + "Accept": "application/json", + }, + ) + + def _cleanup_listings(self) -> None: + now = datetime.datetime.now(datetime.UTC) + + updated: dict[str, "Zap2it.Channel"] = {} + for channel in self._listings.values(): + channel.events = [evt for evt in channel.events if evt.end_time > now] + if channel.events: + updated[channel.call_sign] = channel + self._listings = updated + + async def _refresh_listings(self) -> list["Zap2it.Channel"]: + self._cleanup_listings() + + now = time.time() + if self._listings and now - self._last_fetch > config.ZAP2IT_REFRESH_DELAY: + return list(self._listings.values()) + + params = { + "lineupId": config.ZAP2IT_LINEUP_ID, + "timespan": "6", + "headendId": config.ZAP2IT_HEADEND_ID, + "country": "USA", + "timezone": "", + "device": "X", + "postalCode": config.ZAP2IT_POSTAL_CODE, + "isOverride": "true", + "time": str(int(time.time())), + "pref": "16,256", + "userId": "-", + "aid": "gapzap", + "languagecode": "en-us", + } + + now = datetime.datetime.now(datetime.UTC) + async with self._get_client() as client: + res = await client.get("/grid", params=params) + res.raise_for_status() + + data = res.json() + + for ch_data in data["channels"]: + call_sign = ch_data["callSign"] + if call_sign in self._listings: + channel = self._listings[call_sign] + else: + channel = self.Channel( + call_sign=call_sign, + name=ch_data["affiliateName"], + number=ch_data["channelNo"], + id=ch_data["id"], + thumbnail=ch_data["thumbnail"], + ) + self._listings[call_sign] = channel + + for evt_data in ch_data["events"]: + end_time = datetime.datetime.fromisoformat(evt_data["endTime"]) + if end_time < now: + continue + + event = self.Event( + duration=evt_data["duration"], + rating=evt_data["rating"], + tags=evt_data["tags"], + thumbnail=f"//zap2it.tmsimg.com/assets/{evt_data['thumbnail']}.jpg?w=165", + series_id=evt_data["seriesId"], + start_time=datetime.datetime.fromisoformat(evt_data["startTime"]), + end_time=end_time, + program=self.Program( + title=evt_data["program"]["title"], + id=evt_data["program"]["id"], + short_desc=evt_data["program"]["shortDesc"], + season=evt_data["program"]["season"], + release_year=evt_data["program"]["releaseYear"], + episode=evt_data["program"]["episode"], + episode_title=evt_data["program"]["episodeTitle"], + series_id=evt_data["program"]["seriesId"], + ), + ) + channel.events.append(event) + + return list(self._listings.values()) + + async def get_channel(self, call_sign: str) -> Channel | None: + await self._refresh_listings() + + return self._listings.get(call_sign)