Format code
This commit is contained in:
@@ -5,7 +5,7 @@ from . import parser
|
||||
from . block import Block
|
||||
from . worker import ExtractWorker
|
||||
from . patch import Patch
|
||||
from ... import config
|
||||
from ... import config
|
||||
from ... paramgen import arcparam
|
||||
from ... exceptions import UnknownConnectionError
|
||||
from concurrent.futures import CancelledError
|
||||
@@ -17,10 +17,11 @@ REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
|
||||
"get_live_chat_replay?continuation="
|
||||
MAX_RETRY_COUNT = 3
|
||||
|
||||
def _split(start, end, count, min_interval_sec = 120):
|
||||
|
||||
def _split(start, end, count, min_interval_sec=120):
|
||||
"""
|
||||
Split section from `start` to `end` into `count` pieces,
|
||||
and returns the beginning of each piece.
|
||||
and returns the beginning of each piece.
|
||||
The `count` is adjusted so that the length of each piece
|
||||
is no smaller than `min_interval`.
|
||||
|
||||
@@ -28,41 +29,43 @@ def _split(start, end, count, min_interval_sec = 120):
|
||||
--------
|
||||
List of the offset of each block's first chat data.
|
||||
"""
|
||||
|
||||
if not (isinstance(start,int) or isinstance(start,float)) or \
|
||||
not (isinstance(end,int) or isinstance(end,float)):
|
||||
if not (isinstance(start, int) or isinstance(start, float)) or \
|
||||
not (isinstance(end, int) or isinstance(end, float)):
|
||||
raise ValueError("start/end must be int or float")
|
||||
if not isinstance(count,int):
|
||||
if not isinstance(count, int):
|
||||
raise ValueError("count must be int")
|
||||
if start>end:
|
||||
if start > end:
|
||||
raise ValueError("end must be equal to or greater than start.")
|
||||
if count<1:
|
||||
if count < 1:
|
||||
raise ValueError("count must be equal to or greater than 1.")
|
||||
if (end-start)/count < min_interval_sec:
|
||||
count = int((end-start)/min_interval_sec)
|
||||
if count == 0 : count = 1
|
||||
interval= (end-start)/count
|
||||
|
||||
if (end - start) / count < min_interval_sec:
|
||||
count = int((end - start) / min_interval_sec)
|
||||
if count == 0:
|
||||
count = 1
|
||||
interval = (end - start) / count
|
||||
|
||||
if count == 1:
|
||||
return [start]
|
||||
return sorted( list(set( [int(start + interval*j)
|
||||
for j in range(count) ])))
|
||||
return sorted(list(set([int(start + interval * j)
|
||||
for j in range(count)])))
|
||||
|
||||
|
||||
def ready_blocks(video_id, duration, div, callback):
|
||||
if div <= 0: raise ValueError
|
||||
if div <= 0:
|
||||
raise ValueError
|
||||
|
||||
async def _get_blocks( video_id, duration, div, callback):
|
||||
async def _get_blocks(video_id, duration, div, callback):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tasks = [_create_block(session, video_id, seektime, callback)
|
||||
for seektime in _split(-1, duration, div)]
|
||||
tasks = [_create_block(session, video_id, seektime, callback)
|
||||
for seektime in _split(-1, duration, div)]
|
||||
return await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
async def _create_block(session, video_id, seektime, callback):
|
||||
continuation = arcparam.getparam(video_id, seektime = seektime)
|
||||
continuation = arcparam.getparam(video_id, seektime=seektime)
|
||||
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
|
||||
for _ in range(MAX_RETRY_COUNT):
|
||||
try :
|
||||
async with session.get(url, headers = headers) as resp:
|
||||
try:
|
||||
async with session.get(url, headers=headers) as resp:
|
||||
text = await resp.text()
|
||||
next_continuation, actions = parser.parse(json.loads(text))
|
||||
break
|
||||
@@ -76,41 +79,42 @@ def ready_blocks(video_id, duration, div, callback):
|
||||
first = parser.get_offset(actions[0])
|
||||
last = parser.get_offset(actions[-1])
|
||||
if callback:
|
||||
callback(actions,last-first)
|
||||
callback(actions, last - first)
|
||||
return Block(
|
||||
continuation = next_continuation,
|
||||
chat_data = actions,
|
||||
first = first,
|
||||
last = last
|
||||
continuation=next_continuation,
|
||||
chat_data=actions,
|
||||
first=first,
|
||||
last=last
|
||||
)
|
||||
|
||||
|
||||
"""
|
||||
fetch initial blocks.
|
||||
"""
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
blocks = loop.run_until_complete(
|
||||
_get_blocks(video_id, duration, div, callback))
|
||||
return blocks
|
||||
|
||||
|
||||
def fetch_patch(callback, blocks, video_id):
|
||||
|
||||
async def _allocate_workers():
|
||||
workers = [
|
||||
ExtractWorker(
|
||||
fetch = _fetch, block = block,
|
||||
blocks = blocks, video_id = video_id
|
||||
fetch=_fetch, block=block,
|
||||
blocks=blocks, video_id=video_id
|
||||
)
|
||||
for block in blocks
|
||||
]
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tasks = [worker.run(session) for worker in workers]
|
||||
return await asyncio.gather(*tasks)
|
||||
return await asyncio.gather(*tasks)
|
||||
|
||||
async def _fetch(continuation,session) -> Patch:
|
||||
async def _fetch(continuation, session) -> Patch:
|
||||
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
|
||||
for _ in range(MAX_RETRY_COUNT):
|
||||
try:
|
||||
async with session.get(url,headers = config.headers) as resp:
|
||||
async with session.get(url, headers=config.headers) as resp:
|
||||
chat_json = await resp.text()
|
||||
continuation, actions = parser.parse(json.loads(chat_json))
|
||||
break
|
||||
@@ -126,21 +130,22 @@ def fetch_patch(callback, blocks, video_id):
|
||||
if callback:
|
||||
callback(actions, last - first)
|
||||
return Patch(actions, continuation, first, last)
|
||||
return Patch(continuation = continuation)
|
||||
return Patch(continuation=continuation)
|
||||
|
||||
"""
|
||||
allocate workers and assign blocks.
|
||||
"""
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
loop.run_until_complete(_allocate_workers())
|
||||
except CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
async def _shutdown():
|
||||
print("\nshutdown...")
|
||||
tasks = [t for t in asyncio.all_tasks()
|
||||
if t is not asyncio.current_task()]
|
||||
if t is not asyncio.current_task()]
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
try:
|
||||
@@ -148,7 +153,7 @@ async def _shutdown():
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
def cancel():
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.create_task(_shutdown())
|
||||
|
||||
Reference in New Issue
Block a user