131 lines
4.4 KiB
Python
131 lines
4.4 KiB
Python
|
|
import aiohttp
|
|
import asyncio
|
|
import json
|
|
from . import parser
|
|
from . block import Block
|
|
from . dlworker import DownloadWorker
|
|
from .. paramgen import arcparam
|
|
from .. import config
|
|
from urllib.parse import quote
|
|
|
|
headers = config.headers
|
|
REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
|
|
"get_live_chat_replay?continuation="
|
|
|
|
def _split(start, end, count, min_interval = 120):
|
|
"""
|
|
Split section from `start` to `end` into `count` pieces,
|
|
and returns the beginning of each piece.
|
|
The `count` is adjusted so that the length of each piece
|
|
is no smaller than `min_interval`.
|
|
|
|
Returns:
|
|
--------
|
|
List of the beginning position of each piece.
|
|
"""
|
|
|
|
if not (isinstance(start,int) or isinstance(start,float)) or \
|
|
not (isinstance(end,int) or isinstance(end,float)):
|
|
raise ValueError("start/end must be int or float")
|
|
if not isinstance(count,int):
|
|
raise ValueError("count must be int")
|
|
if start>end:
|
|
raise ValueError("end must be equal to or greater than start.")
|
|
if count<1:
|
|
raise ValueError("count must be equal to or greater than 1.")
|
|
if (end-start)/count < min_interval:
|
|
count = int((end-start)/min_interval)
|
|
if count == 0 : count = 1
|
|
interval= (end-start)/count
|
|
|
|
if count == 1:
|
|
return [start]
|
|
return sorted(list(set([int(start+interval*j)
|
|
for j in range(count) ])))
|
|
|
|
def ready_blocks(video_id, duration, div, callback):
|
|
if div <= 0: raise ValueError
|
|
|
|
async def _get_blocks( video_id, duration, div, callback):
|
|
async with aiohttp.ClientSession() as session:
|
|
tasks = [_create_block(session, video_id, pos, seektime, callback)
|
|
for pos, seektime in enumerate(_split(-1, duration, div))]
|
|
return await asyncio.gather(*tasks)
|
|
|
|
async def _create_block(session, video_id, pos, seektime, callback):
|
|
continuation = arcparam.getparam(
|
|
video_id, seektime = seektime)
|
|
|
|
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
|
|
for _ in range(3):
|
|
try:
|
|
async with session.get(url, headers = headers) as resp:
|
|
text = await resp.text()
|
|
next_continuation, actions = parser.parse(json.loads(text))
|
|
except json.JSONDecodeError:
|
|
print("JSONDecodeError occured")
|
|
await asyncio.sleep(1)
|
|
continue
|
|
break
|
|
else:
|
|
raise json.JSONDecodeError
|
|
if actions:
|
|
first = parser.get_offset(actions[0])
|
|
last = parser.get_offset(actions[-1])
|
|
if callback:
|
|
callback(actions,last-first)
|
|
return Block(
|
|
continuation = next_continuation,
|
|
chat_data = actions,
|
|
first = first,
|
|
last = last
|
|
)
|
|
|
|
loop = asyncio.get_event_loop()
|
|
result = loop.run_until_complete(
|
|
_get_blocks(video_id, duration, div, callback))
|
|
return result
|
|
|
|
def download_chunk(callback, blocks, video_id):
|
|
|
|
async def _allocate_workers():
|
|
workers = [
|
|
DownloadWorker(
|
|
fetch = _fetch,
|
|
block = block,
|
|
blocks = blocks,
|
|
video_id = video_id
|
|
|
|
)
|
|
for i,block in enumerate(blocks)
|
|
]
|
|
async with aiohttp.ClientSession() as session:
|
|
tasks = [worker.run(session) for worker in workers]
|
|
return await asyncio.gather(*tasks)
|
|
|
|
async def _fetch(continuation,session):
|
|
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
|
|
for _ in range(3):
|
|
try:
|
|
async with session.get(url,headers = config.headers) as resp:
|
|
chat_json = await resp.text()
|
|
except json.JSONDecodeError:
|
|
print("JSONDecodeError occured")
|
|
await asyncio.sleep(1)
|
|
continue
|
|
break
|
|
else:
|
|
raise json.JSONDecodeError
|
|
continuation, actions = parser.parse(json.loads(chat_json))
|
|
if actions:
|
|
last = parser.get_offset(actions[-1])
|
|
first = parser.get_offset(actions[0])
|
|
if callback:
|
|
callback(actions, last - first)
|
|
return actions, continuation, first, last
|
|
return [], continuation, None, None
|
|
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(_allocate_workers())
|