From 7308a87a611bf77fab113f7d2bc9b79a42a23199 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Thu, 2 Jan 2020 13:15:41 +0900 Subject: [PATCH] Implement pause/pauser to livechat --- pytchat/core_async/livechat.py | 43 ++++++++++++++++---------------- pytchat/core_async/replaychat.py | 6 +++-- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/pytchat/core_async/livechat.py b/pytchat/core_async/livechat.py index c671d1a..09f1c69 100644 --- a/pytchat/core_async/livechat.py +++ b/pytchat/core_async/livechat.py @@ -8,6 +8,7 @@ import traceback import urllib.parse from aiohttp.client_exceptions import ClientConnectorError from concurrent.futures import CancelledError +from queue import Queue from .buffer import Buffer from ..parser.live import Parser from .. import config @@ -63,6 +64,7 @@ class LiveChatAsync: _setup_finished = False def __init__(self, video_id, + seektime = 0, processor = DefaultProcessor(), buffer = None, interruptable = True, @@ -71,6 +73,7 @@ class LiveChatAsync: exception_handler = None, direct_mode = False): self.video_id = video_id + self.seektime = seektime if isinstance(processor, tuple): self.processor = Combinator(processor) else: @@ -82,6 +85,8 @@ class LiveChatAsync: self._direct_mode = direct_mode self._is_alive = True self._parser = Parser() + self._pauser = Queue() + self._pauser.put_nowait(None) self._setup() if not LiveChatAsync._setup_finished: @@ -126,28 +131,9 @@ class LiveChatAsync: """最初のcontinuationパラメータを取得し、 _listenループのタスクを作成し開始する """ - initial_continuation = await self._get_initial_continuation() - if initial_continuation is None: - self.terminate() - logger.debug(f"[{self.video_id}]No initial continuation.") - return + initial_continuation = liveparam.getparam(self.video_id) await self._listen(initial_continuation) - async def _get_initial_continuation(self): - ''' チャットデータ取得に必要な最初のcontinuationを取得する。''' - try: - initial_continuation = liveparam.getparam(self.video_id) - except ChatParseException as e: - self.terminate() - logger.debug(f"[{self.video_id}]Error:{str(e)}") - return - except KeyError: - logger.debug(f"[{self.video_id}]KeyError:" - f"{traceback.format_exc(limit = -1)}") - self.terminate() - return - return initial_continuation - async def _listen(self, continuation): ''' continuationに紐付いたチャットデータを取得し Bufferにチャットデータを格納、 @@ -161,6 +147,12 @@ class LiveChatAsync: try: async with aiohttp.ClientSession() as session: while(continuation and self._is_alive): + if self._pauser.empty(): + #pause + await self._pauser.get() + #resume + #prohibit from blocking by putting None into _pauser. + self._pauser.put_nowait(None) livechat_json = (await self._get_livechat_json(continuation, session, headers) ) @@ -246,6 +238,15 @@ class LiveChatAsync: raise IllegalFunctionCall( "既にcallbackを登録済みのため、get()は実行できません。") + def pause(self): + if not self._pauser.empty(): + self._pauser.get() + + def resume(self): + if self._pauser.empty(): + self._pauser.put_nowait(None) + + def is_alive(self): return self._is_alive @@ -264,7 +265,7 @@ class LiveChatAsync: if self._direct_mode == False: #bufferにダミーオブジェクトを入れてis_alive()を判定させる self._buffer.put_nowait({'chatdata':'','timeout':1}) - logger.info(f'終了しました:[{self.video_id}]') + logger.info(f'[{self.video_id}]終了しました') @classmethod def _set_exception_handler(cls, handler): diff --git a/pytchat/core_async/replaychat.py b/pytchat/core_async/replaychat.py index 9479458..d66a2f6 100644 --- a/pytchat/core_async/replaychat.py +++ b/pytchat/core_async/replaychat.py @@ -18,8 +18,9 @@ from ..processors.default.processor import DefaultProcessor from ..processors.combinator import Combinator logger = config.logger(__name__) -MAX_RETRY = 10 headers = config.headers +MAX_RETRY = 10 + @@ -178,11 +179,12 @@ class ReplayChatAsync: await asyncio.sleep(diff_time) continuation = metadata.get('continuation') except ChatParseException as e: + self.terminate() logger.error(f"{str(e)}(video_id:\"{self.video_id}\")") return except (TypeError , json.JSONDecodeError) : - logger.error(f"{traceback.format_exc(limit = -1)}") self.terminate() + logger.error(f"{traceback.format_exc(limit = -1)}") return logger.debug(f"[{self.video_id}]チャット取得を終了しました。")