Modify termination

This commit is contained in:
taizan-hokuto
2020-05-05 21:16:06 +09:00
parent 84bae4ad2a
commit bd32c75833

View File

@@ -11,8 +11,8 @@ from queue import Queue
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam, arcparam
from ..exceptions import ChatParseException, IllegalFunctionCall
from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
@@ -72,22 +72,20 @@ class LiveChat:
'''
_setup_finished = False
#チャット監視中のListenerのリスト
_listeners = []
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
callback = None,
done_callback = None,
direct_mode = False,
force_replay = False,
topchat_only = False,
logger = config.logger(__name__)
):
self.video_id = video_id
seektime=0,
processor=DefaultProcessor(),
buffer=None,
interruptable=True,
callback=None,
done_callback=None,
direct_mode=False,
force_replay=False,
topchat_only=False,
logger=config.logger(__name__)
):
self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
@@ -98,55 +96,49 @@ class LiveChat:
self._done_callback = done_callback
self._executor = ThreadPoolExecutor(max_workers=2)
self._direct_mode = direct_mode
self._is_alive = True
self._is_alive = True
self._is_replay = force_replay
self._parser = Parser(is_replay = self._is_replay)
self._parser = Parser(is_replay=self._is_replay)
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation="
self._topchat_only = topchat_only
self._logger = logger
LiveChat._logger = logger
if not LiveChat._setup_finished:
LiveChat._setup_finished = True
if interruptable:
signal.signal(signal.SIGINT, (lambda a, b:
(LiveChat.shutdown(None,signal.SIGINT,b))
))
LiveChat._listeners.append(self)
if interruptable:
signal.signal(signal.SIGINT, lambda a, b: self.terminate())
self._setup()
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
# direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"When direct_mode=True, callback parameter is required.")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
# direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
self._buffer = Buffer(maxsize=20)
# callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
else:
#callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop,self._callback)
#_listenループタスクの開始
# callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop, self._callback)
# _listenループタスクの開始
listen_task = self._executor.submit(self._startlisten)
#add_done_callbackの登録
# add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
def _startlisten(self):
time.sleep(0.1) #sleep shortly to prohibit skipping fetching data
time.sleep(0.1) # sleep shortly to prohibit skipping fetching data
"""Fetch first continuation parameter,
create and start _listen loop.
"""
initial_continuation = liveparam.getparam(self.video_id,3)
initial_continuation = liveparam.getparam(self.video_id, 3)
self._listen(initial_continuation)
def _listen(self, continuation):
@@ -164,18 +156,19 @@ class LiveChat:
continuation = self._check_pause(continuation)
contents = self._get_contents(
continuation, session, headers)
metadata, chatdata = self._parser.parse(contents)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
"video_id": self.video_id,
"timeout": timeout,
"chatdata": chatdata
}
time_mark =time.time()
time_mark = time.time()
if self._direct_mode:
processed_chat = self.processor.process([chat_component])
if isinstance(processed_chat,tuple):
processed_chat = self.processor.process(
[chat_component])
if isinstance(processed_chat, tuple):
self._callback(*processed_chat)
else:
self._callback(processed_chat)
@@ -187,7 +180,7 @@ class LiveChat:
except ChatParseException as e:
self._logger.debug(f"[{self.video_id}]{str(e)}")
return
except (TypeError , json.JSONDecodeError) :
except (TypeError, json.JSONDecodeError):
self._logger.error(f"{traceback.format_exc(limit = -1)}")
return
@@ -202,7 +195,7 @@ class LiveChat:
'''
self._pauser.put_nowait(None)
if not self._is_replay:
continuation = liveparam.getparam(self.video_id,3)
continuation = liveparam.getparam(self.video_id, 3)
return continuation
def _get_contents(self, continuation, session, headers):
@@ -225,7 +218,7 @@ class LiveChat:
self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation="
continuation = arcparam.getparam(
self.video_id, self.seektime, self._topchat_only)
livechat_json = ( self._get_livechat_json(
livechat_json = (self._get_livechat_json(
continuation, session, headers))
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json))
@@ -244,23 +237,23 @@ class LiveChat:
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
for _ in range(MAX_RETRY + 1):
with session.get(url ,headers = headers) as resp:
with session.get(url, headers=headers) as resp:
try:
text = resp.text
livechat_json = json.loads(text)
break
except json.JSONDecodeError :
except json.JSONDecodeError:
time.sleep(1)
continue
else:
self._logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
def _callback_loop(self,callback):
def _callback_loop(self, callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
@@ -286,7 +279,7 @@ class LiveChat:
"""
if self._callback is None:
items = self._buffer.get()
return self.processor.process(items)
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
@@ -308,7 +301,7 @@ class LiveChat:
def is_alive(self):
return self._is_alive
def finish(self,sender):
def finish(self, sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
@@ -319,14 +312,7 @@ class LiveChat:
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put({'chatdata':'','timeout':0})
self._logger.info(f'[{self.video_id}]finished.')
@classmethod
def shutdown(cls, event, sig = None, handler=None):
cls._logger.debug("shutdown...")
for t in LiveChat._listeners:
t._is_alive = False
if self.is_alive():
self._is_alive = False
self._buffer.put({})
self._logger.info(f'[{self.video_id}]終了しました')