From 9751289ecae91c8dedbf9bf44b1423b7a844875f Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 12:47:40 +0900 Subject: [PATCH 01/19] Integrate _get_initial_continuation --- pytchat/core_async/replaychat.py | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/pytchat/core_async/replaychat.py b/pytchat/core_async/replaychat.py index 95499fe..9479458 100644 --- a/pytchat/core_async/replaychat.py +++ b/pytchat/core_async/replaychat.py @@ -135,28 +135,9 @@ class ReplayChatAsync: """最初のcontinuationパラメータを取得し、 _listenループのタスクを作成し開始する """ - initial_continuation = await self._get_initial_continuation() - if initial_continuation is None: - self.terminate() - logger.debug(f"[{self.video_id}]No initial continuation.") - return + initial_continuation = arcparam.get(self.video_id,self.seektime) await self._listen(initial_continuation) - async def _get_initial_continuation(self): - ''' チャットデータ取得に必要な最初のcontinuationを取得する。''' - try: - initial_continuation = arcparam.get(self.video_id,self.seektime) - except ChatParseException as e: - self.terminate() - logger.debug(f"[{self.video_id}]Error:{str(e)}") - return - except KeyError: - logger.debug(f"[{self.video_id}]KeyError:" - f"{traceback.format_exc(limit = -1)}") - self.terminate() - return - return initial_continuation - async def _listen(self, continuation): ''' continuationに紐付いたチャットデータを取得し Bufferにチャットデータを格納、 From 7308a87a611bf77fab113f7d2bc9b79a42a23199 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 13:15:41 +0900 Subject: [PATCH 02/19] Implement pause/pauser to livechat --- pytchat/core_async/livechat.py | 43 ++++++++++++++++---------------- pytchat/core_async/replaychat.py | 6 +++-- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index c671d1a..09f1c69 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -8,6 +8,7 @@ import traceback import urllib.parse from aiohttp.client_exceptions import ClientConnectorError from concurrent.futures import CancelledError +from queue import Queue from .buffer import Buffer from ..parser.live import Parser from .. import config @@ -63,6 +64,7 @@ class LiveChatAsync: _setup_finished = False def __init__(self, video_id, + seektime = 0, processor = DefaultProcessor(), buffer = None, interruptable = True, @@ -71,6 +73,7 @@ class LiveChatAsync: exception_handler = None, direct_mode = False): self.video_id = video_id + self.seektime = seektime if isinstance(processor, tuple): self.processor = Combinator(processor) else: @@ -82,6 +85,8 @@ class LiveChatAsync: self._direct_mode = direct_mode self._is_alive = True self._parser = Parser() + self._pauser = Queue() + self._pauser.put_nowait(None) self._setup() if not LiveChatAsync._setup_finished: @@ -126,28 +131,9 @@ class LiveChatAsync: """最初のcontinuationパラメータを取得し、 _listenループのタスクを作成し開始する """ - initial_continuation = await self._get_initial_continuation() - if initial_continuation is None: - self.terminate() - logger.debug(f"[{self.video_id}]No initial continuation.") - return + initial_continuation = liveparam.getparam(self.video_id) await self._listen(initial_continuation) - async def _get_initial_continuation(self): - ''' チャットデータ取得に必要な最初のcontinuationを取得する。''' - try: - initial_continuation = liveparam.getparam(self.video_id) - except ChatParseException as e: - self.terminate() - logger.debug(f"[{self.video_id}]Error:{str(e)}") - return - except KeyError: - logger.debug(f"[{self.video_id}]KeyError:" - f"{traceback.format_exc(limit = -1)}") - self.terminate() - return - return initial_continuation - async def _listen(self, continuation): ''' continuationに紐付いたチャットデータを取得し Bufferにチャットデータを格納、 @@ -161,6 +147,12 @@ class LiveChatAsync: try: async with aiohttp.ClientSession() as session: while(continuation and self._is_alive): + if self._pauser.empty(): + #pause + await self._pauser.get() + #resume + #prohibit from blocking by putting None into _pauser. + self._pauser.put_nowait(None) livechat_json = (await self._get_livechat_json(continuation, session, headers) ) @@ -246,6 +238,15 @@ class LiveChatAsync: raise IllegalFunctionCall( "既にcallbackを登録済みのため、get()は実行できません。") + def pause(self): + if not self._pauser.empty(): + self._pauser.get() + + def resume(self): + if self._pauser.empty(): + self._pauser.put_nowait(None) + + def is_alive(self): return self._is_alive @@ -264,7 +265,7 @@ class LiveChatAsync: if self._direct_mode == False: #bufferにダミーオブジェクトを入れてis_alive()を判定させる self._buffer.put_nowait({'chatdata':'','timeout':1}) - logger.info(f'終了しました:[{self.video_id}]') + logger.info(f'[{self.video_id}]終了しました') @classmethod def _set_exception_handler(cls, handler): diff --git a/pytchat/core_async/replaychat.py b/pytchat/core_async/replaychat.py index 9479458..d66a2f6 100644 --- a/pytchat/core_async/replaychat.py +++ b/pytchat/core_async/replaychat.py @@ -18,8 +18,9 @@ from ..processors.default.processor import DefaultProcessor from ..processors.combinator import Combinator logger = config.logger(__name__) -MAX_RETRY = 10 headers = config.headers +MAX_RETRY = 10 + @@ -178,11 +179,12 @@ class ReplayChatAsync: await asyncio.sleep(diff_time) continuation = metadata.get('continuation') except ChatParseException as e: + self.terminate() logger.error(f"{str(e)}(video_id:\"{self.video_id}\")") return except (TypeError , json.JSONDecodeError) : - logger.error(f"{traceback.format_exc(limit = -1)}") self.terminate() + logger.error(f"{traceback.format_exc(limit = -1)}") return logger.debug(f"[{self.video_id}]チャット取得を終了しました。") From 2bb481a228d5856c4c2f9579d51184d1e13a40ee Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 14:27:36 +0900 Subject: [PATCH 03/19] Disable _pauser when callback is unset --- pytchat/config/__init__.py | 2 +- pytchat/core_async/livechat.py | 19 +++++++++++-------- pytchat/paramgen/liveparam.py | 2 +- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pytchat/config/__init__.py b/pytchat/config/__init__.py index 1a84b59..542551e 100644 --- a/pytchat/config/__init__.py +++ b/pytchat/config/__init__.py @@ -1,7 +1,7 @@ import logging from . import mylogger -LOGGER_MODE = None +LOGGER_MODE = logging.DEBUG headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36'} diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index 09f1c69..172a96d 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -8,7 +8,7 @@ import traceback import urllib.parse from aiohttp.client_exceptions import ClientConnectorError from concurrent.futures import CancelledError -from queue import Queue +from asyncio import Queue from .buffer import Buffer from ..parser.live import Parser from .. import config @@ -148,11 +148,13 @@ class LiveChatAsync: async with aiohttp.ClientSession() as session: while(continuation and self._is_alive): if self._pauser.empty(): - #pause + '''pause''' await self._pauser.get() - #resume - #prohibit from blocking by putting None into _pauser. + '''resume: + prohibit from blocking by putting None into _pauser. + ''' self._pauser.put_nowait(None) + continuation= liveparam.getparam(self.video_id) livechat_json = (await self._get_livechat_json(continuation, session, headers) ) @@ -239,14 +241,17 @@ class LiveChatAsync: "既にcallbackを登録済みのため、get()は実行できません。") def pause(self): + if self._callback is None: + return if not self._pauser.empty(): - self._pauser.get() + self._pauser.get_nowait() def resume(self): + if self._callback is None: + return if self._pauser.empty(): self._pauser.put_nowait(None) - def is_alive(self): return self._is_alive @@ -270,12 +275,10 @@ class LiveChatAsync: @classmethod def _set_exception_handler(cls, handler): loop = asyncio.get_event_loop() - #default handler: cls._handle_exception loop.set_exception_handler(handler) @classmethod def _handle_exception(cls, loop, context): - #msg = context.get("exception", context["message"]) if not isinstance(context["exception"],CancelledError): logger.error(f"Caught exception: {context}") loop= asyncio.get_event_loop() diff --git a/pytchat/paramgen/liveparam.py b/pytchat/paramgen/liveparam.py index f53fdda..f3bb8b7 100644 --- a/pytchat/paramgen/liveparam.py +++ b/pytchat/paramgen/liveparam.py @@ -155,7 +155,7 @@ def _times(past_sec): return list(map(lambda x:int(x*1000000),[_ts1,_ts2,_ts3,_ts4,_ts5])) -def getparam(video_id,past_sec = 60): +def getparam(video_id,past_sec = 0): ''' Parameter --------- From d6ea673f98b4523d224d326095193db569138121 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 15:12:48 +0900 Subject: [PATCH 04/19] Fix getting arcparam when resume --- pytchat/core_async/livechat.py | 4 ++-- pytchat/core_async/replaychat.py | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index 172a96d..19aadd3 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -154,7 +154,7 @@ class LiveChatAsync: prohibit from blocking by putting None into _pauser. ''' self._pauser.put_nowait(None) - continuation= liveparam.getparam(self.video_id) + continuation= liveparam.getparam(self.video_id,3) livechat_json = (await self._get_livechat_json(continuation, session, headers) ) @@ -185,6 +185,7 @@ class LiveChatAsync: return logger.debug(f"[{self.video_id}]チャット取得を終了しました。") + self.terminate() async def _get_livechat_json(self, continuation, session, headers): ''' @@ -209,7 +210,6 @@ class LiveChatAsync: else: logger.error(f"[{self.video_id}]" f"Exceeded retry count. status_code={status_code}") - self.terminate() return None return livechat_json diff --git a/pytchat/core_async/replaychat.py b/pytchat/core_async/replaychat.py index d66a2f6..ab5fd5b 100644 --- a/pytchat/core_async/replaychat.py +++ b/pytchat/core_async/replaychat.py @@ -8,7 +8,7 @@ import traceback import urllib.parse from aiohttp.client_exceptions import ClientConnectorError from concurrent.futures import CancelledError -from queue import Queue +from asyncio import Queue from .buffer import Buffer from ..parser.replay import Parser from .. import config @@ -153,11 +153,13 @@ class ReplayChatAsync: async with aiohttp.ClientSession() as session: while(continuation and self._is_alive): if self._pauser.empty(): - #pause + '''pause''' await self._pauser.get() - #resume - #prohibit from blocking by putting None into _pauser. + '''resume: + prohibit from blocking by putting None into _pauser. + ''' self._pauser.put_nowait(None) + #continuation= arcparam.get(self.video_id) livechat_json = (await self._get_livechat_json(continuation, session, headers) ) @@ -244,14 +246,17 @@ class ReplayChatAsync: "既にcallbackを登録済みのため、get()は実行できません。") def pause(self): + if self._callback is None: + return if not self._pauser.empty(): - self._pauser.get() + self._pauser.get_nowait() def resume(self): + if self._callback is None: + return if self._pauser.empty(): self._pauser.put_nowait(None) - def is_alive(self): return self._is_alive @@ -275,12 +280,10 @@ class ReplayChatAsync: @classmethod def _set_exception_handler(cls, handler): loop = asyncio.get_event_loop() - #default handler: cls._handle_exception loop.set_exception_handler(handler) @classmethod def _handle_exception(cls, loop, context): - #msg = context.get("exception", context["message"]) if not isinstance(context["exception"],CancelledError): logger.error(f"Caught exception: {context}") loop= asyncio.get_event_loop() From 2616e4c4b53a947e8d398b56841cb3b1660d0cf3 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 15:20:34 +0900 Subject: [PATCH 05/19] Adjust amount of first fetching chat --- pytchat/core_async/livechat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index 19aadd3..82b3bdc 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -131,7 +131,7 @@ class LiveChatAsync: """最初のcontinuationパラメータを取得し、 _listenループのタスクを作成し開始する """ - initial_continuation = liveparam.getparam(self.video_id) + initial_continuation = liveparam.getparam(self.video_id,3) await self._listen(initial_continuation) async def _listen(self, continuation): From 907f8aba0b37051d7237bda915f551932a1a09d8 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 15:42:32 +0900 Subject: [PATCH 06/19] Rename function name --- pytchat/core_async/replaychat.py | 4 ++-- pytchat/core_multithread/replaychat.py | 2 +- pytchat/paramgen/arcparam.py | 12 +++++++++--- tests/test_arcparam.py | 6 +++--- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pytchat/core_async/replaychat.py b/pytchat/core_async/replaychat.py index ab5fd5b..45f0c6e 100644 --- a/pytchat/core_async/replaychat.py +++ b/pytchat/core_async/replaychat.py @@ -136,7 +136,7 @@ class ReplayChatAsync: """最初のcontinuationパラメータを取得し、 _listenループのタスクを作成し開始する """ - initial_continuation = arcparam.get(self.video_id,self.seektime) + initial_continuation = arcparam.getparam(self.video_id, self.seektime) await self._listen(initial_continuation) async def _listen(self, continuation): @@ -159,7 +159,7 @@ class ReplayChatAsync: prohibit from blocking by putting None into _pauser. ''' self._pauser.put_nowait(None) - #continuation= arcparam.get(self.video_id) + #when replay, not reacquire continuation param livechat_json = (await self._get_livechat_json(continuation, session, headers) ) diff --git a/pytchat/core_multithread/replaychat.py b/pytchat/core_multithread/replaychat.py index 43b5e2e..10106ca 100644 --- a/pytchat/core_multithread/replaychat.py +++ b/pytchat/core_multithread/replaychat.py @@ -139,7 +139,7 @@ class ReplayChat: def _get_initial_continuation(self): ''' チャットデータ取得に必要な最初のcontinuationを取得する。''' try: - initial_continuation = arcparam.get(self.video_id,self.seektime) + initial_continuation = arcparam.getparam(self.video_id,self.seektime) except ChatParseException as e: self.terminate() logger.debug(f"[{self.video_id}]Error:{str(e)}") diff --git a/pytchat/paramgen/arcparam.py b/pytchat/paramgen/arcparam.py index 6cc7650..45da2c7 100644 --- a/pytchat/paramgen/arcparam.py +++ b/pytchat/paramgen/arcparam.py @@ -65,7 +65,7 @@ def _tzparity(video_id,times): return ((times^t) % 2).to_bytes(1,'big') -def get(video_id, seektime = 0, topchatonly = False): +def _build(video_id, seektime, topchatonly = False): switch_01 = b'\x04' if topchatonly else b'\x01' @@ -116,5 +116,11 @@ def get(video_id, seektime = 0, topchatonly = False): ).decode() ) - - +def getparam(video_id, seektime = 0): + ''' + Parameter + --------- + seektime : int + seconds to load past chat data + ''' + return _build(video_id, seektime) diff --git a/tests/test_arcparam.py b/tests/test_arcparam.py index 926dd59..e53eda3 100644 --- a/tests/test_arcparam.py +++ b/tests/test_arcparam.py @@ -5,16 +5,16 @@ import requests, json from pytchat.paramgen import arcparam def test_arcparam_0(mocker): - param = arcparam.get("01234567890") + param = arcparam.getparam("01234567890") assert "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoATAAOABAAEgEUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAXIECAEQAXgA" == param def test_arcparam_1(mocker): - param = arcparam.get("01234567890", seektime = 100000) + param = arcparam.getparam("01234567890", seektime = 100000) assert "op2w0wR3GjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QogNDbw_QCMAA4AEAASANSHAgAEAAYACAAKg5zdGF0aWNjaGVja3N1bUAAWANgAWgBcgQIARABeAA%3D" == param def test_arcparam_2(mocker): - param = arcparam.get("SsjCnHOk-Sk") + param = arcparam.getparam("SsjCnHOk-Sk") url=f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1" resp = requests.Session().get(url,headers = config.headers) jsn = json.loads(resp.text) From 48b6f2c24e5b4c9672320a06be811148522691ae Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 15:46:45 +0900 Subject: [PATCH 07/19] Add comment --- pytchat/paramgen/arcparam.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pytchat/paramgen/arcparam.py b/pytchat/paramgen/arcparam.py index 45da2c7..99154d7 100644 --- a/pytchat/paramgen/arcparam.py +++ b/pytchat/paramgen/arcparam.py @@ -121,6 +121,7 @@ def getparam(video_id, seektime = 0): Parameter --------- seektime : int - seconds to load past chat data + unit:seconds + start position of fetching chat data. ''' return _build(video_id, seektime) From 7766a39c9c0d8236114adb665247ee5d34ad6f20 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 19:35:58 +0900 Subject: [PATCH 08/19] Integrate replaychat into livechat --- pytchat/core_async/livechat copy.py | 297 ++++++++++++++++++++++++++++ pytchat/core_async/livechat.py | 29 ++- pytchat/parser/live.py | 56 ++++-- tests/test_compatible_processor.py | 6 +- tests/test_parser.py | 4 +- tests/test_speed_calculator.py | 6 +- 6 files changed, 372 insertions(+), 26 deletions(-) create mode 100644 pytchat/core_async/livechat copy.py diff --git a/pytchat/core_async/livechat copy.py b/pytchat/core_async/livechat copy.py new file mode 100644 index 0000000..82b3bdc --- /dev/null +++ b/pytchat/core_async/livechat copy.py @@ -0,0 +1,297 @@ +import aiohttp, asyncio +import datetime +import json +import random +import signal +import time +import traceback +import urllib.parse +from aiohttp.client_exceptions import ClientConnectorError +from concurrent.futures import CancelledError +from asyncio import Queue +from .buffer import Buffer +from ..parser.live import Parser +from .. import config +from ..exceptions import ChatParseException,IllegalFunctionCall +from ..paramgen import liveparam +from ..processors.default.processor import DefaultProcessor +from ..processors.combinator import Combinator + +logger = config.logger(__name__) +headers = config.headers +MAX_RETRY = 10 + + +class LiveChatAsync: + '''asyncio(aiohttp)を利用してYouTubeのライブ配信のチャットデータを取得する。 + + Parameter + --------- + video_id : str + 動画ID + + processor : ChatProcessor + チャットデータを加工するオブジェクト + + buffer : Buffer(maxsize:20[default]) + チャットデータchat_componentを格納するバッファ。 + maxsize : 格納できるchat_componentの個数 + default値20個。1個で約5~10秒分。 + + interruptable : bool + Ctrl+Cによる処理中断を行うかどうか。 + + callback : func + _listen()関数から一定間隔で自動的に呼びだす関数。 + + done_callback : func + listener終了時に呼び出すコールバック。 + + exception_handler : func + 例外を処理する関数 + + direct_mode : bool + Trueの場合、bufferを使わずにcallbackを呼ぶ。 + Trueの場合、callbackの設定が必須 + (設定していない場合IllegalFunctionCall例外を発生させる) + + Attributes + --------- + _is_alive : bool + チャット取得を停止するためのフラグ + ''' + + _setup_finished = False + + def __init__(self, video_id, + seektime = 0, + processor = DefaultProcessor(), + buffer = None, + interruptable = True, + callback = None, + done_callback = None, + exception_handler = None, + direct_mode = False): + self.video_id = video_id + self.seektime = seektime + if isinstance(processor, tuple): + self.processor = Combinator(processor) + else: + self.processor = processor + self._buffer = buffer + self._callback = callback + self._done_callback = done_callback + self._exception_handler = exception_handler + self._direct_mode = direct_mode + self._is_alive = True + self._parser = Parser() + self._pauser = Queue() + self._pauser.put_nowait(None) + self._setup() + + if not LiveChatAsync._setup_finished: + LiveChatAsync._setup_finished = True + if exception_handler == None: + self._set_exception_handler(self._handle_exception) + else: + self._set_exception_handler(exception_handler) + if interruptable: + signal.signal(signal.SIGINT, + (lambda a, b:asyncio.create_task( + LiveChatAsync.shutdown(None,signal.SIGINT,b)) + )) + + def _setup(self): + #direct modeがTrueでcallback未設定の場合例外発生。 + if self._direct_mode: + if self._callback is None: + raise IllegalFunctionCall( + "direct_mode=Trueの場合callbackの設定が必須です。") + else: + #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 + if self._buffer is None: + self._buffer = Buffer(maxsize = 20) + #callbackが指定されている場合はcallbackを呼ぶループタスクを作成 + if self._callback is None: + pass + else: + #callbackを呼ぶループタスクの開始 + loop = asyncio.get_event_loop() + loop.create_task(self._callback_loop(self._callback)) + #_listenループタスクの開始 + loop = asyncio.get_event_loop() + listen_task = loop.create_task(self._startlisten()) + #add_done_callbackの登録 + if self._done_callback is None: + listen_task.add_done_callback(self.finish) + else: + listen_task.add_done_callback(self._done_callback) + + async def _startlisten(self): + """最初のcontinuationパラメータを取得し、 + _listenループのタスクを作成し開始する + """ + initial_continuation = liveparam.getparam(self.video_id,3) + await self._listen(initial_continuation) + + async def _listen(self, continuation): + ''' continuationに紐付いたチャットデータを取得し + Bufferにチャットデータを格納、 + 次のcontinuaitonを取得してループする。 + + Parameter + --------- + continuation : str + 次のチャットデータ取得に必要なパラメータ + ''' + try: + async with aiohttp.ClientSession() as session: + while(continuation and self._is_alive): + if self._pauser.empty(): + '''pause''' + await self._pauser.get() + '''resume: + prohibit from blocking by putting None into _pauser. + ''' + self._pauser.put_nowait(None) + continuation= liveparam.getparam(self.video_id,3) + livechat_json = (await + self._get_livechat_json(continuation, session, headers) + ) + metadata, chatdata = self._parser.parse( livechat_json ) + timeout = metadata['timeoutMs']/1000 + chat_component = { + "video_id" : self.video_id, + "timeout" : timeout, + "chatdata" : chatdata + } + time_mark =time.time() + if self._direct_mode: + await self._callback( + self.processor.process([chat_component]) + ) + else: + await self._buffer.put(chat_component) + diff_time = timeout - (time.time()-time_mark) + await asyncio.sleep(diff_time) + continuation = metadata.get('continuation') + except ChatParseException as e: + self.terminate() + logger.error(f"{str(e)}(video_id:\"{self.video_id}\")") + return + except (TypeError , json.JSONDecodeError) : + self.terminate() + logger.error(f"{traceback.format_exc(limit = -1)}") + return + + logger.debug(f"[{self.video_id}]チャット取得を終了しました。") + self.terminate() + + async def _get_livechat_json(self, continuation, session, headers): + ''' + チャットデータが格納されたjsonデータを取得する。 + ''' + continuation = urllib.parse.quote(continuation) + livechat_json = None + status_code = 0 + url =( + f"https://www.youtube.com/live_chat/get_live_chat?" + f"continuation={continuation}&pbj=1") + for _ in range(MAX_RETRY + 1): + async with session.get(url ,headers = headers) as resp: + try: + text = await resp.text() + status_code = resp.status + livechat_json = json.loads(text) + break + except (ClientConnectorError,json.JSONDecodeError) : + await asyncio.sleep(1) + continue + else: + logger.error(f"[{self.video_id}]" + f"Exceeded retry count. status_code={status_code}") + return None + return livechat_json + + async def _callback_loop(self,callback): + """ コンストラクタでcallbackを指定している場合、バックグラウンドで + callbackに指定された関数に一定間隔でチャットデータを投げる。 + + Parameter + --------- + callback : func + 加工済みのチャットデータを渡す先の関数。 + """ + while self.is_alive(): + items = await self._buffer.get() + data = self.processor.process(items) + await callback(data) + + async def get(self): + """ bufferからデータを取り出し、processorに投げ、 + 加工済みのチャットデータを返す。 + + Returns + : Processorによって加工されたチャットデータ + """ + if self._callback is None: + items = await self._buffer.get() + return self.processor.process(items) + raise IllegalFunctionCall( + "既にcallbackを登録済みのため、get()は実行できません。") + + def pause(self): + if self._callback is None: + return + if not self._pauser.empty(): + self._pauser.get_nowait() + + def resume(self): + if self._callback is None: + return + if self._pauser.empty(): + self._pauser.put_nowait(None) + + def is_alive(self): + return self._is_alive + + def finish(self,sender): + '''Listener終了時のコールバック''' + try: + self.terminate() + except CancelledError: + logger.debug(f'[{self.video_id}]cancelled:{sender}') + + def terminate(self): + ''' + Listenerを終了する。 + ''' + self._is_alive = False + if self._direct_mode == False: + #bufferにダミーオブジェクトを入れてis_alive()を判定させる + self._buffer.put_nowait({'chatdata':'','timeout':1}) + logger.info(f'[{self.video_id}]終了しました') + + @classmethod + def _set_exception_handler(cls, handler): + loop = asyncio.get_event_loop() + loop.set_exception_handler(handler) + + @classmethod + def _handle_exception(cls, loop, context): + if not isinstance(context["exception"],CancelledError): + logger.error(f"Caught exception: {context}") + loop= asyncio.get_event_loop() + loop.create_task(cls.shutdown(None,None,None)) + + @classmethod + async def shutdown(cls, event, sig = None, handler=None): + logger.debug("シャットダウンしています") + tasks = [t for t in asyncio.all_tasks() if t is not + asyncio.current_task()] + [task.cancel() for task in tasks] + + logger.debug(f"残っているタスクを終了しています") + await asyncio.gather(*tasks,return_exceptions=True) + loop = asyncio.get_event_loop() + loop.stop() \ No newline at end of file diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index 82b3bdc..1eb7269 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -13,7 +13,7 @@ from .buffer import Buffer from ..parser.live import Parser from .. import config from ..exceptions import ChatParseException,IllegalFunctionCall -from ..paramgen import liveparam +from ..paramgen import liveparam, arcparam from ..processors.default.processor import DefaultProcessor from ..processors.combinator import Combinator @@ -88,7 +88,9 @@ class LiveChatAsync: self._pauser = Queue() self._pauser.put_nowait(None) self._setup() - + #self._paramgen = liveparam + self._first_fetch = True + self._fetch_url = "live_chat/get_live_chat?continuation=" if not LiveChatAsync._setup_finished: LiveChatAsync._setup_finished = True if exception_handler == None: @@ -154,11 +156,26 @@ class LiveChatAsync: prohibit from blocking by putting None into _pauser. ''' self._pauser.put_nowait(None) - continuation= liveparam.getparam(self.video_id,3) + if self._parser.mode == 'LIVE': + continuation = liveparam.getparam(self.video_id,3) livechat_json = (await self._get_livechat_json(continuation, session, headers) ) - metadata, chatdata = self._parser.parse( livechat_json ) + contents = self._parser.get_contents(livechat_json) + '''switch live or replay''' + if self._first_fetch: + if contents is None: + self._parser.mode = 'REPLAY' + self._fetch_url = ("live_chat_replay/" + "get_live_chat_replay?continuation=") + continuation = arcparam.getparam(self.video_id) + livechat_json = (await self._get_livechat_json( + continuation, session, headers)) + contents = self._parser.get_contents(livechat_json) + self._first_fetch = False + + metadata, chatdata = self._parser.parse( contents ) + timeout = metadata['timeoutMs']/1000 chat_component = { "video_id" : self.video_id, @@ -191,12 +208,12 @@ class LiveChatAsync: ''' チャットデータが格納されたjsonデータを取得する。 ''' + continuation = urllib.parse.quote(continuation) livechat_json = None status_code = 0 url =( - f"https://www.youtube.com/live_chat/get_live_chat?" - f"continuation={continuation}&pbj=1") + f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1") for _ in range(MAX_RETRY + 1): async with session.get(url ,headers = headers) as resp: try: diff --git a/pytchat/parser/live.py b/pytchat/parser/live.py index e32177f..0207ac8 100644 --- a/pytchat/parser/live.py +++ b/pytchat/parser/live.py @@ -14,9 +14,24 @@ from .. exceptions import ( logger = config.logger(__name__) - +from .. import util class Parser: - def parse(self, jsn): + URL_LIVE = "live_chat/get_live_chat?continuation=" + URL_REPLAY = "live_chat_replay/get_live_chat_replay?continuation=" + + def __init__(self): + self.mode = 'LIVE' + + def get_contents(self, jsn): + if jsn is None: + return {'timeoutMs':0,'continuation':None},[] + if jsn['response']['responseContext'].get('errors'): + raise ResponseContextError('The video_id would be wrong, or video is deleted or private.') + contents=jsn['response'].get('continuationContents') + return contents + + def parse(self, contents): + util.save(json.dumps(contents,ensure_ascii=False,indent=2),"v:\\~\\test_",".json") """ このparse関数はLiveChat._listen() 関数から定期的に呼び出される。 引数jsnはYoutubeから取得したチャットデータの生JSONであり、 @@ -27,7 +42,7 @@ class Parser: Parameter ---------- - + jsn : dict + + contents : dict + Youtubeから取得したチャットデータのJSONオブジェクト。 (pythonの辞書形式に変換済みの状態で渡される) @@ -38,19 +53,19 @@ class Parser: + chatdata : list[dict] + チャットデータ本体のリスト。 """ - if jsn is None: - return {'timeoutMs':0,'continuation':None},[] - if jsn['response']['responseContext'].get('errors'): - raise ResponseContextError('動画に接続できません。' - '動画IDが間違っているか、動画が削除/非公開の可能性があります。') - contents=jsn['response'].get('continuationContents') - #配信が終了した場合、もしくはチャットデータが取得できない場合 + # if jsn is None: + # return {'timeoutMs':0,'continuation':None},[] + # if jsn['response']['responseContext'].get('errors'): + # raise ResponseContextError('動画に接続できません。' + # '動画IDが間違っているか、動画が削除/非公開の可能性があります。') + # contents=jsn['response'].get('continuationContents') + '''配信が終了した場合、もしくはチャットデータが取得できない場合''' if contents is None: raise NoContentsException('チャットデータを取得できませんでした。') cont = contents['liveChatContinuation']['continuations'][0] if cont is None: - raise NoContinuationsException('Continuationがありません。') + raise NoContinuationsException('No Continuation') metadata = (cont.get('invalidationContinuationData') or cont.get('timedContinuationData') or cont.get('reloadContinuationData') @@ -60,6 +75,23 @@ class Parser: if unknown: logger.debug(f"Received unknown continuation type:{unknown}") metadata = cont.get(unknown) - metadata.setdefault('timeoutMs', 10000) + return self._create_data(metadata, contents) + + def _create_data(self, metadata, contents): chatdata = contents['liveChatContinuation'].get('actions') + if self.mode == 'LIVE': + metadata.setdefault('timeoutMs', 10000) + else: + interval = self._get_interval(chatdata) + metadata.setdefault("timeoutMs",interval) + """アーカイブ済みチャットはライブチャットと構造が異なっているため、以下の行により + ライブチャットと同じ形式にそろえる""" + chatdata = [action["replayChatItemAction"]["actions"][0] for action in chatdata] return metadata, chatdata + + def _get_interval(self, actions: list): + if actions is None: + return 0 + start = int(actions[0]["replayChatItemAction"]["videoOffsetTimeMsec"]) + last = int(actions[-1]["replayChatItemAction"]["videoOffsetTimeMsec"]) + return (last - start) \ No newline at end of file diff --git a/tests/test_compatible_processor.py b/tests/test_compatible_processor.py index fc3051a..efc81b9 100644 --- a/tests/test_compatible_processor.py +++ b/tests/test_compatible_processor.py @@ -20,7 +20,7 @@ def test_textmessage(mocker): _json = _open_file("tests/testdata/compatible/textmessage.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 7, @@ -57,7 +57,7 @@ def test_newsponcer(mocker): _json = _open_file("tests/testdata/compatible/newSponsor.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 7, @@ -93,7 +93,7 @@ def test_superchat(mocker): _json = _open_file("tests/testdata/compatible/superchat.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 7, diff --git a/tests/test_parser.py b/tests/test_parser.py index b338908..b40bee8 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -21,7 +21,7 @@ def test_finishedlive(*mock): _text = json.loads(_text) try: - parser.parse(_text) + parser.parse(parser.get_contents(_text)) assert False except NoContentsException: assert True @@ -34,7 +34,7 @@ def test_parsejson(*mock): _text = json.loads(_text) try: - parser.parse(_text) + parser.parse(parser.get_contents(_text)) jsn = _text timeout = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["timeoutMs"] continuation = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["continuation"] diff --git a/tests/test_speed_calculator.py b/tests/test_speed_calculator.py index 8a096c8..f137c52 100644 --- a/tests/test_speed_calculator.py +++ b/tests/test_speed_calculator.py @@ -21,7 +21,7 @@ def test_speed_1(mocker): _json = _open_file("tests/testdata/speed/speedtest_normal.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 10, @@ -37,7 +37,7 @@ def test_speed_2(mocker): _json = _open_file("tests/testdata/speed/speedtest_undefined.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 10, @@ -53,7 +53,7 @@ def test_speed_3(mocker): _json = _open_file("tests/testdata/speed/speedtest_empty.json") - _, chatdata = parser.parse(json.loads(_json)) + _, chatdata = parser.parse(parser.get_contents(json.loads(_json))) data = { "video_id" : "", "timeout" : 10, From 347707a5144e990f540d69fd11f1138074dbb42d Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 20:08:45 +0900 Subject: [PATCH 09/19] Delete unnecessary lines --- pytchat/core_async/livechat.py | 1 - pytchat/parser/live.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index 1eb7269..09e3c5f 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -218,7 +218,6 @@ class LiveChatAsync: async with session.get(url ,headers = headers) as resp: try: text = await resp.text() - status_code = resp.status livechat_json = json.loads(text) break except (ClientConnectorError,json.JSONDecodeError) : diff --git a/pytchat/parser/live.py b/pytchat/parser/live.py index 0207ac8..f362be9 100644 --- a/pytchat/parser/live.py +++ b/pytchat/parser/live.py @@ -16,8 +16,6 @@ logger = config.logger(__name__) from .. import util class Parser: - URL_LIVE = "live_chat/get_live_chat?continuation=" - URL_REPLAY = "live_chat_replay/get_live_chat_replay?continuation=" def __init__(self): self.mode = 'LIVE' From f4dc5e9d4a5f56d0b94bf5994dd651f2ae36d099 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 20:31:51 +0900 Subject: [PATCH 10/19] Delete unnecessary lines --- pytchat/core_async/livechat.py | 37 ++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index 09e3c5f..16e3367 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -161,19 +161,7 @@ class LiveChatAsync: livechat_json = (await self._get_livechat_json(continuation, session, headers) ) - contents = self._parser.get_contents(livechat_json) - '''switch live or replay''' - if self._first_fetch: - if contents is None: - self._parser.mode = 'REPLAY' - self._fetch_url = ("live_chat_replay/" - "get_live_chat_replay?continuation=") - continuation = arcparam.getparam(self.video_id) - livechat_json = (await self._get_livechat_json( - continuation, session, headers)) - contents = self._parser.get_contents(livechat_json) - self._first_fetch = False - + contents = await self._get_contents(livechat_json, session, headers) metadata, chatdata = self._parser.parse( contents ) timeout = metadata['timeoutMs']/1000 @@ -204,6 +192,29 @@ class LiveChatAsync: logger.debug(f"[{self.video_id}]チャット取得を終了しました。") self.terminate() + async def _get_contents(self, livechat_json, session, headers): + '''Get 'contents' dict from livechat json. + If contents is None at first fetching, + try to fetch archive chat data. + + Return: + ------- + 'contents' dict which includes metadata & chatdata. + ''' + contents = self._parser.get_contents(livechat_json) + if self._first_fetch: + if contents is None: + '''Try to fetch archive chat data.''' + self._parser.mode = 'REPLAY' + self._fetch_url = ("live_chat_replay/" + "get_live_chat_replay?continuation=") + continuation = arcparam.getparam(self.video_id) + livechat_json = (await self._get_livechat_json( + continuation, session, headers)) + contents = self._parser.get_contents(livechat_json) + self._first_fetch = False + return contents + async def _get_livechat_json(self, continuation, session, headers): ''' チャットデータが格納されたjsonデータを取得する。 From fc5979c025a79511027cccb784546b780828e13c Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 20:51:59 +0900 Subject: [PATCH 11/19] Moved livechat_json part --- pytchat/core_async/livechat.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index 16e3367..e178b46 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -158,11 +158,9 @@ class LiveChatAsync: self._pauser.put_nowait(None) if self._parser.mode == 'LIVE': continuation = liveparam.getparam(self.video_id,3) - livechat_json = (await - self._get_livechat_json(continuation, session, headers) - ) - contents = await self._get_contents(livechat_json, session, headers) - metadata, chatdata = self._parser.parse( contents ) + contents = await self._get_contents( + continuation, session, headers) + metadata, chatdata = self._parser.parse(contents) timeout = metadata['timeoutMs']/1000 chat_component = { @@ -192,7 +190,7 @@ class LiveChatAsync: logger.debug(f"[{self.video_id}]チャット取得を終了しました。") self.terminate() - async def _get_contents(self, livechat_json, session, headers): + async def _get_contents(self, continuation, session, headers): '''Get 'contents' dict from livechat json. If contents is None at first fetching, try to fetch archive chat data. @@ -201,6 +199,10 @@ class LiveChatAsync: ------- 'contents' dict which includes metadata & chatdata. ''' + + livechat_json = (await + self._get_livechat_json(continuation, session, headers) + ) contents = self._parser.get_contents(livechat_json) if self._first_fetch: if contents is None: From 7b7323abf846732967880d680e6c793758900611 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 20:52:36 +0900 Subject: [PATCH 12/19] Delete debug line --- pytchat/parser/live.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pytchat/parser/live.py b/pytchat/parser/live.py index f362be9..c445780 100644 --- a/pytchat/parser/live.py +++ b/pytchat/parser/live.py @@ -29,7 +29,6 @@ class Parser: return contents def parse(self, contents): - util.save(json.dumps(contents,ensure_ascii=False,indent=2),"v:\\~\\test_",".json") """ このparse関数はLiveChat._listen() 関数から定期的に呼び出される。 引数jsnはYoutubeから取得したチャットデータの生JSONであり、 From 18400724b1d8b790953902d5a50a53a3eb0d44ed Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 21:08:53 +0900 Subject: [PATCH 13/19] Modify metadata selection --- pytchat/core_async/livechat.py | 1 - pytchat/parser/live.py | 28 ++++++++++++---------------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index e178b46..cc1ab46 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -199,7 +199,6 @@ class LiveChatAsync: ------- 'contents' dict which includes metadata & chatdata. ''' - livechat_json = (await self._get_livechat_json(continuation, session, headers) ) diff --git a/pytchat/parser/live.py b/pytchat/parser/live.py index c445780..031c031 100644 --- a/pytchat/parser/live.py +++ b/pytchat/parser/live.py @@ -31,11 +31,8 @@ class Parser: def parse(self, contents): """ このparse関数はLiveChat._listen() 関数から定期的に呼び出される。 - 引数jsnはYoutubeから取得したチャットデータの生JSONであり、 - このparse関数によって与えられたJSONを以下に分割して返す。 - + timeout (次のチャットデータ取得までのインターバル) - + chat data(チャットデータ本体) - + continuation (次のチャットデータ取得に必要となるパラメータ). + 引数contentsはYoutubeから取得したチャットデータの生JSONであり、 + 与えられたJSONをチャットデータとメタデータに分割して返す。 Parameter ---------- @@ -45,19 +42,17 @@ class Parser: Returns ------- - + metadata : dict - + チャットデータに付随するメタデータ。timeout、 動画ID、continuationパラメータで構成される。 + tuple: + + metadata : dict  チャットデータに付随するメタデータ + + timeout + + video_id + + continuation + chatdata : list[dict] - + チャットデータ本体のリスト。 +     チャットデータ本体のリスト。 """ - # if jsn is None: - # return {'timeoutMs':0,'continuation':None},[] - # if jsn['response']['responseContext'].get('errors'): - # raise ResponseContextError('動画に接続できません。' - # '動画IDが間違っているか、動画が削除/非公開の可能性があります。') - # contents=jsn['response'].get('continuationContents') - '''配信が終了した場合、もしくはチャットデータが取得できない場合''' + if contents is None: + '''配信が終了した場合、もしくはチャットデータが取得できない場合''' raise NoContentsException('チャットデータを取得できませんでした。') cont = contents['liveChatContinuation']['continuations'][0] @@ -65,7 +60,8 @@ class Parser: raise NoContinuationsException('No Continuation') metadata = (cont.get('invalidationContinuationData') or cont.get('timedContinuationData') or - cont.get('reloadContinuationData') + cont.get('reloadContinuationData') or + cont.get('liveChatReplayContinuationData') ) if metadata is None: unknown = list(cont.keys())[0] From 0fc9d14780ebe567de880d8ba5b4ea50d0714455 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 21:14:22 +0900 Subject: [PATCH 14/19] Fix handling exception --- pytchat/parser/live.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pytchat/parser/live.py b/pytchat/parser/live.py index 031c031..f0fd176 100644 --- a/pytchat/parser/live.py +++ b/pytchat/parser/live.py @@ -9,7 +9,8 @@ from .. import config from .. exceptions import ( ResponseContextError, NoContentsException, - NoContinuationsException ) + NoContinuationsException, + ChatParseException ) logger = config.logger(__name__) @@ -22,7 +23,7 @@ class Parser: def get_contents(self, jsn): if jsn is None: - return {'timeoutMs':0,'continuation':None},[] + raise ChatParseException('Called with none JSON object.') if jsn['response']['responseContext'].get('errors'): raise ResponseContextError('The video_id would be wrong, or video is deleted or private.') contents=jsn['response'].get('continuationContents') From 4c558491a39ad2f3e82f21729bf1c5af0663ceab Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 21:17:37 +0900 Subject: [PATCH 15/19] Change exception message --- pytchat/parser/live.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytchat/parser/live.py b/pytchat/parser/live.py index f0fd176..1cc1125 100644 --- a/pytchat/parser/live.py +++ b/pytchat/parser/live.py @@ -54,7 +54,7 @@ class Parser: if contents is None: '''配信が終了した場合、もしくはチャットデータが取得できない場合''' - raise NoContentsException('チャットデータを取得できませんでした。') + raise NoContentsException('Chat data stream is empty.') cont = contents['liveChatContinuation']['continuations'][0] if cont is None: From 2fdd834caf6fbb6e1885cf043c9cfdbdeae88ed1 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 22:08:26 +0900 Subject: [PATCH 16/19] Extract method _check_pause() --- pytchat/core_async/livechat.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index cc1ab46..7b69b4d 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -149,15 +149,7 @@ class LiveChatAsync: try: async with aiohttp.ClientSession() as session: while(continuation and self._is_alive): - if self._pauser.empty(): - '''pause''' - await self._pauser.get() - '''resume: - prohibit from blocking by putting None into _pauser. - ''' - self._pauser.put_nowait(None) - if self._parser.mode == 'LIVE': - continuation = liveparam.getparam(self.video_id,3) + continuation = await self._check_pause(continuation) contents = await self._get_contents( continuation, session, headers) metadata, chatdata = self._parser.parse(contents) @@ -190,6 +182,18 @@ class LiveChatAsync: logger.debug(f"[{self.video_id}]チャット取得を終了しました。") self.terminate() + async def _check_pause(self, continuation): + if self._pauser.empty(): + '''pause''' + await self._pauser.get() + '''resume: + prohibit from blocking by putting None into _pauser. + ''' + self._pauser.put_nowait(None) + if self._parser.mode == 'LIVE': + continuation = liveparam.getparam(self.video_id,3) + return continuation + async def _get_contents(self, continuation, session, headers): '''Get 'contents' dict from livechat json. If contents is None at first fetching, From d742a9fdf35fc409b33c999e10e02534841da014 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 22:27:21 +0900 Subject: [PATCH 17/19] Fix seek time param --- pytchat/core_async/livechat.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index 7b69b4d..72e7d10 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -88,7 +88,6 @@ class LiveChatAsync: self._pauser = Queue() self._pauser.put_nowait(None) self._setup() - #self._paramgen = liveparam self._first_fetch = True self._fetch_url = "live_chat/get_live_chat?continuation=" if not LiveChatAsync._setup_finished: @@ -213,7 +212,7 @@ class LiveChatAsync: self._parser.mode = 'REPLAY' self._fetch_url = ("live_chat_replay/" "get_live_chat_replay?continuation=") - continuation = arcparam.getparam(self.video_id) + continuation = arcparam.getparam(self.video_id, self.seektime) livechat_json = (await self._get_livechat_json( continuation, session, headers)) contents = self._parser.get_contents(livechat_json) @@ -222,9 +221,8 @@ class LiveChatAsync: async def _get_livechat_json(self, continuation, session, headers): ''' - チャットデータが格納されたjsonデータを取得する。 + Get json which includes chat data. ''' - continuation = urllib.parse.quote(continuation) livechat_json = None status_code = 0 From 30708470f26b4c2ba69d9e7eb59acc1449751ca7 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 22:43:23 +0900 Subject: [PATCH 18/19] Change comments --- pytchat/core_async/livechat.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index 72e7d10..e0d1f8b 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -107,7 +107,7 @@ class LiveChatAsync: if self._direct_mode: if self._callback is None: raise IllegalFunctionCall( - "direct_mode=Trueの場合callbackの設定が必須です。") + "When direct_mode=True, callback parameter is required.") else: #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 if self._buffer is None: @@ -129,21 +129,20 @@ class LiveChatAsync: listen_task.add_done_callback(self._done_callback) async def _startlisten(self): - """最初のcontinuationパラメータを取得し、 - _listenループのタスクを作成し開始する + """Fetch first continuation parameter, + create and start _listen loop. """ initial_continuation = liveparam.getparam(self.video_id,3) await self._listen(initial_continuation) async def _listen(self, continuation): - ''' continuationに紐付いたチャットデータを取得し - Bufferにチャットデータを格納、 - 次のcontinuaitonを取得してループする。 + ''' Fetch chat data and store them into buffer, + get next continuaiton parameter and loop. Parameter --------- continuation : str - 次のチャットデータ取得に必要なパラメータ + parameter for next chat data ''' try: async with aiohttp.ClientSession() as session: @@ -178,7 +177,7 @@ class LiveChatAsync: logger.error(f"{traceback.format_exc(limit = -1)}") return - logger.debug(f"[{self.video_id}]チャット取得を終了しました。") + logger.debug(f"[{self.video_id}]finished fetching chat.") self.terminate() async def _check_pause(self, continuation): @@ -300,7 +299,7 @@ class LiveChatAsync: if self._direct_mode == False: #bufferにダミーオブジェクトを入れてis_alive()を判定させる self._buffer.put_nowait({'chatdata':'','timeout':1}) - logger.info(f'[{self.video_id}]終了しました') + logger.info(f'[{self.video_id}]finished.') @classmethod def _set_exception_handler(cls, handler): @@ -316,12 +315,12 @@ class LiveChatAsync: @classmethod async def shutdown(cls, event, sig = None, handler=None): - logger.debug("シャットダウンしています") + logger.debug("shutdown...") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] [task.cancel() for task in tasks] - logger.debug(f"残っているタスクを終了しています") + logger.debug(f"complete remaining tasks...") await asyncio.gather(*tasks,return_exceptions=True) loop = asyncio.get_event_loop() loop.stop() \ No newline at end of file From 2c684d04b50fe1d5a5d48bf649a160fb64ce99db Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Fri, 3 Jan 2020 02:09:39 +0900 Subject: [PATCH 19/19] Intgegrate replaychat into livechat (multithread) --- pytchat/core_async/livechat.py | 10 ++- pytchat/core_multithread/livechat.py | 130 +++++++++++++++++---------- pytchat/parser/live.py | 4 + 3 files changed, 95 insertions(+), 49 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index e0d1f8b..a78dc87 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -169,16 +169,15 @@ class LiveChatAsync: await asyncio.sleep(diff_time) continuation = metadata.get('continuation') except ChatParseException as e: - self.terminate() - logger.error(f"{str(e)}(video_id:\"{self.video_id}\")") + #self.terminate() + logger.debug(f"[{self.video_id}]{str(e)}") return except (TypeError , json.JSONDecodeError) : - self.terminate() + #self.terminate() logger.error(f"{traceback.format_exc(limit = -1)}") return logger.debug(f"[{self.video_id}]finished fetching chat.") - self.terminate() async def _check_pause(self, continuation): if self._pauser.empty(): @@ -269,6 +268,9 @@ class LiveChatAsync: raise IllegalFunctionCall( "既にcallbackを登録済みのため、get()は実行できません。") + def get_mode(self): + return self._parser.mode + def pause(self): if self._callback is None: return diff --git a/pytchat/core_multithread/livechat.py b/pytchat/core_multithread/livechat.py index 30b9249..a07a439 100644 --- a/pytchat/core_multithread/livechat.py +++ b/pytchat/core_multithread/livechat.py @@ -7,11 +7,12 @@ import time import traceback import urllib.parse from concurrent.futures import CancelledError, ThreadPoolExecutor +from queue import Queue from .buffer import Buffer from ..parser.live import Parser from .. import config from ..exceptions import ChatParseException,IllegalFunctionCall -from ..paramgen import liveparam +from ..paramgen import liveparam, arcparam from ..processors.default.processor import DefaultProcessor from ..processors.combinator import Combinator @@ -63,6 +64,7 @@ class LiveChat: #チャット監視中のListenerのリスト _listeners= [] def __init__(self, video_id, + seektime = 0, processor = DefaultProcessor(), buffer = None, interruptable = True, @@ -71,6 +73,7 @@ class LiveChat: direct_mode = False ): self.video_id = video_id + self.seektime = seektime if isinstance(processor, tuple): self.processor = Combinator(processor) else: @@ -82,7 +85,11 @@ class LiveChat: self._direct_mode = direct_mode self._is_alive = True self._parser = Parser() + self._pauser = Queue() + self._pauser.put_nowait(None) self._setup() + self._first_fetch = True + self._fetch_url = "live_chat/get_live_chat?continuation=" if not LiveChat._setup_finished: LiveChat._setup_finished = True @@ -93,11 +100,12 @@ class LiveChat: LiveChat._listeners.append(self) def _setup(self): + #logger.debug("setup") #direct modeがTrueでcallback未設定の場合例外発生。 if self._direct_mode: if self._callback is None: raise IllegalFunctionCall( - "direct_mode=Trueの場合callbackの設定が必須です。") + "When direct_mode=True, callback parameter is required.") else: #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 if self._buffer is None: @@ -117,48 +125,30 @@ class LiveChat: listen_task.add_done_callback(self._done_callback) def _startlisten(self): - """最初のcontinuationパラメータを取得し、 - _listenループのタスクを作成し開始する + time.sleep(0.1) #sleep shortly to prohibit skipping fetching data + """Fetch first continuation parameter, + create and start _listen loop. """ - initial_continuation = self._get_initial_continuation() - if initial_continuation is None: - self.terminate() - logger.debug(f"[{self.video_id}]No initial continuation.") - return + initial_continuation = liveparam.getparam(self.video_id,3) self._listen(initial_continuation) - def _get_initial_continuation(self): - ''' チャットデータ取得に必要な最初のcontinuationを取得する。''' - try: - initial_continuation = liveparam.getparam(self.video_id) - except ChatParseException as e: - self.terminate() - logger.debug(f"[{self.video_id}]Error:{str(e)}") - return - except KeyError: - logger.debug(f"[{self.video_id}]KeyError:" - f"{traceback.format_exc(limit = -1)}") - self.terminate() - return - return initial_continuation - def _listen(self, continuation): - ''' continuationに紐付いたチャットデータを取得し - Bufferにチャットデータを格納、 - 次のcontinuaitonを取得してループする。 + ''' Fetch chat data and store them into buffer, + get next continuaiton parameter and loop. Parameter --------- continuation : str - 次のチャットデータ取得に必要なパラメータ + parameter for next chat data ''' try: with requests.Session() as session: while(continuation and self._is_alive): - livechat_json = ( - self._get_livechat_json(continuation, session, headers) - ) - metadata, chatdata = self._parser.parse( livechat_json ) + continuation = self._check_pause(continuation) + contents = self._get_contents( + continuation, session, headers) + metadata, chatdata = self._parser.parse(contents) + timeout = metadata['timeoutMs']/1000 chat_component = { "video_id" : self.video_id, @@ -173,35 +163,70 @@ class LiveChat: else: self._buffer.put(chat_component) diff_time = timeout - (time.time()-time_mark) - if diff_time < 0 : diff_time=0 - time.sleep(diff_time) + time.sleep(diff_time if diff_time > 0 else 0) continuation = metadata.get('continuation') except ChatParseException as e: - self.terminate() - logger.error(f"{str(e)}(video_id:\"{self.video_id}\")") + #self.terminate() + logger.debug(f"[{self.video_id}]{str(e)}") return except (TypeError , json.JSONDecodeError) : - self.terminate() + #self.terminate() logger.error(f"{traceback.format_exc(limit = -1)}") return - logger.debug(f"[{self.video_id}]チャット取得を終了しました。") + logger.debug(f"[{self.video_id}]finished fetching chat.") + + def _check_pause(self, continuation): + if self._pauser.empty(): + '''pause''' + self._pauser.get() + '''resume: + prohibit from blocking by putting None into _pauser. + ''' + self._pauser.put_nowait(None) + if self._parser.mode == 'LIVE': + continuation = liveparam.getparam(self.video_id,3) + return continuation + + def _get_contents(self, continuation, session, headers): + '''Get 'contents' dict from livechat json. + If contents is None at first fetching, + try to fetch archive chat data. + + Return: + ------- + 'contents' dict which includes metadata & chatdata. + ''' + livechat_json = ( + self._get_livechat_json(continuation, session, headers) + ) + contents = self._parser.get_contents(livechat_json) + if self._first_fetch: + if contents is None: + '''Try to fetch archive chat data.''' + self._parser.mode = 'REPLAY' + self._fetch_url = ("live_chat_replay/" + "get_live_chat_replay?continuation=") + continuation = arcparam.getparam(self.video_id, self.seektime) + livechat_json = ( self._get_livechat_json( + continuation, session, headers)) + contents = self._parser.get_contents(livechat_json) + self._first_fetch = False + return contents def _get_livechat_json(self, continuation, session, headers): ''' - チャットデータが格納されたjsonデータを取得する。 + Get json which includes chat data. ''' continuation = urllib.parse.quote(continuation) livechat_json = None status_code = 0 url =( - f"https://www.youtube.com/live_chat/get_live_chat?" - f"continuation={continuation}&pbj=1") + f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1") for _ in range(MAX_RETRY + 1): with session.get(url ,headers = headers) as resp: try: text = resp.text - status_code = resp.status_code livechat_json = json.loads(text) break except json.JSONDecodeError : @@ -210,7 +235,7 @@ class LiveChat: else: logger.error(f"[{self.video_id}]" f"Exceeded retry count. status_code={status_code}") - self.terminate() + #self.terminate() return None return livechat_json @@ -241,6 +266,21 @@ class LiveChat: raise IllegalFunctionCall( "既にcallbackを登録済みのため、get()は実行できません。") + def get_mode(self): + return self._parser.mode + + def pause(self): + if self._callback is None: + return + if not self._pauser.empty(): + self._pauser.get() + + def resume(self): + if self._callback is None: + return + if self._pauser.empty(): + self._pauser.put_nowait(None) + def is_alive(self): return self._is_alive @@ -259,10 +299,10 @@ class LiveChat: if self._direct_mode == False: #bufferにダミーオブジェクトを入れてis_alive()を判定させる self._buffer.put({'chatdata':'','timeout':1}) - logger.info(f'[{self.video_id}]終了しました') + logger.info(f'[{self.video_id}]finished.') @classmethod def shutdown(cls, event, sig = None, handler=None): - logger.debug("シャットダウンしています") + logger.debug("shutdown...") for t in LiveChat._listeners: t._is_alive = False \ No newline at end of file diff --git a/pytchat/parser/live.py b/pytchat/parser/live.py index 1cc1125..b58ab3d 100644 --- a/pytchat/parser/live.py +++ b/pytchat/parser/live.py @@ -65,10 +65,14 @@ class Parser: cont.get('liveChatReplayContinuationData') ) if metadata is None: + if cont.get("playerSeekContinuationData"): + raise ChatParseException('Finished chat data') unknown = list(cont.keys())[0] if unknown: logger.debug(f"Received unknown continuation type:{unknown}") metadata = cont.get(unknown) + else: + raise ChatParseException('Cannot extract continuation data') return self._create_data(metadata, contents) def _create_data(self, metadata, contents):