diff --git a/pytchat/tool/mining/__init__.py b/pytchat/tool/mining/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pytchat/tool/mining/asyncdl.py b/pytchat/tool/mining/asyncdl.py deleted file mode 100644 index 8bb1bc3..0000000 --- a/pytchat/tool/mining/asyncdl.py +++ /dev/null @@ -1,146 +0,0 @@ - -import httpx -import asyncio -import json -from . import parser -from . block import Block -from . worker import ExtractWorker -from . patch import Patch -from ... import config -from ... paramgen import arcparam_mining as arcparam -from concurrent.futures import CancelledError -from urllib.parse import quote - -headers = config.headers -REPLAY_URL = "https://www.youtube.com/live_chat_replay?continuation=" -INTERVAL = 1 - - -def _split(start, end, count, min_interval_sec=120): - """ - Split section from `start` to `end` into `count` pieces, - and returns the beginning of each piece. - The `count` is adjusted so that the length of each piece - is no smaller than `min_interval`. - - Returns: - -------- - List of the offset of each block's first chat data. - """ - - if not (isinstance(start, int) or isinstance(start, float)) or \ - not (isinstance(end, int) or isinstance(end, float)): - raise ValueError("start/end must be int or float") - if not isinstance(count, int): - raise ValueError("count must be int") - if start > end: - raise ValueError("end must be equal to or greater than start.") - if count < 1: - raise ValueError("count must be equal to or greater than 1.") - if (end - start) / count < min_interval_sec: - count = int((end - start) / min_interval_sec) - if count == 0: - count = 1 - interval = (end - start) / count - - if count == 1: - return [start] - return sorted(list(set([int(start + interval * j) - for j in range(count)]))) - - -def ready_blocks(video_id, duration, div, callback): - if div <= 0: - raise ValueError - - async def _get_blocks(video_id, duration, div, callback): - async with httpx.ClientSession() as session: - tasks = [_create_block(session, video_id, seektime, callback) - for seektime in _split(0, duration, div)] - return await asyncio.gather(*tasks) - - async def _create_block(session, video_id, seektime, callback): - continuation = arcparam.getparam(video_id, seektime=seektime) - url = (f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs=" - f"{int(seektime*1000)}&hidden=false&pbj=1") - async with session.get(url, headers=headers) as resp: - chat_json = await resp.text() - if chat_json is None: - return - continuation, actions = parser.parse(json.loads(chat_json)[1]) - first = seektime - seektime += INTERVAL - if callback: - callback(actions, INTERVAL) - return Block( - continuation=continuation, - chat_data=actions, - first=first, - last=seektime, - seektime=seektime - ) - """ - fetch initial blocks. - """ - loop = asyncio.get_event_loop() - blocks = loop.run_until_complete( - _get_blocks(video_id, duration, div, callback)) - return blocks - - -def fetch_patch(callback, blocks, video_id): - - async def _allocate_workers(): - workers = [ - ExtractWorker( - fetch=_fetch, block=block, - blocks=blocks, video_id=video_id - ) - for block in blocks - ] - async with httpx.ClientSession() as session: - tasks = [worker.run(session) for worker in workers] - return await asyncio.gather(*tasks) - - async def _fetch(seektime, session) -> Patch: - continuation = arcparam.getparam(video_id, seektime=seektime) - url = (f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs=" - f"{int(seektime*1000)}&hidden=false&pbj=1") - async with session.get(url, headers=config.headers) as resp: - chat_json = await resp.text() - actions = [] - try: - if chat_json is None: - return Patch() - continuation, actions = parser.parse(json.loads(chat_json)[1]) - except json.JSONDecodeError: - pass - if callback: - callback(actions, INTERVAL) - return Patch(chats=actions, continuation=continuation, - seektime=seektime, last=seektime) - """ - allocate workers and assign blocks. - """ - loop = asyncio.get_event_loop() - try: - loop.run_until_complete(_allocate_workers()) - except CancelledError: - pass - - -async def _shutdown(): - print("\nshutdown...") - tasks = [t for t in asyncio.all_tasks() - if t is not asyncio.current_task()] - for task in tasks: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - -def cancel(): - loop = asyncio.get_event_loop() - loop.create_task(_shutdown()) diff --git a/pytchat/tool/mining/block.py b/pytchat/tool/mining/block.py deleted file mode 100644 index 40c95d1..0000000 --- a/pytchat/tool/mining/block.py +++ /dev/null @@ -1,62 +0,0 @@ -from . import parser -class Block: - """Block object represents something like a box - to join chunk of chatdata. - - Parameter: - --------- - first : int : - videoOffsetTimeMs of the first chat_data - (chat_data[0]) - - last : int : - videoOffsetTimeMs of the last chat_data. - (chat_data[-1]) - - this value increases as fetching chatdata progresses. - - end : int : - target videoOffsetTimeMs of last chat data for extract, - equals to first videoOffsetTimeMs of next block. - when extract worker reaches this offset, stop fetching. - - continuation : str : - continuation param of last chat data. - - chat_data : list - - done : bool : - whether this block has been fetched. - - remaining : int : - remaining data to extract. - equals end - last. - - is_last : bool : - whether this block is the last one in blocklist. - - during_split : bool : - whether this block is in the process of during_split. - while True, this block is excluded from duplicate split procedure. - - seektime : float : - the last position of this block(seconds) already fetched. - """ - - __slots__ = ['first','last','end','continuation','chat_data','remaining', - 'done','is_last','during_split','seektime'] - - def __init__(self, first = 0, last = 0, end = 0, - continuation = '', chat_data = [], is_last = False, - during_split = False, seektime = None): - self.first = first - self.last = last - self.end = end - self.continuation = continuation - self.chat_data = chat_data - self.done = False - self.remaining = self.end - self.last - self.is_last = is_last - self.during_split = during_split - self.seektime = seektime - diff --git a/pytchat/tool/mining/parser.py b/pytchat/tool/mining/parser.py deleted file mode 100644 index f9a692f..0000000 --- a/pytchat/tool/mining/parser.py +++ /dev/null @@ -1,73 +0,0 @@ -import re -from ... import config -from ... exceptions import ( - ResponseContextError, - NoContents, NoContinuation) - -logger = config.logger(__name__) - - -def parse(jsn): - """ - Parse replay chat data. - Parameter: - ---------- - jsn : dict - JSON of replay chat data. - Returns: - ------ - continuation : str - actions : list - - """ - if jsn is None: - raise ValueError("parameter JSON is None") - if jsn['response']['responseContext'].get('errors'): - raise ResponseContextError( - 'video_id is invalid or private/deleted.') - contents = jsn["response"].get('continuationContents') - if contents is None: - raise NoContents('No chat data.') - - cont = contents['liveChatContinuation']['continuations'][0] - if cont is None: - raise NoContinuation('No Continuation') - metadata = cont.get('liveChatReplayContinuationData') - if metadata: - continuation = metadata.get("continuation") - actions = contents['liveChatContinuation'].get('actions') - if continuation: - return continuation, [action["replayChatItemAction"]["actions"][0] - for action in actions - if list(action['replayChatItemAction']["actions"][0].values() - )[0]['item'].get("liveChatPaidMessageRenderer") - or list(action['replayChatItemAction']["actions"][0].values() - )[0]['item'].get("liveChatPaidStickerRenderer") - ] - return None, [] - - -def get_offset(item): - return int(item['replayChatItemAction']["videoOffsetTimeMsec"]) - - -def get_id(item): - return list((list(item['replayChatItemAction']["actions"][0].values() - )[0])['item'].values())[0].get('id') - - -def get_type(item): - return list((list(item['replayChatItemAction']["actions"][0].values() - )[0])['item'].keys())[0] - - -_REGEX_YTINIT = re.compile( - "window\\[\"ytInitialData\"\\]\\s*=\\s*({.+?});\\s+") - - -def extract(text): - - match = re.findall(_REGEX_YTINIT, str(text)) - if match: - return match[0] - return None diff --git a/pytchat/tool/mining/patch.py b/pytchat/tool/mining/patch.py deleted file mode 100644 index 7666a52..0000000 --- a/pytchat/tool/mining/patch.py +++ /dev/null @@ -1,27 +0,0 @@ -from . import parser -from . block import Block -from typing import NamedTuple - -class Patch(NamedTuple): - """ - Patch represents chunk of chat data - which is fetched by asyncdl.fetch_patch._fetch(). - """ - chats : list = [] - continuation : str = None - seektime : float = None - first : int = None - last : int = None - -def fill(block:Block, patch:Patch): - if patch.last < block.end: - set_patch(block, patch) - return - block.continuation = None - -def set_patch(block:Block, patch:Patch): - block.continuation = patch.continuation - block.chat_data.extend(patch.chats) - block.last = patch.seektime - block.seektime = patch.seektime - diff --git a/pytchat/tool/mining/superchat_miner.py b/pytchat/tool/mining/superchat_miner.py deleted file mode 100644 index 8a5b3bd..0000000 --- a/pytchat/tool/mining/superchat_miner.py +++ /dev/null @@ -1,72 +0,0 @@ -from . import asyncdl -from . import parser -from .. videoinfo import VideoInfo -from ... import config -from ... exceptions import InvalidVideoIdException -logger = config.logger(__name__) -headers=config.headers - -class SuperChatMiner: - def __init__(self, video_id, duration, div, callback): - if not isinstance(div ,int) or div < 1: - raise ValueError('div must be positive integer.') - elif div > 10: - div = 10 - if not isinstance(duration ,int) or duration < 1: - raise ValueError('duration must be positive integer.') - self.video_id = video_id - self.duration = duration - self.div = div - self.callback = callback - self.blocks = [] - - def _ready_blocks(self): - blocks = asyncdl.ready_blocks( - self.video_id, self.duration, self.div, self.callback) - self.blocks = [block for block in blocks if block is not None] - return self - - def _set_block_end(self): - for i in range(len(self.blocks)-1): - self.blocks[i].end = self.blocks[i+1].first - self.blocks[-1].end = self.duration - self.blocks[-1].is_last =True - return self - - def _download_blocks(self): - asyncdl.fetch_patch(self.callback, self.blocks, self.video_id) - return self - - def _combine(self): - ret = [] - for block in self.blocks: - ret.extend(block.chat_data) - return ret - - def extract(self): - return ( - self._ready_blocks() - ._set_block_end() - ._download_blocks() - ._combine() - ) - -def extract(video_id, div = 1, callback = None, processor = None): - duration = 0 - try: - duration = VideoInfo(video_id).get_duration() - except InvalidVideoIdException: - raise - if duration == 0: - print("video is live.") - return [] - data = SuperChatMiner(video_id, duration, div, callback).extract() - if processor is None: - return data - return processor.process( - [{'video_id':None,'timeout':1,'chatdata' : (action - for action in data)}] - ) - -def cancel(): - asyncdl.cancel() \ No newline at end of file diff --git a/pytchat/tool/mining/worker.py b/pytchat/tool/mining/worker.py deleted file mode 100644 index 3a53e40..0000000 --- a/pytchat/tool/mining/worker.py +++ /dev/null @@ -1,45 +0,0 @@ -from . import parser -from . block import Block -from . patch import Patch, fill -from ... paramgen import arcparam -INTERVAL = 1 -class ExtractWorker: - """ - ExtractWorker associates a download session with a block. - - When the worker finishes fetching, the block - being fetched is splitted and assigned the free worker. - - Parameter - ---------- - fetch : func : - extract function of asyncdl - - block : Block : - Block object that includes chat_data - - blocks : list : - List of Block(s) - - video_id : str : - - parent_block : Block : - the block from which current block is splitted - """ - __slots__ = ['block', 'fetch', 'blocks', 'video_id', 'parent_block'] - def __init__(self, fetch, block, blocks, video_id ): - self.block:Block = block - self.fetch = fetch - self.blocks:list = blocks - self.video_id:str = video_id - self.parent_block:Block = None - - async def run(self, session): - while self.block.continuation: - patch = await self.fetch( - self.block.seektime, session) - fill(self.block, patch) - self.block.seektime += INTERVAL - self.block.done = True - -