From bd32c758335808d15ab7bea8a4741000134c5a45 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Tue, 5 May 2020 21:16:06 +0900 Subject: [PATCH] Modify termination --- pytchat/core_multithread/livechat.py | 144 ++++++++++++--------------- 1 file changed, 65 insertions(+), 79 deletions(-) diff --git a/pytchat/core_multithread/livechat.py b/pytchat/core_multithread/livechat.py index 3e5d133..8ce84e8 100644 --- a/pytchat/core_multithread/livechat.py +++ b/pytchat/core_multithread/livechat.py @@ -11,8 +11,8 @@ from queue import Queue from .buffer import Buffer from ..parser.live import Parser from .. import config -from ..exceptions import ChatParseException,IllegalFunctionCall -from ..paramgen import liveparam, arcparam +from ..exceptions import ChatParseException, IllegalFunctionCall +from ..paramgen import liveparam, arcparam from ..processors.default.processor import DefaultProcessor from ..processors.combinator import Combinator @@ -27,7 +27,7 @@ class LiveChat: --------- video_id : str 動画ID - + seektime : int (ライブチャット取得時は無視) 取得開始するアーカイブ済みチャットの経過時間(秒) @@ -61,7 +61,7 @@ class LiveChat: topchat_only : bool Trueの場合、上位チャットのみ取得する。 - + Attributes --------- _executor : ThreadPoolExecutor @@ -72,22 +72,20 @@ class LiveChat: ''' _setup_finished = False - #チャット監視中のListenerのリスト - _listeners = [] def __init__(self, video_id, - seektime = 0, - processor = DefaultProcessor(), - buffer = None, - interruptable = True, - callback = None, - done_callback = None, - direct_mode = False, - force_replay = False, - topchat_only = False, - logger = config.logger(__name__) - ): - self.video_id = video_id + seektime=0, + processor=DefaultProcessor(), + buffer=None, + interruptable=True, + callback=None, + done_callback=None, + direct_mode=False, + force_replay=False, + topchat_only=False, + logger=config.logger(__name__) + ): + self.video_id = video_id self.seektime = seektime if isinstance(processor, tuple): self.processor = Combinator(processor) @@ -98,57 +96,51 @@ class LiveChat: self._done_callback = done_callback self._executor = ThreadPoolExecutor(max_workers=2) self._direct_mode = direct_mode - self._is_alive = True + self._is_alive = True self._is_replay = force_replay - self._parser = Parser(is_replay = self._is_replay) + self._parser = Parser(is_replay=self._is_replay) self._pauser = Queue() self._pauser.put_nowait(None) - self._setup() self._first_fetch = True self._fetch_url = "live_chat/get_live_chat?continuation=" self._topchat_only = topchat_only self._logger = logger - LiveChat._logger = logger - if not LiveChat._setup_finished: - LiveChat._setup_finished = True - if interruptable: - signal.signal(signal.SIGINT, (lambda a, b: - (LiveChat.shutdown(None,signal.SIGINT,b)) - )) - LiveChat._listeners.append(self) + if interruptable: + signal.signal(signal.SIGINT, lambda a, b: self.terminate()) + self._setup() def _setup(self): - #direct modeがTrueでcallback未設定の場合例外発生。 + # direct modeがTrueでcallback未設定の場合例外発生。 if self._direct_mode: if self._callback is None: raise IllegalFunctionCall( "When direct_mode=True, callback parameter is required.") else: - #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 + # direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 if self._buffer is None: - self._buffer = Buffer(maxsize = 20) - #callbackが指定されている場合はcallbackを呼ぶループタスクを作成 + self._buffer = Buffer(maxsize=20) + # callbackが指定されている場合はcallbackを呼ぶループタスクを作成 if self._callback is None: - pass + pass else: - #callbackを呼ぶループタスクの開始 - self._executor.submit(self._callback_loop,self._callback) - #_listenループタスクの開始 + # callbackを呼ぶループタスクの開始 + self._executor.submit(self._callback_loop, self._callback) + # _listenループタスクの開始 listen_task = self._executor.submit(self._startlisten) - #add_done_callbackの登録 + # add_done_callbackの登録 if self._done_callback is None: listen_task.add_done_callback(self.finish) else: listen_task.add_done_callback(self._done_callback) def _startlisten(self): - time.sleep(0.1) #sleep shortly to prohibit skipping fetching data + time.sleep(0.1) # sleep shortly to prohibit skipping fetching data """Fetch first continuation parameter, create and start _listen loop. """ - initial_continuation = liveparam.getparam(self.video_id,3) + initial_continuation = liveparam.getparam(self.video_id, 3) self._listen(initial_continuation) - + def _listen(self, continuation): ''' Fetch chat data and store them into buffer, get next continuaiton parameter and loop. @@ -164,33 +156,34 @@ class LiveChat: continuation = self._check_pause(continuation) contents = self._get_contents( continuation, session, headers) - metadata, chatdata = self._parser.parse(contents) + metadata, chatdata = self._parser.parse(contents) timeout = metadata['timeoutMs']/1000 chat_component = { - "video_id" : self.video_id, - "timeout" : timeout, - "chatdata" : chatdata + "video_id": self.video_id, + "timeout": timeout, + "chatdata": chatdata } - time_mark =time.time() + time_mark = time.time() if self._direct_mode: - processed_chat = self.processor.process([chat_component]) - if isinstance(processed_chat,tuple): + processed_chat = self.processor.process( + [chat_component]) + if isinstance(processed_chat, tuple): self._callback(*processed_chat) else: self._callback(processed_chat) else: self._buffer.put(chat_component) diff_time = timeout - (time.time()-time_mark) - time.sleep(diff_time if diff_time > 0 else 0) - continuation = metadata.get('continuation') + time.sleep(diff_time if diff_time > 0 else 0) + continuation = metadata.get('continuation') except ChatParseException as e: self._logger.debug(f"[{self.video_id}]{str(e)}") - return - except (TypeError , json.JSONDecodeError) : + return + except (TypeError, json.JSONDecodeError): self._logger.error(f"{traceback.format_exc(limit = -1)}") return - + self._logger.debug(f"[{self.video_id}]finished fetching chat.") def _check_pause(self, continuation): @@ -202,7 +195,7 @@ class LiveChat: ''' self._pauser.put_nowait(None) if not self._is_replay: - continuation = liveparam.getparam(self.video_id,3) + continuation = liveparam.getparam(self.video_id, 3) return continuation def _get_contents(self, continuation, session, headers): @@ -214,7 +207,7 @@ class LiveChat: ------- 'continuationContents' which includes metadata & chat data. ''' - livechat_json = ( + livechat_json = ( self._get_livechat_json(continuation, session, headers) ) contents = self._parser.get_contents(livechat_json) @@ -225,7 +218,7 @@ class LiveChat: self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation=" continuation = arcparam.getparam( self.video_id, self.seektime, self._topchat_only) - livechat_json = ( self._get_livechat_json( + livechat_json = (self._get_livechat_json( continuation, session, headers)) reload_continuation = self._parser.reload_continuation( self._parser.get_contents(livechat_json)) @@ -244,26 +237,26 @@ class LiveChat: continuation = urllib.parse.quote(continuation) livechat_json = None status_code = 0 - url =f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1" + url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1" for _ in range(MAX_RETRY + 1): - with session.get(url ,headers = headers) as resp: + with session.get(url, headers=headers) as resp: try: text = resp.text livechat_json = json.loads(text) break - except json.JSONDecodeError : + except json.JSONDecodeError: time.sleep(1) continue else: self._logger.error(f"[{self.video_id}]" - f"Exceeded retry count. status_code={status_code}") + f"Exceeded retry count. status_code={status_code}") return None return livechat_json - - def _callback_loop(self,callback): + + def _callback_loop(self, callback): """ コンストラクタでcallbackを指定している場合、バックグラウンドで callbackに指定された関数に一定間隔でチャットデータを投げる。 - + Parameter --------- callback : func @@ -280,13 +273,13 @@ class LiveChat: def get(self): """ bufferからデータを取り出し、processorに投げ、 加工済みのチャットデータを返す。 - + Returns : Processorによって加工されたチャットデータ """ if self._callback is None: items = self._buffer.get() - return self.processor.process(items) + return self.processor.process(items) raise IllegalFunctionCall( "既にcallbackを登録済みのため、get()は実行できません。") @@ -304,13 +297,13 @@ class LiveChat: return if self._pauser.empty(): self._pauser.put_nowait(None) - + def is_alive(self): return self._is_alive - def finish(self,sender): + def finish(self, sender): '''Listener終了時のコールバック''' - try: + try: self.terminate() except CancelledError: self._logger.debug(f'[{self.video_id}]cancelled:{sender}') @@ -319,14 +312,7 @@ class LiveChat: ''' Listenerを終了する。 ''' - self._is_alive = False - if self._direct_mode == False: - #bufferにダミーオブジェクトを入れてis_alive()を判定させる - self._buffer.put({'chatdata':'','timeout':0}) - self._logger.info(f'[{self.video_id}]finished.') - - @classmethod - def shutdown(cls, event, sig = None, handler=None): - cls._logger.debug("shutdown...") - for t in LiveChat._listeners: - t._is_alive = False \ No newline at end of file + if self.is_alive(): + self._is_alive = False + self._buffer.put({}) + self._logger.info(f'[{self.video_id}]終了しました')