Implement raise_for_status()

This commit is contained in:
taizan-hokuto
2020-06-17 23:56:07 +09:00
parent 2474207691
commit 94d4eebd0f
13 changed files with 326 additions and 286 deletions

View File

@@ -1,7 +1,7 @@
import argparse
from pathlib import Path
from .arguments import Arguments
from .. exceptions import InvalidVideoIdException, NoContentsException
from .. exceptions import InvalidVideoIdException, NoContents
from .. processors.html_archiver import HTMLArchiver
from .. tool.extract.extractor import Extractor
from .. tool.videoinfo import VideoInfo
@@ -50,7 +50,7 @@ def main():
callback=_disp_progress
).extract()
print("\nExtraction end.\n")
except (InvalidVideoIdException, NoContentsException) as e:
except (InvalidVideoIdException, NoContents) as e:
print(e)
return
parser.print_help()

View File

@@ -11,7 +11,7 @@ from asyncio import Queue
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
from ..exceptions import ChatParseException, IllegalFunctionCall
from .. import exceptions
from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
@@ -86,7 +86,7 @@ class LiveChatAsync:
topchat_only=False,
logger=config.logger(__name__),
):
self.video_id = video_id
self._video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
@@ -102,28 +102,26 @@ class LiveChatAsync:
self._parser = Parser(is_replay=self._is_replay)
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation="
self._topchat_only = topchat_only
self._logger = logger
self.exception = None
LiveChatAsync._logger = logger
if not LiveChatAsync._setup_finished:
LiveChatAsync._setup_finished = True
if exception_handler:
self._set_exception_handler(exception_handler)
if interruptable:
signal.signal(signal.SIGINT,
(lambda a, b: asyncio.create_task(
LiveChatAsync.shutdown(None, signal.SIGINT, b))
))
if exception_handler:
self._set_exception_handler(exception_handler)
if interruptable:
signal.signal(signal.SIGINT,
(lambda a, b: asyncio.create_task(
LiveChatAsync.shutdown(None, signal.SIGINT, b))))
self._setup()
def _setup(self):
# direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
raise exceptions.IllegalFunctionCall(
"When direct_mode=True, callback parameter is required.")
else:
# direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
@@ -138,18 +136,18 @@ class LiveChatAsync:
loop.create_task(self._callback_loop(self._callback))
# _listenループタスクの開始
loop = asyncio.get_event_loop()
listen_task = loop.create_task(self._startlisten())
self.listen_task = loop.create_task(self._startlisten())
# add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
self.listen_task.add_done_callback(self._finish)
else:
listen_task.add_done_callback(self._done_callback)
self.listen_task.add_done_callback(self._done_callback)
async def _startlisten(self):
"""Fetch first continuation parameter,
create and start _listen loop.
"""
initial_continuation = liveparam.getparam(self.video_id, 3)
initial_continuation = liveparam.getparam(self._video_id, 3)
await self._listen(initial_continuation)
async def _listen(self, continuation):
@@ -171,7 +169,7 @@ class LiveChatAsync:
timeout = metadata['timeoutMs'] / 1000
chat_component = {
"video_id": self.video_id,
"video_id": self._video_id,
"timeout": timeout,
"chatdata": chatdata
}
@@ -188,14 +186,15 @@ class LiveChatAsync:
diff_time = timeout - (time.time() - time_mark)
await asyncio.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
self._logger.debug(f"[{self.video_id}]{str(e)}")
return
except exceptions.ChatParseException as e:
self._logger.debug(f"[{self._video_id}]{str(e)}")
raise
except (TypeError, json.JSONDecodeError):
self._logger.error(f"{traceback.format_exc(limit = -1)}")
return
raise
self._logger.debug(f"[{self.video_id}]finished fetching chat.")
self._logger.debug(f"[{self._video_id}]finished fetching chat.")
raise exceptions.ChatDataFinished
async def _check_pause(self, continuation):
if self._pauser.empty():
@@ -207,7 +206,7 @@ class LiveChatAsync:
self._pauser.put_nowait(None)
if not self._is_replay:
continuation = liveparam.getparam(
self.video_id, 3, self._topchat_only)
self._video_id, 3, self._topchat_only)
return continuation
async def _get_contents(self, continuation, session, headers):
@@ -227,7 +226,7 @@ class LiveChatAsync:
self._parser.is_replay = True
self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation="
continuation = arcparam.getparam(
self.video_id, self.seektime, self._topchat_only)
self._video_id, self.seektime, self._topchat_only)
livechat_json = (await self._get_livechat_json(
continuation, session, headers))
reload_continuation = self._parser.reload_continuation(
@@ -258,7 +257,7 @@ class LiveChatAsync:
await asyncio.sleep(1)
continue
else:
self._logger.error(f"[{self.video_id}]"
self._logger.error(f"[{self._video_id}]"
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
@@ -288,9 +287,12 @@ class LiveChatAsync:
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = await self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
if self.is_alive():
items = await self._buffer.get()
return self.processor.process(items)
else:
return []
raise exceptions.IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def is_replay(self):
@@ -311,22 +313,36 @@ class LiveChatAsync:
def is_alive(self):
return self._is_alive
def finish(self, sender):
def _finish(self, sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
self._task_finished()
except CancelledError:
self._logger.debug(f'[{self.video_id}]cancelled:{sender}')
self._logger.debug(f'[{self._video_id}]cancelled:{sender}')
def terminate(self):
if self._pauser.empty():
self._pauser.put_nowait(None)
self._is_alive = False
self._buffer.put_nowait({})
def _task_finished(self):
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode is False:
# bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put_nowait({'chatdata': '', 'timeout': 0})
self._logger.info(f'[{self.video_id}]finished.')
if self.is_alive():
self.terminate()
try:
self.listen_task.result()
except Exception as e:
self.exception = e
if not isinstance(e, exceptions.ChatParseException):
self._logger.error(f'Internal exception - {type(e)}{str(e)}')
self._logger.info(f'[{self._video_id}]終了しました')
def raise_for_status(self):
if self.exception is not None:
raise self.exception
@classmethod
def _set_exception_handler(cls, handler):

View File

@@ -6,10 +6,11 @@ import traceback
import urllib.parse
from concurrent.futures import CancelledError, ThreadPoolExecutor
from queue import Queue
from threading import Event
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
from ..exceptions import ChatParseException, IllegalFunctionCall
from .. import exceptions
from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
@@ -83,7 +84,7 @@ class LiveChat:
topchat_only=False,
logger=config.logger(__name__)
):
self.video_id = video_id
self._video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
@@ -102,7 +103,9 @@ class LiveChat:
self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation="
self._topchat_only = topchat_only
self._event = Event()
self._logger = logger
self.exception = None
if interruptable:
signal.signal(signal.SIGINT, lambda a, b: self.terminate())
self._setup()
@@ -111,7 +114,7 @@ class LiveChat:
# direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
raise exceptions.IllegalFunctionCall(
"When direct_mode=True, callback parameter is required.")
else:
# direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
@@ -124,19 +127,19 @@ class LiveChat:
# callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop, self._callback)
# _listenループタスクの開始
listen_task = self._executor.submit(self._startlisten)
self.listen_task = self._executor.submit(self._startlisten)
# add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
self.listen_task.add_done_callback(self._finish)
else:
listen_task.add_done_callback(self._done_callback)
self.listen_task.add_done_callback(self._done_callback)
def _startlisten(self):
time.sleep(0.1) # sleep shortly to prohibit skipping fetching data
"""Fetch first continuation parameter,
create and start _listen loop.
"""
initial_continuation = liveparam.getparam(self.video_id, 3)
initial_continuation = liveparam.getparam(self._video_id, 3)
self._listen(initial_continuation)
def _listen(self, continuation):
@@ -152,13 +155,11 @@ class LiveChat:
with requests.Session() as session:
while(continuation and self._is_alive):
continuation = self._check_pause(continuation)
contents = self._get_contents(
continuation, session, headers)
contents = self._get_contents(continuation, session, headers)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs'] / 1000
chat_component = {
"video_id": self.video_id,
"video_id": self._video_id,
"timeout": timeout,
"chatdata": chatdata
}
@@ -173,16 +174,17 @@ class LiveChat:
else:
self._buffer.put(chat_component)
diff_time = timeout - (time.time() - time_mark)
time.sleep(diff_time if diff_time > 0 else 0)
self._event.wait(diff_time if diff_time > 0 else 0)
continuation = metadata.get('continuation')
except ChatParseException as e:
self._logger.debug(f"[{self.video_id}]{str(e)}")
return
except exceptions.ChatParseException as e:
self._logger.debug(f"[{self._video_id}]{str(e)}")
raise
except (TypeError, json.JSONDecodeError):
self._logger.error(f"{traceback.format_exc(limit=-1)}")
return
raise
self._logger.debug(f"[{self.video_id}]finished fetching chat.")
self._logger.debug(f"[{self._video_id}]finished fetching chat.")
raise exceptions.ChatDataFinished
def _check_pause(self, continuation):
if self._pauser.empty():
@@ -193,7 +195,7 @@ class LiveChat:
'''
self._pauser.put_nowait(None)
if not self._is_replay:
continuation = liveparam.getparam(self.video_id, 3)
continuation = liveparam.getparam(self._video_id, 3)
return continuation
def _get_contents(self, continuation, session, headers):
@@ -215,9 +217,8 @@ class LiveChat:
self._parser.is_replay = True
self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation="
continuation = arcparam.getparam(
self.video_id, self.seektime, self._topchat_only)
livechat_json = (self._get_livechat_json(
continuation, session, headers))
self._video_id, self.seektime, self._topchat_only)
livechat_json = (self._get_livechat_json(continuation, session, headers))
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json))
if reload_continuation:
@@ -246,9 +247,9 @@ class LiveChat:
time.sleep(1)
continue
else:
self._logger.error(f"[{self.video_id}]"
self._logger.error(f"[{self._video_id}]"
f"Exceeded retry count. status_code={status_code}")
return None
raise exceptions.RetryExceedMaxCount()
return livechat_json
def _callback_loop(self, callback):
@@ -276,9 +277,12 @@ class LiveChat:
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
if self.is_alive():
items = self._buffer.get()
return self.processor.process(items)
else:
return []
raise exceptions.IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def is_replay(self):
@@ -299,18 +303,34 @@ class LiveChat:
def is_alive(self):
return self._is_alive
def finish(self, sender):
def _finish(self, sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
self._task_finished()
except CancelledError:
self._logger.debug(f'[{self.video_id}]cancelled:{sender}')
self._logger.debug(f'[{self._video_id}]cancelled:{sender}')
def terminate(self):
if self._pauser.empty():
self._pauser.put_nowait(None)
self._is_alive = False
self._buffer.put({})
self._event.set()
def _task_finished(self):
'''
Listenerを終了する。
'''
if self.is_alive():
self._is_alive = False
self._buffer.put({})
self._logger.info(f'[{self.video_id}]終了しました')
self.terminate()
try:
self.listen_task.result()
except Exception as e:
self.exception = e
if not isinstance(e, exceptions.ChatParseException):
self._logger.error(f'Internal exception - {type(e)}{str(e)}')
self._logger.info(f'[{self._video_id}]終了しました')
def raise_for_status(self):
if self.exception is not None:
raise self.exception

View File

@@ -5,13 +5,6 @@ class ChatParseException(Exception):
pass
class NoYtinitialdataException(ChatParseException):
'''
Thrown when the video is not found.
'''
pass
class ResponseContextError(ChatParseException):
'''
Thrown when chat data is invalid.
@@ -19,21 +12,14 @@ class ResponseContextError(ChatParseException):
pass
class NoLivechatRendererException(ChatParseException):
'''
Thrown when livechatRenderer is missing in JSON.
'''
pass
class NoContentsException(ChatParseException):
class NoContents(ChatParseException):
'''
Thrown when ContinuationContents is missing in JSON.
'''
pass
class NoContinuationsException(ChatParseException):
class NoContinuation(ChatParseException):
'''
Thrown when continuation is missing in ContinuationContents.
'''
@@ -42,8 +28,8 @@ class NoContinuationsException(ChatParseException):
class IllegalFunctionCall(Exception):
'''
Thrown when get () is called even though
set_callback () has been executed.
Thrown when get() is called even though
set_callback() has been executed.
'''
pass
@@ -57,3 +43,22 @@ class InvalidVideoIdException(Exception):
class UnknownConnectionError(Exception):
pass
class RetryExceedMaxCount(Exception):
'''
thrown when the number of retries exceeds the maximum value.
'''
pass
class ChatDataFinished(ChatParseException):
pass
class ReceivedUnknownContinuation(ChatParseException):
pass
class FailedExtractContinuation(ChatDataFinished):
pass

View File

@@ -4,11 +4,7 @@ pytchat.parser.live
Parser of live chat JSON.
"""
from .. exceptions import (
ResponseContextError,
NoContentsException,
NoContinuationsException,
ChatParseException)
from .. import exceptions
class Parser:
@@ -20,9 +16,9 @@ class Parser:
def get_contents(self, jsn):
if jsn is None:
raise ChatParseException('Called with none JSON object.')
raise exceptions.IllegalFunctionCall('Called with none JSON object.')
if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError(
raise exceptions.ResponseContextError(
'The video_id would be wrong, or video is deleted or private.')
contents = jsn['response'].get('continuationContents')
return contents
@@ -46,11 +42,11 @@ class Parser:
if contents is None:
'''Broadcasting end or cannot fetch chat stream'''
raise NoContentsException('Chat data stream is empty.')
raise exceptions.NoContents('Chat data stream is empty.')
cont = contents['liveChatContinuation']['continuations'][0]
if cont is None:
raise NoContinuationsException('No Continuation')
raise exceptions.NoContinuation('No Continuation')
metadata = (cont.get('invalidationContinuationData')
or cont.get('timedContinuationData')
or cont.get('reloadContinuationData')
@@ -58,22 +54,25 @@ class Parser:
)
if metadata is None:
if cont.get("playerSeekContinuationData"):
raise ChatParseException('Finished chat data')
raise exceptions.ChatDataFinished('Finished chat data')
unknown = list(cont.keys())[0]
if unknown:
raise ChatParseException(
raise exceptions.ReceivedUnknownContinuation(
f"Received unknown continuation type:{unknown}")
else:
raise ChatParseException('Cannot extract continuation data')
raise exceptions.FailedExtractContinuation('Cannot extract continuation data')
return self._create_data(metadata, contents)
def reload_continuation(self, contents):
"""
When `seektime = 0` or seektime is abbreviated ,
When `seektime == 0` or seektime is abbreviated ,
check if fetched chat json has no chat data.
If so, try to fetch playerSeekContinuationData.
This function must be run only first fetching.
"""
if contents is None:
'''Broadcasting end or cannot fetch chat stream'''
raise exceptions.NoContents('Chat data stream is empty.')
cont = contents['liveChatContinuation']['continuations'][0]
if cont.get("liveChatReplayContinuationData"):
# chat data exist.
@@ -82,7 +81,7 @@ class Parser:
init_cont = cont.get("playerSeekContinuationData")
if init_cont:
return init_cont.get("continuation")
raise ChatParseException('Finished chat data')
raise exceptions.ChatDataFinished('Finished chat data')
def _create_data(self, metadata, contents):
actions = contents['liveChatContinuation'].get('actions')

View File

@@ -1,8 +1,5 @@
from ... import config
from ... exceptions import (
ResponseContextError,
NoContentsException,
NoContinuationsException)
from ... import exceptions
logger = config.logger(__name__)
@@ -23,15 +20,15 @@ def parse(jsn):
if jsn is None:
raise ValueError("parameter JSON is None")
if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError(
raise exceptions.ResponseContextError(
'video_id is invalid or private/deleted.')
contents = jsn['response'].get('continuationContents')
if contents is None:
raise NoContentsException('No chat data.')
raise exceptions.NoContents('No chat data.')
cont = contents['liveChatContinuation']['continuations'][0]
if cont is None:
raise NoContinuationsException('No Continuation')
raise exceptions.NoContinuation('No Continuation')
metadata = cont.get('liveChatReplayContinuationData')
if metadata:
continuation = metadata.get("continuation")

View File

@@ -1,12 +1,12 @@
import json
import re
from ... import config
from ... exceptions import (
ResponseContextError,
NoContentsException,
NoContinuationsException )
from ... exceptions import (
ResponseContextError,
NoContents, NoContinuation)
logger = config.logger(__name__)
def parse(jsn):
"""
Parse replay chat data.
@@ -20,45 +20,51 @@ def parse(jsn):
actions : list
"""
if jsn is None:
if jsn is None:
raise ValueError("parameter JSON is None")
if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError(
'video_id is invalid or private/deleted.')
contents=jsn["response"].get('continuationContents')
'video_id is invalid or private/deleted.')
contents = jsn["response"].get('continuationContents')
if contents is None:
raise NoContentsException('No chat data.')
raise NoContents('No chat data.')
cont = contents['liveChatContinuation']['continuations'][0]
if cont is None:
raise NoContinuationsException('No Continuation')
raise NoContinuation('No Continuation')
metadata = cont.get('liveChatReplayContinuationData')
if metadata:
continuation = metadata.get("continuation")
actions = contents['liveChatContinuation'].get('actions')
if continuation:
return continuation, [action["replayChatItemAction"]["actions"][0]
for action in actions
if list(action['replayChatItemAction']["actions"][0].values()
)[0]['item'].get("liveChatPaidMessageRenderer")
or list(action['replayChatItemAction']["actions"][0].values()
)[0]['item'].get("liveChatPaidStickerRenderer")
]
return continuation, [action["replayChatItemAction"]["actions"][0]
for action in actions
if list(action['replayChatItemAction']["actions"][0].values()
)[0]['item'].get("liveChatPaidMessageRenderer")
or list(action['replayChatItemAction']["actions"][0].values()
)[0]['item'].get("liveChatPaidStickerRenderer")
]
return None, []
def get_offset(item):
return int(item['replayChatItemAction']["videoOffsetTimeMsec"])
def get_id(item):
return list((list(item['replayChatItemAction']["actions"][0].values()
)[0])['item'].values())[0].get('id')
)[0])['item'].values())[0].get('id')
def get_type(item):
return list((list(item['replayChatItemAction']["actions"][0].values()
)[0])['item'].keys())[0]
import re
_REGEX_YTINIT = re.compile("window\\[\"ytInitialData\"\\]\\s*=\\s*({.+?});\\s+")
)[0])['item'].keys())[0]
_REGEX_YTINIT = re.compile(
"window\\[\"ytInitialData\"\\]\\s*=\\s*({.+?});\\s+")
def extract(text):
match = re.findall(_REGEX_YTINIT, str(text))