From f1d839397194d6ba472c613206fceb521e439043 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Sun, 2 Feb 2020 00:38:22 +0900 Subject: [PATCH] Divide download module --- pytchat/tool/asyncdl.py | 100 ++++++++++++++++++++++++++++++++++ pytchat/tool/block.py | 9 ++++ pytchat/tool/dlworker.py | 11 ++-- pytchat/tool/downloader.py | 107 +++---------------------------------- 4 files changed, 122 insertions(+), 105 deletions(-) create mode 100644 pytchat/tool/asyncdl.py create mode 100644 pytchat/tool/block.py diff --git a/pytchat/tool/asyncdl.py b/pytchat/tool/asyncdl.py new file mode 100644 index 0000000..d227118 --- /dev/null +++ b/pytchat/tool/asyncdl.py @@ -0,0 +1,100 @@ + +import aiohttp +import asyncio +import json +from . import parser +from . block import Block +from . dlworker import DownloadWorker +from .. paramgen import arcparam +from .. import config +from urllib.parse import quote + +headers = config.headers +REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \ + "get_live_chat_replay?continuation=" + +def ready_blocks(video_id, duration, div, callback): + if div <= 0: raise ValueError + + def _divide(start, end, count): + min_interval = 120 + if (not isinstance(start,int) or + not isinstance(end,int) or + not isinstance(count,int)): + raise ValueError("start/end/count must be int") + if start>end: + raise ValueError("end must be equal to or greater than start.") + if count<1: + raise ValueError("count must be equal to or greater than 1.") + if (end-start)/count < min_interval: + count = int((end-start)/min_interval) + if count == 0 : count = 1 + interval= (end-start)/count + + if count == 1: + return [start] + return sorted(list(set([int(start+interval*j) + for j in range(count) ]))) + + async def _get_blocks( video_id, duration, div, callback): + async with aiohttp.ClientSession() as session: + futures = [_create_block(session, video_id, pos, seektime, callback) + for pos, seektime in enumerate(_divide(-1, duration, div))] + return await asyncio.gather(*futures,return_exceptions=True) + + async def _create_block(session, video_id, pos, seektime, callback): + continuation = arcparam.getparam( + video_id, seektime = seektime) + + url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" + async with session.get(url, headers = headers) as resp: + text = await resp.text() + next_continuation, actions = parser.parse(json.loads(text)) + if actions: + first = parser.get_offset(actions[0]) + last = parser.get_offset(actions[-1]) + if callback: + callback(actions,last-first) + return Block( + pos = pos, + continuation = next_continuation, + chat_data = actions, + first = first, + last = last + ) + + loop = asyncio.get_event_loop() + result = loop.run_until_complete( + _get_blocks(video_id, duration, div, callback)) + return result + +def download_chunk(callback, blocks): + + async def _dl_distribute(): + workers = [ + DownloadWorker( + fetch = _fetch, + block = block + ) + for block in blocks + ] + async with aiohttp.ClientSession() as session: + tasks = [worker.run(session) for worker in workers] + return await asyncio.gather(*tasks,return_exceptions=True) + + async def _fetch(continuation,session): + url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" + async with session.get(url,headers = config.headers) as resp: + text = await resp.text() + continuation, actions = parser.parse(json.loads(text)) + if actions: + last = parser.get_offset(actions[-1]) + first = parser.get_offset(actions[0]) + if callback: + callback(actions,last-first) + return actions,continuation,last + return continuation, [], None + + loop = asyncio.get_event_loop() + loop.run_until_complete( + _dl_distribute()) diff --git a/pytchat/tool/block.py b/pytchat/tool/block.py new file mode 100644 index 0000000..60f650a --- /dev/null +++ b/pytchat/tool/block.py @@ -0,0 +1,9 @@ +class Block: + def __init__(self, pos=0, first=0, last=0, + continuation='', chat_data=[]): + self.pos = pos + self.first = first + self.last = last + self.temp_last = 0 + self.continuation = continuation + self.chat_data = chat_data \ No newline at end of file diff --git a/pytchat/tool/dlworker.py b/pytchat/tool/dlworker.py index 9866323..b70f9ef 100644 --- a/pytchat/tool/dlworker.py +++ b/pytchat/tool/dlworker.py @@ -1,11 +1,10 @@ from . import parser class DownloadWorker: - def __init__(self, dl, block, blocklist): - self.block = block - self.blocklist = blocklist - self.dl = dl - + def __init__(self, fetch, block): + self.block = block + self.fetch = fetch + async def run(self,session): temp_last = self.block.temp_last self.block.chat_data, continuation = self.cut( @@ -14,7 +13,7 @@ class DownloadWorker: self.block.last, temp_last ) while continuation: - data, cont, fetched_last = await self.dl(continuation, session) + data, cont, fetched_last = await self.fetch(continuation, session) data, continuation = self.cut(data, cont, fetched_last, temp_last) self.block.chat_data.extend(data) diff --git a/pytchat/tool/downloader.py b/pytchat/tool/downloader.py index 10df333..81d3b96 100644 --- a/pytchat/tool/downloader.py +++ b/pytchat/tool/downloader.py @@ -3,92 +3,29 @@ import aiohttp import json import traceback from urllib.parse import quote +from . import asyncdl from . import parser from . import videoinfo -from . dlworker import DownloadWorker +from . block import Block from . duplcheck import duplicate_head, duplicate_tail, overwrap from .. import config -from .. import util +from .. exceptions import InvalidVideoIdException from .. paramgen import arcparam -from ..exceptions import InvalidVideoIdException logger = config.logger(__name__) headers=config.headers -REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \ - "get_live_chat_replay?continuation=" -class Block: - def __init__(self, pos=0, first=0, last=0, - continuation='', chat_data=[]): - self.pos = pos - self.first = first - self.last = last - self.temp_last = 0 - self.continuation = continuation - self.chat_data = chat_data - class Downloader: - def __init__(self, video_id, duration, div, callback=None): + def __init__(self, video_id, duration, div, callback): self.video_id = video_id self.duration = duration self.div = div self.blocks = [] self.callback = callback - def ready_blocks(self): - if self.div <= 0: raise ValueError - - def _divide(start, end, count): - min_interval = 120 - if (not isinstance(start,int) or - not isinstance(end,int) or - not isinstance(count,int)): - raise ValueError("start/end/count must be int") - if start>end: - raise ValueError("end must be equal to or greater than start.") - if count<1: - raise ValueError("count must be equal to or greater than 1.") - if (end-start)/count < min_interval: - count = int((end-start)/min_interval) - if count == 0 : count = 1 - interval= (end-start)/count - - if count == 1: - return [start] - return sorted(list(set([int(start+interval*j) - for j in range(count) ]))) - - async def _get_blocks(duration,div): - async with aiohttp.ClientSession() as session: - futures = [_create_block(session, pos, seektime) - for pos, seektime in enumerate(_divide(-1, duration, div))] - return await asyncio.gather(*futures,return_exceptions=True) - - async def _create_block(session, pos, seektime): - continuation = arcparam.getparam( - self.video_id, seektime = seektime) - - url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" - async with session.get(url, headers = headers) as resp: - text = await resp.text() - next_continuation, actions = parser.parse(json.loads(text)) - if actions: - first = parser.get_offset(actions[0]) - last = parser.get_offset(actions[-1]) - if self.callback: - self.callback(actions,last-first) - return Block( - pos = pos, - continuation = next_continuation, - chat_data = actions, - first = first, - last = last - ) - - loop = asyncio.get_event_loop() - result = loop.run_until_complete( - _get_blocks(self.duration, self.div)) + result = asyncdl.ready_blocks( + self.video_id, self.duration, self.div, self.callback) self.blocks = [block for block in result if block] return self @@ -107,36 +44,9 @@ class Downloader: return self def download_blocks(self): - loop = asyncio.get_event_loop() - loop.run_until_complete(self._dl_distribute()) + asyncdl.download_chunk(self.callback, self.blocks) return self - async def _dl_distribute(self): - workers = [ - DownloadWorker( - dl=self.dl_func, - block = block, - blocklist= self.blocks - ) - for pos,block in enumerate(self.blocks) - ] - async with aiohttp.ClientSession() as session: - tasks = [worker.run(session) for worker in workers] - return await asyncio.gather(*tasks,return_exceptions=True) - - async def dl_func(self,continuation,session): - url = f"{REPLAY_URL}{quote(continuation)}&pbj=1" - async with session.get(url,headers = config.headers) as resp: - text = await resp.text() - continuation, actions = parser.parse(json.loads(text)) - if actions: - last = parser.get_offset(actions[-1]) - first = parser.get_offset(actions[0]) - if self.callback: - self.callback(actions,last-first) - return actions,continuation,last - return continuation, [], None - def remove_duplicate_tail(self): self.blocks = duplicate_tail(self.blocks) return self @@ -157,7 +67,6 @@ class Downloader: .remove_duplicate_tail() .combine() ) - def download(video_id, div = 20, callback=None, processor = None): duration = 0 @@ -168,4 +77,4 @@ def download(video_id, div = 20, callback=None, processor = None): if duration == 0: print("video is live.") return - return Downloader(video_id, duration, div, callback).download() \ No newline at end of file + return Downloader(video_id, duration, div, callback).download()