From 540f16c1a02a66197224ae4856fffee3a59670c3 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Sun, 26 Jan 2020 12:08:10 +0900 Subject: [PATCH] Make it possible to use method chain --- pytchat/downloader/dictquery.py | 48 --- pytchat/downloader/downloader.py | 479 ++++++++++++--------------- pytchat/downloader/downloader_old.py | 291 ---------------- 3 files changed, 217 insertions(+), 601 deletions(-) delete mode 100644 pytchat/downloader/dictquery.py delete mode 100644 pytchat/downloader/downloader_old.py diff --git a/pytchat/downloader/dictquery.py b/pytchat/downloader/dictquery.py deleted file mode 100644 index ae44d6b..0000000 --- a/pytchat/downloader/dictquery.py +++ /dev/null @@ -1,48 +0,0 @@ -class DictQuery(dict): - def get(self, path, default = None): - keys = path.split("/") - val = None - for key in keys: - if val: - if key.isdecimal(): - if isinstance(val,list) and len(val) > int(key): - #val=val[int(key)] - val=list(val)[int(key)] - else:return default - elif isinstance(val, dict): - val = val.get(key, default) - else: - return default - else: - val = dict.get(self, key, default) - - return val - -def find(target,**kwargs): - for key in kwargs.keys(): - if key == target: - return kwargs[key] - if isinstance(kwargs[key], dict): - res = find(target,**kwargs[key]) - elif isinstance(kwargs[key], list): - for item in kwargs[key]: - res = find(target,**item) - try: - return res - except UnboundLocalError: - return None - -def getid_replay(item): - return list((list(item['replayChatItemAction']["actions"][0].values())[0])['item'].values())[0]['id'] - -def getid_realtime(item): - return list((list(item.values())[0])['item'].values())[0]['id'] - - -def get_timestamp_realtime(item): - return list((list(item.values())[0])['item'].values())[0]['timestampUsec'] - - -def get_offsettime(item): - #return item['replayChatItemAction']["actions"][0]["videoOffsetTimeMsec"] - return item['replayChatItemAction']["videoOffsetTimeMsec"] \ No newline at end of file diff --git a/pytchat/downloader/downloader.py b/pytchat/downloader/downloader.py index 7257759..2d32bb1 100644 --- a/pytchat/downloader/downloader.py +++ b/pytchat/downloader/downloader.py @@ -1,7 +1,7 @@ import asyncio -import aiohttp,async_timeout +import aiohttp import json -import traceback,time +import traceback from urllib.parse import quote from . import parser @@ -14,276 +14,231 @@ headers=config.headers REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \ "get_live_chat_replay?continuation=" - - class Block: - def __init__(self, pos=0, init_offset=0, last_offset=0, - continuation='', chat_data=[]): + def __init__(self, pos=0, first=0, last=0, + continuation='', chat_data=[]): self.pos = pos - self.init_offset = init_offset - self.last_offset = last_offset - self.stop_offset = 0 + self.first = first + self.last = last + self.temp_last = 0 self.continuation = continuation self.chat_data = chat_data -def _debug_save(_pbar_pos,prefix,init_offset_ms,last_offset_ms,dics): - chat_data =[] - init = '{:0>8}'.format(str(init_offset_ms)) - last = '{:0>8}'.format(str(last_offset_ms)) - chat_data.extend(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"]) +class Downloader: + def __init__(self, video_id,duration,div): + self.video_id = video_id + self.duration = duration + self.div = div + self.blocks = [] + + + def ready_blocks(self): + if self.div <= 0: raise ValueError + + def _divide(start, end, count): + min_interval = 120 + if (not isinstance(start,int) or + not isinstance(end,int) or + not isinstance(count,int)): + raise ValueError("start/end/count must be int") + if start>end: + raise ValueError("end must be equal to or greater than start.") + if count<1: + raise ValueError("count must be equal to or greater than 1.") + if (end-start)/count < min_interval: + count = int((end-start)/min_interval) + if count == 0 : count = 1 + interval= (end-start)/count + + if count == 1: + return [start] + return sorted(list(set([int(start+interval*j) + for j in range(count) ]))) + + async def _get_blocks(duration,div): + async with aiohttp.ClientSession() as session: + futures = [_create_block(session, pos, seektime) + for pos, seektime in enumerate(_divide(-1, duration , div))] + return await asyncio.gather(*futures,return_exceptions=True) + + async def _create_block(session, pos, seektime): + continuation = arcparam.getparam(self.video_id, seektime = seektime) + url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" + async with session.get(url, headers = headers) as resp: + text = await resp.text() + next_continuation, actions = parser.parse(json.loads(text)) + return Block( + pos = pos, + continuation = next_continuation, + chat_data = actions, + first = parser.get_offset(actions[0]), + last = parser.get_offset(actions[-1]) + ) + + + loop = asyncio.get_event_loop() + self.blocks = loop.run_until_complete(_get_blocks(self.duration, self.div)) + return self + + def remove_duplicate_head(self): + blocks = self.blocks + + def is_same_offset(index): + return (blocks[index].first == blocks[index+1].first) + + def is_same_id(index): + id_0 = parser.get_id(blocks[index].chat_data[0]) + id_1 = parser.get_id(blocks[index+1].chat_data[0]) + return (id_0 == id_1) + + def is_same_type(index): + type_0 = parser.get_type(blocks[index].chat_data[0]) + type_1 = parser.get_type(blocks[index+1].chat_data[0]) + return (type_0 == type_1) + + ret = [] + [ret.append(blocks[i]) for i in range(len(blocks)-1) + if (len(blocks[i].chat_data)>0 and + not ( is_same_offset(i) and is_same_id(i) and is_same_type(i)))] + ret.append(blocks[-1]) + self.blocks = ret + return self + + + def set_temporary_last(self): + for i in range(len(self.blocks)-1): + self.blocks[i].temp_last = self.blocks[i+1].first + self.blocks[-1].temp_last = -1 + return self + + def remove_overwrap(self): + blocks = self.blocks + if len(blocks) == 1 : return self + + def is_overwrap(a, b): + return (blocks[a].last > blocks[b].first) - with open(f"[{_pbar_pos}]-{prefix}-from_{init}_to_{last}.data",mode ='w',encoding='utf-8') as f: - f.writelines(chat_data) + ret = [] + a = 0 + b = 1 + jmp = False + ret.append(blocks[0]) + while a < len(blocks)-2: - -def dump(o): - for key, value in o.__dict__.items(): - if key != "chat_data": - print(key, ':', value) - -def dumpt(blocks,mes = None): - print(f"{'-'*40}\n{mes}") - [print(f"pos:{b.pos:>2} |init:{b.init_offset: >12,} |last:{b.last_offset: >12,} |stop:{b.stop_offset :>12,} ") - for b in blocks] - -def _divide_(start, end, count): - min_interval = 120 - if (not isinstance(start,int) or - not isinstance(end,int) or - not isinstance(count,int)): - raise ValueError("start/end/count must be int") - if start>end: - raise ValueError("end must be equal to or greater than start.") - if count<1: - raise ValueError("count must be equal to or greater than 1.") - if (end-start)/count < min_interval: - count = int((end-start)/min_interval) - if count == 0 : count = 1 - interval= (end-start)/count - - if count == 1: - return [start] - return sorted(list(set([int(start+interval*j) - for j in range(count) ]))) - - -def ready_blocks(video_id:str, div:int, duration:int): - if div <= 0: raise ValueError - - def _divide(start, end, count): - if (not isinstance(start,int) or - not isinstance(end,int) or - not isinstance(count,int)): - raise ValueError("start/end/count must be int") - if start>end: - raise ValueError("end must be equal to or greater than start.") - if count<1: - raise ValueError("count must be equal to or greater than 1.") - - interval= (end-start)/(count) - if interval < 120 : - interval=120 - count = int((end-start)/interval)+1 - if count == 1: - return [start] - return sorted(list(set([int(start+interval*j) - if j < count else end - for j in range(count) ]))) - - async def _get_blocks(duration,div): - async with aiohttp.ClientSession() as session: - futures = [_create_block(session, pos, seektime) - for pos, seektime in enumerate(_divide(-1, duration , div))] - return await asyncio.gather(*futures,return_exceptions=True) - - async def _create_block(session, pos, seektime): - continuation = arcparam.getparam(video_id,seektime=seektime) - url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" - - async with session.get(url,headers = headers) as resp: - text = await resp.text() - #util.save(text,f"v:/~~/pre_{pos}_",".json") - next_continuation, actions = parser.parse(json.loads(text)) - block = Block( - pos = pos, - continuation = next_continuation, - chat_data = actions, - init_offset = parser.get_offset(actions[0]), - last_offset = parser.get_offset(actions[-1]) - ) - return block - - blocks=[] - loop = asyncio.get_event_loop() - blocks = loop.run_until_complete(_get_blocks(duration,div)) - return blocks - -def remove_duplicate_head(blocks): - def is_same_offset(index): - return (blocks[index].init_offset == blocks[index+1].init_offset) - - def is_same_id(index): - id_0 = parser.get_id(blocks[index].chat_data[0]) - id_1 = parser.get_id(blocks[index+1].chat_data[0]) - return (id_0 == id_1) - - def is_same_type(index): - type_0 = parser.get_type(blocks[index].chat_data[0]) - type_1 = parser.get_type(blocks[index+1].chat_data[0]) - return (type_0 == type_1) - - ret = [] - [ret.append(blocks[i]) for i in range(len(blocks)-1) - if (len(blocks[i].chat_data)>0 and - not ( is_same_offset(i) and is_same_id(i) and is_same_type(i)))] - ret.append(blocks[-1]) - blocks = None - return ret - -def remove_overwrap(blocks): - def is_overwrap(a, b): - print(f"comparing({a}, {b})....overwrap ({(blocks[a].last_offset > blocks[b].init_offset)})") - return (blocks[a].last_offset > blocks[b].init_offset) - - ret = [] - a = 0 - b=1 - jmp = False - ret.append(blocks[0]) - while a < len(blocks)-2: - - while is_overwrap(a,b): - b+=1 - print("forward") - if b == len(blocks)-2: - jmp=True - break - if jmp: break - if b-a == 1: - print(f"next ret.append(blocks[{b}]") - ret.append(blocks[b]) - a = b - b+=1 - - continue - else: - print(f"apart ret.append(blocks[{b-1}]") - ret.append(blocks[b-1]) - a=b-1 - b=a+1 - ret.append(blocks[-1]) - return ret - -def remove_duplicate_tail(blocks): - def is_same_offset(index): - return blocks[index-1].init_offset == blocks[index].init_offset - - def is_same_id(index): - id_0 = parser.get_id(blocks[index-1].chat_data[-1]) - id_1 = parser.get_id(blocks[index].chat_data[-1]) - return id_0 == id_1 - - def is_same_type(index): - type_0 = parser.get_type(blocks[index-1].chat_data[-1]) - type_1 = parser.get_type(blocks[index].chat_data[-1]) - return type_0 == type_1 - - ret = [] - [ret.append(blocks[i]) for i in range(len(blocks)-1) - if not ( is_same_offset(i) and is_same_id(i) and is_same_type(i) )] - ret.append(blocks[-1]) - blocks = None - return ret - - -def set_stop_offset(blocks): - for i in range(len(blocks)-1): - blocks[i].stop_offset = blocks[i+1].init_offset - blocks[-1].stop_offset = -1 - return blocks - - -def download_each_block(blocks): - loop = asyncio.get_event_loop() - return loop.run_until_complete(_dl_block(blocks)) - -async def _dl_block(blocks): - futures = [] - async with aiohttp.ClientSession() as session: - futures = [_dl_chunk(session, block) for block in blocks] - return await asyncio.gather(*futures,return_exceptions=True) - -async def _dl_chunk(session, block:Block): - if (block.stop_offset != -1 and - block.last_offset > block.stop_offset): - return - - def get_last_offset(actions): - return parser.get_offset(actions[-1]) - - continuation = block.continuation - while continuation: - print(block.pos) - url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" - async with session.get(url,headers = config.headers) as resp: - text = await resp.text() - continuation, actions = parser.parse(json.loads(text)) - if actions: - block.chat_data.extend(actions) - last_offset = get_last_offset(actions) - if block.stop_offset != -1: - if last_offset > block.stop_offset: - block.last_offset = last_offset + while is_overwrap(a,b): + b+=1 + if b == len(blocks)-1: + jmp=True break - else: - block.last_offset = last_offset - - -def combine(blocks): - line = '' - try: - if len(blocks[0].chat_data)>0: - lastline=blocks[0].chat_data[-1] - lastline_offset = parser.get_offset(lastline) - else: return None - for i in range(1,len(blocks)): - f=blocks[i].chat_data - if len(f)==0: - logger.error(f'zero size piece.:{str(i)}') + if jmp: break + if b-a == 1: + ret.append(blocks[b]) + a = b + b+=1 continue - for row in range(len(f)): - line = f[row] - if parser.get_offset(line) > lastline_offset: - blocks[0].chat_data.extend(f[row:]) - break - if line =='error': - logger.error(f'Error file was saved.: piece:{str(i)}') - return['error'] else: - logger.error(f'Missing common line.: piece:{str(i-1)}->{str(i)} lastline_id= {lastline_offset}') - return ['combination failed'] - lastline_offset = parser.get_offset( f[-1]) - return blocks[0].chat_data - except Exception as e: - logger.error(f"{type(e)} {str(e)} {line}") - traceback.print_exc() - + ret.append(blocks[b-1]) + a=b-1 + b=a+1 + ret.append(blocks[-1]) + self.blocks = ret + return self -def download(video_id, duration, div): - blocks = ready_blocks(video_id=video_id, duration=duration, div=div) - dumpt(blocks,"ready_blocks") + def download_each_block(self): + loop = asyncio.get_event_loop() + loop.run_until_complete(self._dl_block()) + return self - selected = remove_duplicate_head(blocks) - dumpt(selected,"removed duplicate_head") - - - set_stop_offset(selected) - dumpt(selected,"set stop_offset") - #set_stop_offset(selected) - removed = remove_overwrap(selected) - dumpt(removed,"removed overwrap") - - download_each_block(removed) - dumpt(removed,"downloaded each_block") + async def _dl_block(self): + futures = [] + async with aiohttp.ClientSession() as session: + futures = [self._dl_chunk(session, block) for block in self.blocks] + return await asyncio.gather(*futures,return_exceptions=True) - return combine(removed) \ No newline at end of file + async def _dl_chunk(self, session, block:Block): + if (block.temp_last != -1 and + block.last > block.temp_last): + return + + def get_last_offset(actions): + return parser.get_offset(actions[-1]) + + continuation = block.continuation + while continuation: + url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" + async with session.get(url,headers = config.headers) as resp: + text = await resp.text() + continuation, actions = parser.parse(json.loads(text)) + if actions: + block.chat_data.extend(actions) + last = get_last_offset(actions) + if block.temp_last != -1: + if last > block.temp_last: + block.last = last + break + else: + block.last = last + + def remove_duplicate_tail(self): + blocks = self.blocks + if len(blocks) == 1 : return self + + def is_same_offset(index): + return (blocks[index-1].last == blocks[index].last) + + def is_same_id(index): + id_0 = parser.get_id(blocks[index-1].chat_data[-1]) + id_1 = parser.get_id(blocks[index].chat_data[-1]) + return (id_0 == id_1) + + def is_same_type(index): + type_0 = parser.get_type(blocks[index-1].chat_data[-1]) + type_1 = parser.get_type(blocks[index].chat_data[-1]) + return (type_0 == type_1) + + ret = [] + ret.append(blocks[0]) + [ret.append(blocks[i]) for i in range(1,len(blocks)-1) + if not ( is_same_offset(i) and is_same_id(i) and is_same_type(i) )] + ret.append(self.blocks[-1]) + self.blocks = ret + return self + + def combine(self): + line = '' + try: + if len(self.blocks[0].chat_data)>0: + lastline=self.blocks[0].chat_data[-1] + lastline_offset = parser.get_offset(lastline) + else: return None + for i in range(1,len(self.blocks)): + f=self.blocks[i].chat_data + if len(f)==0: + logger.error(f'zero size piece.:{str(i)}') + continue + for row in range(len(f)): + line = f[row] + if parser.get_offset(line) > lastline_offset: + self.blocks[0].chat_data.extend(f[row:]) + break + else: + logger.error( + f'Missing connection.: pos:{str(i-1)}->{str(i)}' + f' lastline_offset= {lastline_offset}') + lastline_offset = parser.get_offset( f[-1]) + return self.blocks[0].chat_data + except Exception as e: + logger.error(f"{type(e)} {str(e)} {line}") + traceback.print_exc() + + + def download(self): + return ( + self.ready_blocks() + .remove_duplicate_head() + .set_temporary_last() + .remove_overwrap() + .download_each_block() + .remove_duplicate_tail() + .combine() + ) \ No newline at end of file diff --git a/pytchat/downloader/downloader_old.py b/pytchat/downloader/downloader_old.py deleted file mode 100644 index 1c5c75b..0000000 --- a/pytchat/downloader/downloader_old.py +++ /dev/null @@ -1,291 +0,0 @@ -import asyncio -import aiohttp,async_timeout -import json -from tqdm import tqdm -import traceback,time -from requests.exceptions import ConnectionError -from urllib.parse import quote -from multiprocessing import Process, Queue - -from . import dictquery -from .. import config -from .. paramgen import arcparam -from .. import util - - - -logger = config.logger(__name__) - -REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \ - "get_live_chat_replay?continuation=" - -async def _dl_piece(_session,queue,movie_id,offset_ms,duration_ms,pbar_pos,is_lastpiece,dl_end): - #print(f'_dl_piece:{movie_id},{offset_ms},{duration_ms},{pbar_pos},{is_lastpiece}') - - chat_data=[] - if pbar_pos == 0: - #continue_url = construct.encoder.encode(construct.construct_seekinit(movie_id,filter='all')) - continuation = arcparam.getparam(movie_id,-1) - else: - continuation = arcparam.getparam(movie_id,offset_ms/1000) - next_url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" - - #print(pbar_pos,next_url) - chat_data = await _dl(session=_session, - queue = queue, - next_url =next_url, - absolute_start = offset_ms, - duration = duration_ms, - pbar_pos = pbar_pos, - is_lastpiece = is_lastpiece, - dl_end = dl_end) - return chat_data - -def get_init_offset_ms(dics: dict): - n = 0 - while(True): - init_offset_ms = dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][n].get("replayChatItemAction")['videoOffsetTimeMsec'] - if init_offset_ms is None: - n += 1 - continue - else: - return int(init_offset_ms) - -def get_last_offset_ms(dics: dict): - m = -1 - while(True): - last_offset_ms = dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][m].get("replayChatItemAction")['videoOffsetTimeMsec'] - if last_offset_ms is None: - m -= 1 - continue - else: - return int(last_offset_ms) - - -async def _dl(session,queue, next_url, absolute_start,duration,pbar_pos,is_lastpiece,dl_end): - async with async_timeout.timeout(1000): - chat_data = [] - #print('absolute',absolute_start,'duration',duration,'pos',pbar_pos) - dlerror=False - first = True - rqerr=0 - jserr=0 - while(True): - - try: - #json形式のchatデータのダウンロードと読み込み - async with session.get(next_url,headers=config.headers) as response: - text = await response.text() - util.save(text,"v:/~~/test_",".json") - dics = json.loads(text) - - continuation = dics["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["liveChatReplayContinuationData"]["continuation"] - #次のlive_chat_replayのurl - next_url =f"{REPLAY_URL}{continuation}&pbj=1" - - init_offset_ms = get_init_offset_ms(dics) - last_offset_ms = get_last_offset_ms(dics) - length_ms = last_offset_ms - init_offset_ms - #print(f'[{pbar_pos}] length_ms = {length_ms}, total={last_offset_ms}') - if length_ms < 0: - raise Exception('length_ms < 0') - queue.put(length_ms) - if first: - #print(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][0]) - #with open(str(pbar_pos)+'FIRST') as f: - # f.writelines(dics)############################## - if pbar_pos > 0: - #print(f'Reset dl_end[{pbar_pos - 1}]:{dl_end[pbar_pos - 1]} -> {init_offset_ms} ({init_offset_ms-dl_end[pbar_pos - 1]})') - dl_end[pbar_pos - 1] = init_offset_ms - first = False - #print(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][0]) - chat_data.extend(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"]) - #print(chat_data) - if (last_offset_ms >= dl_end[pbar_pos]) and not(is_lastpiece): - #save(pbar_pos,'LAST ',init_offset_ms,last_offset_ms,dics)############################### - #print(f'break:pbar_pos ={pbar_pos}') - queue.put('quit') - break - - # next_urlが入手できなくなったら終わり - except KeyError: - queue.put('quit') - break - # JSONDecodeErrorが発生した場合はデータを取得しなおす。 - except json.decoder.JSONDecodeError: - time.sleep(1) - jserr+=1 - if jserr<20: - continue - else: - logger.error('JSONDecodeError at piece %d' % (pbar_pos)) - queue.put(quit) - dlerror = True - break - except ConnectionError: - time.sleep(1) - rqerr+=1 - if rqerr<20: - continue - else: - logger.error('ConnectionError at piece %d' % (pbar_pos)) - queue.put(quit) - dlerror = True - break - #except KeyboardInterrupt: - # pass - except UnicodeDecodeError as e: - logger.error(f"{type(e)}, {str(e)}") - logger.error(f"{str(e.object)}") - with open('unicodeerror.txt', mode ="w", encoding='utf-8') as f: - f.writelines(str(e.object)) - break - except: - logger.error('\n不明なエラーが発生しました%d at:' % (pbar_pos)) - logger.error('%s\n' % (next_url)) - traceback.print_exc() - try: - with open('error.json', mode ="w", encoding='utf-8') as f: - f.writelines(text) - except UnboundLocalError as ule: - pass - queue.put('quit') - dlerror = True - break - #session.close() - if dlerror: - return 'error' - else: - return chat_data - - -def _debug_save(_pbar_pos,prefix,init_offset_ms,last_offset_ms,dics): - ''' - 例外が発生したときのチャットデータを保存する。 - ''' - chat_data =[] - init = '{:0>8}'.format(str(init_offset_ms)) - last = '{:0>8}'.format(str(last_offset_ms)) - chat_data.extend(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"]) - - with open(f"[{_pbar_pos}]-{prefix}-from_{init}_to_{last}.data",mode ='w',encoding='utf-8') as f: - f.writelines(chat_data) - -def _debug_chatblock(): - pass - - -async def _asyncdl(argslist): - promises=[] - async with aiohttp.ClientSession() as session: - promises = [_dl_piece(session,*args) for args in argslist] - return await asyncio.gather(*promises) - -def _listener(q,duration,div): - duration_ms =int(duration/1000) - ret = int(div) - pbar = tqdm(total = duration_ms, ncols=80,unit_scale = 1, - bar_format='{desc}{percentage:3.0f}%|{bar}|[{n_fmt:>7}/{total_fmt}]{elapsed}<{remaining}') - #Noneを見つけるまでgetし続ける。 - - for item in iter(q.get, None): - if(item=='quit'): - ret=ret-1 - if(ret==0): - if duration_ms>0: - pbar.update(duration_ms) - pbar.close() - else: - item =int(item/1000) - if duration_ms - item >= 0 and item >= 0: - duration_ms -= item - pbar.update(item) - - -def _combine(chatblocks): - ''' - 分割DLしたチャットデータを結合する - 1番目の固まり(chatblocks[0])に順次結合していく。 - ''' - line='' - try: - if len(chatblocks[0])>0: - lastline=chatblocks[0][-1] - #lastline_id = dictquery.getid_replay(json.loads(lastline)) - lastline_id = dictquery.getid_replay(lastline) - else: return None - for i in range(1,len(chatblocks)): - f=chatblocks[i] - if len(f)==0: - logger.error(f'zero size piece.:{str(i)}') - continue - #チャットデータの行を最初から走査して直前のデータの末尾との共通行を探す - for row in range(len(f)): - #row行目のデータ - line = f[row] - #末尾が直前のデータの末尾行と等しい(ダウンロードタイミングが異なると - #trackingParamsが一致しないためidで判定) - #if dictquery.getid_replay(json.loads(line)) == lastline_id: - if dictquery.getid_replay(line) == lastline_id: - #共通行が見つかったので、共通行以降を結合する - #print(f'[{i}][{row}]Find common line {lastline_id}') - chatblocks[0].extend(f[row+1:]) - break - if line =='error': - logger.error(f'Error file was saved.: piece:{str(i)}') - return['error'] - else:#forの途中でbreakが発生しなかった場合ここに飛ぶ - #ファイルの結合点(共通ライン)の発見に失敗 - logger.error(f'Missing common line.: piece:{str(i-1)}->{str(i)} lastline_id= {lastline_id}') - return ['combination failed']#---------------------------------test - #最終行のデータを更新する - lastline = f[-1] - #dic_lastline=json.loads(lastline) - dic_lastline=lastline - lastline_id = dictquery.getid_replay(dic_lastline) - #print(f'[{i}]lastline_id:{lastline_id}') - print(f"length:{len(chatblocks[0])}") - return chatblocks[0] - except Exception as e: - logger.error(f"{type(e)} {str(e)} {line}") - traceback.print_exc() - # p= json.loads(line) - # with open('v:/~~/error_dic.json',mode ='w',encoding='utf-8') as f: - # f.write(line) - - -def download(movie_id, duration, divisions): - #動画の長さ(ミリ秒) - duration_ms=duration*1000 - #分割DL数 - div = divisions - #プログレスバー用のQueue - queue= Queue() - #プログレスバー用のプロセス - proc = Process(target=_listener,args=(queue,duration_ms,div)) - proc.start() - #チャットデータの分割間隔(ミリ秒) - term_ms = int(duration_ms / div) - #チャットデータの取得開始時間 - start_ms = 0 - #分割したピースが最後かどうか - is_lastpiece = False - argslist=[] - dl_end=[] - #分割DL用の引数を用意 - for i in range(0,div): - if i==div-1: - is_lastpiece =True - args = (queue, movie_id, start_ms, term_ms, i, is_lastpiece,dl_end) - argslist.append(args) - start_ms+=term_ms - dl_end.append(start_ms) - - loop = asyncio.get_event_loop() - chatlist =loop.run_until_complete(_asyncdl(argslist)) - #プログレスバーのプロセスを終了させるためQueueにNoneを送る - queue.put(None) - #分割DLされたチャットデータを結合して返す - - return _combine(chatlist) - \ No newline at end of file