Make it possible to use method chain

This commit is contained in:
taizan-hokuto
2020-01-26 12:08:10 +09:00
parent cc8bba8f63
commit 540f16c1a0
3 changed files with 217 additions and 601 deletions

View File

@@ -1,48 +0,0 @@
class DictQuery(dict):
def get(self, path, default = None):
keys = path.split("/")
val = None
for key in keys:
if val:
if key.isdecimal():
if isinstance(val,list) and len(val) > int(key):
#val=val[int(key)]
val=list(val)[int(key)]
else:return default
elif isinstance(val, dict):
val = val.get(key, default)
else:
return default
else:
val = dict.get(self, key, default)
return val
def find(target,**kwargs):
for key in kwargs.keys():
if key == target:
return kwargs[key]
if isinstance(kwargs[key], dict):
res = find(target,**kwargs[key])
elif isinstance(kwargs[key], list):
for item in kwargs[key]:
res = find(target,**item)
try:
return res
except UnboundLocalError:
return None
def getid_replay(item):
return list((list(item['replayChatItemAction']["actions"][0].values())[0])['item'].values())[0]['id']
def getid_realtime(item):
return list((list(item.values())[0])['item'].values())[0]['id']
def get_timestamp_realtime(item):
return list((list(item.values())[0])['item'].values())[0]['timestampUsec']
def get_offsettime(item):
#return item['replayChatItemAction']["actions"][0]["videoOffsetTimeMsec"]
return item['replayChatItemAction']["videoOffsetTimeMsec"]

View File

@@ -1,7 +1,7 @@
import asyncio
import aiohttp,async_timeout
import aiohttp
import json
import traceback,time
import traceback
from urllib.parse import quote
from . import parser
@@ -14,276 +14,231 @@ headers=config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
"get_live_chat_replay?continuation="
class Block:
def __init__(self, pos=0, init_offset=0, last_offset=0,
continuation='', chat_data=[]):
def __init__(self, pos=0, first=0, last=0,
continuation='', chat_data=[]):
self.pos = pos
self.init_offset = init_offset
self.last_offset = last_offset
self.stop_offset = 0
self.first = first
self.last = last
self.temp_last = 0
self.continuation = continuation
self.chat_data = chat_data
def _debug_save(_pbar_pos,prefix,init_offset_ms,last_offset_ms,dics):
chat_data =[]
init = '{:0>8}'.format(str(init_offset_ms))
last = '{:0>8}'.format(str(last_offset_ms))
chat_data.extend(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"])
class Downloader:
def __init__(self, video_id,duration,div):
self.video_id = video_id
self.duration = duration
self.div = div
self.blocks = []
def ready_blocks(self):
if self.div <= 0: raise ValueError
def _divide(start, end, count):
min_interval = 120
if (not isinstance(start,int) or
not isinstance(end,int) or
not isinstance(count,int)):
raise ValueError("start/end/count must be int")
if start>end:
raise ValueError("end must be equal to or greater than start.")
if count<1:
raise ValueError("count must be equal to or greater than 1.")
if (end-start)/count < min_interval:
count = int((end-start)/min_interval)
if count == 0 : count = 1
interval= (end-start)/count
if count == 1:
return [start]
return sorted(list(set([int(start+interval*j)
for j in range(count) ])))
async def _get_blocks(duration,div):
async with aiohttp.ClientSession() as session:
futures = [_create_block(session, pos, seektime)
for pos, seektime in enumerate(_divide(-1, duration , div))]
return await asyncio.gather(*futures,return_exceptions=True)
async def _create_block(session, pos, seektime):
continuation = arcparam.getparam(self.video_id, seektime = seektime)
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url, headers = headers) as resp:
text = await resp.text()
next_continuation, actions = parser.parse(json.loads(text))
return Block(
pos = pos,
continuation = next_continuation,
chat_data = actions,
first = parser.get_offset(actions[0]),
last = parser.get_offset(actions[-1])
)
loop = asyncio.get_event_loop()
self.blocks = loop.run_until_complete(_get_blocks(self.duration, self.div))
return self
def remove_duplicate_head(self):
blocks = self.blocks
def is_same_offset(index):
return (blocks[index].first == blocks[index+1].first)
def is_same_id(index):
id_0 = parser.get_id(blocks[index].chat_data[0])
id_1 = parser.get_id(blocks[index+1].chat_data[0])
return (id_0 == id_1)
def is_same_type(index):
type_0 = parser.get_type(blocks[index].chat_data[0])
type_1 = parser.get_type(blocks[index+1].chat_data[0])
return (type_0 == type_1)
ret = []
[ret.append(blocks[i]) for i in range(len(blocks)-1)
if (len(blocks[i].chat_data)>0 and
not ( is_same_offset(i) and is_same_id(i) and is_same_type(i)))]
ret.append(blocks[-1])
self.blocks = ret
return self
def set_temporary_last(self):
for i in range(len(self.blocks)-1):
self.blocks[i].temp_last = self.blocks[i+1].first
self.blocks[-1].temp_last = -1
return self
def remove_overwrap(self):
blocks = self.blocks
if len(blocks) == 1 : return self
def is_overwrap(a, b):
return (blocks[a].last > blocks[b].first)
with open(f"[{_pbar_pos}]-{prefix}-from_{init}_to_{last}.data",mode ='w',encoding='utf-8') as f:
f.writelines(chat_data)
ret = []
a = 0
b = 1
jmp = False
ret.append(blocks[0])
while a < len(blocks)-2:
def dump(o):
for key, value in o.__dict__.items():
if key != "chat_data":
print(key, ':', value)
def dumpt(blocks,mes = None):
print(f"{'-'*40}\n{mes}")
[print(f"pos:{b.pos:>2} |init:{b.init_offset: >12,} |last:{b.last_offset: >12,} |stop:{b.stop_offset :>12,} ")
for b in blocks]
def _divide_(start, end, count):
min_interval = 120
if (not isinstance(start,int) or
not isinstance(end,int) or
not isinstance(count,int)):
raise ValueError("start/end/count must be int")
if start>end:
raise ValueError("end must be equal to or greater than start.")
if count<1:
raise ValueError("count must be equal to or greater than 1.")
if (end-start)/count < min_interval:
count = int((end-start)/min_interval)
if count == 0 : count = 1
interval= (end-start)/count
if count == 1:
return [start]
return sorted(list(set([int(start+interval*j)
for j in range(count) ])))
def ready_blocks(video_id:str, div:int, duration:int):
if div <= 0: raise ValueError
def _divide(start, end, count):
if (not isinstance(start,int) or
not isinstance(end,int) or
not isinstance(count,int)):
raise ValueError("start/end/count must be int")
if start>end:
raise ValueError("end must be equal to or greater than start.")
if count<1:
raise ValueError("count must be equal to or greater than 1.")
interval= (end-start)/(count)
if interval < 120 :
interval=120
count = int((end-start)/interval)+1
if count == 1:
return [start]
return sorted(list(set([int(start+interval*j)
if j < count else end
for j in range(count) ])))
async def _get_blocks(duration,div):
async with aiohttp.ClientSession() as session:
futures = [_create_block(session, pos, seektime)
for pos, seektime in enumerate(_divide(-1, duration , div))]
return await asyncio.gather(*futures,return_exceptions=True)
async def _create_block(session, pos, seektime):
continuation = arcparam.getparam(video_id,seektime=seektime)
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url,headers = headers) as resp:
text = await resp.text()
#util.save(text,f"v:/~~/pre_{pos}_",".json")
next_continuation, actions = parser.parse(json.loads(text))
block = Block(
pos = pos,
continuation = next_continuation,
chat_data = actions,
init_offset = parser.get_offset(actions[0]),
last_offset = parser.get_offset(actions[-1])
)
return block
blocks=[]
loop = asyncio.get_event_loop()
blocks = loop.run_until_complete(_get_blocks(duration,div))
return blocks
def remove_duplicate_head(blocks):
def is_same_offset(index):
return (blocks[index].init_offset == blocks[index+1].init_offset)
def is_same_id(index):
id_0 = parser.get_id(blocks[index].chat_data[0])
id_1 = parser.get_id(blocks[index+1].chat_data[0])
return (id_0 == id_1)
def is_same_type(index):
type_0 = parser.get_type(blocks[index].chat_data[0])
type_1 = parser.get_type(blocks[index+1].chat_data[0])
return (type_0 == type_1)
ret = []
[ret.append(blocks[i]) for i in range(len(blocks)-1)
if (len(blocks[i].chat_data)>0 and
not ( is_same_offset(i) and is_same_id(i) and is_same_type(i)))]
ret.append(blocks[-1])
blocks = None
return ret
def remove_overwrap(blocks):
def is_overwrap(a, b):
print(f"comparing({a}, {b})....overwrap ({(blocks[a].last_offset > blocks[b].init_offset)})")
return (blocks[a].last_offset > blocks[b].init_offset)
ret = []
a = 0
b=1
jmp = False
ret.append(blocks[0])
while a < len(blocks)-2:
while is_overwrap(a,b):
b+=1
print("forward")
if b == len(blocks)-2:
jmp=True
break
if jmp: break
if b-a == 1:
print(f"next ret.append(blocks[{b}]")
ret.append(blocks[b])
a = b
b+=1
continue
else:
print(f"apart ret.append(blocks[{b-1}]")
ret.append(blocks[b-1])
a=b-1
b=a+1
ret.append(blocks[-1])
return ret
def remove_duplicate_tail(blocks):
def is_same_offset(index):
return blocks[index-1].init_offset == blocks[index].init_offset
def is_same_id(index):
id_0 = parser.get_id(blocks[index-1].chat_data[-1])
id_1 = parser.get_id(blocks[index].chat_data[-1])
return id_0 == id_1
def is_same_type(index):
type_0 = parser.get_type(blocks[index-1].chat_data[-1])
type_1 = parser.get_type(blocks[index].chat_data[-1])
return type_0 == type_1
ret = []
[ret.append(blocks[i]) for i in range(len(blocks)-1)
if not ( is_same_offset(i) and is_same_id(i) and is_same_type(i) )]
ret.append(blocks[-1])
blocks = None
return ret
def set_stop_offset(blocks):
for i in range(len(blocks)-1):
blocks[i].stop_offset = blocks[i+1].init_offset
blocks[-1].stop_offset = -1
return blocks
def download_each_block(blocks):
loop = asyncio.get_event_loop()
return loop.run_until_complete(_dl_block(blocks))
async def _dl_block(blocks):
futures = []
async with aiohttp.ClientSession() as session:
futures = [_dl_chunk(session, block) for block in blocks]
return await asyncio.gather(*futures,return_exceptions=True)
async def _dl_chunk(session, block:Block):
if (block.stop_offset != -1 and
block.last_offset > block.stop_offset):
return
def get_last_offset(actions):
return parser.get_offset(actions[-1])
continuation = block.continuation
while continuation:
print(block.pos)
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url,headers = config.headers) as resp:
text = await resp.text()
continuation, actions = parser.parse(json.loads(text))
if actions:
block.chat_data.extend(actions)
last_offset = get_last_offset(actions)
if block.stop_offset != -1:
if last_offset > block.stop_offset:
block.last_offset = last_offset
while is_overwrap(a,b):
b+=1
if b == len(blocks)-1:
jmp=True
break
else:
block.last_offset = last_offset
def combine(blocks):
line = ''
try:
if len(blocks[0].chat_data)>0:
lastline=blocks[0].chat_data[-1]
lastline_offset = parser.get_offset(lastline)
else: return None
for i in range(1,len(blocks)):
f=blocks[i].chat_data
if len(f)==0:
logger.error(f'zero size piece.:{str(i)}')
if jmp: break
if b-a == 1:
ret.append(blocks[b])
a = b
b+=1
continue
for row in range(len(f)):
line = f[row]
if parser.get_offset(line) > lastline_offset:
blocks[0].chat_data.extend(f[row:])
break
if line =='error':
logger.error(f'Error file was saved.: piece:{str(i)}')
return['error']
else:
logger.error(f'Missing common line.: piece:{str(i-1)}->{str(i)} lastline_id= {lastline_offset}')
return ['combination failed']
lastline_offset = parser.get_offset( f[-1])
return blocks[0].chat_data
except Exception as e:
logger.error(f"{type(e)} {str(e)} {line}")
traceback.print_exc()
ret.append(blocks[b-1])
a=b-1
b=a+1
ret.append(blocks[-1])
self.blocks = ret
return self
def download(video_id, duration, div):
blocks = ready_blocks(video_id=video_id, duration=duration, div=div)
dumpt(blocks,"ready_blocks")
def download_each_block(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._dl_block())
return self
selected = remove_duplicate_head(blocks)
dumpt(selected,"removed duplicate_head")
set_stop_offset(selected)
dumpt(selected,"set stop_offset")
#set_stop_offset(selected)
removed = remove_overwrap(selected)
dumpt(removed,"removed overwrap")
download_each_block(removed)
dumpt(removed,"downloaded each_block")
async def _dl_block(self):
futures = []
async with aiohttp.ClientSession() as session:
futures = [self._dl_chunk(session, block) for block in self.blocks]
return await asyncio.gather(*futures,return_exceptions=True)
return combine(removed)
async def _dl_chunk(self, session, block:Block):
if (block.temp_last != -1 and
block.last > block.temp_last):
return
def get_last_offset(actions):
return parser.get_offset(actions[-1])
continuation = block.continuation
while continuation:
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url,headers = config.headers) as resp:
text = await resp.text()
continuation, actions = parser.parse(json.loads(text))
if actions:
block.chat_data.extend(actions)
last = get_last_offset(actions)
if block.temp_last != -1:
if last > block.temp_last:
block.last = last
break
else:
block.last = last
def remove_duplicate_tail(self):
blocks = self.blocks
if len(blocks) == 1 : return self
def is_same_offset(index):
return (blocks[index-1].last == blocks[index].last)
def is_same_id(index):
id_0 = parser.get_id(blocks[index-1].chat_data[-1])
id_1 = parser.get_id(blocks[index].chat_data[-1])
return (id_0 == id_1)
def is_same_type(index):
type_0 = parser.get_type(blocks[index-1].chat_data[-1])
type_1 = parser.get_type(blocks[index].chat_data[-1])
return (type_0 == type_1)
ret = []
ret.append(blocks[0])
[ret.append(blocks[i]) for i in range(1,len(blocks)-1)
if not ( is_same_offset(i) and is_same_id(i) and is_same_type(i) )]
ret.append(self.blocks[-1])
self.blocks = ret
return self
def combine(self):
line = ''
try:
if len(self.blocks[0].chat_data)>0:
lastline=self.blocks[0].chat_data[-1]
lastline_offset = parser.get_offset(lastline)
else: return None
for i in range(1,len(self.blocks)):
f=self.blocks[i].chat_data
if len(f)==0:
logger.error(f'zero size piece.:{str(i)}')
continue
for row in range(len(f)):
line = f[row]
if parser.get_offset(line) > lastline_offset:
self.blocks[0].chat_data.extend(f[row:])
break
else:
logger.error(
f'Missing connection.: pos:{str(i-1)}->{str(i)}'
f' lastline_offset= {lastline_offset}')
lastline_offset = parser.get_offset( f[-1])
return self.blocks[0].chat_data
except Exception as e:
logger.error(f"{type(e)} {str(e)} {line}")
traceback.print_exc()
def download(self):
return (
self.ready_blocks()
.remove_duplicate_head()
.set_temporary_last()
.remove_overwrap()
.download_each_block()
.remove_duplicate_tail()
.combine()
)

View File

@@ -1,291 +0,0 @@
import asyncio
import aiohttp,async_timeout
import json
from tqdm import tqdm
import traceback,time
from requests.exceptions import ConnectionError
from urllib.parse import quote
from multiprocessing import Process, Queue
from . import dictquery
from .. import config
from .. paramgen import arcparam
from .. import util
logger = config.logger(__name__)
REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
"get_live_chat_replay?continuation="
async def _dl_piece(_session,queue,movie_id,offset_ms,duration_ms,pbar_pos,is_lastpiece,dl_end):
#print(f'_dl_piece:{movie_id},{offset_ms},{duration_ms},{pbar_pos},{is_lastpiece}')
chat_data=[]
if pbar_pos == 0:
#continue_url = construct.encoder.encode(construct.construct_seekinit(movie_id,filter='all'))
continuation = arcparam.getparam(movie_id,-1)
else:
continuation = arcparam.getparam(movie_id,offset_ms/1000)
next_url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
#print(pbar_pos,next_url)
chat_data = await _dl(session=_session,
queue = queue,
next_url =next_url,
absolute_start = offset_ms,
duration = duration_ms,
pbar_pos = pbar_pos,
is_lastpiece = is_lastpiece,
dl_end = dl_end)
return chat_data
def get_init_offset_ms(dics: dict):
n = 0
while(True):
init_offset_ms = dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][n].get("replayChatItemAction")['videoOffsetTimeMsec']
if init_offset_ms is None:
n += 1
continue
else:
return int(init_offset_ms)
def get_last_offset_ms(dics: dict):
m = -1
while(True):
last_offset_ms = dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][m].get("replayChatItemAction")['videoOffsetTimeMsec']
if last_offset_ms is None:
m -= 1
continue
else:
return int(last_offset_ms)
async def _dl(session,queue, next_url, absolute_start,duration,pbar_pos,is_lastpiece,dl_end):
async with async_timeout.timeout(1000):
chat_data = []
#print('absolute',absolute_start,'duration',duration,'pos',pbar_pos)
dlerror=False
first = True
rqerr=0
jserr=0
while(True):
try:
#json形式のchatデータのダウンロードと読み込み
async with session.get(next_url,headers=config.headers) as response:
text = await response.text()
util.save(text,"v:/~~/test_",".json")
dics = json.loads(text)
continuation = dics["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["liveChatReplayContinuationData"]["continuation"]
#次のlive_chat_replayのurl
next_url =f"{REPLAY_URL}{continuation}&pbj=1"
init_offset_ms = get_init_offset_ms(dics)
last_offset_ms = get_last_offset_ms(dics)
length_ms = last_offset_ms - init_offset_ms
#print(f'[{pbar_pos}] length_ms = {length_ms}, total={last_offset_ms}')
if length_ms < 0:
raise Exception('length_ms < 0')
queue.put(length_ms)
if first:
#print(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][0])
#with open(str(pbar_pos)+'FIRST') as f:
# f.writelines(dics)##############################
if pbar_pos > 0:
#print(f'Reset dl_end[{pbar_pos - 1}]:{dl_end[pbar_pos - 1]} -> {init_offset_ms} ({init_offset_ms-dl_end[pbar_pos - 1]})')
dl_end[pbar_pos - 1] = init_offset_ms
first = False
#print(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"][0])
chat_data.extend(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"])
#print(chat_data)
if (last_offset_ms >= dl_end[pbar_pos]) and not(is_lastpiece):
#save(pbar_pos,'LAST ',init_offset_ms,last_offset_ms,dics)###############################
#print(f'break:pbar_pos ={pbar_pos}')
queue.put('quit')
break
# next_urlが入手できなくなったら終わり
except KeyError:
queue.put('quit')
break
# JSONDecodeErrorが発生した場合はデータを取得しなおす。
except json.decoder.JSONDecodeError:
time.sleep(1)
jserr+=1
if jserr<20:
continue
else:
logger.error('JSONDecodeError at piece %d' % (pbar_pos))
queue.put(quit)
dlerror = True
break
except ConnectionError:
time.sleep(1)
rqerr+=1
if rqerr<20:
continue
else:
logger.error('ConnectionError at piece %d' % (pbar_pos))
queue.put(quit)
dlerror = True
break
#except KeyboardInterrupt:
# pass
except UnicodeDecodeError as e:
logger.error(f"{type(e)}, {str(e)}")
logger.error(f"{str(e.object)}")
with open('unicodeerror.txt', mode ="w", encoding='utf-8') as f:
f.writelines(str(e.object))
break
except:
logger.error('\n不明なエラーが発生しました%d at:' % (pbar_pos))
logger.error('%s\n' % (next_url))
traceback.print_exc()
try:
with open('error.json', mode ="w", encoding='utf-8') as f:
f.writelines(text)
except UnboundLocalError as ule:
pass
queue.put('quit')
dlerror = True
break
#session.close()
if dlerror:
return 'error'
else:
return chat_data
def _debug_save(_pbar_pos,prefix,init_offset_ms,last_offset_ms,dics):
'''
例外が発生したときのチャットデータを保存する。
'''
chat_data =[]
init = '{:0>8}'.format(str(init_offset_ms))
last = '{:0>8}'.format(str(last_offset_ms))
chat_data.extend(dics["response"]["continuationContents"]["liveChatContinuation"]["actions"])
with open(f"[{_pbar_pos}]-{prefix}-from_{init}_to_{last}.data",mode ='w',encoding='utf-8') as f:
f.writelines(chat_data)
def _debug_chatblock():
pass
async def _asyncdl(argslist):
promises=[]
async with aiohttp.ClientSession() as session:
promises = [_dl_piece(session,*args) for args in argslist]
return await asyncio.gather(*promises)
def _listener(q,duration,div):
duration_ms =int(duration/1000)
ret = int(div)
pbar = tqdm(total = duration_ms, ncols=80,unit_scale = 1,
bar_format='{desc}{percentage:3.0f}%|{bar}|[{n_fmt:>7}/{total_fmt}]{elapsed}<{remaining}')
#Noneを見つけるまでgetし続ける。
for item in iter(q.get, None):
if(item=='quit'):
ret=ret-1
if(ret==0):
if duration_ms>0:
pbar.update(duration_ms)
pbar.close()
else:
item =int(item/1000)
if duration_ms - item >= 0 and item >= 0:
duration_ms -= item
pbar.update(item)
def _combine(chatblocks):
'''
分割DLしたチャットデータを結合する
1番目の固まり(chatblocks[0])に順次結合していく。
'''
line=''
try:
if len(chatblocks[0])>0:
lastline=chatblocks[0][-1]
#lastline_id = dictquery.getid_replay(json.loads(lastline))
lastline_id = dictquery.getid_replay(lastline)
else: return None
for i in range(1,len(chatblocks)):
f=chatblocks[i]
if len(f)==0:
logger.error(f'zero size piece.:{str(i)}')
continue
#チャットデータの行を最初から走査して直前のデータの末尾との共通行を探す
for row in range(len(f)):
#row行目のデータ
line = f[row]
#末尾が直前のデータの末尾行と等しい(ダウンロードタイミングが異なると
#trackingParamsが一致しないためidで判定
#if dictquery.getid_replay(json.loads(line)) == lastline_id:
if dictquery.getid_replay(line) == lastline_id:
#共通行が見つかったので、共通行以降を結合する
#print(f'[{i}][{row}]Find common line {lastline_id}')
chatblocks[0].extend(f[row+1:])
break
if line =='error':
logger.error(f'Error file was saved.: piece:{str(i)}')
return['error']
else:#forの途中でbreakが発生しなかった場合ここに飛ぶ
#ファイルの結合点(共通ライン)の発見に失敗
logger.error(f'Missing common line.: piece:{str(i-1)}->{str(i)} lastline_id= {lastline_id}')
return ['combination failed']#---------------------------------test
#最終行のデータを更新する
lastline = f[-1]
#dic_lastline=json.loads(lastline)
dic_lastline=lastline
lastline_id = dictquery.getid_replay(dic_lastline)
#print(f'[{i}]lastline_id:{lastline_id}')
print(f"length:{len(chatblocks[0])}")
return chatblocks[0]
except Exception as e:
logger.error(f"{type(e)} {str(e)} {line}")
traceback.print_exc()
# p= json.loads(line)
# with open('v:/~~/error_dic.json',mode ='w',encoding='utf-8') as f:
# f.write(line)
def download(movie_id, duration, divisions):
#動画の長さ(ミリ秒)
duration_ms=duration*1000
#分割DL数
div = divisions
#プログレスバー用のQueue
queue= Queue()
#プログレスバー用のプロセス
proc = Process(target=_listener,args=(queue,duration_ms,div))
proc.start()
#チャットデータの分割間隔(ミリ秒)
term_ms = int(duration_ms / div)
#チャットデータの取得開始時間
start_ms = 0
#分割したピースが最後かどうか
is_lastpiece = False
argslist=[]
dl_end=[]
#分割DL用の引数を用意
for i in range(0,div):
if i==div-1:
is_lastpiece =True
args = (queue, movie_id, start_ms, term_ms, i, is_lastpiece,dl_end)
argslist.append(args)
start_ms+=term_ms
dl_end.append(start_ms)
loop = asyncio.get_event_loop()
chatlist =loop.run_until_complete(_asyncdl(argslist))
#プログレスバーのプロセスを終了させるためQueueにNoneを送る
queue.put(None)
#分割DLされたチャットデータを結合して返す
return _combine(chatlist)