From 04aedc82e8f9f9754663b8d0fe9bab926d8fff78 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Sat, 1 Feb 2020 21:08:27 +0900 Subject: [PATCH] Divide modules --- pytchat/tool/dlworker.py | 28 ++++++ pytchat/tool/downloader.py | 177 +++++++------------------------------ pytchat/tool/duplcheck.py | 105 ++++++++++++++++++++++ pytchat/tool/parser.py | 11 +-- 4 files changed, 171 insertions(+), 150 deletions(-) create mode 100644 pytchat/tool/dlworker.py create mode 100644 pytchat/tool/duplcheck.py diff --git a/pytchat/tool/dlworker.py b/pytchat/tool/dlworker.py new file mode 100644 index 0000000..9866323 --- /dev/null +++ b/pytchat/tool/dlworker.py @@ -0,0 +1,28 @@ +from . import parser + +class DownloadWorker: + def __init__(self, dl, block, blocklist): + self.block = block + self.blocklist = blocklist + self.dl = dl + + async def run(self,session): + temp_last = self.block.temp_last + self.block.chat_data, continuation = self.cut( + self.block.chat_data, + self.block.continuation, + self.block.last, + temp_last ) + while continuation: + data, cont, fetched_last = await self.dl(continuation, session) + data, continuation = self.cut(data, cont, fetched_last, temp_last) + self.block.chat_data.extend(data) + + def cut(self, data, cont, fetched_last, temp_last): + if fetched_last < temp_last or temp_last == -1: + return data, cont + for i, line in enumerate(data): + line_offset = parser.get_offset(line) + if line_offset >= temp_last: + self.block.last = line_offset + return data[:i], None \ No newline at end of file diff --git a/pytchat/tool/downloader.py b/pytchat/tool/downloader.py index a6cfbf9..10df333 100644 --- a/pytchat/tool/downloader.py +++ b/pytchat/tool/downloader.py @@ -3,13 +3,15 @@ import aiohttp import json import traceback from urllib.parse import quote - from . import parser from . import videoinfo +from . dlworker import DownloadWorker +from . duplcheck import duplicate_head, duplicate_tail, overwrap from .. import config from .. import util from .. paramgen import arcparam from ..exceptions import InvalidVideoIdException + logger = config.logger(__name__) headers=config.headers @@ -91,27 +93,7 @@ class Downloader: return self def remove_duplicate_head(self): - blocks = self.blocks - - def is_same_offset(index): - return (blocks[index].first == blocks[index+1].first) - - def is_same_id(index): - id_0 = parser.get_id(blocks[index].chat_data[0]) - id_1 = parser.get_id(blocks[index+1].chat_data[0]) - return (id_0 == id_1) - - def is_same_type(index): - type_0 = parser.get_type(blocks[index].chat_data[0]) - type_1 = parser.get_type(blocks[index+1].chat_data[0]) - return (type_0 == type_1) - - ret = [] - [ret.append(blocks[i]) for i in range(len(blocks)-1) - if (len(blocks[i].chat_data)>0 and - not ( is_same_offset(i) and is_same_id(i) and is_same_type(i)))] - ret.append(blocks[-1]) - self.blocks = ret + self.blocks = duplicate_head(self.blocks) return self def set_temporary_last(self): @@ -121,118 +103,49 @@ class Downloader: return self def remove_overwrap(self): - blocks = self.blocks - if len(blocks) == 1 : return self - - ret = [] - a = 0 - b = 1 - jmp = False - ret.append(blocks[0]) - while a < len(blocks)-2: - while blocks[a].last > blocks[b].first: - b+=1 - if b == len(blocks)-1: - jmp = True - break - if jmp: break - if b-a == 1: - a = b - else: - a = b-1 - ret.append(blocks[a]) - b = a+1 - - ret.append(blocks[-1]) - self.blocks = ret + self.blocks = overwrap(self.blocks) return self def download_blocks(self): loop = asyncio.get_event_loop() - loop.run_until_complete(self._dl_allocate()) + loop.run_until_complete(self._dl_distribute()) return self - async def _dl_allocate(self): - tasks = [] + async def _dl_distribute(self): + workers = [ + DownloadWorker( + dl=self.dl_func, + block = block, + blocklist= self.blocks + ) + for pos,block in enumerate(self.blocks) + ] async with aiohttp.ClientSession() as session: - tasks = [self._dl_task(session, block) for block in self.blocks] + tasks = [worker.run(session) for worker in workers] return await asyncio.gather(*tasks,return_exceptions=True) - async def _dl_task(self, session, block:Block): - if (block.temp_last != -1 and - block.last > block.temp_last): - return - continuation = block.continuation - while continuation: - url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" - async with session.get(url,headers = config.headers) as resp: - text = await resp.text() - continuation, actions = parser.parse(json.loads(text)) - if actions: - block.chat_data.extend(actions) - last = parser.get_offset(actions[-1]) - first = parser.get_offset(actions[0]) - if self.callback: - self.callback(actions,last-first) - if block.temp_last != -1: - if last > block.temp_last: - block.last = last - break - else: - block.last = last - + async def dl_func(self,continuation,session): + url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" + async with session.get(url,headers = config.headers) as resp: + text = await resp.text() + continuation, actions = parser.parse(json.loads(text)) + if actions: + last = parser.get_offset(actions[-1]) + first = parser.get_offset(actions[0]) + if self.callback: + self.callback(actions,last-first) + return actions,continuation,last + return continuation, [], None + def remove_duplicate_tail(self): - blocks = self.blocks - if len(blocks) == 1 : return self - - def is_same_offset(index): - return (blocks[index-1].last == blocks[index].last) - - def is_same_id(index): - id_0 = parser.get_id(blocks[index-1].chat_data[-1]) - id_1 = parser.get_id(blocks[index].chat_data[-1]) - return (id_0 == id_1) - - def is_same_type(index): - type_0 = parser.get_type(blocks[index-1].chat_data[-1]) - type_1 = parser.get_type(blocks[index].chat_data[-1]) - return (type_0 == type_1) - - ret = [] - ret.append(blocks[0]) - [ret.append(blocks[i]) for i in range(1,len(blocks)-1) - if not ( is_same_offset(i) and is_same_id(i) and is_same_type(i) )] - ret.append(self.blocks[-1]) - self.blocks = ret + self.blocks = duplicate_tail(self.blocks) return self def combine(self): - line = '' - try: - if len(self.blocks[0].chat_data)>0: - lastline=self.blocks[0].chat_data[-1] - lastline_offset = parser.get_offset(lastline) - else: return None - for i in range(1,len(self.blocks)): - f=self.blocks[i].chat_data - if len(f)==0: - logger.error(f'zero size piece.:{str(i)}') - continue - for row in range(len(f)): - line = f[row] - if parser.get_offset(line) > lastline_offset: - self.blocks[0].chat_data.extend(f[row:]) - break - else: - logger.error( - f'Missing connection.: pos:{str(i-1)}->{str(i)}' - f' lastline_offset= {lastline_offset}') - lastline_offset = parser.get_offset( f[-1]) - return self.blocks[0].chat_data - except Exception as e: - logger.error(f"{type(e)} {str(e)} {line}") - traceback.print_exc() - + ret = [] + for block in self.blocks: + ret.extend(block.chat_data) + return ret def download(self): return ( @@ -245,28 +158,6 @@ class Downloader: .combine() ) -def check_duplicate(blocks): - - def is_same_offset(index): - offset_0 = parser.get_offset(blocks[index]) - offset_1 = parser.get_offset(blocks[index+1]) - return (offset_0 == offset_1) - - def is_same_id(index): - id_0 = parser.get_id(blocks[index]) - id_1 = parser.get_id(blocks[index+1]) - return (id_0 == id_1) - - def is_same_type(index): - type_0 = parser.get_type(blocks[index]) - type_1 = parser.get_type(blocks[index+1]) - return (type_0 == type_1) - - ret =[] - for i in range(len(blocks)-1): - if ( is_same_offset(i) and is_same_id(i) and is_same_type(i) ): - ret.append(blocks[i]) - return ret def download(video_id, div = 20, callback=None, processor = None): duration = 0 diff --git a/pytchat/tool/duplcheck.py b/pytchat/tool/duplcheck.py new file mode 100644 index 0000000..ff5dca9 --- /dev/null +++ b/pytchat/tool/duplcheck.py @@ -0,0 +1,105 @@ +from . import parser + +def check_duplicate(chatdata): + max_range = len(chatdata)-1 + tbl_offset = [None] * max_range + tbl_id =[None] * max_range + tbl_type=[None] * max_range + + def create_table(chatdata,max_range): + for i in range(max_range): + tbl_offset[i] = parser.get_offset(chatdata[i]) + tbl_id[i] = parser.get_id(chatdata[i]) + tbl_type[i] = parser.get_type(chatdata[i]) + + def is_duplicate(i,j): + return ( + tbl_offset[i] == tbl_offset[j] + and + tbl_id[i] == tbl_id[j] + and + tbl_type[i] == tbl_type[j] + ) + + print("creating table...") + create_table(chatdata,max_range) + print("searching duplicate data...") + + return [{ "i":{ + "index" : i, "id" : parser.get_id(chatdata[i]), + "offsetTime" : parser.get_offset(chatdata[i]) + }, + "j":{ + "index" : j, "id" : parser.get_id(chatdata[j]), + "offsetTime" : parser.get_offset(chatdata[j]) + } + } + for i in range(max_range) for j in range(i+1,max_range) + if is_duplicate(i,j)] + +def duplicate_head(blocks): + if len(blocks) == 1 : return blocks + + def is_duplicate_head(index): + id_0 = parser.get_id(blocks[index].chat_data[0]) + id_1 = parser.get_id(blocks[index+1].chat_data[0]) + type_0 = parser.get_type(blocks[index].chat_data[0]) + type_1 = parser.get_type(blocks[index+1].chat_data[0]) + return ( + blocks[index].first == blocks[index+1].first + and + id_0 == id_1 + and + type_0 == type_1 + ) + + ret = [blocks[i] for i in range(len(blocks)-1) + if (len(blocks[i].chat_data)>0 and + not is_duplicate_head(i) )] + ret.append(blocks[-1]) + return ret + +def duplicate_tail(blocks): + if len(blocks) == 1 : return blocks + + def is_duplicate_tail(index): + id_0 = parser.get_id(blocks[index-1].chat_data[-1]) + id_1 = parser.get_id(blocks[index].chat_data[-1]) + type_0 = parser.get_type(blocks[index-1].chat_data[-1]) + type_1 = parser.get_type(blocks[index].chat_data[-1]) + return ( + blocks[index-1].last == blocks[index].last + and + id_0 == id_1 + and + type_0 == type_1 + ) + + ret = [blocks[i] for i in range(0,len(blocks)-1) + if i == 0 or not is_duplicate_tail(i) ] + ret.append(blocks[-1]) + return ret + +def overwrap(blocks): + if len(blocks) == 1 : return blocks + + ret = [] + a = 0 + b = 1 + jmp = False + ret.append(blocks[0]) + while a < len(blocks)-2: + while blocks[a].last > blocks[b].first: + b+=1 + if b == len(blocks)-1: + jmp = True + break + if jmp: break + if b-a == 1: + a = b + else: + a = b-1 + ret.append(blocks[a]) + b = a+1 + ret.append(blocks[-1]) + return ret diff --git a/pytchat/tool/parser.py b/pytchat/tool/parser.py index 8b046bd..c9dbfcb 100644 --- a/pytchat/tool/parser.py +++ b/pytchat/tool/parser.py @@ -5,7 +5,6 @@ from .. exceptions import ( NoContentsException, NoContinuationsException ) - logger = config.logger(__name__) def parse(jsn): @@ -27,7 +26,6 @@ def parse(jsn): raise ResponseContextError('動画に接続できません。' '動画IDが間違っているか、動画が削除/非公開の可能性があります。') contents=jsn['response'].get('continuationContents') - #配信が終了した場合、もしくはチャットデータが取得できない場合 if contents is None: raise NoContentsException('チャットデータを取得できませんでした。') @@ -41,17 +39,16 @@ def parse(jsn): return continuation, actions return None, [] - # if actions is None: - # return {"continuation":None,"chatdata":[]} - def get_offset(item): return int(item['replayChatItemAction']["videoOffsetTimeMsec"]) def get_id(item): - return list((list(item['replayChatItemAction']["actions"][0].values())[0])['item'].values())[0].get('id') + return list((list(item['replayChatItemAction']["actions"][0].values() + )[0])['item'].values())[0].get('id') def get_type(item): - return list((list(item['replayChatItemAction']["actions"][0].values())[0])['item'].keys())[0] + return list((list(item['replayChatItemAction']["actions"][0].values() + )[0])['item'].keys())[0]