Fix handling internal error and keyboard interrupt

This commit is contained in:
taizan-hokouto
2020-10-06 01:19:45 +09:00
parent ebf0e7c181
commit 115277e5e1
4 changed files with 109 additions and 77 deletions

View File

@@ -1,5 +1,6 @@
import argparse import argparse
import asyncio
from asyncio.exceptions import CancelledError
import os import os
import signal import signal
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
@@ -52,67 +53,102 @@ def main():
if not os.path.exists(Arguments().output): if not os.path.exists(Arguments().output):
print("\nThe specified directory does not exist.:{}\n".format(Arguments().output)) print("\nThe specified directory does not exist.:{}\n".format(Arguments().output))
return return
try:
Runner().run()
except CancelledError as e:
print(str(e))
for counter, video_id in enumerate(Arguments().video_ids):
if len(Arguments().video_ids) > 1:
print(f"\n{'-' * 10} video:{counter + 1} of {len(Arguments().video_ids)} {'-' * 10}")
try: class Runner:
video_id = extract_video_id(video_id)
separated_path = str(Path(Arguments().output)) + os.path.sep def run(self) -> None:
path = util.checkpath(separated_path + video_id + '.html') ex = None
pbar = None
for counter, video_id in enumerate(Arguments().video_ids):
if len(Arguments().video_ids) > 1:
print(f"\n{'-' * 10} video:{counter + 1} of {len(Arguments().video_ids)} {'-' * 10}")
try: try:
info = VideoInfo(video_id) video_id = extract_video_id(video_id)
except Exception as e: separated_path = str(Path(Arguments().output)) + os.path.sep
print("Cannot parse video information.:{} {}".format(video_id, type(e))) path = util.checkpath(separated_path + video_id + '.html')
try:
info = VideoInfo(video_id)
except Exception as e:
print("Cannot parse video information.:{} {}".format(video_id, type(e)))
if Arguments().save_error_data:
util.save(str(e), "ERR", ".dat")
continue
print(f"\n"
f" video_id: {video_id}\n"
f" channel: {info.get_channel_name()}\n"
f" title: {info.get_title()}\n"
f" output path: {path}")
duration = info.get_duration()
pbar = ProgressBar(total=(duration * 1000), status_txt="Extracting")
ex = Extractor(video_id,
callback=pbar.disp,
div=10)
signal.signal(signal.SIGINT, (lambda a, b: self.cancel(ex, pbar)))
data = ex.extract()
if data == []:
continue
pbar.reset("#", "=", total=len(data), status_txt="Rendering ")
processor = HTMLArchiver(path, callback=pbar.disp)
processor.process(
[{'video_id': None,
'timeout': 1,
'chatdata': (action["replayChatItemAction"]["actions"][0] for action in data)}]
)
processor.finalize()
pbar.reset('#', '#', status_txt='Completed ')
pbar.close()
print()
if pbar.is_cancelled():
print("\nThe extraction process has been discontinued.\n")
except InvalidVideoIdException:
print("Invalid Video ID or URL:", video_id)
except NoContents as e:
print(f"Abort:{str(e)}:[{video_id}]")
except (JSONDecodeError, PatternUnmatchError) as e:
print("{}:{}".format(e.msg, video_id))
if Arguments().save_error_data: if Arguments().save_error_data:
util.save(str(e), "ERR", ".dat") util.save(e.doc, "ERR_", ".dat")
continue except (UnknownConnectionError, HCNetworkError, HCReadTimeout) as e:
print(f"An unknown network error occurred during the processing of [{video_id}]. : " + str(e))
except Exception as e:
print(f"Abort:{str(type(e))} {str(e)[:80]}")
finally:
clear_tasks()
print(f"\n" return
f" video_id: {video_id}\n"
f" channel: {info.get_channel_name()}\n"
f" title: {info.get_title()}\n"
f" output path: {path}")
duration = info.get_duration() def cancel(self, ex=None, pbar=None) -> None:
pbar = ProgressBar(total=(duration * 1000), status="Extracting") '''Called when keyboard interrupted has occurred.
ex = Extractor(video_id, '''
callback=pbar._disp, print("\nKeyboard interrupted.\n")
div=10) if ex and pbar:
signal.signal(signal.SIGINT, (lambda a, b: cancel(ex, pbar))) ex.cancel()
data = ex.extract() pbar.cancel()
if data == []:
return False
pbar.reset("#", "=", total=len(data), status="Rendering ")
processor = HTMLArchiver(path, callback=pbar._disp)
processor.process(
[{'video_id': None,
'timeout': 1,
'chatdata': (action["replayChatItemAction"]["actions"][0] for action in data)}]
)
processor.finalize()
pbar.reset('#', '#', status='Completed ')
pbar.close()
print()
if pbar.is_cancelled():
print("\nThe extraction process has been discontinued.\n")
except InvalidVideoIdException:
print("Invalid Video ID or URL:", video_id)
except NoContents as e:
print(e)
except (JSONDecodeError, PatternUnmatchError) as e:
print("{}:{}".format(e.msg, video_id))
if Arguments().save_error_data:
util.save(e.doc, "ERR_", ".dat")
except (UnknownConnectionError, HCNetworkError, HCReadTimeout) as e:
print(f"An unknown network error occurred during the processing of [{video_id}]. : " + str(e))
except Exception as e:
print(type(e), str(e))
return
def cancel(ex, pbar): def clear_tasks():
ex.cancel() '''
pbar.cancel() Clear remained tasks.
Called when internal exception has occurred or
after each extraction process is completed.
'''
async def _shutdown():
tasks = [t for t in asyncio.all_tasks()
if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(_shutdown())
except Exception as e:
print(e)

View File

@@ -9,21 +9,20 @@ import sys
class ProgressBar: class ProgressBar:
def __init__(self, total, status): def __init__(self, total, status_txt):
self._bar_len = 60 self._bar_len = 60
self._cancelled = False self._cancelled = False
self.reset(total=total, status=status) self.reset(total=total, status_txt=status_txt)
self._blinker = 0
def reset(self, symbol_done="=", symbol_space=" ", total=100, status=''): def reset(self, symbol_done="=", symbol_space=" ", total=100, status_txt=''):
self.con_width = shutil.get_terminal_size(fallback=(80, 24)).columns self._console_width = shutil.get_terminal_size(fallback=(80, 24)).columns
self._symbol_done = symbol_done self._symbol_done = symbol_done
self._symbol_space = symbol_space self._symbol_space = symbol_space
self._total = total self._total = total
self._status = status self._status_txt = status_txt
self._count = 0 self._count = 0
def _disp(self, _, fetched): def disp(self, _, fetched):
self._progress(fetched, self._total) self._progress(fetched, self._total)
def _progress(self, fillin, total): def _progress(self, fillin, total):
@@ -39,11 +38,10 @@ class ProgressBar:
bar = self._symbol_done * filled_len + \ bar = self._symbol_done * filled_len + \
self._symbol_space * (self._bar_len - filled_len) self._symbol_space * (self._bar_len - filled_len)
disp = f" [{bar}] {percents:>5.1f}% ...{self._status} "[:self.con_width - 1] + '\r' disp = f" [{bar}] {percents:>5.1f}% ...{self._status_txt} "[:self._console_width - 1] + '\r'
sys.stdout.write(disp) sys.stdout.write(disp)
sys.stdout.flush() sys.stdout.flush()
self._blinker += 1
def close(self): def close(self):
if not self._cancelled: if not self._cancelled:

View File

@@ -1,5 +1,6 @@
import httpx
import asyncio import asyncio
import httpx
import socket
from . import parser from . import parser
from . block import Block from . block import Block
from . worker import ExtractWorker from . worker import ExtractWorker
@@ -8,7 +9,7 @@ from ... import config
from ... paramgen import arcparam from ... paramgen import arcparam
from ... exceptions import UnknownConnectionError from ... exceptions import UnknownConnectionError
from concurrent.futures import CancelledError from concurrent.futures import CancelledError
from httpx import NetworkError, ReadTimeout from httpx import NetworkError, TimeoutException, ConnectError
from json import JSONDecodeError from json import JSONDecodeError
from urllib.parse import quote from urllib.parse import quote
@@ -75,12 +76,12 @@ def ready_blocks(video_id, duration, div, callback):
next_continuation, actions = None, [] next_continuation, actions = None, []
break break
param_set.add(continuation) param_set.add(continuation)
resp = await session.get(url, headers=headers) resp = await session.get(url, headers=headers, timeout=10)
next_continuation, actions = parser.parse(resp.json()) next_continuation, actions = parser.parse(resp.json())
break break
except JSONDecodeError: except JSONDecodeError:
await asyncio.sleep(3) await asyncio.sleep(3)
except (NetworkError, ReadTimeout) as e: except (NetworkError, TimeoutException, ConnectError) as e:
err = e err = e
await asyncio.sleep(3) await asyncio.sleep(3)
else: else:
@@ -136,9 +137,12 @@ def fetch_patch(callback, blocks, video_id):
break break
except JSONDecodeError: except JSONDecodeError:
await asyncio.sleep(3) await asyncio.sleep(3)
except (NetworkError, ReadTimeout) as e: except (NetworkError, TimeoutException, ConnectError) as e:
err = e err = e
await asyncio.sleep(3) await asyncio.sleep(3)
except socket.error as error:
print("socket error", error.errno)
await asyncio.sleep(3)
else: else:
cancel() cancel()
raise UnknownConnectionError("Abort:" + str(err)) raise UnknownConnectionError("Abort:" + str(err))
@@ -162,15 +166,10 @@ def fetch_patch(callback, blocks, video_id):
async def _shutdown(): async def _shutdown():
print("\nshutdown...")
tasks = [t for t in asyncio.all_tasks() tasks = [t for t in asyncio.all_tasks()
if t is not asyncio.current_task()] if t is not asyncio.current_task()]
for task in tasks: for task in tasks:
task.cancel() task.cancel()
try:
await task
except asyncio.CancelledError:
pass
def cancel(): def cancel():

View File

@@ -93,5 +93,4 @@ class Extractor:
return ret return ret
def cancel(self): def cancel(self):
print("cancel")
asyncdl.cancel() asyncdl.cancel()