5 Commits

7 changed files with 785 additions and 825 deletions
Split View
  1. +2
    -0
      src/dlhdhr/config.py
  2. +624
    -807
      src/dlhdhr/dlhd/channels.py
  3. +13
    -3
      src/dlhdhr/epg/__init__.py
  4. +114
    -0
      src/dlhdhr/epg/epgsky.py
  5. +15
    -12
      src/dlhdhr/epg/program.py
  6. +17
    -2
      src/dlhdhr/epg/zap2it.py
  7. +0
    -1
      src/dlhdhr/epg/zaptv.py

+ 2
- 0
src/dlhdhr/config.py View File

@ -22,3 +22,5 @@ COUNTRY_ALLOW: set[str] | None = _set_or_none("DLHDHR_COUNTRY_ALLOW")
ZAP2IT_REFRESH_DELAY: int = int(os.getenv("DLHDHR_ZAP2IT_REFRESH_DELAY", "3600"))
ZAPTV_REFRESH_DELAY: int = int(os.getenv("DLHDHR_ZAPTV_REFRESH_DELAY", "3600"))
EPGSKY_REFRESH_DELAY: int = int(os.getenv("DLHDHR_EPGSKY_REFRESH_DELAY", "3600"))
EPGSKY_LOCATION_ID: int = int(os.getenv("DLHDHR_EPGSKY_LOCATION_ID", "1"))

+ 624
- 807
src/dlhdhr/dlhd/channels.py
File diff suppressed because it is too large
View File


+ 13
- 3
src/dlhdhr/epg/__init__.py View File

@ -7,10 +7,12 @@ from dlhdhr.dlhd import DLHDChannel
from dlhdhr.epg.zap2it import Zap2it
from dlhdhr.epg.program import Program
from dlhdhr.epg.zaptv import ZapTV
from dlhdhr.epg.epgsky import EPGSky
@dataclass()
class EPG:
epgsky: EPGSky = field(default_factory=EPGSky)
zap2it: Zap2it = field(default_factory=Zap2it)
zaptv: ZapTV = field(default_factory=ZapTV)
@ -18,22 +20,30 @@ class EPG:
if channel.country_code == "us":
return await self.zap2it.get_channel_programs(channel)
elif channel.country_code == "uk":
if channel.epgsky_id:
return await self.epgsky.get_channel_programs(channel)
return await self.zaptv.get_channel_programs(channel)
return []
async def get_channel_icon_from_epg(self, channel: DLHDChannel) -> str | None:
if channel.country_code == "us":
return await self.zap2it.get_channel_icon(channel)
elif channel.country_code == "uk":
if channel.epgsky_id:
return self.epgsky.get_channel_icon(channel)
return None
async def generate_xmltv(self, channels: Iterable[DLHDChannel]) -> bytes:
tv = Element("tv", attrib={"generator-info-name": "dlhdhr"})
channels = [c for c in channels if c.xmltv_id]
for channel in channels:
tv.append(channel.to_xmltv())
tv.append(channel.to_xmltv(thumbnail=await self.get_channel_icon_from_epg(channel)))
programs = await self.get_channel_programs(channel)
# Note: The order of the elements in the <programme /> matters
# title, desc, date, category, icon, episode-num, rating
for program in programs:
node = program.to_xmltv(channel)
if node:


+ 114
- 0
src/dlhdhr/epg/epgsky.py View File

@ -0,0 +1,114 @@
import datetime
from dataclasses import dataclass, field
import time
import httpx
from dlhdhr import config
from dlhdhr.dlhd.channels import DLHDChannel, get_channels
from dlhdhr.epg.program import Program
@dataclass()
class EPGSky:
_BASE_URL = "https://awk.epgsky.com/hawk/linear"
_listings: dict[str, Program] = field(default_factory=dict)
_last_fetch: float = 0
def _get_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=self._BASE_URL,
timeout=5.0,
verify=True,
max_redirects=1,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:120.0) Gecko/20100101 Firefox/120.0",
"Origin": "https://www.sky.com",
"Referer": "https://www.sky.com/",
"Accept": "application/json",
},
)
def _cleanup_listings(self) -> None:
now = datetime.datetime.now(datetime.UTC)
cutoff = now - datetime.timedelta(hours=3)
updated: dict[str, list[Program]] = {}
for epgsky_id, programs in self._listings.items():
updated_programs = [p for p in programs if p.end_time > cutoff]
if updated_programs:
updated[epgsky_id] = updated_programs
self._listings = updated
async def _fetch_listings(self) -> dict[str, list[Program]]:
listings: dict[str, list[Program]] = {}
now = datetime.datetime.now(datetime.UTC)
cutoff = now - datetime.timedelta(hours=3)
async with self._get_client() as client:
channels: list[str] = [c.epgsky_id for c in get_channels() if c.epgsky_id]
date = now.strftime("%Y%m%d")
for i in range(0, len(channels), 20):
services = channels[i : i + 20]
res = await client.get(f"/schedule/{date}/{','.join(services)}")
res.raise_for_status()
data = res.json()
for channel in data["schedule"]:
programs = []
for event in channel["events"]:
start_time = datetime.datetime.fromtimestamp(event["st"], datetime.UTC)
end_time = start_time + datetime.timedelta(seconds=event["d"])
if end_time < cutoff:
continue
programs.append(
Program(
start_time=start_time,
end_time=end_time,
title=event["t"],
subtitle=None,
description=event.get("sy") or "",
season=event.get("seasonnumber") or None,
episode=event.get("episodenumber") or None,
tags=[],
release_year=None,
thumbnail=None,
rating=None,
)
)
listings[channel["sid"]] = sorted(programs, key=lambda p: p.start_time)
return listings
async def _refresh_listings(self) -> dict[str, list[Program]]:
self._cleanup_listings()
now = time.time()
if self._listings and now - self._last_fetch > config.EPGSKY_REFRESH_DELAY:
return self._listings
programs = await self._fetch_listings()
for code, programs in programs.items():
if code in self._listings:
self._listings[code].extend(programs)
else:
self._listings[code] = programs
return self._listings
async def get_channel_programs(self, channel: DLHDChannel) -> list[Program]:
if not channel.epgsky_id:
return []
await self._refresh_listings()
if channel.epgsky_id not in self._listings:
return []
return self._listings[channel.epgsky_id]
def get_channel_icon(self, channel: DLHDChannel) -> str | None:
if not channel.epgsky_id:
return None
return f"https://d2n0069hmnqmmx.cloudfront.net/epgdata/1.0/newchanlogos/80/35/skychb{channel.epgsky_id}.png"

+ 15
- 12
src/dlhdhr/epg/program.py View File

@ -1,5 +1,5 @@
import datetime
from dataclasses import dataclass
from dataclasses import dataclass, field
from xml.etree.ElementTree import Element, SubElement
from dlhdhr.dlhd import DLHDChannel
@ -17,13 +17,14 @@ class Program:
end_time: datetime.datetime
title: str
description: str
tags: list[str]
subtitle: str | None
thumbnail: str | None
season: int | None
episode: int | None
rating: Rating | None
release_year: str | None
tags: list[str] = field(default_factory=list)
subtitle: str | None = None
thumbnail: str | None = None
season: int | None = None
episode: int | None = None
rating: Rating | None = None
release_year: str | None = None
dd_progid: str | None = None
@property
def duration_minutes(self) -> int:
@ -54,14 +55,16 @@ class Program:
if self.thumbnail:
SubElement(programme, "icon", attrib={"src": self.thumbnail})
if self.season or self.episode:
season_id = self.season or ""
episode_id = self.episode or ""
SubElement(programme, "episode-num", attrib={"system": "xmltv_ns"}).text = f"{season_id}.{episode_id}."
if self.season and self.episode:
if self.season and self.episode:
SubElement(
programme, "episode-num", attrib={"system": "xmltv_ns"}
).text = f"{self.season}.{self.episode}."
SubElement(
programme, "episode-num", attrib={"system": "onscreen"}
).text = f"S{self.season} E{self.episode}"
if self.dd_progid:
SubElement(programme, "episode-num", attrib={"system": "dd_progid"}).text = self.dd_progid
if self.rating:
rating = SubElement(programme, "rating", attrib={"system": self.rating.system})


+ 17
- 2
src/dlhdhr/epg/zap2it.py View File

@ -15,6 +15,7 @@ class Zap2it:
_BASE_URL = "https://tvlistings.zap2it.com/api/"
_listings: dict[str, Program] = field(default_factory=dict)
_last_fetch: float = 0
_channel_icons: dict[str, str] = field(default_factory=dict)
def _get_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(
@ -77,6 +78,12 @@ class Zap2it:
if call_sign not in events:
events[call_sign] = {}
if ch_data.get("thumbnail"):
thumbnail = ch_data["thumbnail"]
if thumbnail.startswith("//"):
thumbnail = f"https:{thumbnail}"
self._channel_icons[call_sign] = thumbnail
for evt in ch_data["events"]:
key = (evt["startTime"], evt["endTime"])
if key not in events[call_sign]:
@ -100,8 +107,9 @@ class Zap2it:
title=evt_data["program"]["title"],
subtitle=evt_data["program"].get("episodeTitle") or None,
description=evt_data["program"]["shortDesc"],
season=evt_data["program"]["season"],
episode=evt_data["program"]["episode"],
season=evt_data["program"]["season"] or None,
episode=evt_data["program"]["episode"] or None,
dd_progid=evt_data["program"].get("tmsId"),
tags=evt_data["tags"],
release_year=evt_data["program"]["releaseYear"],
thumbnail=f"https://zap2it.tmsimg.com/assets/{evt_data['thumbnail']}.jpg?w=165",
@ -149,3 +157,10 @@ class Zap2it:
return []
return self._listings[channel.call_sign]
async def get_channel_icon(self, channel: DLHDChannel) -> str | None:
if not channel.call_sign:
return None
await self._refresh_listings()
return self._channel_icons.get(channel.call_sign)

+ 0
- 1
src/dlhdhr/epg/zaptv.py View File

@ -7,7 +7,6 @@ import httpx
from dlhdhr import config
from dlhdhr.dlhd.channels import DLHDChannel
from dlhdhr.epg.program import Program
from dlhdhr.epg.program import Rating
@dataclass()


Loading…
Cancel
Save