Browse Source

refactor epg generator and update channel mappings

main
Brett Langdon 2 years ago
parent
commit
5b98c63d51
No known key found for this signature in database GPG Key ID: 9BAD4322A65AD78B
8 changed files with 618 additions and 468 deletions
  1. +7
    -32
      src/dlhdhr/app.py
  2. +0
    -6
      src/dlhdhr/config.py
  3. +381
    -219
      src/dlhdhr/dlhd/channels.py
  4. +40
    -0
      src/dlhdhr/epg/__init__.py
  5. +58
    -0
      src/dlhdhr/epg/program.py
  6. +132
    -0
      src/dlhdhr/epg/zap2it.py
  7. +0
    -61
      src/dlhdhr/xmltv.py
  8. +0
    -150
      src/dlhdhr/zap2it.py

+ 7
- 32
src/dlhdhr/app.py View File

@ -11,8 +11,7 @@ from starlette.responses import JSONResponse, Response, StreamingResponse
from dlhdhr import config
from dlhdhr.dlhd import DLHDClient
from dlhdhr.tuner import TunerManager, TunerNotFoundError
from dlhdhr.xmltv import generate_xmltv
from dlhdhr.zap2it import Zap2it
from dlhdhr.epg import EPG
def get_public_url(request: Request, path: str) -> str:
@ -91,6 +90,7 @@ async def channel_proxy(request: Request) -> Response:
async def listings_json(request: Request) -> JSONResponse:
dlhd = cast(DLHDClient, request.app.state.dlhd)
channels = sorted(await dlhd.get_channels(), key=lambda c: int(c.number))
return JSONResponse(
[
{
@ -99,6 +99,7 @@ async def listings_json(request: Request) -> JSONResponse:
"URL": get_public_url(request, channel.channel_proxy),
}
for channel in channels
if not channel.country_code
]
)
@ -135,37 +136,11 @@ async def lineup_status_json(_: Request) -> JSONResponse:
async def xmltv_xml(request: Request) -> Response:
if config.EPG_PROVIDER == "epg.best":
if not config.EPG_BEST_XMLTV_URL:
return Response("", status_code=404)
async def _generator():
if not config.EPG_BEST_XMLTV_URL:
return
async with httpx.AsyncClient() as client:
async with client.stream("GET", config.EPG_BEST_XMLTV_URL) as res:
async for chunk in res.aiter_bytes():
yield chunk
headers = {}
if config.EPG_BEST_XMLTV_URL.endswith(".gz"):
headers["Content-Encoding"] = "gzip"
return StreamingResponse(
_generator(),
status_code=200,
media_type="application/xml; charset=utf-8",
headers={
"Content-Encoding": "gzip",
},
)
dlhd = cast(DLHDClient, request.app.state.dlhd)
zap2it = cast(Zap2it, request.app.state.zap2it)
epg = cast(EPG, request.app.state.epg)
dlhd_channels = await dlhd.get_channels()
return Response(await generate_xmltv(dlhd_channels, zap2it), media_type="application/xml; charset=utf-8")
return Response(await epg.generate_xmltv(dlhd_channels), media_type="application/xml; charset=utf-8")
async def iptv_m3u(request: Request) -> Response:
@ -200,12 +175,12 @@ async def channel_key_proxy(request: Request) -> Response:
def create_app() -> Starlette:
dlhd_client = DLHDClient()
tuner_manager = TunerManager()
zap2it = Zap2it()
epg = EPG()
app = Starlette()
app.state.dlhd = dlhd_client
app.state.tuners = tuner_manager
app.state.zap2it = zap2it
app.state.epg = EPG()
app.add_route("/discover.json", discover_json)
app.add_route("/lineup_status.json", lineup_status_json)
app.add_route("/listings.json", listings_json)


+ 0
- 6
src/dlhdhr/config.py View File

@ -23,10 +23,4 @@ CHANNEL_ALLOW: set[str] | None = _set_or_none("DLHDHR_CHANNEL_ALLOW")
COUNTRY_EXCLUDE: set[str] | None = _set_or_none("DLHDHR_COUNTRY_EXCLUDE")
COUNTRY_ALLOW: set[str] | None = _set_or_none("DLHDHR_COUNTRY_ALLOW")
EPG_PROVIDER: str | None = os.getenv("DLHDHR_EPG_PROVIDER")
EPG_BEST_XMLTV_URL: str | None = os.getenv("DLHDHR_EPG_BEST_XMLTV_URL")
ZAP2IT_POSTAL_CODE: str = os.getenv("DLHDHR_ZAP2IT_POSTAL_CODE", "10001")
ZAP2IT_REFRESH_DELAY: int = int(os.getenv("DLHDHR_ZAP2IT_REFRESH_DELAY", "3600"))
ZAP2IT_LINEUP_ID: str = os.getenv("DLHDHR_ZAP2IT_LINEUP_ID", "USA-NY31519-DEFAULT")
ZAP2IT_HEADEND_ID: str = os.getenv("DLHDHR_ZAP2IT_HEADEND_ID", "NY31519")

+ 381
- 219
src/dlhdhr/dlhd/channels.py
File diff suppressed because it is too large
View File


+ 40
- 0
src/dlhdhr/epg/__init__.py View File

@ -0,0 +1,40 @@
from dataclasses import dataclass, field
from xml.etree.ElementTree import Element, tostring
from dlhdhr.dlhd import DLHDChannel
from dlhdhr.epg.zap2it import Zap2it
from dlhdhr.epg.program import Program
@dataclass()
class EPG:
zap2it: Zap2it = field(default_factory=Zap2it)
async def get_channel_programs(self, channel: DLHDChannel) -> list[Program]:
if channel.country_code == "us":
return await self.zap2it.get_channel_programs(channel)
elif channel.country_code == "uk":
# TODO: TV24? TV Guide?
return []
return []
async def generate_xmltv(self, channels: list[DLHDChannel]) -> bytes:
tv = Element("tv", attrib={"generator-info-name": "dlhdhr"})
channels = [c for c in channels if c.xmltv_id]
for channel in channels:
tv.append(channel.to_xmltv())
for channel in channels:
programs = await self.get_channel_programs(channel)
# Note: The order of the elements in the <programme /> matters
# title, desc, date, category, icon, episode-num, rating
for program in programs:
node = program.to_xmltv(channel)
if node:
tv.append(node)
return tostring(tv)

+ 58
- 0
src/dlhdhr/epg/program.py View File

@ -0,0 +1,58 @@
import datetime
from dataclasses import dataclass
from xml.etree.ElementTree import Element, SubElement
from dlhdhr.dlhd import DLHDChannel
@dataclass(frozen=True)
class Rating:
system: str
value: str
@dataclass(frozen=True)
class Program:
start_time: datetime.datetime
end_time: datetime.datetime
title: str
description: str
tags: list[str]
thumbnail: str | None
season: int | None
episode: int | None
rating: Rating | None
release_year: str | None
def to_xmltv(self, channel: DLHDChannel) -> Element | None:
if not channel.xmltv_id:
return None
start_time = self.start_time.strftime("%Y%m%d%H%M%S %z")
end_time = self.start_time.strftime("%Y%m%d%H%M%S %z")
programme = Element("programme", attrib={"start": start_time, "stop": end_time, "channel": channel.xmltv_id})
if self.title:
SubElement(programme, "title", attrib={"lang": "en"}).text = self.title
if self.description:
SubElement(programme, "desc", attrib={"lang": "en"}).text = self.description
if self.release_year:
SubElement(programme, "date").text = self.release_year
for tag in self.tags:
SubElement(programme, "category", attrib={"lang": "en"}).text = tag
if self.thumbnail:
SubElement(programme, "icon", attrib={"src": self.thumbnail})
if self.season or self.episode:
season_id = self.season or ""
episode_id = self.episode or ""
SubElement(programme, "episode-num", attrib={"system": "xmltv_ns"}).text = f"{season_id}.{episode_id}."
if self.rating:
rating = SubElement(programme, "rating", attrib={"system": self.rating.system})
SubElement(rating, "value").text = self.rating.value
return programme

+ 132
- 0
src/dlhdhr/epg/zap2it.py View File

@ -0,0 +1,132 @@
import datetime
from dataclasses import dataclass, field
import time
import httpx
from dlhdhr import config
from dlhdhr.dlhd.channels import DLHDChannel
from dlhdhr.epg.program import Program
from dlhdhr.epg.program import Rating
@dataclass()
class Zap2it:
_BASE_URL = "https://tvlistings.zap2it.com/api/"
_listings: dict[str, Program] = field(default_factory=dict)
_last_fetch: float = 0
def _get_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=self._BASE_URL,
timeout=2.0,
verify=True,
max_redirects=1,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:120.0) Gecko/20100101 Firefox/120.0",
"Referer": "https://tvlistings.zap2it.com/?aid=gapzap",
"Accept": "application/json",
},
)
def _cleanup_listings(self) -> None:
now = datetime.datetime.now(datetime.UTC)
updated: dict[str, list[Program]] = {}
for call_sign, programs in self._listings.items():
updated_programs = [p for p in programs if p.end_time > now]
if updated_programs:
updated[call_sign] = updated_programs
self._listings = updated
async def _fetch_listings(self, lineup_id: str, headend_id: str, postal_code: str) -> dict[str, list[Program]]:
params = {
"lineupId": lineup_id,
"timespan": "6",
"headendId": headend_id,
"country": "USA",
"timezone": "",
"device": "X",
"postalCode": postal_code,
"isOverride": "true",
"time": str(int(time.time())),
"pref": "16,256",
"userId": "-",
"aid": "gapzap",
"languagecode": "en-us",
}
listings: dict[str, list[Program]] = {}
now = datetime.datetime.now(datetime.UTC)
async with self._get_client() as client:
res = await client.get("/grid", params=params)
res.raise_for_status()
data = res.json()
for ch_data in data["channels"]:
call_sign = ch_data["callSign"]
programs = []
listings[call_sign] = programs
for evt_data in ch_data["events"]:
end_time = datetime.datetime.fromisoformat(evt_data["endTime"])
if end_time < now:
continue
rating = None
if evt_data["rating"]:
rating = Rating(system="MPAA", value=evt_data["rating"])
programs.append(
Program(
start_time=datetime.datetime.fromisoformat(evt_data["startTime"]),
end_time=end_time,
title=evt_data["program"]["title"],
description=evt_data["program"]["shortDesc"],
season=evt_data["program"]["season"],
episode=evt_data["program"]["episode"],
tags=evt_data["tags"],
release_year=evt_data["program"]["releaseYear"],
thumbnail=f"https://zap2it.tmsimg.com/assets/{evt_data['thumbnail']}.jpg?w=165",
rating=rating,
)
)
return listings
async def _refresh_listings(self) -> dict[str, list[Program]]:
self._cleanup_listings()
now = time.time()
if self._listings and now - self._last_fetch > config.ZAP2IT_REFRESH_DELAY:
return self._listings
east_coast_programs = await self._fetch_listings(
lineup_id="USA-NY31519-DEFAULT", headend_id="NY31519", postal_code="10001"
)
for call_sign, programs in east_coast_programs.items():
if call_sign in self._listings:
self._listings[call_sign].extend(programs)
else:
self._listings[call_sign] = programs
west_coast_programs = await self._fetch_listings(
lineup_id="USA-CA66511-DEFAULT", headend_id="CA66511", postal_code="90001"
)
for call_sign, programs in west_coast_programs.items():
if call_sign in self._listings:
self._listings[call_sign].extend(programs)
else:
self._listings[call_sign] = programs
return self._listings
async def get_channel_programs(self, channel: DLHDChannel) -> list[Program]:
if not channel.call_sign:
return []
await self._refresh_listings()
if channel.call_sign not in self._listings:
return []
return self._listings[channel.call_sign]

+ 0
- 61
src/dlhdhr/xmltv.py View File

@ -1,61 +0,0 @@
from xml.etree.ElementTree import Element, SubElement, tostring
from dlhdhr.dlhd import DLHDChannel
from dlhdhr.zap2it import Zap2it
async def generate_xmltv(channels: list[DLHDChannel], zap2it: Zap2it) -> bytes:
tv = Element("tv", attrib={"generator-info-name": "dlhdhr"})
for channel in channels:
if not channel.xmltv_id:
continue
ch_node = SubElement(tv, "channel", attrib={"id": channel.xmltv_id})
SubElement(ch_node, "display-name", attrib={"lang": "en"}).text = channel.name
SubElement(ch_node, "lcn").text = channel.number
for channel in channels:
if not channel.call_sign:
continue
if not channel.xmltv_id:
continue
z_channel = await zap2it.get_channel(channel.call_sign)
if not z_channel:
continue
# Note: The order of the elements in the <programme /> matters
# title, desc, date, category, icon, episode-num, rating
for event in z_channel.events:
start_time = event.start_time.strftime("%Y%m%d%H%M%S %z")
end_time = event.start_time.strftime("%Y%m%d%H%M%S %z")
programme = SubElement(
tv, "programme", attrib={"start": start_time, "stop": end_time, "channel": channel.xmltv_id}
)
if event.program.title:
SubElement(programme, "title", attrib={"lang": "en"}).text = event.program.title
if event.program.short_desc:
SubElement(programme, "desc", attrib={"lang": "en"}).text = event.program.short_desc
if event.program.release_year:
SubElement(programme, "date").text = event.program.release_year
for tag in event.tags:
SubElement(programme, "category", attrib={"lang": "en"}).text = tag
if event.thumbnail:
SubElement(programme, "icon", attrib={"src": event.thumbnail})
if event.program.season or event.program.episode:
e_id = ".".join([event.program.season or "", event.program.episode or "", ""])
SubElement(programme, "episode-num", attrib={"system": "xmltv_ns"}).text = e_id
if event.rating:
rating = SubElement(programme, "rating", attrib={"system": "MPAA"})
SubElement(rating, "value").text = event.rating
return tostring(tv)

+ 0
- 150
src/dlhdhr/zap2it.py View File

@ -1,150 +0,0 @@
import datetime
from dataclasses import dataclass, field
import time
import httpx
from dlhdhr import config
class Zap2it:
_BASE_URL = "https://tvlistings.zap2it.com/api/"
_listings: dict[str, "Zap2it.Channel"]
_last_fetch: float = 0
@dataclass
class Program:
title: str
id: str
short_desc: str
season: str | None
release_year: str | None
episode: str | None
episode_title: str | None
series_id: str | None
@dataclass
class Event:
duration: int
start_time: datetime.datetime
end_time: datetime.datetime
thumbnail: str
series_id: str
rating: str
tags: list[str]
program: "Zap2it.Program"
@dataclass
class Channel:
call_sign: str
name: str
number: str
id: str
thumbnail: str
events: list["Zap2it.Event"] = field(default_factory=list)
def __init__(self):
self._listings = {}
def _get_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=self._BASE_URL,
timeout=2.0,
verify=True,
max_redirects=1,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:120.0) Gecko/20100101 Firefox/120.0",
"Referer": "https://tvlistings.zap2it.com/?aid=gapzap",
"Accept": "application/json",
},
)
def _cleanup_listings(self) -> None:
now = datetime.datetime.now(datetime.UTC)
updated: dict[str, "Zap2it.Channel"] = {}
for channel in self._listings.values():
channel.events = [evt for evt in channel.events if evt.end_time > now]
if channel.events:
updated[channel.call_sign] = channel
self._listings = updated
async def _refresh_listings(self) -> list["Zap2it.Channel"]:
self._cleanup_listings()
now = time.time()
if self._listings and now - self._last_fetch > config.ZAP2IT_REFRESH_DELAY:
return list(self._listings.values())
params = {
"lineupId": config.ZAP2IT_LINEUP_ID,
"timespan": "6",
"headendId": config.ZAP2IT_HEADEND_ID,
"country": "USA",
"timezone": "",
"device": "X",
"postalCode": config.ZAP2IT_POSTAL_CODE,
"isOverride": "true",
"time": str(int(time.time())),
"pref": "16,256",
"userId": "-",
"aid": "gapzap",
"languagecode": "en-us",
}
now = datetime.datetime.now(datetime.UTC)
async with self._get_client() as client:
res = await client.get("/grid", params=params)
res.raise_for_status()
data = res.json()
for ch_data in data["channels"]:
call_sign = ch_data["callSign"]
if call_sign in self._listings:
channel = self._listings[call_sign]
else:
thumbnail = ch_data["thumbnail"]
if thumbnail.startswith("//"):
thumbnail = f"https:{thumbnail}"
channel = self.Channel(
call_sign=call_sign,
name=ch_data["affiliateName"],
number=ch_data["channelNo"],
id=ch_data["id"],
thumbnail=thumbnail,
)
self._listings[call_sign] = channel
for evt_data in ch_data["events"]:
end_time = datetime.datetime.fromisoformat(evt_data["endTime"])
if end_time < now:
continue
event = self.Event(
duration=evt_data["duration"],
rating=evt_data["rating"],
tags=evt_data["tags"],
thumbnail=f"https://zap2it.tmsimg.com/assets/{evt_data['thumbnail']}.jpg?w=165",
series_id=evt_data["seriesId"],
start_time=datetime.datetime.fromisoformat(evt_data["startTime"]),
end_time=end_time,
program=self.Program(
title=evt_data["program"]["title"],
id=evt_data["program"]["id"],
short_desc=evt_data["program"]["shortDesc"],
season=evt_data["program"]["season"],
release_year=evt_data["program"]["releaseYear"],
episode=evt_data["program"]["episode"],
episode_title=evt_data["program"]["episodeTitle"],
series_id=evt_data["program"]["seriesId"],
),
)
channel.events.append(event)
return list(self._listings.values())
async def get_channel(self, call_sign: str) -> Channel | None:
await self._refresh_listings()
return self._listings.get(call_sign)

Loading…
Cancel
Save