From 9d494446e1ceedc7c316793068494a37e5c6fb5a Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 23 Jan 2020 02:00:50 +0900 Subject: [PATCH] Implement base downloader --- error.json | 0 pytchat/downloader/__init__.py | 0 pytchat/downloader/dictquery.py | 43 +++++ pytchat/downloader/downloader.py | 287 +++++++++++++++++++++++++++++++ 4 files changed, 330 insertions(+) create mode 100644 error.json create mode 100644 pytchat/downloader/__init__.py create mode 100644 pytchat/downloader/dictquery.py create mode 100644 pytchat/downloader/downloader.py diff --git a/error.json b/error.json new file mode 100644 index 0000000..e69de29 diff --git a/pytchat/downloader/__init__.py b/pytchat/downloader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pytchat/downloader/dictquery.py b/pytchat/downloader/dictquery.py new file mode 100644 index 0000000..9e7068c --- /dev/null +++ b/pytchat/downloader/dictquery.py @@ -0,0 +1,43 @@ +class DictQuery(dict): + def get(self, path, default = None): + keys = path.split("/") + val = None + for key in keys: + if val: + if key.isdecimal(): + if isinstance(val,list) and len(val) > int(key): + #val=val[int(key)] + val=list(val)[int(key)] + else:return default + elif isinstance(val, dict): + val = val.get(key, default) + else: + return default + else: + val = dict.get(self, key, default) + + return val + +def find(target,**kwargs): + for key in kwargs.keys(): + if key == target: + return kwargs[key] + if isinstance(kwargs[key], dict): + res = find(target,**kwargs[key]) + elif isinstance(kwargs[key], list): + for item in kwargs[key]: + res = find(target,**item) + try: + return res + except UnboundLocalError: + return None + +def getid_replay(item): + return list((list(item['replayChatItemAction']["actions"][0].values())[0])['item'].values())[0]['id'] + +def getid_realtime(item): + return list((list(item.values())[0])['item'].values())[0]['id'] + + +def get_timestamp_realtime(item): + return list((list(item.values())[0])['item'].values())[0]['timestampUsec'] diff --git a/pytchat/downloader/downloader.py b/pytchat/downloader/downloader.py new file mode 100644 index 0000000..c95300f --- /dev/null +++ b/pytchat/downloader/downloader.py @@ -0,0 +1,287 @@ +import asyncio +import aiohttp,async_timeout +import json +from tqdm import tqdm +import traceback,time +from requests.exceptions import ConnectionError +from urllib.parse import quote +from multiprocessing import Process, Queue + +from . import dictquery +from .. import config +from .. paramgen import arcparam + + + + +logger = config.logger(__name__) + +REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \ + "get_live_chat_replay?continuation=" + +async def _dl_piece(_session,queue,movie_id,offset_ms,duration_ms,pbar_pos,is_lastpiece,dl_end): + #print(f'_dl_piece:{movie_id},{offset_ms},{duration_ms},{pbar_pos},{is_lastpiece}') + + chat_data=[] + if pbar_pos == 0: + #continue_url = construct.encoder.encode(construct.construct_seekinit(movie_id,filter='all')) + continuation = arcparam.getparam(movie_id,-1) + else: + continuation = arcparam.getparam(movie_id,offset_ms/1000) + next_url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" + + #print(pbar_pos,next_url) + chat_data = await _dl(session=_session, + queue = queue, + next_url =next_url, + absolute_start = offset_ms, + duration = duration_ms, + pbar_pos = pbar_pos, + is_lastpiece = is_lastpiece, + dl_end = dl_end) + return chat_data + +def get_init_offset_ms(dics: dict): + n = 0 + while(True): + init_offset_ms = dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][n].get("replayChatItemAction")['videoOffsetTimeMsec'] + if init_offset_ms is None: + n += 1 + continue + else: + return int(init_offset_ms) + +def get_last_offset_ms(dics: dict): + m = -1 + while(True): + last_offset_ms = dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][m].get("replayChatItemAction")['videoOffsetTimeMsec'] + if last_offset_ms is None: + m -= 1 + continue + else: + return int(last_offset_ms) + + +async def _dl(session,queue, next_url, absolute_start,duration,pbar_pos,is_lastpiece,dl_end): + async with async_timeout.timeout(1000): + chat_data = [] + #print('absolute',absolute_start,'duration',duration,'pos',pbar_pos) + dlerror=False + first = True + rqerr=0 + jserr=0 + while(True): + + try: + #json形式のchatデータのダウンロードと読み込み + async with session.get(next_url,headers=config.headers) as response: + text = await response.text() + dics = json.loads(text) + + continuation = dics["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["liveChatReplayContinuationData"]["continuation"] + #次のlive_chat_replayのurl + next_url =f"{REPLAY_URL}{continuation}&pbj=1" + + init_offset_ms = get_init_offset_ms(dics) + last_offset_ms = get_last_offset_ms(dics) + length_ms = last_offset_ms - init_offset_ms + #print(f'[{pbar_pos}] length_ms = {length_ms}, total={last_offset_ms}') + if length_ms < 0: + raise Exception('length_ms < 0') + queue.put(length_ms) + if first: + #save(pbar_pos,'FIRST',init_offset_ms,last_offset_ms,dics)############################## + if pbar_pos > 0: + #print(f'Reset dl_end[{pbar_pos - 1}]:{dl_end[pbar_pos - 1]} -> {init_offset_ms} ({init_offset_ms-dl_end[pbar_pos - 1]})') + dl_end[pbar_pos - 1] = init_offset_ms + first = False + + chat_data.extend(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"]) + #print(chat_data) + if (last_offset_ms >= dl_end[pbar_pos]) and not(is_lastpiece): + #save(pbar_pos,'LAST ',init_offset_ms,last_offset_ms,dics)############################### + #print(f'break:pbar_pos ={pbar_pos}') + queue.put('quit') + break + + # next_urlが入手できなくなったら終わり + except KeyError: + queue.put('quit') + break + # JSONDecodeErrorが発生した場合はデータを取得しなおす。 + except json.decoder.JSONDecodeError: + time.sleep(1) + jserr+=1 + if jserr<20: + continue + else: + logger.error('JSONDecodeError at piece %d' % (pbar_pos)) + queue.put(quit) + dlerror = True + break + except ConnectionError: + time.sleep(1) + rqerr+=1 + if rqerr<20: + continue + else: + logger.error('ConnectionError at piece %d' % (pbar_pos)) + queue.put(quit) + dlerror = True + break + #except KeyboardInterrupt: + # pass + except UnicodeDecodeError as e: + logger.error(f"{type(e)}, {str(e)}") + logger.error(f"{str(e.object)}") + with open('unicodeerror.txt', mode ="w", encoding='utf-8') as f: + f.writelines(str(e.object)) + break + except: + logger.error('\n不明なエラーが発生しました%d at:' % (pbar_pos)) + logger.error('%s\n' % (next_url)) + traceback.print_exc() + try: + with open('error.json', mode ="w", encoding='utf-8') as f: + f.writelines(text) + except UnboundLocalError as ule: + pass + queue.put('quit') + dlerror = True + break + #session.close() + if dlerror: + return 'error' + else: + return chat_data + + +def _debug_save(_pbar_pos,prefix,init_offset_ms,last_offset_ms,dics): + ''' + 例外が発生したときのチャットデータを保存する。 + ''' + chat_data =[] + init = '{:0>8}'.format(str(init_offset_ms)) + last = '{:0>8}'.format(str(last_offset_ms)) + chat_data.extend(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"]) + + with open(f"[{_pbar_pos}]-{prefix}-from_{init}_to_{last}.data",mode ='w',encoding='utf-8') as f: + f.writelines(chat_data) + +def _debug_chatblock(): + pass + + +async def _asyncdl(argslist): + promises=[] + async with aiohttp.ClientSession() as session: + promises = [_dl_piece(session,*args) for args in argslist] + return await asyncio.gather(*promises) + +def _listener(q,duration,div): + duration_ms =int(duration/1000) + ret = int(div) + pbar = tqdm(total = duration_ms, ncols=80,unit_scale = 1, + bar_format='{desc}{percentage:3.0f}%|{bar}|[{n_fmt:>7}/{total_fmt}]{elapsed}<{remaining}') + #Noneを見つけるまでgetし続ける。 + + for item in iter(q.get, None): + if(item=='quit'): + ret=ret-1 + if(ret==0): + if duration_ms>0: + pbar.update(duration_ms) + pbar.close() + else: + item =int(item/1000) + if duration_ms - item >= 0 and item >= 0: + duration_ms -= item + pbar.update(item) + + +def _combine(chatblocks): + ''' + 分割DLしたチャットデータを結合する + 1番目の固まり(chatblocks[0])に順次結合していく。 + ''' + line='' + try: + if len(chatblocks[0])>0: + lastline=chatblocks[0][-1] + #lastline_id = dictquery.getid_replay(json.loads(lastline)) + lastline_id = dictquery.getid_replay(lastline) + else: return None + for i in range(1,len(chatblocks)): + f=chatblocks[i] + if len(f)==0: + logger.error(f'zero size piece.:{str(i)}') + continue + #チャットデータの行を最初から走査して直前のデータの末尾との共通行を探す + for row in range(len(f)): + #row行目のデータ + line = f[row] + #末尾が直前のデータの末尾行と等しい(ダウンロードタイミングが異なると + #trackingParamsが一致しないためidで判定) + #if dictquery.getid_replay(json.loads(line)) == lastline_id: + if dictquery.getid_replay(line) == lastline_id: + #共通行が見つかったので、共通行以降を結合する + #print(f'[{i}][{row}]Find common line {lastline_id}') + chatblocks[0].extend(f[row+1:]) + break + if line =='error': + logger.error(f'Error file was saved.: piece:{str(i)}') + return['error'] + else:#forの途中でbreakが発生しなかった場合ここに飛ぶ + #ファイルの結合点(共通ライン)の発見に失敗 + logger.error(f'Missing common line.: piece:{str(i-1)}->{str(i)} lastline_id= {lastline_id}') + return ['combination failed']#---------------------------------test + #最終行のデータを更新する + lastline = f[-1] + #dic_lastline=json.loads(lastline) + dic_lastline=lastline + lastline_id = dictquery.getid_replay(dic_lastline) + #print(f'[{i}]lastline_id:{lastline_id}') + + return chatblocks[0] + except Exception as e: + logger.error(f"{type(e)} {str(e)} {line}") + traceback.print_exc() + # p= json.loads(line) + # with open('v:/~~/error_dic.json',mode ='w',encoding='utf-8') as f: + # f.write(line) + + +def download(movie_id, duration, divisions): + #動画の長さ(ミリ秒) + duration_ms=duration*1000 + #分割DL数 + div = divisions + #プログレスバー用のQueue + queue= Queue() + #プログレスバー用のプロセス + proc = Process(target=_listener,args=(queue,duration_ms,div)) + proc.start() + #チャットデータの分割間隔(ミリ秒) + term_ms = int(duration_ms / div) + #チャットデータの取得開始時間 + start_ms = 0 + #分割したピースが最後かどうか + is_lastpiece = False + argslist=[] + dl_end=[] + #分割DL用の引数を用意 + for i in range(0,div): + if i==div-1: + is_lastpiece =True + args = (queue, movie_id, start_ms, term_ms, i, is_lastpiece,dl_end) + argslist.append(args) + start_ms+=term_ms + dl_end.append(start_ms) + + loop = asyncio.get_event_loop() + chatlist =loop.run_until_complete(_asyncdl(argslist)) + #プログレスバーのプロセスを終了させるためQueueにNoneを送る + queue.put(None) + #分割DLされたチャットデータを結合して返す + return _combine(chatlist) + \ No newline at end of file