diff --git a/pytchat/core_async/livechat copy.py b/pytchat/core_async/livechat copy.py new file mode 100644 index 0000000..82b3bdc --- /dev/null +++ b/pytchat/core_async/livechat copy.py @@ -0,0 +1,297 @@ +import aiohttp, asyncio +import datetime +import json +import random +import signal +import time +import traceback +import urllib.parse +from aiohttp.client_exceptions import ClientConnectorError +from concurrent.futures import CancelledError +from asyncio import Queue +from .buffer import Buffer +from ..parser.live import Parser +from .. import config +from ..exceptions import ChatParseException,IllegalFunctionCall +from ..paramgen import liveparam +from ..processors.default.processor import DefaultProcessor +from ..processors.combinator import Combinator + +logger = config.logger(__name__) +headers = config.headers +MAX_RETRY = 10 + + +class LiveChatAsync: + '''asyncio(aiohttp)を利用してYouTubeのライブ配信のチャットデータを取得する。 + + Parameter + --------- + video_id : str + 動画ID + + processor : ChatProcessor + チャットデータを加工するオブジェクト + + buffer : Buffer(maxsize:20[default]) + チャットデータchat_componentを格納するバッファ。 + maxsize : 格納できるchat_componentの個数 + default値20個。1個で約5~10秒分。 + + interruptable : bool + Ctrl+Cによる処理中断を行うかどうか。 + + callback : func + _listen()関数から一定間隔で自動的に呼びだす関数。 + + done_callback : func + listener終了時に呼び出すコールバック。 + + exception_handler : func + 例外を処理する関数 + + direct_mode : bool + Trueの場合、bufferを使わずにcallbackを呼ぶ。 + Trueの場合、callbackの設定が必須 + (設定していない場合IllegalFunctionCall例外を発生させる) + + Attributes + --------- + _is_alive : bool + チャット取得を停止するためのフラグ + ''' + + _setup_finished = False + + def __init__(self, video_id, + seektime = 0, + processor = DefaultProcessor(), + buffer = None, + interruptable = True, + callback = None, + done_callback = None, + exception_handler = None, + direct_mode = False): + self.video_id = video_id + self.seektime = seektime + if isinstance(processor, tuple): + self.processor = Combinator(processor) + else: + self.processor = processor + self._buffer = buffer + self._callback = callback + self._done_callback = done_callback + self._exception_handler = exception_handler + self._direct_mode = direct_mode + self._is_alive = True + self._parser = Parser() + self._pauser = Queue() + self._pauser.put_nowait(None) + self._setup() + + if not LiveChatAsync._setup_finished: + LiveChatAsync._setup_finished = True + if exception_handler == None: + self._set_exception_handler(self._handle_exception) + else: + self._set_exception_handler(exception_handler) + if interruptable: + signal.signal(signal.SIGINT, + (lambda a, b:asyncio.create_task( + LiveChatAsync.shutdown(None,signal.SIGINT,b)) + )) + + def _setup(self): + #direct modeがTrueでcallback未設定の場合例外発生。 + if self._direct_mode: + if self._callback is None: + raise IllegalFunctionCall( + "direct_mode=Trueの場合callbackの設定が必須です。") + else: + #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 + if self._buffer is None: + self._buffer = Buffer(maxsize = 20) + #callbackが指定されている場合はcallbackを呼ぶループタスクを作成 + if self._callback is None: + pass + else: + #callbackを呼ぶループタスクの開始 + loop = asyncio.get_event_loop() + loop.create_task(self._callback_loop(self._callback)) + #_listenループタスクの開始 + loop = asyncio.get_event_loop() + listen_task = loop.create_task(self._startlisten()) + #add_done_callbackの登録 + if self._done_callback is None: + listen_task.add_done_callback(self.finish) + else: + listen_task.add_done_callback(self._done_callback) + + async def _startlisten(self): + """最初のcontinuationパラメータを取得し、 + _listenループのタスクを作成し開始する + """ + initial_continuation = liveparam.getparam(self.video_id,3) + await self._listen(initial_continuation) + + async def _listen(self, continuation): + ''' continuationに紐付いたチャットデータを取得し + Bufferにチャットデータを格納、 + 次のcontinuaitonを取得してループする。 + + Parameter + --------- + continuation : str + 次のチャットデータ取得に必要なパラメータ + ''' + try: + async with aiohttp.ClientSession() as session: + while(continuation and self._is_alive): + if self._pauser.empty(): + '''pause''' + await self._pauser.get() + '''resume: + prohibit from blocking by putting None into _pauser. + ''' + self._pauser.put_nowait(None) + continuation= liveparam.getparam(self.video_id,3) + livechat_json = (await + self._get_livechat_json(continuation, session, headers) + ) + metadata, chatdata = self._parser.parse( livechat_json ) + timeout = metadata['timeoutMs']/1000 + chat_component = { + "video_id" : self.video_id, + "timeout" : timeout, + "chatdata" : chatdata + } + time_mark =time.time() + if self._direct_mode: + await self._callback( + self.processor.process([chat_component]) + ) + else: + await self._buffer.put(chat_component) + diff_time = timeout - (time.time()-time_mark) + await asyncio.sleep(diff_time) + continuation = metadata.get('continuation') + except ChatParseException as e: + self.terminate() + logger.error(f"{str(e)}(video_id:\"{self.video_id}\")") + return + except (TypeError , json.JSONDecodeError) : + self.terminate() + logger.error(f"{traceback.format_exc(limit = -1)}") + return + + logger.debug(f"[{self.video_id}]チャット取得を終了しました。") + self.terminate() + + async def _get_livechat_json(self, continuation, session, headers): + ''' + チャットデータが格納されたjsonデータを取得する。 + ''' + continuation = urllib.parse.quote(continuation) + livechat_json = None + status_code = 0 + url =( + f"https://www.youtube.com/live_chat/get_live_chat?" + f"continuation={continuation}&pbj=1") + for _ in range(MAX_RETRY + 1): + async with session.get(url ,headers = headers) as resp: + try: + text = await resp.text() + status_code = resp.status + livechat_json = json.loads(text) + break + except (ClientConnectorError,json.JSONDecodeError) : + await asyncio.sleep(1) + continue + else: + logger.error(f"[{self.video_id}]" + f"Exceeded retry count. status_code={status_code}") + return None + return livechat_json + + async def _callback_loop(self,callback): + """ コンストラクタでcallbackを指定している場合、バックグラウンドで + callbackに指定された関数に一定間隔でチャットデータを投げる。 + + Parameter + --------- + callback : func + 加工済みのチャットデータを渡す先の関数。 + """ + while self.is_alive(): + items = await self._buffer.get() + data = self.processor.process(items) + await callback(data) + + async def get(self): + """ bufferからデータを取り出し、processorに投げ、 + 加工済みのチャットデータを返す。 + + Returns + : Processorによって加工されたチャットデータ + """ + if self._callback is None: + items = await self._buffer.get() + return self.processor.process(items) + raise IllegalFunctionCall( + "既にcallbackを登録済みのため、get()は実行できません。") + + def pause(self): + if self._callback is None: + return + if not self._pauser.empty(): + self._pauser.get_nowait() + + def resume(self): + if self._callback is None: + return + if self._pauser.empty(): + self._pauser.put_nowait(None) + + def is_alive(self): + return self._is_alive + + def finish(self,sender): + '''Listener終了時のコールバック''' + try: + self.terminate() + except CancelledError: + logger.debug(f'[{self.video_id}]cancelled:{sender}') + + def terminate(self): + ''' + Listenerを終了する。 + ''' + self._is_alive = False + if self._direct_mode == False: + #bufferにダミーオブジェクトを入れてis_alive()を判定させる + self._buffer.put_nowait({'chatdata':'','timeout':1}) + logger.info(f'[{self.video_id}]終了しました') + + @classmethod + def _set_exception_handler(cls, handler): + loop = asyncio.get_event_loop() + loop.set_exception_handler(handler) + + @classmethod + def _handle_exception(cls, loop, context): + if not isinstance(context["exception"],CancelledError): + logger.error(f"Caught exception: {context}") + loop= asyncio.get_event_loop() + loop.create_task(cls.shutdown(None,None,None)) + + @classmethod + async def shutdown(cls, event, sig = None, handler=None): + logger.debug("シャットダウンしています") + tasks = [t for t in asyncio.all_tasks() if t is not + asyncio.current_task()] + [task.cancel() for task in tasks] + + logger.debug(f"残っているタスクを終了しています") + await asyncio.gather(*tasks,return_exceptions=True) + loop = asyncio.get_event_loop() + loop.stop() \ No newline at end of file diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index 82b3bdc..1eb7269 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -13,7 +13,7 @@ from .buffer import Buffer from ..parser.live import Parser from .. import config from ..exceptions import ChatParseException,IllegalFunctionCall -from ..paramgen import liveparam +from ..paramgen import liveparam, arcparam from ..processors.default.processor import DefaultProcessor from ..processors.combinator import Combinator @@ -88,7 +88,9 @@ class LiveChatAsync: self._pauser = Queue() self._pauser.put_nowait(None) self._setup() - + #self._paramgen = liveparam + self._first_fetch = True + self._fetch_url = "live_chat/get_live_chat?continuation=" if not LiveChatAsync._setup_finished: LiveChatAsync._setup_finished = True if exception_handler == None: @@ -154,11 +156,26 @@ class LiveChatAsync: prohibit from blocking by putting None into _pauser. ''' self._pauser.put_nowait(None) - continuation= liveparam.getparam(self.video_id,3) + if self._parser.mode == 'LIVE': + continuation = liveparam.getparam(self.video_id,3) livechat_json = (await self._get_livechat_json(continuation, session, headers) ) - metadata, chatdata = self._parser.parse( livechat_json ) + contents = self._parser.get_contents(livechat_json) + '''switch live or replay''' + if self._first_fetch: + if contents is None: + self._parser.mode = 'REPLAY' + self._fetch_url = ("live_chat_replay/" + "get_live_chat_replay?continuation=") + continuation = arcparam.getparam(self.video_id) + livechat_json = (await self._get_livechat_json( + continuation, session, headers)) + contents = self._parser.get_contents(livechat_json) + self._first_fetch = False + + metadata, chatdata = self._parser.parse( contents ) + timeout = metadata['timeoutMs']/1000 chat_component = { "video_id" : self.video_id, @@ -191,12 +208,12 @@ class LiveChatAsync: ''' チャットデータが格納されたjsonデータを取得する。 ''' + continuation = urllib.parse.quote(continuation) livechat_json = None status_code = 0 url =( - f"https://www.youtube.com/live_chat/get_live_chat?" - f"continuation={continuation}&pbj=1") + f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1") for _ in range(MAX_RETRY + 1): async with session.get(url ,headers = headers) as resp: try: diff --git a/pytchat/parser/live.py b/pytchat/parser/live.py index e32177f..0207ac8 100644 --- a/pytchat/parser/live.py +++ b/pytchat/parser/live.py @@ -14,9 +14,24 @@ from .. exceptions import ( logger = config.logger(__name__) - +from .. import util class Parser: - def parse(self, jsn): + URL_LIVE = "live_chat/get_live_chat?continuation=" + URL_REPLAY = "live_chat_replay/get_live_chat_replay?continuation=" + + def __init__(self): + self.mode = 'LIVE' + + def get_contents(self, jsn): + if jsn is None: + return {'timeoutMs':0,'continuation':None},[] + if jsn['response']['responseContext'].get('errors'): + raise ResponseContextError('The video_id would be wrong, or video is deleted or private.') + contents=jsn['response'].get('continuationContents') + return contents + + def parse(self, contents): + util.save(json.dumps(contents,ensure_ascii=False,indent=2),"v:\\~\\test_",".json") """ このparse関数はLiveChat._listen() 関数から定期的に呼び出される。 引数jsnはYoutubeから取得したチャットデータの生JSONであり、 @@ -27,7 +42,7 @@ class Parser: Parameter ---------- - + jsn : dict + + contents : dict + Youtubeから取得したチャットデータのJSONオブジェクト。 (pythonの辞書形式に変換済みの状態で渡される) @@ -38,19 +53,19 @@ class Parser: + chatdata : list[dict] + チャットデータ本体のリスト。 """ - if jsn is None: - return {'timeoutMs':0,'continuation':None},[] - if jsn['response']['responseContext'].get('errors'): - raise ResponseContextError('動画に接続できません。' - '動画IDが間違っているか、動画が削除/非公開の可能性があります。') - contents=jsn['response'].get('continuationContents') - #配信が終了した場合、もしくはチャットデータが取得できない場合 + # if jsn is None: + # return {'timeoutMs':0,'continuation':None},[] + # if jsn['response']['responseContext'].get('errors'): + # raise ResponseContextError('動画に接続できません。' + # '動画IDが間違っているか、動画が削除/非公開の可能性があります。') + # contents=jsn['response'].get('continuationContents') + '''配信が終了した場合、もしくはチャットデータが取得できない場合''' if contents is None: raise NoContentsException('チャットデータを取得できませんでした。') cont = contents['liveChatContinuation']['continuations'][0] if cont is None: - raise NoContinuationsException('Continuationがありません。') + raise NoContinuationsException('No Continuation') metadata = (cont.get('invalidationContinuationData') or cont.get('timedContinuationData') or cont.get('reloadContinuationData') @@ -60,6 +75,23 @@ class Parser: if unknown: logger.debug(f"Received unknown continuation type:{unknown}") metadata = cont.get(unknown) - metadata.setdefault('timeoutMs', 10000) + return self._create_data(metadata, contents) + + def _create_data(self, metadata, contents): chatdata = contents['liveChatContinuation'].get('actions') + if self.mode == 'LIVE': + metadata.setdefault('timeoutMs', 10000) + else: + interval = self._get_interval(chatdata) + metadata.setdefault("timeoutMs",interval) + """アーカイブ済みチャットはライブチャットと構造が異なっているため、以下の行により + ライブチャットと同じ形式にそろえる""" + chatdata = [action["replayChatItemAction"]["actions"][0] for action in chatdata] return metadata, chatdata + + def _get_interval(self, actions: list): + if actions is None: + return 0 + start = int(actions[0]["replayChatItemAction"]["videoOffsetTimeMsec"]) + last = int(actions[-1]["replayChatItemAction"]["videoOffsetTimeMsec"]) + return (last - start) \ No newline at end of file diff --git a/tests/test_compatible_processor.py b/tests/test_compatible_processor.py index fc3051a..efc81b9 100644 --- a/tests/test_compatible_processor.py +++ b/tests/test_compatible_processor.py @@ -20,7 +20,7 @@ def test_textmessage(mocker): _json = _open_file("tests/testdata/compatible/textmessage.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 7, @@ -57,7 +57,7 @@ def test_newsponcer(mocker): _json = _open_file("tests/testdata/compatible/newSponsor.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 7, @@ -93,7 +93,7 @@ def test_superchat(mocker): _json = _open_file("tests/testdata/compatible/superchat.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 7, diff --git a/tests/test_parser.py b/tests/test_parser.py index b338908..b40bee8 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -21,7 +21,7 @@ def test_finishedlive(*mock): _text = json.loads(_text) try: - parser.parse(_text) + parser.parse(parser.get_contents(_text)) assert False except NoContentsException: assert True @@ -34,7 +34,7 @@ def test_parsejson(*mock): _text = json.loads(_text) try: - parser.parse(_text) + parser.parse(parser.get_contents(_text)) jsn = _text timeout = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["timeoutMs"] continuation = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["continuation"] diff --git a/tests/test_speed_calculator.py b/tests/test_speed_calculator.py index 8a096c8..f137c52 100644 --- a/tests/test_speed_calculator.py +++ b/tests/test_speed_calculator.py @@ -21,7 +21,7 @@ def test_speed_1(mocker): _json = _open_file("tests/testdata/speed/speedtest_normal.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 10, @@ -37,7 +37,7 @@ def test_speed_2(mocker): _json = _open_file("tests/testdata/speed/speedtest_undefined.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 10, @@ -53,7 +53,7 @@ def test_speed_3(mocker): _json = _open_file("tests/testdata/speed/speedtest_empty.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 10,