Intgegrate replaychat into livechat (multithread)

This commit is contained in:
taizan-hokuto
2020-01-03 02:09:39 +09:00
parent 30708470f2
commit 2c684d04b5
3 changed files with 95 additions and 49 deletions

View File

@@ -169,16 +169,15 @@ class LiveChatAsync:
await asyncio.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
self.terminate()
logger.error(f"{str(e)}video_id:\"{self.video_id}\"")
#self.terminate()
logger.debug(f"[{self.video_id}]{str(e)}")
return
except (TypeError , json.JSONDecodeError) :
self.terminate()
#self.terminate()
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]finished fetching chat.")
self.terminate()
async def _check_pause(self, continuation):
if self._pauser.empty():
@@ -269,6 +268,9 @@ class LiveChatAsync:
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def get_mode(self):
return self._parser.mode
def pause(self):
if self._callback is None:
return

View File

@@ -7,11 +7,12 @@ import time
import traceback
import urllib.parse
from concurrent.futures import CancelledError, ThreadPoolExecutor
from queue import Queue
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam
from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
@@ -63,6 +64,7 @@ class LiveChat:
#チャット監視中のListenerのリスト
_listeners= []
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
@@ -71,6 +73,7 @@ class LiveChat:
direct_mode = False
):
self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
else:
@@ -82,7 +85,11 @@ class LiveChat:
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation="
if not LiveChat._setup_finished:
LiveChat._setup_finished = True
@@ -93,11 +100,12 @@ class LiveChat:
LiveChat._listeners.append(self)
def _setup(self):
#logger.debug("setup")
#direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。")
"When direct_mode=True, callback parameter is required.")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
@@ -117,48 +125,30 @@ class LiveChat:
listen_task.add_done_callback(self._done_callback)
def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する
time.sleep(0.1) #sleep shortly to prohibit skipping fetching data
"""Fetch first continuation parameter,
create and start _listen loop.
"""
initial_continuation = self._get_initial_continuation()
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
initial_continuation = liveparam.getparam(self.video_id,3)
self._listen(initial_continuation)
def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = liveparam.getparam(self.video_id)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
Bufferにチャットデータを格納、
次のcontinuaitonを取得してループする。
''' Fetch chat data and store them into buffer,
get next continuaiton parameter and loop.
Parameter
---------
continuation : str
次のチャットデータ取得に必要なパラメータ
parameter for next chat data
'''
try:
with requests.Session() as session:
while(continuation and self._is_alive):
livechat_json = (
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = self._parser.parse( livechat_json )
continuation = self._check_pause(continuation)
contents = self._get_contents(
continuation, session, headers)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
@@ -173,35 +163,70 @@ class LiveChat:
else:
self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
if diff_time < 0 : diff_time=0
time.sleep(diff_time)
time.sleep(diff_time if diff_time > 0 else 0)
continuation = metadata.get('continuation')
except ChatParseException as e:
self.terminate()
logger.error(f"{str(e)}video_id:\"{self.video_id}\"")
#self.terminate()
logger.debug(f"[{self.video_id}]{str(e)}")
return
except (TypeError , json.JSONDecodeError) :
self.terminate()
#self.terminate()
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
logger.debug(f"[{self.video_id}]finished fetching chat.")
def _check_pause(self, continuation):
if self._pauser.empty():
'''pause'''
self._pauser.get()
'''resume:
prohibit from blocking by putting None into _pauser.
'''
self._pauser.put_nowait(None)
if self._parser.mode == 'LIVE':
continuation = liveparam.getparam(self.video_id,3)
return continuation
def _get_contents(self, continuation, session, headers):
'''Get 'contents' dict from livechat json.
If contents is None at first fetching,
try to fetch archive chat data.
Return:
-------
'contents' dict which includes metadata & chatdata.
'''
livechat_json = (
self._get_livechat_json(continuation, session, headers)
)
contents = self._parser.get_contents(livechat_json)
if self._first_fetch:
if contents is None:
'''Try to fetch archive chat data.'''
self._parser.mode = 'REPLAY'
self._fetch_url = ("live_chat_replay/"
"get_live_chat_replay?continuation=")
continuation = arcparam.getparam(self.video_id, self.seektime)
livechat_json = ( self._get_livechat_json(
continuation, session, headers))
contents = self._parser.get_contents(livechat_json)
self._first_fetch = False
return contents
def _get_livechat_json(self, continuation, session, headers):
'''
チャットデータが格納されたjsonデータを取得する。
Get json which includes chat data.
'''
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =(
f"https://www.youtube.com/live_chat/get_live_chat?"
f"continuation={continuation}&pbj=1")
f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1")
for _ in range(MAX_RETRY + 1):
with session.get(url ,headers = headers) as resp:
try:
text = resp.text
status_code = resp.status_code
livechat_json = json.loads(text)
break
except json.JSONDecodeError :
@@ -210,7 +235,7 @@ class LiveChat:
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
self.terminate()
#self.terminate()
return None
return livechat_json
@@ -241,6 +266,21 @@ class LiveChat:
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def get_mode(self):
return self._parser.mode
def pause(self):
if self._callback is None:
return
if not self._pauser.empty():
self._pauser.get()
def resume(self):
if self._callback is None:
return
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
@@ -259,10 +299,10 @@ class LiveChat:
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put({'chatdata':'','timeout':1})
logger.info(f'[{self.video_id}]終了しました')
logger.info(f'[{self.video_id}]finished.')
@classmethod
def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています")
logger.debug("shutdown...")
for t in LiveChat._listeners:
t._is_alive = False

View File

@@ -65,10 +65,14 @@ class Parser:
cont.get('liveChatReplayContinuationData')
)
if metadata is None:
if cont.get("playerSeekContinuationData"):
raise ChatParseException('Finished chat data')
unknown = list(cont.keys())[0]
if unknown:
logger.debug(f"Received unknown continuation type:{unknown}")
metadata = cont.get(unknown)
else:
raise ChatParseException('Cannot extract continuation data')
return self._create_data(metadata, contents)
def _create_data(self, metadata, contents):