diff --git a/pytchat/api.py b/pytchat/api.py index 516a416..7fa873a 100644 --- a/pytchat/api.py +++ b/pytchat/api.py @@ -1,5 +1,7 @@ -from .core_async.livechat import LiveChatAsync from .core_multithread.livechat import LiveChat +from .core_async.livechat import LiveChatAsync +from .core_multithread.replaychat import ReplayChat +from .core_async.replaychat import ReplayChatAsync from .processors.chat_processor import ChatProcessor from .processors.default.processor import DefaultProcessor from .processors.compatible.processor import CompatibleProcessor diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index aa92adc..15c757c 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -10,12 +10,11 @@ import urllib.parse from aiohttp.client_exceptions import ClientConnectorError from concurrent.futures import CancelledError from .buffer import Buffer -from ..parser import Parser +from ..parser.live import Parser from .. import config from .. import mylogger from ..exceptions import ChatParseException,IllegalFunctionCall from ..paramgen import liveparam - from ..processors.default.processor import DefaultProcessor logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE) diff --git a/pytchat/core_async/replaychat.py b/pytchat/core_async/replaychat.py new file mode 100644 index 0000000..55eafc9 --- /dev/null +++ b/pytchat/core_async/replaychat.py @@ -0,0 +1,306 @@ +import aiohttp, asyncio +import datetime +import json +import random +import signal +import time +import traceback +import urllib.parse +from aiohttp.client_exceptions import ClientConnectorError +from concurrent.futures import CancelledError +from queue import Queue +from .buffer import Buffer +from ..parser.replay import Parser +from .. import config +from .. import mylogger +from ..exceptions import ChatParseException,IllegalFunctionCall +from ..paramgen import arcparam +from ..processors.default.processor import DefaultProcessor + +logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE) +MAX_RETRY = 10 +headers = config.headers + +class ReplayChatAsync: + ''' aiohttpを利用してYouTubeのライブ配信のチャットデータを取得する + + Parameter + --------- + video_id : str + 動画ID + + processor : ChatProcessor + チャットデータを加工するオブジェクト + + buffer : Buffer(maxsize:20[default]) + チャットデータchat_componentを格納するバッファ。 + maxsize : 格納できるchat_componentの個数 + default値20個。1個で約5~10秒分。 + + interruptable : bool + Ctrl+Cによる処理中断を行うかどうか。 + + callback : func + _listen()関数から一定間隔で自動的に呼びだす関数。 + + done_callback : func + listener終了時に呼び出すコールバック。 + + direct_mode : bool + Trueの場合、bufferを使わずにcallbackを呼ぶ。 + Trueの場合、callbackの設定が必須 + (設定していない場合IllegalFunctionCall例外を発生させる) + + Attributes + --------- + _executor : ThreadPoolExecutor + チャットデータ取得ループ(_listen)用のスレッド + + _is_alive : bool + チャット取得を終了したか + ''' + + _setup_finished = False + + def __init__(self, video_id, + seektime =0, + processor = DefaultProcessor(), + buffer = Buffer(maxsize = 20), + interruptable = True, + callback = None, + done_callback = None, + exception_handler = None, + direct_mode = False): + self.video_id = video_id + self.seektime= seektime + self.processor = processor + self._buffer = buffer + self._callback = callback + self._done_callback = done_callback + self._exception_handler = exception_handler + self._direct_mode = direct_mode + self._is_alive = True + self._parser = Parser() + self._pauser = Queue() + self._pauser.put_nowait(None) + self._setup() + + if not ReplayChatAsync._setup_finished: + ReplayChatAsync._setup_finished = True + if exception_handler == None: + self._set_exception_handler(self._handle_exception) + else: + self._set_exception_handler(exception_handler) + if interruptable: + signal.signal(signal.SIGINT, (lambda a, b: + (ReplayChatAsync.shutdown(None,signal.SIGINT,b)) + )) + + def _setup(self): + #direct modeがTrueでcallback未設定の場合例外発生。 + if self._direct_mode: + if self._callback is None: + raise IllegalFunctionCall( + "direct_mode=Trueの場合callbackの設定が必須です。") + else: + #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 + if self._buffer is None: + self._buffer = Buffer(maxsize = 20) + #callbackが指定されている場合はcallbackを呼ぶループタスクを作成 + if self._callback is None: + pass + else: + #callbackを呼ぶループタスクの開始 + loop = asyncio.get_event_loop() + loop.create_task(self._callback_loop(self._callback)) + #_listenループタスクの開始 + loop = asyncio.get_event_loop() + listen_task = loop.create_task(self._startlisten()) + #add_done_callbackの登録 + if self._done_callback is None: + listen_task.add_done_callback(self.finish) + else: + listen_task.add_done_callback(self._done_callback) + + def _startlisten(self): + """最初のcontinuationパラメータを取得し、 + _listenループのタスクを作成し開始する + """ + initial_continuation = await self._get_initial_continuation() + if initial_continuation is None: + self.terminate() + logger.debug(f"[{self.video_id}]No initial continuation.") + return + await self._listen(initial_continuation) + + async def _get_initial_continuation(self): + ''' チャットデータ取得に必要な最初のcontinuationを取得する。''' + try: + initial_continuation = arcparam.get(self.video_id,self.seektime) + except ChatParseException as e: + self.terminate() + logger.debug(f"[{self.video_id}]Error:{str(e)}") + return + except KeyError: + logger.debug(f"[{self.video_id}]KeyError:" + f"{traceback.format_exc(limit = -1)}") + self.terminate() + return + return initial_continuation + + async def _listen(self, continuation): + ''' continuationに紐付いたチャットデータを取得し + にチャットデータを格納、 + 次のcontinuaitonを取得してループする + + Parameter + --------- + continuation : str + 次のチャットデータ取得に必要なパラメータ + ''' + try: + async with aiohttp.ClientSession() as session: + while(continuation and self._is_alive): + if self._pauser.empty(): + #pauseが呼ばれて_pauserが空状態のときは一時停止する + await self._pauser.get() + #resumeが呼ばれて_pauserにitemが入ったら再開する + #直後に_pauserにitem(None)を入れてブロックを防ぐ + self._pauser.put_nowait(None) + livechat_json = (await + self._get_livechat_json(continuation, session, headers) + ) + metadata, chatdata = self._parser.parse( livechat_json ) + timeout = metadata['timeoutMs']/1000 + chat_component = { + "video_id" : self.video_id, + "timeout" : timeout, + "chatdata" : chatdata + } + time_mark =time.time() + if self._direct_mode: + await self._callback( + self.processor.process([chat_component]) + ) + else: + await self._buffer.put(chat_component) + diff_time = timeout - (time.time()-time_mark) + if diff_time < 0 : diff_time=0 + await asyncio.sleep(diff_time) + continuation = metadata.get('continuation') + except ChatParseException as e: + logger.error(f"{str(e)}(動画ID:\"{self.video_id}\")") + return + except (TypeError , json.JSONDecodeError) : + logger.error(f"{traceback.format_exc(limit = -1)}") + return + + logger.debug(f"[{self.video_id}]チャット取得を終了しました。") + + async def _get_livechat_json(self, continuation, session, headers): + ''' + チャットデータが格納されたjsonデータを取得する。 + ''' + continuation = urllib.parse.quote(continuation) + livechat_json = None + status_code = 0 + url =( + f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?" + f"continuation={continuation}&pbj=1") + for _ in range(MAX_RETRY + 1): + async with session.get(url ,headers = headers) as resp: + try: + text = await resp.text() + status_code = resp.status + livechat_json = json.loads(text) + break + except (ClientConnectorError,json.JSONDecodeError) : + await asyncio.sleep(1) + continue + else: + logger.error(f"[{self.video_id}]" + f"Exceeded retry count. status_code={status_code}") + return None + return livechat_json + + async def _callback_loop(self,callback): + """ コンストラクタでcallbackを指定している場合、バックグラウンドで + callbackに指定された関数に一定間隔でチャットデータを投げる。 + + Parameter + --------- + callback : func + 加工済みのチャットデータを渡す先の関数。 + """ + while self.is_alive(): + items = await self._buffer.get() + data = self.processor.process(items) + await callback(data) + + def get(self): + """ bufferからデータを取り出し、processorに投げ、 + 加工済みのチャットデータを返す。 + + Returns + : Processorによって加工されたチャットデータ + """ + if self._callback is None: + items = await self._buffer.get() + return self.processor.process(items) + raise IllegalFunctionCall( + "既にcallbackを登録済みのため、get()は実行できません。") + + def pause(self): + if not self._pauser.empty(): + self._pauser.get() + + def resume(self): + if self._pauser.empty(): + self._pauser.put_nowait(None) + + + def is_alive(self): + return self._is_alive + + def finish(self,sender): + '''Listener終了時のコールバック''' + try: + self.terminate() + except CancelledError: + logger.debug(f'[{self.video_id}]cancelled:{sender}') + + def terminate(self): + ''' + Listenerを終了する。 + ''' + self._is_alive = False + if self._direct_mode == False: + #bufferにダミーオブジェクトを入れてis_alive()を判定させる + self._buffer.put({'chatdata':'','timeout':1}) + logger.info(f'終了しました:[{self.video_id}]') + + @classmethod + def _set_exception_handler(cls, handler): + loop = asyncio.get_event_loop() + #default handler: cls._handle_exception + loop.set_exception_handler(handler) + + @classmethod + def _handle_exception(cls, loop, context): + #msg = context.get("exception", context["message"]) + if not isinstance(context["exception"],CancelledError): + logger.error(f"Caught exception: {context}") + loop= asyncio.get_event_loop() + loop.create_task(cls.shutdown(None,None,None)) + + @classmethod + async def shutdown(cls, event, sig = None, handler=None): + logger.debug("シャットダウンしています") + tasks = [t for t in asyncio.all_tasks() if t is not + asyncio.current_task()] + [task.cancel() for task in tasks] + + logger.debug(f"残っているタスクを終了しています") + await asyncio.gather(*tasks,return_exceptions=True) + loop = asyncio.get_event_loop() + loop.stop() \ No newline at end of file diff --git a/pytchat/core_multithread/livechat.py b/pytchat/core_multithread/livechat.py index 95e9514..c47cae8 100644 --- a/pytchat/core_multithread/livechat.py +++ b/pytchat/core_multithread/livechat.py @@ -8,7 +8,7 @@ import traceback import urllib.parse from concurrent.futures import CancelledError, ThreadPoolExecutor from .buffer import Buffer -from ..parser import Parser +from ..parser.live import Parser from .. import config from .. import mylogger from ..exceptions import ChatParseException,IllegalFunctionCall diff --git a/pytchat/core_multithread/replaychat.py b/pytchat/core_multithread/replaychat.py index 3738844..f8902b1 100644 --- a/pytchat/core_multithread/replaychat.py +++ b/pytchat/core_multithread/replaychat.py @@ -9,7 +9,7 @@ import urllib.parse from concurrent.futures import CancelledError, ThreadPoolExecutor from queue import Queue from .buffer import Buffer -from .replayparser import Parser +from ..parser.replay import Parser from .. import config from .. import mylogger from ..exceptions import ChatParseException,IllegalFunctionCall diff --git a/pytchat/parser/__init__.py b/pytchat/parser/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pytchat/parser.py b/pytchat/parser/live.py similarity index 95% rename from pytchat/parser.py rename to pytchat/parser/live.py index a31ec40..c4dada8 100644 --- a/pytchat/parser.py +++ b/pytchat/parser/live.py @@ -1,7 +1,7 @@ import json -from . import config -from . import mylogger -from . exceptions import ( +from .. import config +from .. import mylogger +from .. exceptions import ( ResponseContextError, NoContentsException, NoContinuationsException ) diff --git a/pytchat/core_multithread/replayparser.py b/pytchat/parser/replay.py similarity index 100% rename from pytchat/core_multithread/replayparser.py rename to pytchat/parser/replay.py