From 115277e5e10f8187fab690def7e4ad0245b5b8e9 Mon Sep 17 00:00:00 2001 From: taizan-hokouto <55448286+taizan-hokuto@users.noreply.github.com> Date: Tue, 6 Oct 2020 01:19:45 +0900 Subject: [PATCH] Fix handling internal error and keyboard interrupt --- pytchat/cli/__init__.py | 150 ++++++++++++++++++------------ pytchat/cli/progressbar.py | 16 ++-- pytchat/tool/extract/asyncdl.py | 19 ++-- pytchat/tool/extract/extractor.py | 1 - 4 files changed, 109 insertions(+), 77 deletions(-) diff --git a/pytchat/cli/__init__.py b/pytchat/cli/__init__.py index b88f95a..b72867d 100644 --- a/pytchat/cli/__init__.py +++ b/pytchat/cli/__init__.py @@ -1,5 +1,6 @@ import argparse - +import asyncio +from asyncio.exceptions import CancelledError import os import signal from json.decoder import JSONDecodeError @@ -52,67 +53,102 @@ def main(): if not os.path.exists(Arguments().output): print("\nThe specified directory does not exist.:{}\n".format(Arguments().output)) return + try: + Runner().run() + except CancelledError as e: + print(str(e)) - for counter, video_id in enumerate(Arguments().video_ids): - if len(Arguments().video_ids) > 1: - print(f"\n{'-' * 10} video:{counter + 1} of {len(Arguments().video_ids)} {'-' * 10}") - try: - video_id = extract_video_id(video_id) - separated_path = str(Path(Arguments().output)) + os.path.sep - path = util.checkpath(separated_path + video_id + '.html') +class Runner: + + def run(self) -> None: + ex = None + pbar = None + for counter, video_id in enumerate(Arguments().video_ids): + if len(Arguments().video_ids) > 1: + print(f"\n{'-' * 10} video:{counter + 1} of {len(Arguments().video_ids)} {'-' * 10}") + try: - info = VideoInfo(video_id) - except Exception as e: - print("Cannot parse video information.:{} {}".format(video_id, type(e))) + video_id = extract_video_id(video_id) + separated_path = str(Path(Arguments().output)) + os.path.sep + path = util.checkpath(separated_path + video_id + '.html') + try: + info = VideoInfo(video_id) + except Exception as e: + print("Cannot parse video information.:{} {}".format(video_id, type(e))) + if Arguments().save_error_data: + util.save(str(e), "ERR", ".dat") + continue + + print(f"\n" + f" video_id: {video_id}\n" + f" channel: {info.get_channel_name()}\n" + f" title: {info.get_title()}\n" + f" output path: {path}") + + duration = info.get_duration() + pbar = ProgressBar(total=(duration * 1000), status_txt="Extracting") + ex = Extractor(video_id, + callback=pbar.disp, + div=10) + signal.signal(signal.SIGINT, (lambda a, b: self.cancel(ex, pbar))) + + data = ex.extract() + if data == []: + continue + pbar.reset("#", "=", total=len(data), status_txt="Rendering ") + processor = HTMLArchiver(path, callback=pbar.disp) + processor.process( + [{'video_id': None, + 'timeout': 1, + 'chatdata': (action["replayChatItemAction"]["actions"][0] for action in data)}] + ) + processor.finalize() + pbar.reset('#', '#', status_txt='Completed ') + pbar.close() + print() + if pbar.is_cancelled(): + print("\nThe extraction process has been discontinued.\n") + except InvalidVideoIdException: + print("Invalid Video ID or URL:", video_id) + except NoContents as e: + print(f"Abort:{str(e)}:[{video_id}]") + except (JSONDecodeError, PatternUnmatchError) as e: + print("{}:{}".format(e.msg, video_id)) if Arguments().save_error_data: - util.save(str(e), "ERR", ".dat") - continue + util.save(e.doc, "ERR_", ".dat") + except (UnknownConnectionError, HCNetworkError, HCReadTimeout) as e: + print(f"An unknown network error occurred during the processing of [{video_id}]. : " + str(e)) + except Exception as e: + print(f"Abort:{str(type(e))} {str(e)[:80]}") + finally: + clear_tasks() - print(f"\n" - f" video_id: {video_id}\n" - f" channel: {info.get_channel_name()}\n" - f" title: {info.get_title()}\n" - f" output path: {path}") + return - duration = info.get_duration() - pbar = ProgressBar(total=(duration * 1000), status="Extracting") - ex = Extractor(video_id, - callback=pbar._disp, - div=10) - signal.signal(signal.SIGINT, (lambda a, b: cancel(ex, pbar))) - data = ex.extract() - if data == []: - return False - pbar.reset("#", "=", total=len(data), status="Rendering ") - processor = HTMLArchiver(path, callback=pbar._disp) - processor.process( - [{'video_id': None, - 'timeout': 1, - 'chatdata': (action["replayChatItemAction"]["actions"][0] for action in data)}] - ) - processor.finalize() - pbar.reset('#', '#', status='Completed ') - pbar.close() - print() - if pbar.is_cancelled(): - print("\nThe extraction process has been discontinued.\n") - except InvalidVideoIdException: - print("Invalid Video ID or URL:", video_id) - except NoContents as e: - print(e) - except (JSONDecodeError, PatternUnmatchError) as e: - print("{}:{}".format(e.msg, video_id)) - if Arguments().save_error_data: - util.save(e.doc, "ERR_", ".dat") - except (UnknownConnectionError, HCNetworkError, HCReadTimeout) as e: - print(f"An unknown network error occurred during the processing of [{video_id}]. : " + str(e)) - except Exception as e: - print(type(e), str(e)) - - return + def cancel(self, ex=None, pbar=None) -> None: + '''Called when keyboard interrupted has occurred. + ''' + print("\nKeyboard interrupted.\n") + if ex and pbar: + ex.cancel() + pbar.cancel() -def cancel(ex, pbar): - ex.cancel() - pbar.cancel() +def clear_tasks(): + ''' + Clear remained tasks. + Called when internal exception has occurred or + after each extraction process is completed. + ''' + async def _shutdown(): + tasks = [t for t in asyncio.all_tasks() + if t is not asyncio.current_task()] + for task in tasks: + task.cancel() + + try: + loop = asyncio.get_event_loop() + loop.run_until_complete(_shutdown()) + except Exception as e: + print(e) diff --git a/pytchat/cli/progressbar.py b/pytchat/cli/progressbar.py index 297119f..fe37591 100644 --- a/pytchat/cli/progressbar.py +++ b/pytchat/cli/progressbar.py @@ -9,21 +9,20 @@ import sys class ProgressBar: - def __init__(self, total, status): + def __init__(self, total, status_txt): self._bar_len = 60 self._cancelled = False - self.reset(total=total, status=status) - self._blinker = 0 + self.reset(total=total, status_txt=status_txt) - def reset(self, symbol_done="=", symbol_space=" ", total=100, status=''): - self.con_width = shutil.get_terminal_size(fallback=(80, 24)).columns + def reset(self, symbol_done="=", symbol_space=" ", total=100, status_txt=''): + self._console_width = shutil.get_terminal_size(fallback=(80, 24)).columns self._symbol_done = symbol_done self._symbol_space = symbol_space self._total = total - self._status = status + self._status_txt = status_txt self._count = 0 - def _disp(self, _, fetched): + def disp(self, _, fetched): self._progress(fetched, self._total) def _progress(self, fillin, total): @@ -39,11 +38,10 @@ class ProgressBar: bar = self._symbol_done * filled_len + \ self._symbol_space * (self._bar_len - filled_len) - disp = f" [{bar}] {percents:>5.1f}% ...{self._status} "[:self.con_width - 1] + '\r' + disp = f" [{bar}] {percents:>5.1f}% ...{self._status_txt} "[:self._console_width - 1] + '\r' sys.stdout.write(disp) sys.stdout.flush() - self._blinker += 1 def close(self): if not self._cancelled: diff --git a/pytchat/tool/extract/asyncdl.py b/pytchat/tool/extract/asyncdl.py index 7dfb1be..ab6fca3 100644 --- a/pytchat/tool/extract/asyncdl.py +++ b/pytchat/tool/extract/asyncdl.py @@ -1,5 +1,6 @@ -import httpx import asyncio +import httpx +import socket from . import parser from . block import Block from . worker import ExtractWorker @@ -8,7 +9,7 @@ from ... import config from ... paramgen import arcparam from ... exceptions import UnknownConnectionError from concurrent.futures import CancelledError -from httpx import NetworkError, ReadTimeout +from httpx import NetworkError, TimeoutException, ConnectError from json import JSONDecodeError from urllib.parse import quote @@ -75,12 +76,12 @@ def ready_blocks(video_id, duration, div, callback): next_continuation, actions = None, [] break param_set.add(continuation) - resp = await session.get(url, headers=headers) + resp = await session.get(url, headers=headers, timeout=10) next_continuation, actions = parser.parse(resp.json()) break except JSONDecodeError: await asyncio.sleep(3) - except (NetworkError, ReadTimeout) as e: + except (NetworkError, TimeoutException, ConnectError) as e: err = e await asyncio.sleep(3) else: @@ -136,9 +137,12 @@ def fetch_patch(callback, blocks, video_id): break except JSONDecodeError: await asyncio.sleep(3) - except (NetworkError, ReadTimeout) as e: + except (NetworkError, TimeoutException, ConnectError) as e: err = e await asyncio.sleep(3) + except socket.error as error: + print("socket error", error.errno) + await asyncio.sleep(3) else: cancel() raise UnknownConnectionError("Abort:" + str(err)) @@ -162,15 +166,10 @@ def fetch_patch(callback, blocks, video_id): async def _shutdown(): - print("\nshutdown...") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] for task in tasks: task.cancel() - try: - await task - except asyncio.CancelledError: - pass def cancel(): diff --git a/pytchat/tool/extract/extractor.py b/pytchat/tool/extract/extractor.py index bf32d59..4647f9d 100644 --- a/pytchat/tool/extract/extractor.py +++ b/pytchat/tool/extract/extractor.py @@ -93,5 +93,4 @@ class Extractor: return ret def cancel(self): - print("cancel") asyncdl.cancel()