Browse Source

start working on epg

main
Brett Langdon 2 years ago
parent
commit
dc99929ec6
No known key found for this signature in database GPG Key ID: 9BAD4322A65AD78B
5 changed files with 278 additions and 12 deletions
  1. +6
    -12
      src/dlhdhr/app.py
  2. +5
    -0
      src/dlhdhr/config.py
  3. +5
    -0
      src/dlhdhr/dlhd.py
  4. +56
    -0
      src/dlhdhr/xmltv.py
  5. +206
    -0
      src/dlhdhr/zap2it.py

+ 6
- 12
src/dlhdhr/app.py View File

@ -10,6 +10,8 @@ from starlette.responses import JSONResponse, Response, StreamingResponse
from dlhdhr import config
from dlhdhr.dlhd import DLHDClient
from dlhdhr.tuner import TunerManager, TunerNotFoundError
from dlhdhr.xmltv import generate_xmltv
from dlhdhr.zap2it import Zap2it
def get_public_url(request: Request, path: str) -> str:
@ -159,20 +161,10 @@ async def xmltv_xml(request: Request) -> Response:
)
dlhd = cast(DLHDClient, request.app.state.dlhd)
zap2it = cast(Zap2it, request.app.state.zap2it)
dlhd_channels = await dlhd.get_channels()
response = '<tv generator-info-name="dlhdhr">'
for channel in dlhd_channels:
name = saxutils.escape(channel.name)
response += f'<channel id="{channel.number}">'
response += f'<display-name lang="en">{name}</display-name>'
response += f"<lcn>{channel.number}</lcn>"
response += "</channel>"
response += "</tv>"
return Response(response, media_type="application/xml; charset=utf-8")
return Response(await generate_xmltv(dlhd_channels, zap2it), media_type="application/xml; charset=utf-8")
async def iptv_m3u(request: Request) -> Response:
@ -193,10 +185,12 @@ async def iptv_m3u(request: Request) -> Response:
def create_app() -> Starlette:
dlhd_client = DLHDClient()
tuner_manager = TunerManager()
zap2it = Zap2it()
app = Starlette()
app.state.dlhd = dlhd_client
app.state.tuners = tuner_manager
app.state.zap2it = zap2it
app.add_route("/discover.json", discover_json)
app.add_route("/lineup_status.json", lineup_status_json)
app.add_route("/listings.json", listings_json)


+ 5
- 0
src/dlhdhr/config.py View File

@ -22,3 +22,8 @@ CHANNEL_ALLOW: set[str] | None = _set_or_none("DLHDHR_CHANNEL_ALLOW")
EPG_PROVIDER: str | None = os.getenv("DLHDHR_EPG_PROVIDER")
EPG_BEST_XMLTV_URL: str | None = os.getenv("DLHDHR_EPG_BEST_XMLTV_URL")
ZAP2IT_POSTAL_CODE: str = os.getenv("DLHDHR_ZAP2IT_POSTAL_CODE", "10001")
ZAP2IT_REFRESH_DELAY: int = int(os.getenv("DLHDHR_ZAP2IT_REFRESH_DELAY", "3600"))
ZAP2IT_LINEUP_ID: str = os.getenv("DLHDHR_ZAP2IT_LINEUP_ID", "USA-NY31519-DEFAULT")
ZAP2IT_HEADEND_ID: str = os.getenv("DLHDHR_ZAP2IT_HEADEND_ID", "NY31519")

+ 5
- 0
src/dlhdhr/dlhd.py View File

@ -9,6 +9,7 @@ import m3u8
from dlhdhr import config
from dlhdhr.tvg_id import get_tvg_id
from dlhdhr.zap2it import get_channel_call_sign
@dataclass(frozen=True)
@ -20,6 +21,10 @@ class DLHDChannel:
def tvg_id(self) -> str | None:
return get_tvg_id(self.number)
@property
def call_sign(self) -> str | None:
return get_channel_call_sign(self.number)
@property
def playlist_m3u8(self) -> str:
return f"/channel/{self.number}/playlist.m3u8"


+ 56
- 0
src/dlhdhr/xmltv.py View File

@ -0,0 +1,56 @@
from xml.etree.ElementTree import Element, SubElement, tostring
from dlhdhr.dlhd import DLHDChannel
from dlhdhr.zap2it import Zap2it
async def generate_xmltv(channels: list[DLHDChannel], zap2it: Zap2it) -> bytes:
tv = Element("tv", attrib={"generator-info-name": "dlhdhr"})
for channel in channels:
ch_node = SubElement(tv, "channel", attrib={"id": channel.number})
SubElement(ch_node, "display-name", attrib={"lang": "en"}).text = channel.name
SubElement(ch_node, "lcn").text = channel.number
if not channel.call_sign:
continue
z_channel = await zap2it.get_channel(channel.call_sign)
if not z_channel:
continue
if not channel.tvg_id:
continue
for event in z_channel.events:
start_time = event.start_time.strftime("%Y%m%d%H%M%S %z")
end_time = event.start_time.strftime("%Y%m%d%H%M%S %z")
programme = SubElement(
tv, "programme", attrib={"start": start_time, "stop": end_time, "channel": channel.tvg_id}
)
if event.program.title:
SubElement(programme, "title", attrib={"lang": "en"}).text = event.program.title
if event.program.short_desc:
SubElement(
programme, "desc", attrib={"lang": "en", "desc": event.program.short_desc}
).text = event.program.short_desc
if event.program.release_year:
SubElement(programme, "date").text = event.program.release_year
if event.rating:
rating = SubElement(programme, "rating", attrib={"system": "MPAA"})
SubElement(rating, "value").text = event.rating
if event.program.season or event.program.episode:
e_id = ".".join([event.program.season or "", event.program.episode or "", ""])
SubElement(programme, "episode-num", attrib={"system": "xmltv_ns"}).text = e_id
if event.thumbnail:
SubElement(programme, "icon", attrib={"src": event.thumbnail})
for tag in event.tags:
SubElement(programme, "category", attrib={"lang": "en"}).text = tag
return tostring(tv)

+ 206
- 0
src/dlhdhr/zap2it.py View File

@ -0,0 +1,206 @@
import datetime
from dataclasses import dataclass, field
import time
import httpx
from dlhdhr import config
# Mapping of DLHD channel number to zap2it call sign
_MAPPING = dict(
[
("44", "ESPN"),
("45", "ESPN2"),
("66", "TUDN"),
("295", "TOON"),
("298", "FXX"),
("301", "FREEFRM"),
("303", "AMC"),
("305", "BBCA"),
("306", "BET"),
("307", "BRAVO"),
("309", "CNBC"),
("310", "COMEDY"),
("312", "DISN"),
("316", "ESPNU"),
("317", "FX"),
("319", "GSN"),
("321", "HBO"),
("325", "ION"),
("326", "LIFE"),
("327", "MSNBC"),
("329", "NICJR"),
("330", "NIK"),
("331", "OWN"),
("333", "SHOW"),
("334", "PAR"),
("335", "STARZ"),
("336", "TBS"),
("337", "TLC"),
("338", "TNT"),
("339", "TOON"),
("340", "TRAV"),
("343", "USA"),
("344", "VH1"),
("345", "CNN"),
("371", "MTV"),
("373", "SYFY"),
("381", "FXM"),
("382", "HGTV"),
("647", "CMTV"),
("651", "DEST"),
("658", "SUNDANC"),
("665", "FYISD"),
("689", "HBO2"),
("691", "HBOF"),
("693", "HBOSIG"),
("697", "COOK"),
("745", "NGC"),
("766", "ABC"),
]
)
def get_channel_call_sign(channel_number: str) -> str | None:
return _MAPPING.get(channel_number)
class Zap2it:
_BASE_URL = "https://tvlistings.zap2it.com/api/"
_listings: dict[str, "Zap2it.Channel"]
_last_fetch: float = 0
@dataclass
class Program:
title: str
id: str
short_desc: str
season: str | None
release_year: str | None
episode: str | None
episode_title: str | None
series_id: str | None
@dataclass
class Event:
duration: int
start_time: datetime.datetime
end_time: datetime.datetime
thumbnail: str
series_id: str
rating: str
tags: list[str]
program: "Zap2it.Program"
@dataclass
class Channel:
call_sign: str
name: str
number: str
id: str
thumbnail: str
events: list["Zap2it.Event"] = field(default_factory=list)
def __init__(self):
self._listings = {}
def _get_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=self._BASE_URL,
timeout=2.0,
verify=True,
max_redirects=1,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:120.0) Gecko/20100101 Firefox/120.0",
"Referer": "https://tvlistings.zap2it.com/?aid=gapzap",
"Accept": "application/json",
},
)
def _cleanup_listings(self) -> None:
now = datetime.datetime.now(datetime.UTC)
updated: dict[str, "Zap2it.Channel"] = {}
for channel in self._listings.values():
channel.events = [evt for evt in channel.events if evt.end_time > now]
if channel.events:
updated[channel.call_sign] = channel
self._listings = updated
async def _refresh_listings(self) -> list["Zap2it.Channel"]:
self._cleanup_listings()
now = time.time()
if self._listings and now - self._last_fetch > config.ZAP2IT_REFRESH_DELAY:
return list(self._listings.values())
params = {
"lineupId": config.ZAP2IT_LINEUP_ID,
"timespan": "6",
"headendId": config.ZAP2IT_HEADEND_ID,
"country": "USA",
"timezone": "",
"device": "X",
"postalCode": config.ZAP2IT_POSTAL_CODE,
"isOverride": "true",
"time": str(int(time.time())),
"pref": "16,256",
"userId": "-",
"aid": "gapzap",
"languagecode": "en-us",
}
now = datetime.datetime.now(datetime.UTC)
async with self._get_client() as client:
res = await client.get("/grid", params=params)
res.raise_for_status()
data = res.json()
for ch_data in data["channels"]:
call_sign = ch_data["callSign"]
if call_sign in self._listings:
channel = self._listings[call_sign]
else:
channel = self.Channel(
call_sign=call_sign,
name=ch_data["affiliateName"],
number=ch_data["channelNo"],
id=ch_data["id"],
thumbnail=ch_data["thumbnail"],
)
self._listings[call_sign] = channel
for evt_data in ch_data["events"]:
end_time = datetime.datetime.fromisoformat(evt_data["endTime"])
if end_time < now:
continue
event = self.Event(
duration=evt_data["duration"],
rating=evt_data["rating"],
tags=evt_data["tags"],
thumbnail=f"//zap2it.tmsimg.com/assets/{evt_data['thumbnail']}.jpg?w=165",
series_id=evt_data["seriesId"],
start_time=datetime.datetime.fromisoformat(evt_data["startTime"]),
end_time=end_time,
program=self.Program(
title=evt_data["program"]["title"],
id=evt_data["program"]["id"],
short_desc=evt_data["program"]["shortDesc"],
season=evt_data["program"]["season"],
release_year=evt_data["program"]["releaseYear"],
episode=evt_data["program"]["episode"],
episode_title=evt_data["program"]["episodeTitle"],
series_id=evt_data["program"]["seriesId"],
),
)
channel.events.append(event)
return list(self._listings.values())
async def get_channel(self, call_sign: str) -> Channel | None:
await self._refresh_listings()
return self._listings.get(call_sign)

Loading…
Cancel
Save