diff --git a/pytchat/__init__.py b/pytchat/__init__.py index 918a216..92cfde9 100644 --- a/pytchat/__init__.py +++ b/pytchat/__init__.py @@ -26,7 +26,6 @@ from .exceptions import ( ) from .api import ( - cli, config, LiveChat, LiveChatAsync, @@ -34,14 +33,12 @@ from .api import ( CompatibleProcessor, DummyProcessor, DefaultProcessor, - Extractor, HTMLArchiver, TSVArchiver, JsonfileArchiver, SimpleDisplayProcessor, SpeedCalculator, SuperchatCalculator, - VideoInfo, create ) # flake8: noqa \ No newline at end of file diff --git a/pytchat/api.py b/pytchat/api.py index bf64e07..64bf794 100644 --- a/pytchat/api.py +++ b/pytchat/api.py @@ -1,4 +1,3 @@ -from . import cli from . import config from .core import create from .core_multithread.livechat import LiveChat @@ -13,11 +12,9 @@ from .processors.jsonfile_archiver import JsonfileArchiver from .processors.simple_display_processor import SimpleDisplayProcessor from .processors.speed.calculator import SpeedCalculator from .processors.superchat.calculator import SuperchatCalculator -from .tool.extract.extractor import Extractor -from .tool.videoinfo import VideoInfo + __all__ = [ - cli, config, LiveChat, LiveChatAsync, @@ -25,14 +22,12 @@ __all__ = [ CompatibleProcessor, DummyProcessor, DefaultProcessor, - Extractor, HTMLArchiver, TSVArchiver, JsonfileArchiver, SimpleDisplayProcessor, SpeedCalculator, SuperchatCalculator, - VideoInfo, create ] diff --git a/pytchat/cli/__init__.py b/pytchat/cli/__init__.py deleted file mode 100644 index 7a50853..0000000 --- a/pytchat/cli/__init__.py +++ /dev/null @@ -1,71 +0,0 @@ -import argparse -try: - from asyncio import CancelledError -except ImportError: - from asyncio.futures import CancelledError -import os -from .arguments import Arguments -from .echo import Echo -from .. exceptions import InvalidVideoIdException -from .. import __version__ -from .cli_extractor import CLIExtractor - - -''' -Most of CLI modules refer to -Petter Kraabøl's Twitch-Chat-Downloader -https://github.com/PetterKraabol/Twitch-Chat-Downloader -(MIT License) -''' - - -def main(): - # Arguments - parser = argparse.ArgumentParser(description=f'pytchat v{__version__}') - parser.add_argument('-v', f'--{Arguments.Name.VIDEO_IDS}', type=str, - help='Video ID (or URL that includes Video ID). You can specify multiple video IDs by ' - 'separating them with commas without spaces.\n' - 'If ID starts with a hyphen (-), enclose the ID in square brackets.') - parser.add_argument('-o', f'--{Arguments.Name.OUTPUT}', type=str, - help='Output directory (end with "/"). default="./"', default='./') - parser.add_argument(f'--{Arguments.Name.DEBUG}', action='store_true', - help='Debug mode. Stop when exceptions have occurred and save error data (".dat" file).') - parser.add_argument(f'--{Arguments.Name.VERSION}', action='store_true', - help='Show version.') - parser.add_argument(f'--{Arguments.Name.ECHO}', action='store_true', - help='Display chats of specified video.') - - Arguments(parser.parse_args().__dict__) - - if Arguments().print_version: - print(f'pytchat v{__version__} © 2019, 2020 taizan-hokuto') - return - - if not Arguments().video_ids: - parser.print_help() - return - - # Echo - if Arguments().echo: - if len(Arguments().video_ids) > 1: - print("When using --echo option, only one video ID can be specified.") - return - try: - Echo(Arguments().video_ids[0]).run() - except InvalidVideoIdException as e: - print("Invalid video id:", str(e)) - except Exception as e: - print(type(e), str(e)) - if Arguments().debug: - raise - finally: - return - - # Extractor - if not os.path.exists(Arguments().output): - print("\nThe specified directory does not exist.:{}\n".format(Arguments().output)) - return - try: - CLIExtractor().run() - except CancelledError as e: - print(str(e)) diff --git a/pytchat/cli/arguments.py b/pytchat/cli/arguments.py deleted file mode 100644 index 1bad26f..0000000 --- a/pytchat/cli/arguments.py +++ /dev/null @@ -1,45 +0,0 @@ -from typing import Optional, Dict, Union, List -from .singleton import Singleton - -''' -This modules refer to -Petter Kraabøl's Twitch-Chat-Downloader -https://github.com/PetterKraabol/Twitch-Chat-Downloader -(MIT License) -''' - - -class Arguments(metaclass=Singleton): - """ - Arguments singleton - """ - - class Name: - VERSION: str = 'version' - OUTPUT: str = 'output_dir' - VIDEO_IDS: str = 'video_id' - DEBUG: bool = 'debug' - ECHO: bool = 'echo' - - def __init__(self, - arguments: Optional[Dict[str, Union[str, bool, int]]] = None): - """ - Initialize arguments - :param arguments: Arguments from cli - (Optional to call singleton instance without parameters) - """ - - if arguments is None: - print('Error: arguments were not provided') - exit() - - self.print_version: bool = arguments[Arguments.Name.VERSION] - self.output: str = arguments[Arguments.Name.OUTPUT] - self.video_ids: List[int] = [] - self.debug: bool = arguments[Arguments.Name.DEBUG] - self.echo: bool = arguments[Arguments.Name.ECHO] - - # Videos - if arguments[Arguments.Name.VIDEO_IDS]: - self.video_ids = [video_id - for video_id in arguments[Arguments.Name.VIDEO_IDS].split(',')] diff --git a/pytchat/cli/cli_extractor.py b/pytchat/cli/cli_extractor.py deleted file mode 100644 index 4274798..0000000 --- a/pytchat/cli/cli_extractor.py +++ /dev/null @@ -1,120 +0,0 @@ -import asyncio -import os -import signal -import traceback -from httpcore import ReadTimeout as HCReadTimeout, NetworkError as HCNetworkError -from json.decoder import JSONDecodeError -from pathlib import Path -from .arguments import Arguments -from .progressbar import ProgressBar -from .. import util -from .. exceptions import InvalidVideoIdException, NoContents, PatternUnmatchError, UnknownConnectionError -from .. processors.html_archiver import HTMLArchiver -from .. tool.extract.extractor import Extractor -from .. tool.videoinfo import VideoInfo - - -class CLIExtractor: - - def run(self) -> None: - ex = None - pbar = None - for counter, video_id in enumerate(Arguments().video_ids): - if len(Arguments().video_ids) > 1: - print(f"\n{'-' * 10} video:{counter + 1} of {len(Arguments().video_ids)} {'-' * 10}") - - try: - video_id = util.extract_video_id(video_id) - separated_path = str(Path(Arguments().output)) + os.path.sep - path = util.checkpath(separated_path + video_id + '.html') - try: - info = VideoInfo(video_id) - except (PatternUnmatchError, JSONDecodeError) as e: - print("Cannot parse video information.:{} {}".format(video_id, type(e))) - if Arguments().debug: - util.save(str(e.doc), "ERR", ".dat") - continue - except Exception as e: - print("Cannot parse video information.:{} {}".format(video_id, type(e))) - continue - - print(f"\n" - f" video_id: {video_id}\n" - f" channel: {info.get_channel_name()}\n" - f" title: {info.get_title()}\n" - f" output path: {path}") - - duration = info.get_duration() - pbar = ProgressBar(total=(duration * 1000), status_txt="Extracting") - ex = Extractor(video_id, - callback=pbar.disp, - div=10) - signal.signal(signal.SIGINT, (lambda a, b: self.cancel(ex, pbar))) - - data = ex.extract() - if data == [] or data is None: - continue - pbar.reset("#", "=", total=1000, status_txt="Rendering ") - processor = HTMLArchiver(path, callback=pbar.disp) - processor.process( - [{'video_id': None, - 'timeout': 1, - 'chatdata': (action["replayChatItemAction"]["actions"][0] for action in data)}] - ) - processor.finalize() - pbar.reset('#', '#', status_txt='Completed ') - pbar.close() - print() - if pbar.is_cancelled(): - print("\nThe extraction process has been discontinued.\n") - except InvalidVideoIdException: - print("Invalid Video ID or URL:", video_id) - except NoContents as e: - print(f"Abort:{str(e)}:[{video_id}]") - except (JSONDecodeError, PatternUnmatchError) as e: - print("{}:{}".format(e.msg, video_id)) - if Arguments().debug: - filename = util.save(e.doc, "ERR_", ".dat") - traceback.print_exc() - print(f"Saved error data: {filename}") - except (UnknownConnectionError, HCNetworkError, HCReadTimeout) as e: - if Arguments().debug: - traceback.print_exc() - print(f"An unknown network error occurred during the processing of [{video_id}]. : " + str(e)) - except Exception as e: - print(f"Abort:{str(type(e))} {str(e)[:80]}") - if Arguments().debug: - traceback.print_exc() - finally: - clear_tasks() - - return - - def cancel(self, ex=None, pbar=None) -> None: - '''Called when keyboard interrupted has occurred. - ''' - print("\nKeyboard interrupted.\n") - if ex and pbar: - ex.cancel() - pbar.cancel() - - -def clear_tasks(): - ''' - Clear remained tasks. - Called when internal exception has occurred or - after each extraction process is completed. - ''' - async def _shutdown(): - tasks = [t for t in asyncio.all_tasks() - if t is not asyncio.current_task()] - for task in tasks: - task.cancel() - - try: - loop = asyncio.get_event_loop() - loop.run_until_complete(_shutdown()) - except Exception as e: - print(str(e)) - if Arguments().debug: - traceback.print_exc() diff --git a/pytchat/cli/echo.py b/pytchat/cli/echo.py deleted file mode 100644 index 0c1b56a..0000000 --- a/pytchat/cli/echo.py +++ /dev/null @@ -1,22 +0,0 @@ -import pytchat -from ..exceptions import ChatDataFinished, NoContents -from ..util import extract_video_id - - -class Echo: - def __init__(self, video_id): - self.video_id = extract_video_id(video_id) - - def run(self): - livechat = pytchat.create(self.video_id) - while livechat.is_alive(): - chatdata = livechat.get() - for c in chatdata.sync_items(): - print(f"{c.datetime} [{c.author.name}] {c.message} {c.amountString}") - - try: - livechat.raise_for_status() - except (ChatDataFinished, NoContents): - print("Chat finished.") - except Exception as e: - print(type(e), str(e)) diff --git a/pytchat/cli/progressbar.py b/pytchat/cli/progressbar.py deleted file mode 100644 index ae6174d..0000000 --- a/pytchat/cli/progressbar.py +++ /dev/null @@ -1,54 +0,0 @@ -''' -This code is based on -vladignatyev/progress.py -https://gist.github.com/vladignatyev/06860ec2040cb497f0f3 -(MIT License) -''' -import shutil -import sys - - -class ProgressBar: - def __init__(self, total, status_txt): - self._bar_len = 60 - self._cancelled = False - self.reset(total=total, status_txt=status_txt) - - def reset(self, symbol_done="=", symbol_space=" ", total=100, status_txt=''): - self._console_width = shutil.get_terminal_size(fallback=(80, 24)).columns - self._symbol_done = symbol_done - self._symbol_space = symbol_space - self._total = total - self._status_txt = status_txt - self._count = 0 - - def disp(self, _, fetched): - self._progress(fetched, self._total) - - def _progress(self, fillin, total): - if total == 0 or self._cancelled: - return - self._count += fillin - filled_len = int(round(self._bar_len * self._count / float(total))) - percents = round(100.0 * self._count / float(total), 1) - if percents > 100: - percents = 100.0 - if filled_len > self._bar_len: - filled_len = self._bar_len - - bar = self._symbol_done * filled_len + \ - self._symbol_space * (self._bar_len - filled_len) - disp = f" [{bar}] {percents:>5.1f}% ...{self._status_txt} "[:self._console_width - 1] + '\r' - - sys.stdout.write(disp) - sys.stdout.flush() - - def close(self): - if not self._cancelled: - self._progress(self._total, self._total) - - def cancel(self): - self._cancelled = True - - def is_cancelled(self): - return self._cancelled diff --git a/pytchat/cli/singleton.py b/pytchat/cli/singleton.py deleted file mode 100644 index 53a76f0..0000000 --- a/pytchat/cli/singleton.py +++ /dev/null @@ -1,21 +0,0 @@ -''' -This modules refer to -Petter Kraabøl's Twitch-Chat-Downloader -https://github.com/PetterKraabol/Twitch-Chat-Downloader -(MIT License) -''' - - -class Singleton(type): - """ - Abstract class for singletons - """ - _instances = {} - - def __call__(cls, *args, **kwargs): - if cls not in cls._instances: - cls._instances[cls] = super().__call__(*args, **kwargs) - return cls._instances[cls] - - def get_instance(cls, *args, **kwargs): - cls.__call__(*args, **kwargs) diff --git a/pytchat/tool/__init__.py b/pytchat/tool/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pytchat/tool/extract/__init__.py b/pytchat/tool/extract/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pytchat/tool/extract/asyncdl.py b/pytchat/tool/extract/asyncdl.py deleted file mode 100644 index eea2499..0000000 --- a/pytchat/tool/extract/asyncdl.py +++ /dev/null @@ -1,180 +0,0 @@ -import asyncio -import httpx -import socket -from concurrent.futures import CancelledError -from json import JSONDecodeError -from . import parser -from . block import Block -from . worker import ExtractWorker -from . patch import Patch -from ... import config -from ... paramgen import arcparam -from ... exceptions import UnknownConnectionError -from ... util import get_param - - -headers = config.headers -smr = config._smr - -MAX_RETRY_COUNT = 3 - -# Set to avoid duplicate parameters -aquired_params = set() -dat = '' - - -def _split(start, end, count, min_interval_sec=120): - """ - Split section from `start` to `end` into `count` pieces, - and returns the beginning of each piece. - The `count` is adjusted so that the length of each piece - is no smaller than `min_interval`. - - Returns: - -------- - List of the offset of each block's first chat data. - """ - if not (isinstance(start, int) or isinstance(start, float)) or \ - not (isinstance(end, int) or isinstance(end, float)): - raise ValueError("start/end must be int or float") - if not isinstance(count, int): - raise ValueError("count must be int") - if start > end: - raise ValueError("end must be equal to or greater than start.") - if count < 1: - raise ValueError("count must be equal to or greater than 1.") - if (end - start) / count < min_interval_sec: - count = int((end - start) / min_interval_sec) - if count == 0: - count = 1 - interval = (end - start) / count - - if count == 1: - return [start] - return sorted(list(set([int(start + interval * j) - for j in range(count)]))) - - -def ready_blocks(video_id, duration, div, callback): - aquired_params.clear() - if div <= 0: - raise ValueError - - async def _get_blocks(video_id, duration, div, callback): - async with httpx.AsyncClient(http2=True, headers=headers) as session: - tasks = [_create_block(session, video_id, seektime, callback) - for seektime in _split(-1, duration, div)] - return await asyncio.gather(*tasks) - - async def _create_block(session, video_id, seektime, callback): - continuation = arcparam.getparam(video_id, seektime=seektime) - err = None - last_offset = 0 - global dat - for _ in range(MAX_RETRY_COUNT): - try: - if continuation in aquired_params: - next_continuation, actions = None, [] - break - aquired_params.add(continuation) - param = get_param(continuation, replay=True, offsetms=seektime * 1000, dat=dat) - resp = await session.post(smr, json=param, timeout=10) - next_continuation, actions, last_offset, dat = parser.parse(resp.json()) - break - except JSONDecodeError: - await asyncio.sleep(3) - except httpx.HTTPError as e: - err = e - await asyncio.sleep(3) - else: - cancel() - raise UnknownConnectionError("Abort:" + str(err)) - - if actions: - first_offset = parser.get_offset(actions[0]) - if callback: - callback(actions, last_offset - first_offset) - return Block( - continuation=next_continuation, - chat_data=actions, - first=first_offset, - last=last_offset - ) - - """ - fetch initial blocks. - """ - loop = asyncio.get_event_loop() - blocks = loop.run_until_complete( - _get_blocks(video_id, duration, div, callback)) - return blocks - - -def fetch_patch(callback, blocks, video_id): - - async def _allocate_workers(): - workers = [ - ExtractWorker( - fetch=_fetch, block=block, - blocks=blocks, video_id=video_id - ) - for block in blocks - ] - async with httpx.AsyncClient() as session: - tasks = [worker.run(session) for worker in workers] - return await asyncio.gather(*tasks) - - async def _fetch(continuation, last_offset, session=None) -> Patch: - global dat - err = None - for _ in range(MAX_RETRY_COUNT): - try: - if continuation in aquired_params: - continuation, actions = None, [] - break - aquired_params.add(continuation) - params = get_param(continuation, replay=True, offsetms=last_offset, dat=dat) - # util.save(json.dumps(params, ensure_ascii=False), "v:/~~/param_"+str(last_offset), ".json") - resp = await session.post(smr, json=params) - continuation, actions, last_offset, dat = parser.parse(resp.json()) - break - except JSONDecodeError: - await asyncio.sleep(3) - except httpx.HTTPError as e: - err = e - await asyncio.sleep(3) - except socket.error as error: - print("socket error", error.errno) - await asyncio.sleep(3) - else: - cancel() - raise UnknownConnectionError("Abort:" + str(err)) - - if actions: - last = last_offset - first = parser.get_offset(actions[0]) - if callback: - callback(actions, last - first) - return Patch(actions, continuation, first, last) - return Patch(continuation=continuation) - - """ - allocate workers and assign blocks. - """ - loop = asyncio.get_event_loop() - try: - loop.run_until_complete(_allocate_workers()) - except CancelledError: - pass - - -async def _shutdown(): - tasks = [t for t in asyncio.all_tasks() - if t is not asyncio.current_task()] - for task in tasks: - task.cancel() - - -def cancel(): - loop = asyncio.get_event_loop() - loop.create_task(_shutdown()) diff --git a/pytchat/tool/extract/block.py b/pytchat/tool/extract/block.py deleted file mode 100644 index c827661..0000000 --- a/pytchat/tool/extract/block.py +++ /dev/null @@ -1,56 +0,0 @@ -class Block: - """Block object represents something like a box - to join chunk of chatdata. - - Parameter: - --------- - first : int : - videoOffsetTimeMs of the first chat_data - (chat_data[0]) - - last : int : - videoOffsetTimeMs of the last chat_data. - (chat_data[-1]) - - this value increases as fetching chatdata progresses. - - end : int : - target videoOffsetTimeMs of last chat data for extract, - equals to first videoOffsetTimeMs of next block. - when extract worker reaches this offset, stop fetching. - - continuation : str : - continuation param of last chat data. - - chat_data : list - - done : bool : - whether this block has been fetched. - - remaining : int : - remaining data to extract. - equals end - last. - - is_last : bool : - whether this block is the last one in blocklist. - - during_split : bool : - whether this block is in the process of during_split. - while True, this block is excluded from duplicate split procedure. - """ - - __slots__ = ['first', 'last', 'end', 'continuation', 'chat_data', 'remaining', - 'done', 'is_last', 'during_split'] - - def __init__(self, first=0, last=0, end=0, - continuation='', chat_data=[], is_last=False, - during_split=False): - self.first = first - self.last = last - self.end = end - self.continuation = continuation - self.chat_data = chat_data - self.done = False - self.remaining = self.end - self.last - self.is_last = is_last - self.during_split = during_split diff --git a/pytchat/tool/extract/duplcheck.py b/pytchat/tool/extract/duplcheck.py deleted file mode 100644 index 1ac18c1..0000000 --- a/pytchat/tool/extract/duplcheck.py +++ /dev/null @@ -1,153 +0,0 @@ -from . import parser - - -def check_duplicate(chatdata): - max_range = len(chatdata) - 1 - tbl_offset = [None] * max_range - tbl_id = [None] * max_range - tbl_type = [None] * max_range - - def create_table(chatdata, max_range): - for i in range(max_range): - tbl_offset[i] = parser.get_offset(chatdata[i]) - tbl_id[i] = parser.get_id(chatdata[i]) - tbl_type[i] = parser.get_type(chatdata[i]) - - def is_duplicate(i, j): - return ( - tbl_offset[i] == tbl_offset[j] - and tbl_id[i] == tbl_id[j] - and tbl_type[i] == tbl_type[j] - ) - print("creating table...") - create_table(chatdata, max_range) - print("searching duplicate data...") - return [{"i": { - "index": i, "id": parser.get_id(chatdata[i]), - "offsetTime": parser.get_offset(chatdata[i]), - "type": parser.get_type(chatdata[i]) - }, - "j":{ - "index": j, "id": parser.get_id(chatdata[j]), - "offsetTime": parser.get_offset(chatdata[j]), - "type": parser.get_type(chatdata[j]) - } - } - for i in range(max_range) for j in range(i + 1, max_range) - if is_duplicate(i, j)] - - -def check_duplicate_offset(chatdata): - max_range = len(chatdata) - tbl_offset = [None] * max_range - tbl_id = [None] * max_range - tbl_type = [None] * max_range - - def create_table(chatdata, max_range): - for i in range(max_range): - tbl_offset[i] = parser.get_offset(chatdata[i]) - tbl_id[i] = parser.get_id(chatdata[i]) - tbl_type[i] = parser.get_type(chatdata[i]) - - def is_duplicate(i, j): - return ( - tbl_offset[i] == tbl_offset[j] - and tbl_id[i] == tbl_id[j] - ) - - print("creating table...") - create_table(chatdata, max_range) - print("searching duplicate data...") - - return [{ - "index": i, "id": tbl_id[i], - "offsetTime": tbl_offset[i], - "type:": tbl_type[i] - } - for i in range(max_range - 1) - if is_duplicate(i, i + 1)] - - -def remove_duplicate_head(blocks): - if len(blocks) == 0 or len(blocks) == 1: - return blocks - - def is_duplicate_head(index): - - if len(blocks[index].chat_data) == 0: - return True - elif len(blocks[index + 1].chat_data) == 0: - return False - - id_0 = parser.get_id(blocks[index].chat_data[0]) - id_1 = parser.get_id(blocks[index + 1].chat_data[0]) - type_0 = parser.get_type(blocks[index].chat_data[0]) - type_1 = parser.get_type(blocks[index + 1].chat_data[0]) - return ( - blocks[index].first == blocks[index + 1].first - and id_0 == id_1 - and type_0 == type_1 - ) - ret = [blocks[i] for i in range(len(blocks) - 1) - if (len(blocks[i].chat_data) > 0 - and not is_duplicate_head(i))] - ret.append(blocks[-1]) - return ret - - -def remove_duplicate_tail(blocks): - if len(blocks) == 0 or len(blocks) == 1: - return blocks - - def is_duplicate_tail(index): - if len(blocks[index].chat_data) == 0: - return True - elif len(blocks[index - 1].chat_data) == 0: - return False - id_0 = parser.get_id(blocks[index - 1].chat_data[-1]) - id_1 = parser.get_id(blocks[index].chat_data[-1]) - type_0 = parser.get_type(blocks[index - 1].chat_data[-1]) - type_1 = parser.get_type(blocks[index].chat_data[-1]) - return ( - blocks[index - 1].last == blocks[index].last - and id_0 == id_1 - and type_0 == type_1 - ) - - ret = [blocks[i] for i in range(0, len(blocks)) - if i == 0 or not is_duplicate_tail(i)] - return ret - - -def remove_overlap(blocks): - """ - Fix overlapped blocks after ready_blocks(). - Align the last offset of each block to the first offset - of next block (equals `end` offset of each block). - """ - if len(blocks) == 0 or len(blocks) == 1: - return blocks - - for block in blocks: - if block.is_last: - break - if len(block.chat_data) == 0: - continue - block_end = block.end - if block.last >= block_end: - for line in reversed(block.chat_data): - if parser.get_offset(line) < block_end: - break - block.chat_data.pop() - block.last = parser.get_offset(line) - block.remaining = 0 - block.done = True - block.continuation = None - return blocks - - -def _dump(blocks): - print("---------- first last end---") - for i, block in enumerate(blocks): - print( - f"block[{i:3}] {block.first:>10} {block.last:>10} {block.end:>10}") diff --git a/pytchat/tool/extract/extractor.py b/pytchat/tool/extract/extractor.py deleted file mode 100644 index f722132..0000000 --- a/pytchat/tool/extract/extractor.py +++ /dev/null @@ -1,96 +0,0 @@ -from typing import Generator -from . import asyncdl -from . import duplcheck -from .. videoinfo import VideoInfo -from ... import config -from ... exceptions import InvalidVideoIdException -from ... import util - -logger = config.logger(__name__) -headers = config.headers - - -class Extractor: - def __init__(self, video_id, div=1, callback=None, processor=None): - if not isinstance(div, int) or div < 1: - raise ValueError('div must be positive integer.') - elif div > 10: - div = 10 - self.video_id = util.extract_video_id(video_id) - self.div = div - self.callback = callback - self.processor = processor - self.duration = self._get_duration_of_video(video_id) - self.blocks = [] - - def _get_duration_of_video(self, video_id): - duration = 0 - try: - duration = VideoInfo(video_id).get_duration() - except InvalidVideoIdException: - raise - return duration - - def _ready_blocks(self): - blocks = asyncdl.ready_blocks( - self.video_id, self.duration, self.div, self.callback) - self.blocks = [block for block in blocks if block] - return self - - def _remove_duplicate_head(self): - self.blocks = duplcheck.remove_duplicate_head(self.blocks) - return self - - def _set_block_end(self): - if len(self.blocks) > 0: - for i in range(len(self.blocks) - 1): - self.blocks[i].end = self.blocks[i + 1].first - self.blocks[-1].end = self.duration * 1000 - self.blocks[-1].is_last = True - return self - - def _remove_overlap(self): - self.blocks = duplcheck.remove_overlap(self.blocks) - return self - - def _download_blocks(self): - asyncdl.fetch_patch(self.callback, self.blocks, self.video_id) - return self - - def _remove_duplicate_tail(self): - self.blocks = duplcheck.remove_duplicate_tail(self.blocks) - return self - - def _get_chatdata(self) -> Generator: - for block in self.blocks: - for chatdata in block.chat_data: - yield chatdata - - def _execute_extract_operations(self): - return ( - self._ready_blocks() - ._remove_duplicate_head() - ._set_block_end() - ._remove_overlap() - ._download_blocks() - ._remove_duplicate_tail() - ._get_chatdata() - ) - - def extract(self): - if self.duration == 0: - print("\nCannot extract chat data:\n The specified video has not yet been archived.") - return [] - data = self._execute_extract_operations() - if self.processor is None: - return data - ret = self.processor.process( - [{'video_id': None, - 'timeout': 1, - 'chatdata': (action["replayChatItemAction"]["actions"][0] for action in data)}] - ) - self.processor.finalize() - return ret - - def cancel(self): - asyncdl.cancel() diff --git a/pytchat/tool/extract/parser.py b/pytchat/tool/extract/parser.py deleted file mode 100644 index d9b2cc8..0000000 --- a/pytchat/tool/extract/parser.py +++ /dev/null @@ -1,57 +0,0 @@ -from ... import config -from ... import exceptions - -logger = config.logger(__name__) - - -def parse(jsn): - """ - Parse replay chat data. - Parameter: - ---------- - jsn : dict - JSON of replay chat data. - Returns: - ------ - continuation : str - actions : list - - """ - if jsn is None: - raise ValueError("parameter JSON is None") - if jsn.get("error") or jsn.get("responseContext", {}).get("errors"): - raise exceptions.ResponseContextError( - 'video_id is invalid or private/deleted.') - contents = jsn.get('continuationContents') - if contents is None: - raise exceptions.NoContents('No chat data.') - - cont = contents['liveChatContinuation']['continuations'][0] - if cont is None: - raise exceptions.NoContinuation('No Continuation') - metadata = cont.get('liveChatReplayContinuationData') - if metadata: - visitor_data = jsn.get("responseContext", {}).get("visitorData", '') - continuation = metadata.get("continuation") - actions: list = contents['liveChatContinuation'].get('actions') - last_offset: int = get_offset(actions[-1]) if actions else 0 - return continuation, actions, last_offset, visitor_data - return None, [], 0, '' - - -def get_offset(item) -> int: - return int(item['replayChatItemAction']["videoOffsetTimeMsec"]) - - -def get_id(item): - a = list(item['replayChatItemAction']["actions"][0].values())[0].get('item') - if a: - return list(a.values())[0].get('id') - return None - - -def get_type(item): - a = list(item['replayChatItemAction']["actions"][0].values())[0].get('item') - if a: - return list(a.keys())[0] - return None diff --git a/pytchat/tool/extract/patch.py b/pytchat/tool/extract/patch.py deleted file mode 100644 index 307bd0b..0000000 --- a/pytchat/tool/extract/patch.py +++ /dev/null @@ -1,55 +0,0 @@ -from . import parser -from . block import Block -from typing import NamedTuple - - -class Patch(NamedTuple): - """ - Patch represents chunk of chat data - which is fetched by asyncdl.fetch_patch._fetch(). - """ - chats: list = [] - continuation: str = None - first: int = None - last: int = None - - -def fill(block: Block, patch: Patch): - block_end = block.end - if patch.last < block_end or block.is_last: - set_patch(block, patch) - return - for line in reversed(patch.chats): - line_offset = parser.get_offset(line) - if line_offset < block_end: - break - patch.chats.pop() - set_patch(block, patch._replace( - continuation=None, - last=line_offset - ) - ) - block.remaining = 0 - block.done = True - - -def split(parent_block: Block, child_block: Block, patch: Patch): - parent_block.during_split = False - if patch.first <= parent_block.last: - ''' When patch overlaps with parent_block, - discard this block. ''' - child_block.continuation = None - ''' Leave child_block.during_split == True - to exclude from during_split sequence. ''' - return - child_block.during_split = False - child_block.first = patch.first - parent_block.end = patch.first - fill(child_block, patch) - - -def set_patch(block: Block, patch: Patch): - block.continuation = patch.continuation - block.chat_data.extend(patch.chats) - block.last = patch.last - block.remaining = block.end - block.last diff --git a/pytchat/tool/extract/worker.py b/pytchat/tool/extract/worker.py deleted file mode 100644 index 5216451..0000000 --- a/pytchat/tool/extract/worker.py +++ /dev/null @@ -1,92 +0,0 @@ -from . block import Block -from . patch import fill, split -from ... paramgen import arcparam -from typing import Tuple - - -class ExtractWorker: - """ - ExtractWorker associates a download session with a block. - When the worker finishes fetching, the block - being fetched is splitted and assigned the free worker. - - Parameter - ---------- - fetch : func : - extract function of asyncdl - - block : Block : - Block object that includes chat_data - - blocks : list : - List of Block(s) - - video_id : str : - - parent_block : Block : - the block from which current block is splitted - """ - __slots__ = ['block', 'fetch', 'blocks', 'video_id', 'parent_block'] - - def __init__(self, fetch, block, blocks, video_id): - self.block = block - self.fetch = fetch - self.blocks = blocks - self.video_id = video_id - self.parent_block = None - - async def run(self, session): - while self.block.continuation: - patch = await self.fetch( - self.block.continuation, self.block.last, session) - if patch.continuation is None: - """TODO : make the worker assigned to the last block - to work more than twice as possible. - """ - break - if self.parent_block: - split(self.parent_block, self.block, patch) - self.parent_block = None - else: - fill(self.block, patch) - if self.block.continuation is None: - """finished fetching this block """ - self.block.done = True - self.block = _search_new_block(self) - - -def _search_new_block(worker) -> Block: - index, undone_block = _get_undone_block(worker.blocks) - if undone_block is None: - return Block(continuation=None) - mean = (undone_block.last + undone_block.end) / 2 - continuation = arcparam.getparam(worker.video_id, seektime=mean / 1000) - worker.parent_block = undone_block - worker.parent_block.during_split = True - new_block = Block( - end=undone_block.end, - chat_data=[], - continuation=continuation, - during_split=True, - is_last=worker.parent_block.is_last) - '''swap last block''' - if worker.parent_block.is_last: - worker.parent_block.is_last = False - worker.blocks.insert(index + 1, new_block) - return new_block - - -def _get_undone_block(blocks) -> Tuple[int, Block]: - min_interval_ms = 120000 - max_remaining = 0 - undone_block = None - index_undone_block = 0 - for index, block in enumerate(blocks): - if block.done or block.during_split: - continue - remaining = block.remaining - if remaining > max_remaining and remaining > min_interval_ms: - index_undone_block = index - undone_block = block - max_remaining = remaining - return index_undone_block, undone_block diff --git a/pytchat/tool/videoinfo.py b/pytchat/tool/videoinfo.py deleted file mode 100644 index 9f8b972..0000000 --- a/pytchat/tool/videoinfo.py +++ /dev/null @@ -1,201 +0,0 @@ -import httpx -import json -import re -import time -from .. import config -from ..exceptions import InvalidVideoIdException, PatternUnmatchError, UnknownConnectionError -from ..util import extract_video_id - - -headers = config.headers -pattern = re.compile(r"['\"]PLAYER_CONFIG['\"]:\s*({.*})") -pattern2 = re.compile(r"yt\.setConfig\((\{[\s\S]*?\})\);") - -item_channel_id = [ - "videoDetails", - "embeddedPlayerOverlayVideoDetailsRenderer", - "channelThumbnailEndpoint", - "channelThumbnailEndpoint", - "urlEndpoint", - "urlEndpoint", - "url" -] - -item_renderer = [ - "embedPreview", - "thumbnailPreviewRenderer" -] - -item_response = [ - "args", - "embedded_player_response" -] - -item_response2 = [ - "PLAYER_VARS", - "embedded_player_response" -] -item_author_image = [ - "videoDetails", - "embeddedPlayerOverlayVideoDetailsRenderer", - "channelThumbnail", - "thumbnails", - 0, - "url" -] - -item_thumbnail = [ - "defaultThumbnail", - "thumbnails", - 2, - "url" -] - -item_channel_name = [ - "videoDetails", - "embeddedPlayerOverlayVideoDetailsRenderer", - "expandedRenderer", - "embeddedPlayerOverlayVideoDetailsExpandedRenderer", - "title", - "runs", - 0, - "text" -] - -item_moving_thumbnail = [ - "movingThumbnail", - "thumbnails", - 0, - "url" -] - - -class VideoInfo: - ''' - VideoInfo object retrieves YouTube video information. - - Parameter - --------- - video_id : str - - Exception - --------- - InvalidVideoIdException : - Occurs when video_id does not exist on YouTube. - ''' - - def __init__(self, video_id): - self.video_id = extract_video_id(video_id) - self.client = httpx.Client(http2=True) - self.new_pattern_text = False - err = None - for _ in range(3): - try: - text = self._get_page_text(self.video_id) - self._parse(text) - break - except (InvalidVideoIdException, UnknownConnectionError) as e: - raise e - except Exception as e: - err = e - time.sleep(2) - pass - else: - raise err - - def _get_page_text(self, video_id): - url = f"https://www.youtube.com/embed/{video_id}" - err = None - for _ in range(3): - try: - resp = self.client.get(url, headers=headers) - resp.raise_for_status() - break - except httpx.HTTPError as e: - err = e - time.sleep(3) - else: - raise UnknownConnectionError(str(err)) - - return resp.text - - def _parse(self, text): - result = re.search(pattern, text) - if result is None: - result = re.search(pattern2, text) - if result is None: - raise PatternUnmatchError(doc=text) - else: - self.new_pattern_text = True - decoder = json.JSONDecoder() - if self.new_pattern_text: - res = decoder.raw_decode(result.group(1))[0] - else: - res = decoder.raw_decode(result.group(1)[:-1])[0] - if self.new_pattern_text: - response = self._get_item(res, item_response2) - else: - response = self._get_item(res, item_response) - if response is None: - if self.new_pattern_text: - self._check_video_is_private(res.get("PLAYER_VARS")) - else: - self._check_video_is_private(res.get("args")) - self._renderer = self._get_item(json.loads(response), item_renderer) - if self._renderer is None: - raise InvalidVideoIdException( - f"No renderer found in video_id: [{self.video_id}].") - - def _check_video_is_private(self, args): - if args and args.get("video_id"): - raise InvalidVideoIdException( - f"video_id [{self.video_id}] is private or deleted.") - raise InvalidVideoIdException( - f"video_id [{self.video_id}] is invalid.") - - def _get_item(self, dict_body, items: list): - for item in items: - if dict_body is None: - break - if isinstance(dict_body, dict): - dict_body = dict_body.get(item) - continue - if isinstance(item, int) and \ - isinstance(dict_body, list) and \ - len(dict_body) > item: - dict_body = dict_body[item] - continue - return None - return dict_body - - def get_duration(self): - duration_seconds = self._renderer.get("videoDurationSeconds") - if duration_seconds: - '''Fetched value is string, so cast to integer.''' - return int(duration_seconds) - '''When key is not found, explicitly returns None.''' - return None - - def get_title(self): - if self._renderer.get("title"): - return [''.join(run["text"]) - for run in self._renderer["title"]["runs"]][0] - return None - - def get_channel_id(self): - channel_url = self._get_item(self._renderer, item_channel_id) - if channel_url: - return channel_url[9:] - return None - - def get_author_image(self): - return self._get_item(self._renderer, item_author_image) - - def get_thumbnail(self): - return self._get_item(self._renderer, item_thumbnail) - - def get_channel_name(self): - return self._get_item(self._renderer, item_channel_name) - - def get_moving_thumbnail(self): - return self._get_item(self._renderer, item_moving_thumbnail) diff --git a/tests/test_extract_duplcheck.py b/tests/test_extract_duplcheck.py deleted file mode 100644 index e02976e..0000000 --- a/tests/test_extract_duplcheck.py +++ /dev/null @@ -1,134 +0,0 @@ -import json -from pytchat.tool.extract import duplcheck -from pytchat.tool.extract import parser -from pytchat.tool.extract.block import Block -from pytchat.tool.extract.duplcheck import _dump - - -def _open_file(path): - with open(path, mode='r', encoding='utf-8') as f: - return f.read() - - -def test_overlap(): - """ - test overlap data - operation : [0] [2] [3] [4] -> last :align to end - [1] , [5] -> no change - - """ - - def load_chatdata(filename): - return parser.parse( - json.loads(_open_file( - "tests/testdata/extract_duplcheck/overlap/" + filename)) - )[1] - - blocks = ( - Block(first=0, last=12771, end=9890, - chat_data=load_chatdata("dp0-0.json")), - Block(first=9890, last=15800, end=20244, - chat_data=load_chatdata("dp0-1.json")), - Block(first=20244, last=45146, end=32476, - chat_data=load_chatdata("dp0-2.json")), - Block(first=32476, last=50520, end=41380, - chat_data=load_chatdata("dp0-3.json")), - Block(first=41380, last=62875, end=52568, - chat_data=load_chatdata("dp0-4.json")), - Block(first=52568, last=62875, end=54000, - chat_data=load_chatdata("dp0-5.json"), is_last=True) - ) - result = duplcheck.remove_overlap(blocks) - # dp0-0.json has item offset time is 9890 (equals block[0].end = block[1].first), - # but must be aligne to the most close and smaller value:9779. - assert result[0].last == 9779 - - assert result[1].last == 15800 - - assert result[2].last == 32196 - - assert result[3].last == 41116 - - assert result[4].last == 52384 - - # the last block must be always added to result. - assert result[5].last == 62875 - - -def test_duplicate_head(): - - def load_chatdata(filename): - return parser.parse( - json.loads(_open_file( - "tests/testdata/extract_duplcheck/head/" + filename)) - )[1] - - """ - test duplicate head data - operation : [0] , [1] -> discard [0] - [1] , [2] -> discard [1] - [2] , [3] -> append [2] - [3] , [4] -> discard [3] - [4] , [5] -> append [4] - append [5] - - result : [2] , [4] , [5] - """ - - # chat data offsets are ignored. - blocks = ( - Block(first=0, last=2500, chat_data=load_chatdata("dp0-0.json")), - Block(first=0, last=38771, chat_data=load_chatdata("dp0-1.json")), - Block(first=0, last=45146, chat_data=load_chatdata("dp0-2.json")), - Block(first=20244, last=60520, chat_data=load_chatdata("dp0-3.json")), - Block(first=20244, last=62875, chat_data=load_chatdata("dp0-4.json")), - Block(first=52568, last=62875, chat_data=load_chatdata("dp0-5.json")) - ) - _dump(blocks) - result = duplcheck.remove_duplicate_head(blocks) - - assert len(result) == 3 - assert result[0].first == blocks[2].first - assert result[0].last == blocks[2].last - assert result[1].first == blocks[4].first - assert result[1].last == blocks[4].last - assert result[2].first == blocks[5].first - assert result[2].last == blocks[5].last - - -def test_duplicate_tail(): - """ - test duplicate tail data - operation : append [0] - [0] , [1] -> discard [1] - [1] , [2] -> append [2] - [2] , [3] -> discard [3] - [3] , [4] -> append [4] - [4] , [5] -> discard [5] - - result : [0] , [2] , [4] - """ - def load_chatdata(filename): - return parser.parse( - json.loads(_open_file( - "tests/testdata/extract_duplcheck/head/" + filename)) - )[1] - # chat data offsets are ignored. - blocks = ( - Block(first=0, last=2500, chat_data=load_chatdata("dp0-0.json")), - Block(first=1500, last=2500, chat_data=load_chatdata("dp0-1.json")), - Block(first=10000, last=45146, chat_data=load_chatdata("dp0-2.json")), - Block(first=20244, last=45146, chat_data=load_chatdata("dp0-3.json")), - Block(first=20244, last=62875, chat_data=load_chatdata("dp0-4.json")), - Block(first=52568, last=62875, chat_data=load_chatdata("dp0-5.json")) - ) - - result = duplcheck.remove_duplicate_tail(blocks) - _dump(result) - assert len(result) == 3 - assert result[0].first == blocks[0].first - assert result[0].last == blocks[0].last - assert result[1].first == blocks[2].first - assert result[1].last == blocks[2].last - assert result[2].first == blocks[4].first - assert result[2].last == blocks[4].last diff --git a/tests/test_extract_patch.py b/tests/test_extract_patch.py deleted file mode 100644 index bb1c7ec..0000000 --- a/tests/test_extract_patch.py +++ /dev/null @@ -1,239 +0,0 @@ -import json - -from pytchat.tool.extract import parser -from pytchat.tool.extract.block import Block -from pytchat.tool.extract.patch import Patch, split - - -def _open_file(path): - with open(path, mode='r', encoding='utf-8') as f: - return f.read() - - -def load_chatdata(filename): - return parser.parse( - json.loads(_open_file("tests/testdata/fetch_patch/" + filename)) - )[1] - - -def test_split_0(): - """ - Normal case - - ~~~~~~ before ~~~~~~ - - @parent_block (# = already fetched) - - first last end - |########----------------------------------------| - - - @child_block - - first = last = 0 end (=parent_end) - | | - - - @fetched patch - |-- patch --| - - - | - | - V - - ~~~~~~ after ~~~~~~ - - - @parent_block - - first last end (after split) - |########------------| - - @child_block - first last end - |###########---------------| - - @fetched patch - |-- patch --| - """ - parent = Block(first=0, last=4000, end=60000, - continuation='parent', during_split=True) - child = Block(first=0, last=0, end=60000, - continuation='mean', during_split=True) - patch = Patch(chats=load_chatdata('pt0-5.json'), - first=32500, last=34000, continuation='patch') - - split(parent, child, patch) - - assert child.continuation == 'patch' - assert parent.last < child.first - assert parent.end == child.first - assert child.first < child.last - assert child.last < child.end - assert parent.during_split is False - assert child.during_split is False - - -def test_split_1(): - """patch.first <= parent_block.last - - While awaiting at run()->asyncdl._fetch() - fetching parent_block proceeds, - and parent.block.last exceeds patch.first. - - In this case, fetched patch is all discarded, - and worker searches other processing block again. - - ~~~~~~ before ~~~~~~ - - patch.first - first | last end - |####################|#####|---------------------| - ^ - @child_block - first = last = 0 end (=parent_end) - | | - - @fetched patch - |-- patch --| - - - | - | - V - - ~~~~~~ after ~~~~~~ - - @parent_block - first last end - |###########################|--------------------| - - @child_block - - .............. ->  discard all data - - """ - parent = Block(first=0, last=33000, end=60000, continuation='parent', during_split=True) - child = Block(first=0, last=0, end=60000, continuation='mean', during_split=True) - patch = Patch(chats=load_chatdata('pt0-5.json'), - first=32500, last=34000, continuation='patch') - - split(parent, child, patch) - - assert parent.last == 33000 # no change - assert parent.end == 60000 # no change - assert child.continuation is None - assert parent.during_split is False - assert child.during_split is True # exclude during_split sequence - - -def test_split_2(): - """child_block.end < patch.last: - - Case the last offset of patch exceeds child_block.end. - In this case, remove overlapped data of patch. - - ~~~~~~ before ~~~~~~ - - @parent_block (# = already fetched) - first last end (before split) - |########------------------------------| - - @child_block - first = last = 0 end (=parent_end) - | | - - continuation:succeed from patch - - @fetched patch - |-------- patch --------| - - - | - | - V - - ~~~~~~ after ~~~~~~ - - @parent_block - first last end (after split) - |########------------| - - @child_block old patch.end - first last=end | - |#################|...... cut extra data. - ^ - continuation : None (extract complete) - - @fetched patch - |-------- patch --------| - """ - parent = Block(first=0, last=4000, end=33500, continuation='parent', during_split=True) - child = Block(first=0, last=0, end=33500, continuation='mean', during_split=True) - patch = Patch(chats=load_chatdata('pt0-5.json'), - first=32500, last=34000, continuation='patch') - - split(parent, child, patch) - - assert child.continuation is None - assert parent.last < child.first - assert parent.end == child.first - assert child.first < child.last - assert child.last < child.end - assert child.continuation is None - assert parent.during_split is False - assert child.during_split is False - - -def test_split_none(): - """patch.last <= parent_block.last - - While awaiting at run()->asyncdl._fetch() - fetching parent_block proceeds, - and parent.block.last exceeds patch.first. - - In this case, fetched patch is all discarded, - and worker searches other processing block again. - - ~~~~~~ before ~~~~~~ - - patch.first - first | last end - |####################|###################|-------| - ^ - @child_block - first = last = 0 end (=parent_end) - | | - - @fetched patch - |-- patch --| - patch.last < parent_block.last. - - | - | - V - - ~~~~~~ after ~~~~~~ - - @parent_block - first last end (before split) - |########################################|-------| - - @child_block - - ............ -> discard all data. - - """ - parent = Block(first=0, last=40000, end=60000, continuation='parent', during_split=True) - child = Block(first=0, last=0, end=60000, continuation='mean', during_split=True) - patch = Patch(chats=load_chatdata('pt0-5.json'), - first=32500, last=34000, continuation='patch') - - split(parent, child, patch) - - assert parent.last == 40000 # no change - assert parent.end == 60000 # no change - assert child.continuation is None - assert parent.during_split is False - assert child.during_split is True # exclude during_split sequence diff --git a/tests/test_videoinfo.py b/tests/test_videoinfo.py deleted file mode 100644 index 7189fef..0000000 --- a/tests/test_videoinfo.py +++ /dev/null @@ -1,101 +0,0 @@ -from json.decoder import JSONDecodeError -from pytchat.tool.videoinfo import VideoInfo -from pytchat.exceptions import InvalidVideoIdException - - -def _open_file(path): - with open(path, mode='r', encoding='utf-8') as f: - return f.read() - - -def _set_test_data(filepath, mocker): - _text = _open_file(filepath) - response_mock = mocker.Mock() - response_mock.status_code = 200 - response_mock.text = _text - mocker.patch('httpx.Client.get').return_value = response_mock - - -def test_archived_page(mocker): - _set_test_data('tests/testdata/videoinfo/archived_page.txt', mocker) - info = VideoInfo('__test_id__') - actual_thumbnail_url = 'https://i.ytimg.com/vi/fzI9FNjXQ0o/hqdefault.jpg' - assert info.video_id == '__test_id__' - assert info.get_channel_name() == 'GitHub' - assert info.get_thumbnail() == actual_thumbnail_url - assert info.get_title() == 'GitHub Arctic Code Vault' - assert info.get_channel_id() == 'UC7c3Kb6jYCRj4JOHHZTxKsQ' - assert info.get_duration() == 148 - - -def test_live_page(mocker): - _set_test_data('tests/testdata/videoinfo/live_page.txt', mocker) - info = VideoInfo('__test_id__') - '''live page: duration==0''' - assert info.get_duration() == 0 - assert info.video_id == '__test_id__' - assert info.get_channel_name() == 'BGM channel' - assert info.get_thumbnail() == \ - 'https://i.ytimg.com/vi/fEvM-OUbaKs/hqdefault_live.jpg' - assert info.get_title() == ( - 'Coffee Jazz Music - Chill Out Lounge Jazz Music Radio' - ' - 24/7 Live Stream - Slow Jazz') - assert info.get_channel_id() == 'UCQINXHZqCU5i06HzxRkujfg' - - -def test_invalid_video_id(mocker): - '''Test case invalid video_id is specified.''' - _set_test_data( - 'tests/testdata/videoinfo/invalid_video_id_page.txt', mocker) - try: - _ = VideoInfo('__test_id__') - assert False - except InvalidVideoIdException: - assert True - - -def test_no_info(mocker): - '''Test case the video page has renderer, but no info.''' - _set_test_data( - 'tests/testdata/videoinfo/no_info_page.txt', mocker) - info = VideoInfo('__test_id__') - assert info.video_id == '__test_id__' - assert info.get_channel_name() is None - assert info.get_thumbnail() is None - assert info.get_title() is None - assert info.get_channel_id() is None - assert info.get_duration() is None - - -def test_collapsed_data(mocker): - '''Test case the video page's info is collapsed.''' - _set_test_data( - 'tests/testdata/videoinfo/collapsed_page.txt', mocker) - try: - _ = VideoInfo('__test_id__') - assert False - except JSONDecodeError: - assert True - - -def test_pattern_unmatch(mocker): - '''Test case the pattern for extraction is unmatched.''' - _set_test_data( - 'tests/testdata/videoinfo/pattern_unmatch.txt', mocker) - try: - _ = VideoInfo('__test_id__') - assert False - except JSONDecodeError: - assert True - - -def test_extradata_handling(mocker): - '''Test case the extracted data are JSON lines.''' - _set_test_data( - 'tests/testdata/videoinfo/extradata_page.txt', mocker) - try: - _ = VideoInfo('__test_id__') - assert True - except JSONDecodeError as e: - print(e.doc) - assert False