Files
pytchat-fork/pytchat/downloader/downloader.py
2020-01-24 01:14:23 +09:00

290 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import aiohttp,async_timeout
import json
from tqdm import tqdm
import traceback,time
from requests.exceptions import ConnectionError
from urllib.parse import quote
from multiprocessing import Process, Queue
from . import dictquery
from .. import config
from .. paramgen import arcparam
logger = config.logger(__name__)
REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
"get_live_chat_replay?continuation="
async def _dl_piece(_session,queue,movie_id,offset_ms,duration_ms,pbar_pos,is_lastpiece,dl_end):
#print(f'_dl_piece:{movie_id},{offset_ms},{duration_ms},{pbar_pos},{is_lastpiece}')
chat_data=[]
if pbar_pos == 0:
#continue_url = construct.encoder.encode(construct.construct_seekinit(movie_id,filter='all'))
continuation = arcparam.getparam(movie_id,-1)
else:
continuation = arcparam.getparam(movie_id,offset_ms/1000)
next_url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
#print(pbar_pos,next_url)
chat_data = await _dl(session=_session,
queue = queue,
next_url =next_url,
absolute_start = offset_ms,
duration = duration_ms,
pbar_pos = pbar_pos,
is_lastpiece = is_lastpiece,
dl_end = dl_end)
return chat_data
def get_init_offset_ms(dics: dict):
n = 0
while(True):
init_offset_ms = dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][n].get("replayChatItemAction")['videoOffsetTimeMsec']
if init_offset_ms is None:
n += 1
continue
else:
return int(init_offset_ms)
def get_last_offset_ms(dics: dict):
m = -1
while(True):
last_offset_ms = dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][m].get("replayChatItemAction")['videoOffsetTimeMsec']
if last_offset_ms is None:
m -= 1
continue
else:
return int(last_offset_ms)
async def _dl(session,queue, next_url, absolute_start,duration,pbar_pos,is_lastpiece,dl_end):
async with async_timeout.timeout(1000):
chat_data = []
#print('absolute',absolute_start,'duration',duration,'pos',pbar_pos)
dlerror=False
first = True
rqerr=0
jserr=0
while(True):
try:
#json形式のchatデータのダウンロードと読み込み
async with session.get(next_url,headers=config.headers) as response:
text = await response.text()
dics = json.loads(text)
continuation = dics["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["liveChatReplayContinuationData"]["continuation"]
#次のlive_chat_replayのurl
next_url =f"{REPLAY_URL}{continuation}&pbj=1"
init_offset_ms = get_init_offset_ms(dics)
last_offset_ms = get_last_offset_ms(dics)
length_ms = last_offset_ms - init_offset_ms
#print(f'[{pbar_pos}] length_ms = {length_ms}, total={last_offset_ms}')
if length_ms < 0:
raise Exception('length_ms < 0')
queue.put(length_ms)
if first:
#print(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][0])
#with open(str(pbar_pos)+'FIRST') as f:
# f.writelines(dics)##############################
if pbar_pos > 0:
#print(f'Reset dl_end[{pbar_pos - 1}]:{dl_end[pbar_pos - 1]} -> {init_offset_ms} ({init_offset_ms-dl_end[pbar_pos - 1]})')
dl_end[pbar_pos - 1] = init_offset_ms
first = False
#print(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][0])
chat_data.extend(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"])
#print(chat_data)
if (last_offset_ms >= dl_end[pbar_pos]) and not(is_lastpiece):
#save(pbar_pos,'LAST ',init_offset_ms,last_offset_ms,dics)###############################
#print(f'break:pbar_pos ={pbar_pos}')
queue.put('quit')
break
# next_urlが入手できなくなったら終わり
except KeyError:
queue.put('quit')
break
# JSONDecodeErrorが発生した場合はデータを取得しなおす。
except json.decoder.JSONDecodeError:
time.sleep(1)
jserr+=1
if jserr<20:
continue
else:
logger.error('JSONDecodeError at piece %d' % (pbar_pos))
queue.put(quit)
dlerror = True
break
except ConnectionError:
time.sleep(1)
rqerr+=1
if rqerr<20:
continue
else:
logger.error('ConnectionError at piece %d' % (pbar_pos))
queue.put(quit)
dlerror = True
break
#except KeyboardInterrupt:
# pass
except UnicodeDecodeError as e:
logger.error(f"{type(e)}, {str(e)}")
logger.error(f"{str(e.object)}")
with open('unicodeerror.txt', mode ="w", encoding='utf-8') as f:
f.writelines(str(e.object))
break
except:
logger.error('\n不明なエラーが発生しました%d at:' % (pbar_pos))
logger.error('%s\n' % (next_url))
traceback.print_exc()
try:
with open('error.json', mode ="w", encoding='utf-8') as f:
f.writelines(text)
except UnboundLocalError as ule:
pass
queue.put('quit')
dlerror = True
break
#session.close()
if dlerror:
return 'error'
else:
return chat_data
def _debug_save(_pbar_pos,prefix,init_offset_ms,last_offset_ms,dics):
'''
例外が発生したときのチャットデータを保存する。
'''
chat_data =[]
init = '{:0>8}'.format(str(init_offset_ms))
last = '{:0>8}'.format(str(last_offset_ms))
chat_data.extend(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"])
with open(f"[{_pbar_pos}]-{prefix}-from_{init}_to_{last}.data",mode ='w',encoding='utf-8') as f:
f.writelines(chat_data)
def _debug_chatblock():
pass
async def _asyncdl(argslist):
promises=[]
async with aiohttp.ClientSession() as session:
promises = [_dl_piece(session,*args) for args in argslist]
return await asyncio.gather(*promises)
def _listener(q,duration,div):
duration_ms =int(duration/1000)
ret = int(div)
pbar = tqdm(total = duration_ms, ncols=80,unit_scale = 1,
bar_format='{desc}{percentage:3.0f}%|{bar}|[{n_fmt:>7}/{total_fmt}]{elapsed}<{remaining}')
#Noneを見つけるまでgetし続ける。
for item in iter(q.get, None):
if(item=='quit'):
ret=ret-1
if(ret==0):
if duration_ms>0:
pbar.update(duration_ms)
pbar.close()
else:
item =int(item/1000)
if duration_ms - item >= 0 and item >= 0:
duration_ms -= item
pbar.update(item)
def _combine(chatblocks):
'''
分割DLしたチャットデータを結合する
1番目の固まり(chatblocks[0])に順次結合していく。
'''
line=''
try:
if len(chatblocks[0])>0:
lastline=chatblocks[0][-1]
#lastline_id = dictquery.getid_replay(json.loads(lastline))
lastline_id = dictquery.getid_replay(lastline)
else: return None
for i in range(1,len(chatblocks)):
f=chatblocks[i]
if len(f)==0:
logger.error(f'zero size piece.:{str(i)}')
continue
#チャットデータの行を最初から走査して直前のデータの末尾との共通行を探す
for row in range(len(f)):
#row行目のデータ
line = f[row]
#末尾が直前のデータの末尾行と等しい(ダウンロードタイミングが異なると
#trackingParamsが一致しないためidで判定
#if dictquery.getid_replay(json.loads(line)) == lastline_id:
if dictquery.getid_replay(line) == lastline_id:
#共通行が見つかったので、共通行以降を結合する
#print(f'[{i}][{row}]Find common line {lastline_id}')
chatblocks[0].extend(f[row+1:])
break
if line =='error':
logger.error(f'Error file was saved.: piece:{str(i)}')
return['error']
else:#forの途中でbreakが発生しなかった場合ここに飛ぶ
#ファイルの結合点(共通ライン)の発見に失敗
logger.error(f'Missing common line.: piece:{str(i-1)}->{str(i)} lastline_id= {lastline_id}')
return ['combination failed']#---------------------------------test
#最終行のデータを更新する
lastline = f[-1]
#dic_lastline=json.loads(lastline)
dic_lastline=lastline
lastline_id = dictquery.getid_replay(dic_lastline)
#print(f'[{i}]lastline_id:{lastline_id}')
print(f"length:{len(chatblocks[0])}")
return chatblocks[0]
except Exception as e:
logger.error(f"{type(e)} {str(e)} {line}")
traceback.print_exc()
# p= json.loads(line)
# with open('v:/~~/error_dic.json',mode ='w',encoding='utf-8') as f:
# f.write(line)
def download(movie_id, duration, divisions):
#動画の長さ(ミリ秒)
duration_ms=duration*1000
#分割DL数
div = divisions
#プログレスバー用のQueue
queue= Queue()
#プログレスバー用のプロセス
proc = Process(target=_listener,args=(queue,duration_ms,div))
proc.start()
#チャットデータの分割間隔(ミリ秒)
term_ms = int(duration_ms / div)
#チャットデータの取得開始時間
start_ms = 0
#分割したピースが最後かどうか
is_lastpiece = False
argslist=[]
dl_end=[]
#分割DL用の引数を用意
for i in range(0,div):
if i==div-1:
is_lastpiece =True
args = (queue, movie_id, start_ms, term_ms, i, is_lastpiece,dl_end)
argslist.append(args)
start_ms+=term_ms
dl_end.append(start_ms)
loop = asyncio.get_event_loop()
chatlist =loop.run_until_complete(_asyncdl(argslist))
#プログレスバーのプロセスを終了させるためQueueにNoneを送る
queue.put(None)
#分割DLされたチャットデータを結合して返す
return _combine(chatlist)