From ee77807dbd575e9564ca59c25109cfb27d9df5d2 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Sun, 9 Feb 2020 20:26:21 +0900 Subject: [PATCH] Improve dlworker efficiency --- pytchat/tool/asyncdl.py | 20 +++--- pytchat/tool/block.py | 53 ++++++++------- pytchat/tool/dlworker.py | 135 ++++++++++++++++++++++++++++++++++--- pytchat/tool/downloader.py | 18 +++-- pytchat/tool/duplcheck.py | 18 +++-- tests/test_dl_duplcheck.py | 50 +++++++------- 6 files changed, 215 insertions(+), 79 deletions(-) diff --git a/pytchat/tool/asyncdl.py b/pytchat/tool/asyncdl.py index 409d2f5..476979d 100644 --- a/pytchat/tool/asyncdl.py +++ b/pytchat/tool/asyncdl.py @@ -49,9 +49,9 @@ def ready_blocks(video_id, duration, div, callback): async def _get_blocks( video_id, duration, div, callback): async with aiohttp.ClientSession() as session: - futures = [_create_block(session, video_id, pos, seektime, callback) + tasks = [_create_block(session, video_id, pos, seektime, callback) for pos, seektime in enumerate(_split(-1, duration, div))] - return await asyncio.gather(*futures,return_exceptions=True) + return await asyncio.gather(*tasks) async def _create_block(session, video_id, pos, seektime, callback): continuation = arcparam.getparam( @@ -67,7 +67,6 @@ def ready_blocks(video_id, duration, div, callback): if callback: callback(actions,last-first) return Block( - pos = pos, continuation = next_continuation, chat_data = actions, first = first, @@ -79,19 +78,22 @@ def ready_blocks(video_id, duration, div, callback): _get_blocks(video_id, duration, div, callback)) return result -def download_chunk(callback, blocks): +def download_chunk(callback, blocks, video_id): async def _allocate_workers(): workers = [ DownloadWorker( fetch = _fetch, - block = block + block = block, + blocks = blocks, + video_id = video_id + ) - for block in blocks + for i,block in enumerate(blocks) ] async with aiohttp.ClientSession() as session: tasks = [worker.run(session) for worker in workers] - return await asyncio.gather(*tasks,return_exceptions=True) + return await asyncio.gather(*tasks) async def _fetch(continuation,session): url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" @@ -103,8 +105,8 @@ def download_chunk(callback, blocks): first = parser.get_offset(actions[0]) if callback: callback(actions, last - first) - return actions, continuation, last - return continuation, [], None + return actions, continuation, first, last + return [], continuation, None, None loop = asyncio.get_event_loop() loop.run_until_complete(_allocate_workers()) diff --git a/pytchat/tool/block.py b/pytchat/tool/block.py index b4da7e6..7fefb14 100644 --- a/pytchat/tool/block.py +++ b/pytchat/tool/block.py @@ -17,7 +17,7 @@ class Block: this value increases as fetching chatdata progresses. - temp_last : int : + end : int : target videoOffsetTimeMs of last chat data for download, equals to first videoOffsetTimeMs of next block. when download worker reaches this offset, stop downloading. @@ -25,32 +25,35 @@ class Block: continuation : str : continuation param of last chat data. - chat_data : List + chat_data : list + + done : bool : + whether this block has been downloaded. + + remaining : int : + remaining data to download. + equals end - last. + + is_last : bool : + whether this block is the last one in blocklist. + + splitting : bool : + whether this block is in the process of splitting. + while True, this block is excluded from duplicate split procedure. """ - def __init__(self, pos=0, first=0, last=0, - continuation='', chat_data=[]): - self.pos = pos + + __slots__ = ['first','last','end','continuation','chat_data','remaining', + 'done','is_last','splitting'] + + def __init__(self, first = 0, last = 0, end = 0, + continuation = '', chat_data = [], is_last = False, + splitting = False): self.first = first self.last = last - self.temp_last = 0 + self.end = end self.continuation = continuation self.chat_data = chat_data - - def start(self): - chats, cont = self._cut(self.chat_data, self.continuation, self.last) - self.chat_data = chats - return cont - - def fill(self, chats, cont, fetched_last): - chats, cont = self._cut(chats, cont, fetched_last) - self.chat_data.extend(chats) - return cont - - def _cut(self, chats, cont, fetched_last): - if fetched_last < self.temp_last or self.temp_last == -1: - return chats, cont - for i, line in enumerate(chats): - line_offset = parser.get_offset(line) - if line_offset >= self.temp_last: - self.last = line_offset - return chats[:i], None + self.done = False + self.remaining = self.end- self.last + self.is_last = is_last + self.splitting = splitting diff --git a/pytchat/tool/dlworker.py b/pytchat/tool/dlworker.py index dea0c17..4ff3906 100644 --- a/pytchat/tool/dlworker.py +++ b/pytchat/tool/dlworker.py @@ -1,27 +1,144 @@ from . import parser - +from .. paramgen import arcparam +from . block import Block class DownloadWorker: """ DownloadWorker associates a download session with a block. Parameter ---------- - fetch : + fetch : func : download function of asyncdl - block : + block : Block : Block object that includes chat_data + + blocks : list : + List of Block(s) + + video_id : str : + + source_block : Block : + the block from which current downloading block is splitted """ - def __init__(self, fetch, block): + __slots__ = ['fetch', 'block', 'blocks', 'video_id', 'source_block'] + + def __init__(self, fetch, block, blocks, video_id ): self.block = block self.fetch = fetch - + self.blocks = blocks + self.video_id = video_id + self.source_block = None + async def run(self, session): """Remove extra chats just after ready_blocks(). """ - continuation = self.block.start() + continuation = initial_fill(self.block) """download loop """ while continuation: - chats, new_cont, fetched_last = await self.fetch(continuation, session) - continuation = self.block.fill(chats, new_cont, fetched_last ) + chats, new_cont, fetched_first, fetched_last = await self.fetch( + continuation, session) + if fetched_first is None: + break + if self.source_block: + continuation = after_dividing_process( + self.source_block, self.block, chats, new_cont, + fetched_first, fetched_last) + self.source_block = None + else: + continuation = fill(self.block, chats, new_cont, fetched_last) + + if continuation is None: + new_block = get_new_block(self) + self.block = new_block + continuation = new_block.continuation + +def get_new_block(worker) -> Block: + worker.block.done = True + index,undone_block = get_undone_block(worker.blocks) + if undone_block is None: + return Block(continuation = None) + mean = (undone_block.end + undone_block.last)/2 + continuation = arcparam.getparam(worker.video_id, seektime = mean/1000) + worker.source_block = undone_block + worker.source_block.splitting = True + new_block = Block( + end = undone_block.end, + chat_data = [], + continuation = continuation, + splitting = True, + is_last = worker.source_block.is_last) + worker.blocks.insert(index+1,new_block) + return new_block + +def get_undone_block(blocks) -> (int, Block): + max_remaining = 0 + ret_block = None + ret_index = 0 + for index, block in enumerate(blocks): + if block.done or block.splitting: + continue + remaining = block.remaining + if remaining > max_remaining and remaining > 120000: + ret_index = index + ret_block = block + max_remaining = remaining + return ret_index, ret_block + +def top_cut(chats, last) -> list: + for i,chat in enumerate(chats): + if parser.get_offset(chat) > last: + return chats[i:] + return [] + +def bottom_cut(chats, last) -> list: + for rchat in reversed(chats): + if parser.get_offset(rchat)>=last: + chats.pop() + else: + break + return chats - +def after_dividing_process(source_block, block, chats, new_cont, + fetched_first, fetched_last): + if fetched_last <= source_block.last: + return None + block.splitting = False + source_block.splitting = False + source_block.end = fetched_first + block.first = fetched_first + block.last = fetched_last + continuation = new_cont + if fetched_first < source_block.last: + chats = top_cut(chats, source_block.last) + block.first = source_block.last + if block.end= block.end: + block.last = line_offset + block.remaining=0 + block.done=True + return chats[:i], None \ No newline at end of file diff --git a/pytchat/tool/downloader.py b/pytchat/tool/downloader.py index d82f0d5..37e729d 100644 --- a/pytchat/tool/downloader.py +++ b/pytchat/tool/downloader.py @@ -1,6 +1,5 @@ from . import asyncdl from . import parser -from . block import Block from . duplcheck import duplicate_head, duplicate_tail, overwrap from . videoinfo import VideoInfo from .. import config @@ -11,6 +10,12 @@ headers=config.headers class Downloader: def __init__(self, video_id, duration, div, callback): + if not isinstance(div ,int) or div < 1: + raise ValueError('div must be positive integer.') + elif div > 10: + div = 10 + if not isinstance(duration ,int) or duration < 1: + raise ValueError('duration must be positive integer.') self.video_id = video_id self.duration = duration self.div = div @@ -29,8 +34,9 @@ class Downloader: def set_temporary_last(self): for i in range(len(self.blocks)-1): - self.blocks[i].temp_last = self.blocks[i+1].first - self.blocks[-1].temp_last = -1 + self.blocks[i].end = self.blocks[i+1].first + self.blocks[-1].end = self.duration*1000 + self.blocks[-1].is_last =True return self def remove_overwrap(self): @@ -38,7 +44,7 @@ class Downloader: return self def download_blocks(self): - asyncdl.download_chunk(self.callback, self.blocks) + asyncdl.download_chunk(self.callback, self.blocks, self.video_id) return self def remove_duplicate_tail(self): @@ -62,7 +68,7 @@ class Downloader: .combine() ) -def download(video_id, div = 20, callback=None, processor = None): +def download(video_id, div = 1, callback = None, processor = None): duration = 0 try: duration = VideoInfo(video_id).get("duration") @@ -70,5 +76,5 @@ def download(video_id, div = 20, callback=None, processor = None): raise if duration == 0: print("video is live.") - return + return [] return Downloader(video_id, duration, div, callback).download() diff --git a/pytchat/tool/duplcheck.py b/pytchat/tool/duplcheck.py index 637e617..906d4b1 100644 --- a/pytchat/tool/duplcheck.py +++ b/pytchat/tool/duplcheck.py @@ -55,8 +55,6 @@ def check_duplicate_offset(chatdata): tbl_offset[i] == tbl_offset[j] and tbl_id[i] == tbl_id[j] - # and - # tbl_type[i] == tbl_type[j] ) print("creating table...") @@ -76,6 +74,12 @@ def duplicate_head(blocks): if len(blocks) == 1 : return blocks def is_duplicate_head(index): + + if len(blocks[index].chat_data) == 0: + return True + elif len(blocks[index+1].chat_data) == 0: + return False + id_0 = parser.get_id(blocks[index].chat_data[0]) id_1 = parser.get_id(blocks[index+1].chat_data[0]) type_0 = parser.get_type(blocks[index].chat_data[0]) @@ -97,6 +101,12 @@ def duplicate_tail(blocks): if len(blocks) == 1 : return blocks def is_duplicate_tail(index): + + if len(blocks[index].chat_data) == 0: + return True + elif len(blocks[index-1].chat_data) == 0: + return False + id_0 = parser.get_id(blocks[index-1].chat_data[-1]) id_1 = parser.get_id(blocks[index].chat_data[-1]) type_0 = parser.get_type(blocks[index-1].chat_data[-1]) @@ -140,6 +150,6 @@ def overwrap(blocks): def _dump(blocks): print(__name__) - print(f"---------- first last temp_last {'':>3}---") + print(f"---------- first last end {'':>3}---") for i,block in enumerate(blocks): - print(f"block[{i:3}] {block.first:>10} {block.last:>10} {block.temp_last:>10}") \ No newline at end of file + print(f"block[{i:3}] {block.first:>10} {block.last:>10} {block.end:>10}") \ No newline at end of file diff --git a/tests/test_dl_duplcheck.py b/tests/test_dl_duplcheck.py index 55ef1ab..b546550 100644 --- a/tests/test_dl_duplcheck.py +++ b/tests/test_dl_duplcheck.py @@ -12,6 +12,11 @@ def _open_file(path): with open(path,mode ='r',encoding = 'utf-8') as f: return f.read() +def load_chatdata(filename): + return parser.parse( + json.loads(_open_file("tests/testdata/dl_duplcheck/head/"+filename)) + )[1] + def test_overwrap(mocker): """ test overwrap data @@ -22,12 +27,12 @@ def test_overwrap(mocker): """ blocks = ( - Block(0, 0, 38771, "",[]), - Block(1, 9890, 38771, "",[]), - Block(2, 20244, 45146, "",[]), - Block(3, 32476, 60520, "",[]), - Block(4, 41380, 62875, "",[]), - Block(5, 52568, 62875, "",[]) + Block(first = 0,last= 38771, chat_data = load_chatdata("dp0-0.json")), + Block(first = 9890,last= 38771, chat_data = load_chatdata("dp0-1.json")), + Block(first = 20244,last= 45146, chat_data = load_chatdata("dp0-2.json")), + Block(first = 32476,last= 60520, chat_data = load_chatdata("dp0-3.json")), + Block(first = 41380,last= 62875, chat_data = load_chatdata("dp0-4.json")), + Block(first = 52568,last= 62875, chat_data = load_chatdata("dp0-5.json")) ) result = duplcheck.overwrap(blocks) assert len(result) == 3 @@ -50,18 +55,15 @@ def test_duplicate_head(mocker): result : [0] , [3] , [5] """ - def load_chatdata(filename): - return parser.parse( - json.loads(_open_file("tests/testdata/dl_duplcheck/head/"+filename)) - )[1] + blocks = ( - Block(0, 0, 2500, "",load_chatdata("dp0-0.json")), - Block(1, 0, 38771, "",load_chatdata("dp0-1.json")), - Block(2, 0, 45146, "",load_chatdata("dp0-2.json")), - Block(3, 20244, 60520, "",load_chatdata("dp0-3.json")), - Block(4, 20244, 62875, "",load_chatdata("dp0-4.json")), - Block(5, 52568, 62875, "",load_chatdata("dp0-5.json")) + Block(first = 0, last = 2500, chat_data = load_chatdata("dp0-0.json")), + Block(first = 0, last =38771, chat_data = load_chatdata("dp0-1.json")), + Block(first = 0, last =45146, chat_data = load_chatdata("dp0-2.json")), + Block(first = 20244, last =60520, chat_data = load_chatdata("dp0-3.json")), + Block(first = 20244, last =62875, chat_data = load_chatdata("dp0-4.json")), + Block(first = 52568, last =62875, chat_data = load_chatdata("dp0-5.json")) ) _dump(blocks) result = duplcheck.duplicate_head(blocks) @@ -86,18 +88,14 @@ def test_duplicate_tail(mocker): result : [0] , [2] , [4] """ - def load_chatdata(filename): - return parser.parse( - json.loads(_open_file("tests/testdata/dl_duplcheck/head/"+filename)) - )[1] blocks = ( - Block(0, 0, 2500, "",load_chatdata("dp0-0.json")), - Block(1, 1500, 2500, "",load_chatdata("dp0-1.json")), - Block(2, 10000, 45146, "",load_chatdata("dp0-2.json")), - Block(3, 20244, 45146, "",load_chatdata("dp0-3.json")), - Block(4, 20244, 62875, "",load_chatdata("dp0-4.json")), - Block(5, 52568, 62875, "",load_chatdata("dp0-5.json")) + Block(first = 0,last = 2500, chat_data=load_chatdata("dp0-0.json")), + Block(first = 1500,last = 2500, chat_data=load_chatdata("dp0-1.json")), + Block(first = 10000,last = 45146, chat_data=load_chatdata("dp0-2.json")), + Block(first = 20244,last = 45146, chat_data=load_chatdata("dp0-3.json")), + Block(first = 20244,last = 62875, chat_data=load_chatdata("dp0-4.json")), + Block(first = 52568,last = 62875, chat_data=load_chatdata("dp0-5.json")) ) result = duplcheck.duplicate_tail(blocks)