diff --git a/pytchat/config/__init__.py b/pytchat/config/__init__.py index 1a84b59..542551e 100644 --- a/pytchat/config/__init__.py +++ b/pytchat/config/__init__.py @@ -1,7 +1,7 @@ import logging from . import mylogger -LOGGER_MODE = None +LOGGER_MODE = logging.DEBUG headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36'} diff --git a/pytchat/core_async/livechat copy.py b/pytchat/core_async/livechat copy.py new file mode 100644 index 0000000..82b3bdc --- /dev/null +++ b/pytchat/core_async/livechat copy.py @@ -0,0 +1,297 @@ +import aiohttp, asyncio +import datetime +import json +import random +import signal +import time +import traceback +import urllib.parse +from aiohttp.client_exceptions import ClientConnectorError +from concurrent.futures import CancelledError +from asyncio import Queue +from .buffer import Buffer +from ..parser.live import Parser +from .. import config +from ..exceptions import ChatParseException,IllegalFunctionCall +from ..paramgen import liveparam +from ..processors.default.processor import DefaultProcessor +from ..processors.combinator import Combinator + +logger = config.logger(__name__) +headers = config.headers +MAX_RETRY = 10 + + +class LiveChatAsync: + '''asyncio(aiohttp)を利用してYouTubeのライブ配信のチャットデータを取得する。 + + Parameter + --------- + video_id : str + 動画ID + + processor : ChatProcessor + チャットデータを加工するオブジェクト + + buffer : Buffer(maxsize:20[default]) + チャットデータchat_componentを格納するバッファ。 + maxsize : 格納できるchat_componentの個数 + default値20個。1個で約5~10秒分。 + + interruptable : bool + Ctrl+Cによる処理中断を行うかどうか。 + + callback : func + _listen()関数から一定間隔で自動的に呼びだす関数。 + + done_callback : func + listener終了時に呼び出すコールバック。 + + exception_handler : func + 例外を処理する関数 + + direct_mode : bool + Trueの場合、bufferを使わずにcallbackを呼ぶ。 + Trueの場合、callbackの設定が必須 + (設定していない場合IllegalFunctionCall例外を発生させる) + + Attributes + --------- + _is_alive : bool + チャット取得を停止するためのフラグ + ''' + + _setup_finished = False + + def __init__(self, video_id, + seektime = 0, + processor = DefaultProcessor(), + buffer = None, + interruptable = True, + callback = None, + done_callback = None, + exception_handler = None, + direct_mode = False): + self.video_id = video_id + self.seektime = seektime + if isinstance(processor, tuple): + self.processor = Combinator(processor) + else: + self.processor = processor + self._buffer = buffer + self._callback = callback + self._done_callback = done_callback + self._exception_handler = exception_handler + self._direct_mode = direct_mode + self._is_alive = True + self._parser = Parser() + self._pauser = Queue() + self._pauser.put_nowait(None) + self._setup() + + if not LiveChatAsync._setup_finished: + LiveChatAsync._setup_finished = True + if exception_handler == None: + self._set_exception_handler(self._handle_exception) + else: + self._set_exception_handler(exception_handler) + if interruptable: + signal.signal(signal.SIGINT, + (lambda a, b:asyncio.create_task( + LiveChatAsync.shutdown(None,signal.SIGINT,b)) + )) + + def _setup(self): + #direct modeがTrueでcallback未設定の場合例外発生。 + if self._direct_mode: + if self._callback is None: + raise IllegalFunctionCall( + "direct_mode=Trueの場合callbackの設定が必須です。") + else: + #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 + if self._buffer is None: + self._buffer = Buffer(maxsize = 20) + #callbackが指定されている場合はcallbackを呼ぶループタスクを作成 + if self._callback is None: + pass + else: + #callbackを呼ぶループタスクの開始 + loop = asyncio.get_event_loop() + loop.create_task(self._callback_loop(self._callback)) + #_listenループタスクの開始 + loop = asyncio.get_event_loop() + listen_task = loop.create_task(self._startlisten()) + #add_done_callbackの登録 + if self._done_callback is None: + listen_task.add_done_callback(self.finish) + else: + listen_task.add_done_callback(self._done_callback) + + async def _startlisten(self): + """最初のcontinuationパラメータを取得し、 + _listenループのタスクを作成し開始する + """ + initial_continuation = liveparam.getparam(self.video_id,3) + await self._listen(initial_continuation) + + async def _listen(self, continuation): + ''' continuationに紐付いたチャットデータを取得し + Bufferにチャットデータを格納、 + 次のcontinuaitonを取得してループする。 + + Parameter + --------- + continuation : str + 次のチャットデータ取得に必要なパラメータ + ''' + try: + async with aiohttp.ClientSession() as session: + while(continuation and self._is_alive): + if self._pauser.empty(): + '''pause''' + await self._pauser.get() + '''resume: + prohibit from blocking by putting None into _pauser. + ''' + self._pauser.put_nowait(None) + continuation= liveparam.getparam(self.video_id,3) + livechat_json = (await + self._get_livechat_json(continuation, session, headers) + ) + metadata, chatdata = self._parser.parse( livechat_json ) + timeout = metadata['timeoutMs']/1000 + chat_component = { + "video_id" : self.video_id, + "timeout" : timeout, + "chatdata" : chatdata + } + time_mark =time.time() + if self._direct_mode: + await self._callback( + self.processor.process([chat_component]) + ) + else: + await self._buffer.put(chat_component) + diff_time = timeout - (time.time()-time_mark) + await asyncio.sleep(diff_time) + continuation = metadata.get('continuation') + except ChatParseException as e: + self.terminate() + logger.error(f"{str(e)}(video_id:\"{self.video_id}\")") + return + except (TypeError , json.JSONDecodeError) : + self.terminate() + logger.error(f"{traceback.format_exc(limit = -1)}") + return + + logger.debug(f"[{self.video_id}]チャット取得を終了しました。") + self.terminate() + + async def _get_livechat_json(self, continuation, session, headers): + ''' + チャットデータが格納されたjsonデータを取得する。 + ''' + continuation = urllib.parse.quote(continuation) + livechat_json = None + status_code = 0 + url =( + f"https://www.youtube.com/live_chat/get_live_chat?" + f"continuation={continuation}&pbj=1") + for _ in range(MAX_RETRY + 1): + async with session.get(url ,headers = headers) as resp: + try: + text = await resp.text() + status_code = resp.status + livechat_json = json.loads(text) + break + except (ClientConnectorError,json.JSONDecodeError) : + await asyncio.sleep(1) + continue + else: + logger.error(f"[{self.video_id}]" + f"Exceeded retry count. status_code={status_code}") + return None + return livechat_json + + async def _callback_loop(self,callback): + """ コンストラクタでcallbackを指定している場合、バックグラウンドで + callbackに指定された関数に一定間隔でチャットデータを投げる。 + + Parameter + --------- + callback : func + 加工済みのチャットデータを渡す先の関数。 + """ + while self.is_alive(): + items = await self._buffer.get() + data = self.processor.process(items) + await callback(data) + + async def get(self): + """ bufferからデータを取り出し、processorに投げ、 + 加工済みのチャットデータを返す。 + + Returns + : Processorによって加工されたチャットデータ + """ + if self._callback is None: + items = await self._buffer.get() + return self.processor.process(items) + raise IllegalFunctionCall( + "既にcallbackを登録済みのため、get()は実行できません。") + + def pause(self): + if self._callback is None: + return + if not self._pauser.empty(): + self._pauser.get_nowait() + + def resume(self): + if self._callback is None: + return + if self._pauser.empty(): + self._pauser.put_nowait(None) + + def is_alive(self): + return self._is_alive + + def finish(self,sender): + '''Listener終了時のコールバック''' + try: + self.terminate() + except CancelledError: + logger.debug(f'[{self.video_id}]cancelled:{sender}') + + def terminate(self): + ''' + Listenerを終了する。 + ''' + self._is_alive = False + if self._direct_mode == False: + #bufferにダミーオブジェクトを入れてis_alive()を判定させる + self._buffer.put_nowait({'chatdata':'','timeout':1}) + logger.info(f'[{self.video_id}]終了しました') + + @classmethod + def _set_exception_handler(cls, handler): + loop = asyncio.get_event_loop() + loop.set_exception_handler(handler) + + @classmethod + def _handle_exception(cls, loop, context): + if not isinstance(context["exception"],CancelledError): + logger.error(f"Caught exception: {context}") + loop= asyncio.get_event_loop() + loop.create_task(cls.shutdown(None,None,None)) + + @classmethod + async def shutdown(cls, event, sig = None, handler=None): + logger.debug("シャットダウンしています") + tasks = [t for t in asyncio.all_tasks() if t is not + asyncio.current_task()] + [task.cancel() for task in tasks] + + logger.debug(f"残っているタスクを終了しています") + await asyncio.gather(*tasks,return_exceptions=True) + loop = asyncio.get_event_loop() + loop.stop() \ No newline at end of file diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index c671d1a..a78dc87 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -8,11 +8,12 @@ import traceback import urllib.parse from aiohttp.client_exceptions import ClientConnectorError from concurrent.futures import CancelledError +from asyncio import Queue from .buffer import Buffer from ..parser.live import Parser from .. import config from ..exceptions import ChatParseException,IllegalFunctionCall -from ..paramgen import liveparam +from ..paramgen import liveparam, arcparam from ..processors.default.processor import DefaultProcessor from ..processors.combinator import Combinator @@ -63,6 +64,7 @@ class LiveChatAsync: _setup_finished = False def __init__(self, video_id, + seektime = 0, processor = DefaultProcessor(), buffer = None, interruptable = True, @@ -71,6 +73,7 @@ class LiveChatAsync: exception_handler = None, direct_mode = False): self.video_id = video_id + self.seektime = seektime if isinstance(processor, tuple): self.processor = Combinator(processor) else: @@ -82,8 +85,11 @@ class LiveChatAsync: self._direct_mode = direct_mode self._is_alive = True self._parser = Parser() + self._pauser = Queue() + self._pauser.put_nowait(None) self._setup() - + self._first_fetch = True + self._fetch_url = "live_chat/get_live_chat?continuation=" if not LiveChatAsync._setup_finished: LiveChatAsync._setup_finished = True if exception_handler == None: @@ -101,7 +107,7 @@ class LiveChatAsync: if self._direct_mode: if self._callback is None: raise IllegalFunctionCall( - "direct_mode=Trueの場合callbackの設定が必須です。") + "When direct_mode=True, callback parameter is required.") else: #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 if self._buffer is None: @@ -123,48 +129,29 @@ class LiveChatAsync: listen_task.add_done_callback(self._done_callback) async def _startlisten(self): - """最初のcontinuationパラメータを取得し、 - _listenループのタスクを作成し開始する + """Fetch first continuation parameter, + create and start _listen loop. """ - initial_continuation = await self._get_initial_continuation() - if initial_continuation is None: - self.terminate() - logger.debug(f"[{self.video_id}]No initial continuation.") - return + initial_continuation = liveparam.getparam(self.video_id,3) await self._listen(initial_continuation) - async def _get_initial_continuation(self): - ''' チャットデータ取得に必要な最初のcontinuationを取得する。''' - try: - initial_continuation = liveparam.getparam(self.video_id) - except ChatParseException as e: - self.terminate() - logger.debug(f"[{self.video_id}]Error:{str(e)}") - return - except KeyError: - logger.debug(f"[{self.video_id}]KeyError:" - f"{traceback.format_exc(limit = -1)}") - self.terminate() - return - return initial_continuation - async def _listen(self, continuation): - ''' continuationに紐付いたチャットデータを取得し - Bufferにチャットデータを格納、 - 次のcontinuaitonを取得してループする。 + ''' Fetch chat data and store them into buffer, + get next continuaiton parameter and loop. Parameter --------- continuation : str - 次のチャットデータ取得に必要なパラメータ + parameter for next chat data ''' try: async with aiohttp.ClientSession() as session: while(continuation and self._is_alive): - livechat_json = (await - self._get_livechat_json(continuation, session, headers) - ) - metadata, chatdata = self._parser.parse( livechat_json ) + continuation = await self._check_pause(continuation) + contents = await self._get_contents( + continuation, session, headers) + metadata, chatdata = self._parser.parse(contents) + timeout = metadata['timeoutMs']/1000 chat_component = { "video_id" : self.video_id, @@ -182,31 +169,67 @@ class LiveChatAsync: await asyncio.sleep(diff_time) continuation = metadata.get('continuation') except ChatParseException as e: - self.terminate() - logger.error(f"{str(e)}(video_id:\"{self.video_id}\")") + #self.terminate() + logger.debug(f"[{self.video_id}]{str(e)}") return except (TypeError , json.JSONDecodeError) : - self.terminate() + #self.terminate() logger.error(f"{traceback.format_exc(limit = -1)}") return - logger.debug(f"[{self.video_id}]チャット取得を終了しました。") + logger.debug(f"[{self.video_id}]finished fetching chat.") + + async def _check_pause(self, continuation): + if self._pauser.empty(): + '''pause''' + await self._pauser.get() + '''resume: + prohibit from blocking by putting None into _pauser. + ''' + self._pauser.put_nowait(None) + if self._parser.mode == 'LIVE': + continuation = liveparam.getparam(self.video_id,3) + return continuation + + async def _get_contents(self, continuation, session, headers): + '''Get 'contents' dict from livechat json. + If contents is None at first fetching, + try to fetch archive chat data. + + Return: + ------- + 'contents' dict which includes metadata & chatdata. + ''' + livechat_json = (await + self._get_livechat_json(continuation, session, headers) + ) + contents = self._parser.get_contents(livechat_json) + if self._first_fetch: + if contents is None: + '''Try to fetch archive chat data.''' + self._parser.mode = 'REPLAY' + self._fetch_url = ("live_chat_replay/" + "get_live_chat_replay?continuation=") + continuation = arcparam.getparam(self.video_id, self.seektime) + livechat_json = (await self._get_livechat_json( + continuation, session, headers)) + contents = self._parser.get_contents(livechat_json) + self._first_fetch = False + return contents async def _get_livechat_json(self, continuation, session, headers): ''' - チャットデータが格納されたjsonデータを取得する。 + Get json which includes chat data. ''' continuation = urllib.parse.quote(continuation) livechat_json = None status_code = 0 url =( - f"https://www.youtube.com/live_chat/get_live_chat?" - f"continuation={continuation}&pbj=1") + f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1") for _ in range(MAX_RETRY + 1): async with session.get(url ,headers = headers) as resp: try: text = await resp.text() - status_code = resp.status livechat_json = json.loads(text) break except (ClientConnectorError,json.JSONDecodeError) : @@ -215,7 +238,6 @@ class LiveChatAsync: else: logger.error(f"[{self.video_id}]" f"Exceeded retry count. status_code={status_code}") - self.terminate() return None return livechat_json @@ -246,6 +268,21 @@ class LiveChatAsync: raise IllegalFunctionCall( "既にcallbackを登録済みのため、get()は実行できません。") + def get_mode(self): + return self._parser.mode + + def pause(self): + if self._callback is None: + return + if not self._pauser.empty(): + self._pauser.get_nowait() + + def resume(self): + if self._callback is None: + return + if self._pauser.empty(): + self._pauser.put_nowait(None) + def is_alive(self): return self._is_alive @@ -264,17 +301,15 @@ class LiveChatAsync: if self._direct_mode == False: #bufferにダミーオブジェクトを入れてis_alive()を判定させる self._buffer.put_nowait({'chatdata':'','timeout':1}) - logger.info(f'終了しました:[{self.video_id}]') + logger.info(f'[{self.video_id}]finished.') @classmethod def _set_exception_handler(cls, handler): loop = asyncio.get_event_loop() - #default handler: cls._handle_exception loop.set_exception_handler(handler) @classmethod def _handle_exception(cls, loop, context): - #msg = context.get("exception", context["message"]) if not isinstance(context["exception"],CancelledError): logger.error(f"Caught exception: {context}") loop= asyncio.get_event_loop() @@ -282,12 +317,12 @@ class LiveChatAsync: @classmethod async def shutdown(cls, event, sig = None, handler=None): - logger.debug("シャットダウンしています") + logger.debug("shutdown...") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] [task.cancel() for task in tasks] - logger.debug(f"残っているタスクを終了しています") + logger.debug(f"complete remaining tasks...") await asyncio.gather(*tasks,return_exceptions=True) loop = asyncio.get_event_loop() loop.stop() \ No newline at end of file diff --git a/pytchat/core_async/replaychat.py b/pytchat/core_async/replaychat.py index 95499fe..45f0c6e 100644 --- a/pytchat/core_async/replaychat.py +++ b/pytchat/core_async/replaychat.py @@ -8,7 +8,7 @@ import traceback import urllib.parse from aiohttp.client_exceptions import ClientConnectorError from concurrent.futures import CancelledError -from queue import Queue +from asyncio import Queue from .buffer import Buffer from ..parser.replay import Parser from .. import config @@ -18,8 +18,9 @@ from ..processors.default.processor import DefaultProcessor from ..processors.combinator import Combinator logger = config.logger(__name__) -MAX_RETRY = 10 headers = config.headers +MAX_RETRY = 10 + @@ -135,28 +136,9 @@ class ReplayChatAsync: """最初のcontinuationパラメータを取得し、 _listenループのタスクを作成し開始する """ - initial_continuation = await self._get_initial_continuation() - if initial_continuation is None: - self.terminate() - logger.debug(f"[{self.video_id}]No initial continuation.") - return + initial_continuation = arcparam.getparam(self.video_id, self.seektime) await self._listen(initial_continuation) - async def _get_initial_continuation(self): - ''' チャットデータ取得に必要な最初のcontinuationを取得する。''' - try: - initial_continuation = arcparam.get(self.video_id,self.seektime) - except ChatParseException as e: - self.terminate() - logger.debug(f"[{self.video_id}]Error:{str(e)}") - return - except KeyError: - logger.debug(f"[{self.video_id}]KeyError:" - f"{traceback.format_exc(limit = -1)}") - self.terminate() - return - return initial_continuation - async def _listen(self, continuation): ''' continuationに紐付いたチャットデータを取得し Bufferにチャットデータを格納、 @@ -171,11 +153,13 @@ class ReplayChatAsync: async with aiohttp.ClientSession() as session: while(continuation and self._is_alive): if self._pauser.empty(): - #pause + '''pause''' await self._pauser.get() - #resume - #prohibit from blocking by putting None into _pauser. + '''resume: + prohibit from blocking by putting None into _pauser. + ''' self._pauser.put_nowait(None) + #when replay, not reacquire continuation param livechat_json = (await self._get_livechat_json(continuation, session, headers) ) @@ -197,11 +181,12 @@ class ReplayChatAsync: await asyncio.sleep(diff_time) continuation = metadata.get('continuation') except ChatParseException as e: + self.terminate() logger.error(f"{str(e)}(video_id:\"{self.video_id}\")") return except (TypeError , json.JSONDecodeError) : - logger.error(f"{traceback.format_exc(limit = -1)}") self.terminate() + logger.error(f"{traceback.format_exc(limit = -1)}") return logger.debug(f"[{self.video_id}]チャット取得を終了しました。") @@ -261,14 +246,17 @@ class ReplayChatAsync: "既にcallbackを登録済みのため、get()は実行できません。") def pause(self): + if self._callback is None: + return if not self._pauser.empty(): - self._pauser.get() + self._pauser.get_nowait() def resume(self): + if self._callback is None: + return if self._pauser.empty(): self._pauser.put_nowait(None) - def is_alive(self): return self._is_alive @@ -292,12 +280,10 @@ class ReplayChatAsync: @classmethod def _set_exception_handler(cls, handler): loop = asyncio.get_event_loop() - #default handler: cls._handle_exception loop.set_exception_handler(handler) @classmethod def _handle_exception(cls, loop, context): - #msg = context.get("exception", context["message"]) if not isinstance(context["exception"],CancelledError): logger.error(f"Caught exception: {context}") loop= asyncio.get_event_loop() diff --git a/pytchat/core_multithread/livechat.py b/pytchat/core_multithread/livechat.py index 30b9249..a07a439 100644 --- a/pytchat/core_multithread/livechat.py +++ b/pytchat/core_multithread/livechat.py @@ -7,11 +7,12 @@ import time import traceback import urllib.parse from concurrent.futures import CancelledError, ThreadPoolExecutor +from queue import Queue from .buffer import Buffer from ..parser.live import Parser from .. import config from ..exceptions import ChatParseException,IllegalFunctionCall -from ..paramgen import liveparam +from ..paramgen import liveparam, arcparam from ..processors.default.processor import DefaultProcessor from ..processors.combinator import Combinator @@ -63,6 +64,7 @@ class LiveChat: #チャット監視中のListenerのリスト _listeners= [] def __init__(self, video_id, + seektime = 0, processor = DefaultProcessor(), buffer = None, interruptable = True, @@ -71,6 +73,7 @@ class LiveChat: direct_mode = False ): self.video_id = video_id + self.seektime = seektime if isinstance(processor, tuple): self.processor = Combinator(processor) else: @@ -82,7 +85,11 @@ class LiveChat: self._direct_mode = direct_mode self._is_alive = True self._parser = Parser() + self._pauser = Queue() + self._pauser.put_nowait(None) self._setup() + self._first_fetch = True + self._fetch_url = "live_chat/get_live_chat?continuation=" if not LiveChat._setup_finished: LiveChat._setup_finished = True @@ -93,11 +100,12 @@ class LiveChat: LiveChat._listeners.append(self) def _setup(self): + #logger.debug("setup") #direct modeがTrueでcallback未設定の場合例外発生。 if self._direct_mode: if self._callback is None: raise IllegalFunctionCall( - "direct_mode=Trueの場合callbackの設定が必須です。") + "When direct_mode=True, callback parameter is required.") else: #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 if self._buffer is None: @@ -117,48 +125,30 @@ class LiveChat: listen_task.add_done_callback(self._done_callback) def _startlisten(self): - """最初のcontinuationパラメータを取得し、 - _listenループのタスクを作成し開始する + time.sleep(0.1) #sleep shortly to prohibit skipping fetching data + """Fetch first continuation parameter, + create and start _listen loop. """ - initial_continuation = self._get_initial_continuation() - if initial_continuation is None: - self.terminate() - logger.debug(f"[{self.video_id}]No initial continuation.") - return + initial_continuation = liveparam.getparam(self.video_id,3) self._listen(initial_continuation) - def _get_initial_continuation(self): - ''' チャットデータ取得に必要な最初のcontinuationを取得する。''' - try: - initial_continuation = liveparam.getparam(self.video_id) - except ChatParseException as e: - self.terminate() - logger.debug(f"[{self.video_id}]Error:{str(e)}") - return - except KeyError: - logger.debug(f"[{self.video_id}]KeyError:" - f"{traceback.format_exc(limit = -1)}") - self.terminate() - return - return initial_continuation - def _listen(self, continuation): - ''' continuationに紐付いたチャットデータを取得し - Bufferにチャットデータを格納、 - 次のcontinuaitonを取得してループする。 + ''' Fetch chat data and store them into buffer, + get next continuaiton parameter and loop. Parameter --------- continuation : str - 次のチャットデータ取得に必要なパラメータ + parameter for next chat data ''' try: with requests.Session() as session: while(continuation and self._is_alive): - livechat_json = ( - self._get_livechat_json(continuation, session, headers) - ) - metadata, chatdata = self._parser.parse( livechat_json ) + continuation = self._check_pause(continuation) + contents = self._get_contents( + continuation, session, headers) + metadata, chatdata = self._parser.parse(contents) + timeout = metadata['timeoutMs']/1000 chat_component = { "video_id" : self.video_id, @@ -173,35 +163,70 @@ class LiveChat: else: self._buffer.put(chat_component) diff_time = timeout - (time.time()-time_mark) - if diff_time < 0 : diff_time=0 - time.sleep(diff_time) + time.sleep(diff_time if diff_time > 0 else 0) continuation = metadata.get('continuation') except ChatParseException as e: - self.terminate() - logger.error(f"{str(e)}(video_id:\"{self.video_id}\")") + #self.terminate() + logger.debug(f"[{self.video_id}]{str(e)}") return except (TypeError , json.JSONDecodeError) : - self.terminate() + #self.terminate() logger.error(f"{traceback.format_exc(limit = -1)}") return - logger.debug(f"[{self.video_id}]チャット取得を終了しました。") + logger.debug(f"[{self.video_id}]finished fetching chat.") + + def _check_pause(self, continuation): + if self._pauser.empty(): + '''pause''' + self._pauser.get() + '''resume: + prohibit from blocking by putting None into _pauser. + ''' + self._pauser.put_nowait(None) + if self._parser.mode == 'LIVE': + continuation = liveparam.getparam(self.video_id,3) + return continuation + + def _get_contents(self, continuation, session, headers): + '''Get 'contents' dict from livechat json. + If contents is None at first fetching, + try to fetch archive chat data. + + Return: + ------- + 'contents' dict which includes metadata & chatdata. + ''' + livechat_json = ( + self._get_livechat_json(continuation, session, headers) + ) + contents = self._parser.get_contents(livechat_json) + if self._first_fetch: + if contents is None: + '''Try to fetch archive chat data.''' + self._parser.mode = 'REPLAY' + self._fetch_url = ("live_chat_replay/" + "get_live_chat_replay?continuation=") + continuation = arcparam.getparam(self.video_id, self.seektime) + livechat_json = ( self._get_livechat_json( + continuation, session, headers)) + contents = self._parser.get_contents(livechat_json) + self._first_fetch = False + return contents def _get_livechat_json(self, continuation, session, headers): ''' - チャットデータが格納されたjsonデータを取得する。 + Get json which includes chat data. ''' continuation = urllib.parse.quote(continuation) livechat_json = None status_code = 0 url =( - f"https://www.youtube.com/live_chat/get_live_chat?" - f"continuation={continuation}&pbj=1") + f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1") for _ in range(MAX_RETRY + 1): with session.get(url ,headers = headers) as resp: try: text = resp.text - status_code = resp.status_code livechat_json = json.loads(text) break except json.JSONDecodeError : @@ -210,7 +235,7 @@ class LiveChat: else: logger.error(f"[{self.video_id}]" f"Exceeded retry count. status_code={status_code}") - self.terminate() + #self.terminate() return None return livechat_json @@ -241,6 +266,21 @@ class LiveChat: raise IllegalFunctionCall( "既にcallbackを登録済みのため、get()は実行できません。") + def get_mode(self): + return self._parser.mode + + def pause(self): + if self._callback is None: + return + if not self._pauser.empty(): + self._pauser.get() + + def resume(self): + if self._callback is None: + return + if self._pauser.empty(): + self._pauser.put_nowait(None) + def is_alive(self): return self._is_alive @@ -259,10 +299,10 @@ class LiveChat: if self._direct_mode == False: #bufferにダミーオブジェクトを入れてis_alive()を判定させる self._buffer.put({'chatdata':'','timeout':1}) - logger.info(f'[{self.video_id}]終了しました') + logger.info(f'[{self.video_id}]finished.') @classmethod def shutdown(cls, event, sig = None, handler=None): - logger.debug("シャットダウンしています") + logger.debug("shutdown...") for t in LiveChat._listeners: t._is_alive = False \ No newline at end of file diff --git a/pytchat/core_multithread/replaychat.py b/pytchat/core_multithread/replaychat.py index 43b5e2e..10106ca 100644 --- a/pytchat/core_multithread/replaychat.py +++ b/pytchat/core_multithread/replaychat.py @@ -139,7 +139,7 @@ class ReplayChat: def _get_initial_continuation(self): ''' チャットデータ取得に必要な最初のcontinuationを取得する。''' try: - initial_continuation = arcparam.get(self.video_id,self.seektime) + initial_continuation = arcparam.getparam(self.video_id,self.seektime) except ChatParseException as e: self.terminate() logger.debug(f"[{self.video_id}]Error:{str(e)}") diff --git a/pytchat/paramgen/arcparam.py b/pytchat/paramgen/arcparam.py index 6cc7650..99154d7 100644 --- a/pytchat/paramgen/arcparam.py +++ b/pytchat/paramgen/arcparam.py @@ -65,7 +65,7 @@ def _tzparity(video_id,times): return ((times^t) % 2).to_bytes(1,'big') -def get(video_id, seektime = 0, topchatonly = False): +def _build(video_id, seektime, topchatonly = False): switch_01 = b'\x04' if topchatonly else b'\x01' @@ -116,5 +116,12 @@ def get(video_id, seektime = 0, topchatonly = False): ).decode() ) - - +def getparam(video_id, seektime = 0): + ''' + Parameter + --------- + seektime : int + unit:seconds + start position of fetching chat data. + ''' + return _build(video_id, seektime) diff --git a/pytchat/paramgen/liveparam.py b/pytchat/paramgen/liveparam.py index f53fdda..f3bb8b7 100644 --- a/pytchat/paramgen/liveparam.py +++ b/pytchat/paramgen/liveparam.py @@ -155,7 +155,7 @@ def _times(past_sec): return list(map(lambda x:int(x*1000000),[_ts1,_ts2,_ts3,_ts4,_ts5])) -def getparam(video_id,past_sec = 60): +def getparam(video_id,past_sec = 0): ''' Parameter --------- diff --git a/pytchat/parser/live.py b/pytchat/parser/live.py index e32177f..b58ab3d 100644 --- a/pytchat/parser/live.py +++ b/pytchat/parser/live.py @@ -9,57 +9,87 @@ from .. import config from .. exceptions import ( ResponseContextError, NoContentsException, - NoContinuationsException ) + NoContinuationsException, + ChatParseException ) logger = config.logger(__name__) - +from .. import util class Parser: - def parse(self, jsn): + + def __init__(self): + self.mode = 'LIVE' + + def get_contents(self, jsn): + if jsn is None: + raise ChatParseException('Called with none JSON object.') + if jsn['response']['responseContext'].get('errors'): + raise ResponseContextError('The video_id would be wrong, or video is deleted or private.') + contents=jsn['response'].get('continuationContents') + return contents + + def parse(self, contents): """ このparse関数はLiveChat._listen() 関数から定期的に呼び出される。 - 引数jsnはYoutubeから取得したチャットデータの生JSONであり、 - このparse関数によって与えられたJSONを以下に分割して返す。 - + timeout (次のチャットデータ取得までのインターバル) - + chat data(チャットデータ本体) - + continuation (次のチャットデータ取得に必要となるパラメータ). + 引数contentsはYoutubeから取得したチャットデータの生JSONであり、 + 与えられたJSONをチャットデータとメタデータに分割して返す。 Parameter ---------- - + jsn : dict + + contents : dict + Youtubeから取得したチャットデータのJSONオブジェクト。 (pythonの辞書形式に変換済みの状態で渡される) Returns ------- - + metadata : dict - + チャットデータに付随するメタデータ。timeout、 動画ID、continuationパラメータで構成される。 + tuple: + + metadata : dict  チャットデータに付随するメタデータ + + timeout + + video_id + + continuation + chatdata : list[dict] - + チャットデータ本体のリスト。 +     チャットデータ本体のリスト。 """ - if jsn is None: - return {'timeoutMs':0,'continuation':None},[] - if jsn['response']['responseContext'].get('errors'): - raise ResponseContextError('動画に接続できません。' - '動画IDが間違っているか、動画が削除/非公開の可能性があります。') - contents=jsn['response'].get('continuationContents') - #配信が終了した場合、もしくはチャットデータが取得できない場合 + if contents is None: - raise NoContentsException('チャットデータを取得できませんでした。') + '''配信が終了した場合、もしくはチャットデータが取得できない場合''' + raise NoContentsException('Chat data stream is empty.') cont = contents['liveChatContinuation']['continuations'][0] if cont is None: - raise NoContinuationsException('Continuationがありません。') + raise NoContinuationsException('No Continuation') metadata = (cont.get('invalidationContinuationData') or cont.get('timedContinuationData') or - cont.get('reloadContinuationData') + cont.get('reloadContinuationData') or + cont.get('liveChatReplayContinuationData') ) if metadata is None: + if cont.get("playerSeekContinuationData"): + raise ChatParseException('Finished chat data') unknown = list(cont.keys())[0] if unknown: logger.debug(f"Received unknown continuation type:{unknown}") metadata = cont.get(unknown) - metadata.setdefault('timeoutMs', 10000) + else: + raise ChatParseException('Cannot extract continuation data') + return self._create_data(metadata, contents) + + def _create_data(self, metadata, contents): chatdata = contents['liveChatContinuation'].get('actions') + if self.mode == 'LIVE': + metadata.setdefault('timeoutMs', 10000) + else: + interval = self._get_interval(chatdata) + metadata.setdefault("timeoutMs",interval) + """アーカイブ済みチャットはライブチャットと構造が異なっているため、以下の行により + ライブチャットと同じ形式にそろえる""" + chatdata = [action["replayChatItemAction"]["actions"][0] for action in chatdata] return metadata, chatdata + + def _get_interval(self, actions: list): + if actions is None: + return 0 + start = int(actions[0]["replayChatItemAction"]["videoOffsetTimeMsec"]) + last = int(actions[-1]["replayChatItemAction"]["videoOffsetTimeMsec"]) + return (last - start) \ No newline at end of file diff --git a/tests/test_arcparam.py b/tests/test_arcparam.py index 926dd59..e53eda3 100644 --- a/tests/test_arcparam.py +++ b/tests/test_arcparam.py @@ -5,16 +5,16 @@ import requests, json from pytchat.paramgen import arcparam def test_arcparam_0(mocker): - param = arcparam.get("01234567890") + param = arcparam.getparam("01234567890") assert "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoATAAOABAAEgEUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAXIECAEQAXgA" == param def test_arcparam_1(mocker): - param = arcparam.get("01234567890", seektime = 100000) + param = arcparam.getparam("01234567890", seektime = 100000) assert "op2w0wR3GjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QogNDbw_QCMAA4AEAASANSHAgAEAAYACAAKg5zdGF0aWNjaGVja3N1bUAAWANgAWgBcgQIARABeAA%3D" == param def test_arcparam_2(mocker): - param = arcparam.get("SsjCnHOk-Sk") + param = arcparam.getparam("SsjCnHOk-Sk") url=f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1" resp = requests.Session().get(url,headers = config.headers) jsn = json.loads(resp.text) diff --git a/tests/test_compatible_processor.py b/tests/test_compatible_processor.py index fc3051a..efc81b9 100644 --- a/tests/test_compatible_processor.py +++ b/tests/test_compatible_processor.py @@ -20,7 +20,7 @@ def test_textmessage(mocker): _json = _open_file("tests/testdata/compatible/textmessage.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 7, @@ -57,7 +57,7 @@ def test_newsponcer(mocker): _json = _open_file("tests/testdata/compatible/newSponsor.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 7, @@ -93,7 +93,7 @@ def test_superchat(mocker): _json = _open_file("tests/testdata/compatible/superchat.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 7, diff --git a/tests/test_parser.py b/tests/test_parser.py index b338908..b40bee8 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -21,7 +21,7 @@ def test_finishedlive(*mock): _text = json.loads(_text) try: - parser.parse(_text) + parser.parse(parser.get_contents(_text)) assert False except NoContentsException: assert True @@ -34,7 +34,7 @@ def test_parsejson(*mock): _text = json.loads(_text) try: - parser.parse(_text) + parser.parse(parser.get_contents(_text)) jsn = _text timeout = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["timeoutMs"] continuation = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["continuation"] diff --git a/tests/test_speed_calculator.py b/tests/test_speed_calculator.py index 8a096c8..f137c52 100644 --- a/tests/test_speed_calculator.py +++ b/tests/test_speed_calculator.py @@ -21,7 +21,7 @@ def test_speed_1(mocker): _json = _open_file("tests/testdata/speed/speedtest_normal.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 10, @@ -37,7 +37,7 @@ def test_speed_2(mocker): _json = _open_file("tests/testdata/speed/speedtest_undefined.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 10, @@ -53,7 +53,7 @@ def test_speed_3(mocker): _json = _open_file("tests/testdata/speed/speedtest_empty.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 10,