Implement pause/pauser to livechat

This commit is contained in:
taizan-hokuto
2020-01-02 13:15:41 +09:00
parent 9751289eca
commit 7308a87a61
2 changed files with 26 additions and 23 deletions

View File

@@ -8,6 +8,7 @@ import traceback
import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from queue import Queue
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
@@ -63,6 +64,7 @@ class LiveChatAsync:
_setup_finished = False
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
@@ -71,6 +73,7 @@ class LiveChatAsync:
exception_handler = None,
direct_mode = False):
self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
else:
@@ -82,6 +85,8 @@ class LiveChatAsync:
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
if not LiveChatAsync._setup_finished:
@@ -126,28 +131,9 @@ class LiveChatAsync:
"""最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する
"""
initial_continuation = await self._get_initial_continuation()
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
initial_continuation = liveparam.getparam(self.video_id)
await self._listen(initial_continuation)
async def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = liveparam.getparam(self.video_id)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
async def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
Bufferにチャットデータを格納、
@@ -161,6 +147,12 @@ class LiveChatAsync:
try:
async with aiohttp.ClientSession() as session:
while(continuation and self._is_alive):
if self._pauser.empty():
#pause
await self._pauser.get()
#resume
#prohibit from blocking by putting None into _pauser.
self._pauser.put_nowait(None)
livechat_json = (await
self._get_livechat_json(continuation, session, headers)
)
@@ -246,6 +238,15 @@ class LiveChatAsync:
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def pause(self):
if not self._pauser.empty():
self._pauser.get()
def resume(self):
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
@@ -264,7 +265,7 @@ class LiveChatAsync:
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put_nowait({'chatdata':'','timeout':1})
logger.info(f'終了しました:[{self.video_id}]')
logger.info(f'[{self.video_id}]終了しました')
@classmethod
def _set_exception_handler(cls, handler):

View File

@@ -18,8 +18,9 @@ from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
logger = config.logger(__name__)
MAX_RETRY = 10
headers = config.headers
MAX_RETRY = 10
@@ -178,11 +179,12 @@ class ReplayChatAsync:
await asyncio.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
self.terminate()
logger.error(f"{str(e)}video_id:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
logger.error(f"{traceback.format_exc(limit = -1)}")
self.terminate()
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")