Files
pytchat-fork/pytchat/tool/extract/asyncdl.py
2020-09-14 00:28:41 +09:00

179 lines
5.6 KiB
Python

import httpx
import asyncio
from . import parser
from . block import Block
from . worker import ExtractWorker
from . patch import Patch
from ... import config
from ... paramgen import arcparam
from ... exceptions import UnknownConnectionError
from concurrent.futures import CancelledError
from httpx import NetworkError, ReadTimeout
from json import JSONDecodeError
from urllib.parse import quote
headers = config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
"get_live_chat_replay?continuation="
MAX_RETRY_COUNT = 3
# Set to avoid duplicate parameters
param_set = set()
def _split(start, end, count, min_interval_sec=120):
"""
Split section from `start` to `end` into `count` pieces,
and returns the beginning of each piece.
The `count` is adjusted so that the length of each piece
is no smaller than `min_interval`.
Returns:
--------
List of the offset of each block's first chat data.
"""
if not (isinstance(start, int) or isinstance(start, float)) or \
not (isinstance(end, int) or isinstance(end, float)):
raise ValueError("start/end must be int or float")
if not isinstance(count, int):
raise ValueError("count must be int")
if start > end:
raise ValueError("end must be equal to or greater than start.")
if count < 1:
raise ValueError("count must be equal to or greater than 1.")
if (end - start) / count < min_interval_sec:
count = int((end - start) / min_interval_sec)
if count == 0:
count = 1
interval = (end - start) / count
if count == 1:
return [start]
return sorted(list(set([int(start + interval * j)
for j in range(count)])))
def ready_blocks(video_id, duration, div, callback):
param_set.clear()
if div <= 0:
raise ValueError
async def _get_blocks(video_id, duration, div, callback):
async with httpx.AsyncClient(http2=True) as session:
tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(-1, duration, div)]
return await asyncio.gather(*tasks)
async def _create_block(session, video_id, seektime, callback):
continuation = arcparam.getparam(video_id, seektime=seektime)
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
err = None
for _ in range(MAX_RETRY_COUNT):
try:
if continuation in param_set:
next_continuation, actions = None, []
break
param_set.add(continuation)
resp = await session.get(url, headers=headers)
next_continuation, actions = parser.parse(resp.json())
break
except JSONDecodeError:
await asyncio.sleep(3)
except (NetworkError, ReadTimeout) as e:
err = e
await asyncio.sleep(3)
else:
cancel()
raise UnknownConnectionError("Abort:" + str(err))
if actions:
first = parser.get_offset(actions[0])
last = parser.get_offset(actions[-1])
if callback:
callback(actions, last - first)
return Block(
continuation=next_continuation,
chat_data=actions,
first=first,
last=last
)
"""
fetch initial blocks.
"""
loop = asyncio.get_event_loop()
blocks = loop.run_until_complete(
_get_blocks(video_id, duration, div, callback))
return blocks
def fetch_patch(callback, blocks, video_id):
async def _allocate_workers():
workers = [
ExtractWorker(
fetch=_fetch, block=block,
blocks=blocks, video_id=video_id
)
for block in blocks
]
async with httpx.AsyncClient() as session:
tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks)
async def _fetch(continuation, session) -> Patch:
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
err = None
for _ in range(MAX_RETRY_COUNT):
try:
if continuation in param_set:
continuation, actions = None, []
break
param_set.add(continuation)
resp = await session.get(url, headers=config.headers)
continuation, actions = parser.parse(resp.json())
break
except JSONDecodeError:
await asyncio.sleep(3)
except (NetworkError, ReadTimeout) as e:
err = e
await asyncio.sleep(3)
else:
cancel()
raise UnknownConnectionError("Abort:" + str(err))
if actions:
last = parser.get_offset(actions[-1])
first = parser.get_offset(actions[0])
if callback:
callback(actions, last - first)
return Patch(actions, continuation, first, last)
return Patch(continuation=continuation)
"""
allocate workers and assign blocks.
"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_allocate_workers())
except CancelledError:
pass
async def _shutdown():
print("\nshutdown...")
tasks = [t for t in asyncio.all_tasks()
if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
def cancel():
loop = asyncio.get_event_loop()
loop.create_task(_shutdown())