Improve dlworker efficiency

This commit is contained in:
taizan-hokuto
2020-02-09 20:26:21 +09:00
parent 2c598bc8f7
commit ee77807dbd
6 changed files with 215 additions and 79 deletions

View File

@@ -49,9 +49,9 @@ def ready_blocks(video_id, duration, div, callback):
async def _get_blocks( video_id, duration, div, callback): async def _get_blocks( video_id, duration, div, callback):
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
futures = [_create_block(session, video_id, pos, seektime, callback) tasks = [_create_block(session, video_id, pos, seektime, callback)
for pos, seektime in enumerate(_split(-1, duration, div))] for pos, seektime in enumerate(_split(-1, duration, div))]
return await asyncio.gather(*futures,return_exceptions=True) return await asyncio.gather(*tasks)
async def _create_block(session, video_id, pos, seektime, callback): async def _create_block(session, video_id, pos, seektime, callback):
continuation = arcparam.getparam( continuation = arcparam.getparam(
@@ -67,7 +67,6 @@ def ready_blocks(video_id, duration, div, callback):
if callback: if callback:
callback(actions,last-first) callback(actions,last-first)
return Block( return Block(
pos = pos,
continuation = next_continuation, continuation = next_continuation,
chat_data = actions, chat_data = actions,
first = first, first = first,
@@ -79,19 +78,22 @@ def ready_blocks(video_id, duration, div, callback):
_get_blocks(video_id, duration, div, callback)) _get_blocks(video_id, duration, div, callback))
return result return result
def download_chunk(callback, blocks): def download_chunk(callback, blocks, video_id):
async def _allocate_workers(): async def _allocate_workers():
workers = [ workers = [
DownloadWorker( DownloadWorker(
fetch = _fetch, fetch = _fetch,
block = block block = block,
blocks = blocks,
video_id = video_id
) )
for block in blocks for i,block in enumerate(blocks)
] ]
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
tasks = [worker.run(session) for worker in workers] tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks,return_exceptions=True) return await asyncio.gather(*tasks)
async def _fetch(continuation,session): async def _fetch(continuation,session):
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
@@ -103,8 +105,8 @@ def download_chunk(callback, blocks):
first = parser.get_offset(actions[0]) first = parser.get_offset(actions[0])
if callback: if callback:
callback(actions, last - first) callback(actions, last - first)
return actions, continuation, last return actions, continuation, first, last
return continuation, [], None return [], continuation, None, None
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.run_until_complete(_allocate_workers()) loop.run_until_complete(_allocate_workers())

View File

@@ -17,7 +17,7 @@ class Block:
this value increases as fetching chatdata progresses. this value increases as fetching chatdata progresses.
temp_last : int : end : int :
target videoOffsetTimeMs of last chat data for download, target videoOffsetTimeMs of last chat data for download,
equals to first videoOffsetTimeMs of next block. equals to first videoOffsetTimeMs of next block.
when download worker reaches this offset, stop downloading. when download worker reaches this offset, stop downloading.
@@ -25,32 +25,35 @@ class Block:
continuation : str : continuation : str :
continuation param of last chat data. continuation param of last chat data.
chat_data : List chat_data : list
done : bool :
whether this block has been downloaded.
remaining : int :
remaining data to download.
equals end - last.
is_last : bool :
whether this block is the last one in blocklist.
splitting : bool :
whether this block is in the process of splitting.
while True, this block is excluded from duplicate split procedure.
""" """
def __init__(self, pos=0, first=0, last=0,
continuation='', chat_data=[]): __slots__ = ['first','last','end','continuation','chat_data','remaining',
self.pos = pos 'done','is_last','splitting']
def __init__(self, first = 0, last = 0, end = 0,
continuation = '', chat_data = [], is_last = False,
splitting = False):
self.first = first self.first = first
self.last = last self.last = last
self.temp_last = 0 self.end = end
self.continuation = continuation self.continuation = continuation
self.chat_data = chat_data self.chat_data = chat_data
self.done = False
def start(self): self.remaining = self.end- self.last
chats, cont = self._cut(self.chat_data, self.continuation, self.last) self.is_last = is_last
self.chat_data = chats self.splitting = splitting
return cont
def fill(self, chats, cont, fetched_last):
chats, cont = self._cut(chats, cont, fetched_last)
self.chat_data.extend(chats)
return cont
def _cut(self, chats, cont, fetched_last):
if fetched_last < self.temp_last or self.temp_last == -1:
return chats, cont
for i, line in enumerate(chats):
line_offset = parser.get_offset(line)
if line_offset >= self.temp_last:
self.last = line_offset
return chats[:i], None

View File

@@ -1,27 +1,144 @@
from . import parser from . import parser
from .. paramgen import arcparam
from . block import Block
class DownloadWorker: class DownloadWorker:
""" """
DownloadWorker associates a download session with a block. DownloadWorker associates a download session with a block.
Parameter Parameter
---------- ----------
fetch : fetch : func :
download function of asyncdl download function of asyncdl
block : block : Block :
Block object that includes chat_data Block object that includes chat_data
blocks : list :
List of Block(s)
video_id : str :
source_block : Block :
the block from which current downloading block is splitted
""" """
def __init__(self, fetch, block): __slots__ = ['fetch', 'block', 'blocks', 'video_id', 'source_block']
def __init__(self, fetch, block, blocks, video_id ):
self.block = block self.block = block
self.fetch = fetch self.fetch = fetch
self.blocks = blocks
self.video_id = video_id
self.source_block = None
async def run(self, session): async def run(self, session):
"""Remove extra chats just after ready_blocks(). """ """Remove extra chats just after ready_blocks(). """
continuation = self.block.start() continuation = initial_fill(self.block)
"""download loop """ """download loop """
while continuation: while continuation:
chats, new_cont, fetched_last = await self.fetch(continuation, session) chats, new_cont, fetched_first, fetched_last = await self.fetch(
continuation = self.block.fill(chats, new_cont, fetched_last ) continuation, session)
if fetched_first is None:
break
if self.source_block:
continuation = after_dividing_process(
self.source_block, self.block, chats, new_cont,
fetched_first, fetched_last)
self.source_block = None
else:
continuation = fill(self.block, chats, new_cont, fetched_last)
if continuation is None:
new_block = get_new_block(self)
self.block = new_block
continuation = new_block.continuation
def get_new_block(worker) -> Block:
worker.block.done = True
index,undone_block = get_undone_block(worker.blocks)
if undone_block is None:
return Block(continuation = None)
mean = (undone_block.end + undone_block.last)/2
continuation = arcparam.getparam(worker.video_id, seektime = mean/1000)
worker.source_block = undone_block
worker.source_block.splitting = True
new_block = Block(
end = undone_block.end,
chat_data = [],
continuation = continuation,
splitting = True,
is_last = worker.source_block.is_last)
worker.blocks.insert(index+1,new_block)
return new_block
def get_undone_block(blocks) -> (int, Block):
max_remaining = 0
ret_block = None
ret_index = 0
for index, block in enumerate(blocks):
if block.done or block.splitting:
continue
remaining = block.remaining
if remaining > max_remaining and remaining > 120000:
ret_index = index
ret_block = block
max_remaining = remaining
return ret_index, ret_block
def top_cut(chats, last) -> list:
for i,chat in enumerate(chats):
if parser.get_offset(chat) > last:
return chats[i:]
return []
def bottom_cut(chats, last) -> list:
for rchat in reversed(chats):
if parser.get_offset(rchat)>=last:
chats.pop()
else:
break
return chats
def after_dividing_process(source_block, block, chats, new_cont,
fetched_first, fetched_last):
if fetched_last <= source_block.last:
return None
block.splitting = False
source_block.splitting = False
source_block.end = fetched_first
block.first = fetched_first
block.last = fetched_last
continuation = new_cont
if fetched_first < source_block.last:
chats = top_cut(chats, source_block.last)
block.first = source_block.last
if block.end<fetched_last:
chats = bottom_cut(chats, block.end)
block.last = block.end
continuation = None
block.chat_data.extend(chats)
block.continuation = continuation
return continuation
def initial_fill(block):
chats, cont = _cut(block, block.chat_data, block.continuation, block.last)
block.chat_data = chats
return cont
def fill(block, chats, cont, fetched_last):
chats, cont = _cut(block, chats, cont, fetched_last)
block.chat_data.extend(chats)
return cont
def _cut(block, chats, cont, fetched_last):
block.last = fetched_last
if fetched_last < block.end or block.is_last:
block.last = fetched_last
block.remaining=block.end-block.last
return chats, cont
for i, line in enumerate(chats):
line_offset = parser.get_offset(line)
if line_offset >= block.end:
block.last = line_offset
block.remaining=0
block.done=True
return chats[:i], None

View File

@@ -1,6 +1,5 @@
from . import asyncdl from . import asyncdl
from . import parser from . import parser
from . block import Block
from . duplcheck import duplicate_head, duplicate_tail, overwrap from . duplcheck import duplicate_head, duplicate_tail, overwrap
from . videoinfo import VideoInfo from . videoinfo import VideoInfo
from .. import config from .. import config
@@ -11,6 +10,12 @@ headers=config.headers
class Downloader: class Downloader:
def __init__(self, video_id, duration, div, callback): def __init__(self, video_id, duration, div, callback):
if not isinstance(div ,int) or div < 1:
raise ValueError('div must be positive integer.')
elif div > 10:
div = 10
if not isinstance(duration ,int) or duration < 1:
raise ValueError('duration must be positive integer.')
self.video_id = video_id self.video_id = video_id
self.duration = duration self.duration = duration
self.div = div self.div = div
@@ -29,8 +34,9 @@ class Downloader:
def set_temporary_last(self): def set_temporary_last(self):
for i in range(len(self.blocks)-1): for i in range(len(self.blocks)-1):
self.blocks[i].temp_last = self.blocks[i+1].first self.blocks[i].end = self.blocks[i+1].first
self.blocks[-1].temp_last = -1 self.blocks[-1].end = self.duration*1000
self.blocks[-1].is_last =True
return self return self
def remove_overwrap(self): def remove_overwrap(self):
@@ -38,7 +44,7 @@ class Downloader:
return self return self
def download_blocks(self): def download_blocks(self):
asyncdl.download_chunk(self.callback, self.blocks) asyncdl.download_chunk(self.callback, self.blocks, self.video_id)
return self return self
def remove_duplicate_tail(self): def remove_duplicate_tail(self):
@@ -62,7 +68,7 @@ class Downloader:
.combine() .combine()
) )
def download(video_id, div = 20, callback=None, processor = None): def download(video_id, div = 1, callback = None, processor = None):
duration = 0 duration = 0
try: try:
duration = VideoInfo(video_id).get("duration") duration = VideoInfo(video_id).get("duration")
@@ -70,5 +76,5 @@ def download(video_id, div = 20, callback=None, processor = None):
raise raise
if duration == 0: if duration == 0:
print("video is live.") print("video is live.")
return return []
return Downloader(video_id, duration, div, callback).download() return Downloader(video_id, duration, div, callback).download()

View File

@@ -55,8 +55,6 @@ def check_duplicate_offset(chatdata):
tbl_offset[i] == tbl_offset[j] tbl_offset[i] == tbl_offset[j]
and and
tbl_id[i] == tbl_id[j] tbl_id[i] == tbl_id[j]
# and
# tbl_type[i] == tbl_type[j]
) )
print("creating table...") print("creating table...")
@@ -76,6 +74,12 @@ def duplicate_head(blocks):
if len(blocks) == 1 : return blocks if len(blocks) == 1 : return blocks
def is_duplicate_head(index): def is_duplicate_head(index):
if len(blocks[index].chat_data) == 0:
return True
elif len(blocks[index+1].chat_data) == 0:
return False
id_0 = parser.get_id(blocks[index].chat_data[0]) id_0 = parser.get_id(blocks[index].chat_data[0])
id_1 = parser.get_id(blocks[index+1].chat_data[0]) id_1 = parser.get_id(blocks[index+1].chat_data[0])
type_0 = parser.get_type(blocks[index].chat_data[0]) type_0 = parser.get_type(blocks[index].chat_data[0])
@@ -97,6 +101,12 @@ def duplicate_tail(blocks):
if len(blocks) == 1 : return blocks if len(blocks) == 1 : return blocks
def is_duplicate_tail(index): def is_duplicate_tail(index):
if len(blocks[index].chat_data) == 0:
return True
elif len(blocks[index-1].chat_data) == 0:
return False
id_0 = parser.get_id(blocks[index-1].chat_data[-1]) id_0 = parser.get_id(blocks[index-1].chat_data[-1])
id_1 = parser.get_id(blocks[index].chat_data[-1]) id_1 = parser.get_id(blocks[index].chat_data[-1])
type_0 = parser.get_type(blocks[index-1].chat_data[-1]) type_0 = parser.get_type(blocks[index-1].chat_data[-1])
@@ -140,6 +150,6 @@ def overwrap(blocks):
def _dump(blocks): def _dump(blocks):
print(__name__) print(__name__)
print(f"---------- first last temp_last {'':>3}---") print(f"---------- first last end {'':>3}---")
for i,block in enumerate(blocks): for i,block in enumerate(blocks):
print(f"block[{i:3}] {block.first:>10} {block.last:>10} {block.temp_last:>10}") print(f"block[{i:3}] {block.first:>10} {block.last:>10} {block.end:>10}")

View File

@@ -12,6 +12,11 @@ def _open_file(path):
with open(path,mode ='r',encoding = 'utf-8') as f: with open(path,mode ='r',encoding = 'utf-8') as f:
return f.read() return f.read()
def load_chatdata(filename):
return parser.parse(
json.loads(_open_file("tests/testdata/dl_duplcheck/head/"+filename))
)[1]
def test_overwrap(mocker): def test_overwrap(mocker):
""" """
test overwrap data test overwrap data
@@ -22,12 +27,12 @@ def test_overwrap(mocker):
""" """
blocks = ( blocks = (
Block(0, 0, 38771, "",[]), Block(first = 0,last= 38771, chat_data = load_chatdata("dp0-0.json")),
Block(1, 9890, 38771, "",[]), Block(first = 9890,last= 38771, chat_data = load_chatdata("dp0-1.json")),
Block(2, 20244, 45146, "",[]), Block(first = 20244,last= 45146, chat_data = load_chatdata("dp0-2.json")),
Block(3, 32476, 60520, "",[]), Block(first = 32476,last= 60520, chat_data = load_chatdata("dp0-3.json")),
Block(4, 41380, 62875, "",[]), Block(first = 41380,last= 62875, chat_data = load_chatdata("dp0-4.json")),
Block(5, 52568, 62875, "",[]) Block(first = 52568,last= 62875, chat_data = load_chatdata("dp0-5.json"))
) )
result = duplcheck.overwrap(blocks) result = duplcheck.overwrap(blocks)
assert len(result) == 3 assert len(result) == 3
@@ -50,18 +55,15 @@ def test_duplicate_head(mocker):
result : [0] , [3] , [5] result : [0] , [3] , [5]
""" """
def load_chatdata(filename):
return parser.parse(
json.loads(_open_file("tests/testdata/dl_duplcheck/head/"+filename))
)[1]
blocks = ( blocks = (
Block(0, 0, 2500, "",load_chatdata("dp0-0.json")), Block(first = 0, last = 2500, chat_data = load_chatdata("dp0-0.json")),
Block(1, 0, 38771, "",load_chatdata("dp0-1.json")), Block(first = 0, last =38771, chat_data = load_chatdata("dp0-1.json")),
Block(2, 0, 45146, "",load_chatdata("dp0-2.json")), Block(first = 0, last =45146, chat_data = load_chatdata("dp0-2.json")),
Block(3, 20244, 60520, "",load_chatdata("dp0-3.json")), Block(first = 20244, last =60520, chat_data = load_chatdata("dp0-3.json")),
Block(4, 20244, 62875, "",load_chatdata("dp0-4.json")), Block(first = 20244, last =62875, chat_data = load_chatdata("dp0-4.json")),
Block(5, 52568, 62875, "",load_chatdata("dp0-5.json")) Block(first = 52568, last =62875, chat_data = load_chatdata("dp0-5.json"))
) )
_dump(blocks) _dump(blocks)
result = duplcheck.duplicate_head(blocks) result = duplcheck.duplicate_head(blocks)
@@ -86,18 +88,14 @@ def test_duplicate_tail(mocker):
result : [0] , [2] , [4] result : [0] , [2] , [4]
""" """
def load_chatdata(filename):
return parser.parse(
json.loads(_open_file("tests/testdata/dl_duplcheck/head/"+filename))
)[1]
blocks = ( blocks = (
Block(0, 0, 2500, "",load_chatdata("dp0-0.json")), Block(first = 0,last = 2500, chat_data=load_chatdata("dp0-0.json")),
Block(1, 1500, 2500, "",load_chatdata("dp0-1.json")), Block(first = 1500,last = 2500, chat_data=load_chatdata("dp0-1.json")),
Block(2, 10000, 45146, "",load_chatdata("dp0-2.json")), Block(first = 10000,last = 45146, chat_data=load_chatdata("dp0-2.json")),
Block(3, 20244, 45146, "",load_chatdata("dp0-3.json")), Block(first = 20244,last = 45146, chat_data=load_chatdata("dp0-3.json")),
Block(4, 20244, 62875, "",load_chatdata("dp0-4.json")), Block(first = 20244,last = 62875, chat_data=load_chatdata("dp0-4.json")),
Block(5, 52568, 62875, "",load_chatdata("dp0-5.json")) Block(first = 52568,last = 62875, chat_data=load_chatdata("dp0-5.json"))
) )
result = duplcheck.duplicate_tail(blocks) result = duplcheck.duplicate_tail(blocks)