147 lines
4.6 KiB
Python
147 lines
4.6 KiB
Python
|
|
import httpx
|
|
import asyncio
|
|
import json
|
|
from . import parser
|
|
from . block import Block
|
|
from . worker import ExtractWorker
|
|
from . patch import Patch
|
|
from ... import config
|
|
from ... paramgen import arcparam_mining as arcparam
|
|
from concurrent.futures import CancelledError
|
|
from urllib.parse import quote
|
|
|
|
headers = config.headers
|
|
REPLAY_URL = "https://www.youtube.com/live_chat_replay?continuation="
|
|
INTERVAL = 1
|
|
|
|
|
|
def _split(start, end, count, min_interval_sec=120):
|
|
"""
|
|
Split section from `start` to `end` into `count` pieces,
|
|
and returns the beginning of each piece.
|
|
The `count` is adjusted so that the length of each piece
|
|
is no smaller than `min_interval`.
|
|
|
|
Returns:
|
|
--------
|
|
List of the offset of each block's first chat data.
|
|
"""
|
|
|
|
if not (isinstance(start, int) or isinstance(start, float)) or \
|
|
not (isinstance(end, int) or isinstance(end, float)):
|
|
raise ValueError("start/end must be int or float")
|
|
if not isinstance(count, int):
|
|
raise ValueError("count must be int")
|
|
if start > end:
|
|
raise ValueError("end must be equal to or greater than start.")
|
|
if count < 1:
|
|
raise ValueError("count must be equal to or greater than 1.")
|
|
if (end - start) / count < min_interval_sec:
|
|
count = int((end - start) / min_interval_sec)
|
|
if count == 0:
|
|
count = 1
|
|
interval = (end - start) / count
|
|
|
|
if count == 1:
|
|
return [start]
|
|
return sorted(list(set([int(start + interval * j)
|
|
for j in range(count)])))
|
|
|
|
|
|
def ready_blocks(video_id, duration, div, callback):
|
|
if div <= 0:
|
|
raise ValueError
|
|
|
|
async def _get_blocks(video_id, duration, div, callback):
|
|
async with httpx.ClientSession() as session:
|
|
tasks = [_create_block(session, video_id, seektime, callback)
|
|
for seektime in _split(0, duration, div)]
|
|
return await asyncio.gather(*tasks)
|
|
|
|
async def _create_block(session, video_id, seektime, callback):
|
|
continuation = arcparam.getparam(video_id, seektime=seektime)
|
|
url = (f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
|
|
f"{int(seektime*1000)}&hidden=false&pbj=1")
|
|
async with session.get(url, headers=headers) as resp:
|
|
chat_json = await resp.text()
|
|
if chat_json is None:
|
|
return
|
|
continuation, actions = parser.parse(json.loads(chat_json)[1])
|
|
first = seektime
|
|
seektime += INTERVAL
|
|
if callback:
|
|
callback(actions, INTERVAL)
|
|
return Block(
|
|
continuation=continuation,
|
|
chat_data=actions,
|
|
first=first,
|
|
last=seektime,
|
|
seektime=seektime
|
|
)
|
|
"""
|
|
fetch initial blocks.
|
|
"""
|
|
loop = asyncio.get_event_loop()
|
|
blocks = loop.run_until_complete(
|
|
_get_blocks(video_id, duration, div, callback))
|
|
return blocks
|
|
|
|
|
|
def fetch_patch(callback, blocks, video_id):
|
|
|
|
async def _allocate_workers():
|
|
workers = [
|
|
ExtractWorker(
|
|
fetch=_fetch, block=block,
|
|
blocks=blocks, video_id=video_id
|
|
)
|
|
for block in blocks
|
|
]
|
|
async with httpx.ClientSession() as session:
|
|
tasks = [worker.run(session) for worker in workers]
|
|
return await asyncio.gather(*tasks)
|
|
|
|
async def _fetch(seektime, session) -> Patch:
|
|
continuation = arcparam.getparam(video_id, seektime=seektime)
|
|
url = (f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
|
|
f"{int(seektime*1000)}&hidden=false&pbj=1")
|
|
async with session.get(url, headers=config.headers) as resp:
|
|
chat_json = await resp.text()
|
|
actions = []
|
|
try:
|
|
if chat_json is None:
|
|
return Patch()
|
|
continuation, actions = parser.parse(json.loads(chat_json)[1])
|
|
except json.JSONDecodeError:
|
|
pass
|
|
if callback:
|
|
callback(actions, INTERVAL)
|
|
return Patch(chats=actions, continuation=continuation,
|
|
seektime=seektime, last=seektime)
|
|
"""
|
|
allocate workers and assign blocks.
|
|
"""
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
loop.run_until_complete(_allocate_workers())
|
|
except CancelledError:
|
|
pass
|
|
|
|
|
|
async def _shutdown():
|
|
print("\nshutdown...")
|
|
tasks = [t for t in asyncio.all_tasks()
|
|
if t is not asyncio.current_task()]
|
|
for task in tasks:
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
def cancel():
|
|
loop = asyncio.get_event_loop()
|
|
loop.create_task(_shutdown())
|