Implement force_replay_mode

This commit is contained in:
taizan-hokuto
2020-01-04 13:23:32 +09:00
parent 5d86fb4b71
commit 26fefddddf
3 changed files with 58 additions and 40 deletions

View File

@@ -30,6 +30,11 @@ class LiveChatAsync:
video_id : str
動画ID
seektime : int
(ライブチャット取得時は無視)
取得開始するアーカイブ済みチャットの経過時間(秒)
マイナス値を指定した場合は、配信開始前のチャットも取得する。
processor : ChatProcessor
チャットデータを加工するオブジェクト
@@ -53,7 +58,11 @@ class LiveChatAsync:
direct_mode : bool
Trueの場合、bufferを使わずにcallbackを呼ぶ。
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
(設定していない場合IllegalFunctionCall例外を発生させる
force_replay : bool
Trueの場合、ライブチャットが取得できる場合であっても
強制的にアーカイブ済みチャットを取得する。
Attributes
---------
@@ -71,7 +80,9 @@ class LiveChatAsync:
callback = None,
done_callback = None,
exception_handler = None,
direct_mode = False):
direct_mode = False,
force_replay = False
):
self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
@@ -84,7 +95,8 @@ class LiveChatAsync:
self._exception_handler = exception_handler
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._is_replay = force_replay
self._parser = Parser(is_replay = self._is_replay)
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
@@ -187,7 +199,7 @@ class LiveChatAsync:
prohibit from blocking by putting None into _pauser.
'''
self._pauser.put_nowait(None)
if self._parser.mode == 'LIVE':
if not self._is_replay:
continuation = liveparam.getparam(self.video_id,3)
return continuation
@@ -205,9 +217,9 @@ class LiveChatAsync:
)
contents = self._parser.get_contents(livechat_json)
if self._first_fetch:
if contents is None:
if contents is None or self._is_replay:
'''Try to fetch archive chat data.'''
self._parser.mode = 'REPLAY'
self._parser.is_replay = True
self._fetch_url = ("live_chat_replay/"
"get_live_chat_replay?continuation=")
continuation = arcparam.getparam(self.video_id, self.seektime)
@@ -268,8 +280,8 @@ class LiveChatAsync:
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def get_mode(self):
return self._parser.mode
def is_replay(self):
return self._is_replay
def pause(self):
if self._callback is None:
@@ -300,7 +312,7 @@ class LiveChatAsync:
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put_nowait({'chatdata':'','timeout':1})
self._buffer.put_nowait({'chatdata':'','timeout':0})
logger.info(f'[{self.video_id}]finished.')
@classmethod

View File

@@ -28,6 +28,11 @@ class LiveChat:
---------
video_id : str
動画ID
seektime : int
(ライブチャット取得時は無視)
取得開始するアーカイブ済みチャットの経過時間(秒)
マイナス値を指定した場合は、配信開始前のチャットも取得する。
processor : ChatProcessor
チャットデータを加工するオブジェクト
@@ -51,6 +56,10 @@ class LiveChat:
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
force_replay : bool
Trueの場合、ライブチャットが取得できる場合であっても
強制的にアーカイブ済みチャットを取得する。
Attributes
---------
_executor : ThreadPoolExecutor
@@ -70,7 +79,8 @@ class LiveChat:
interruptable = True,
callback = None,
done_callback = None,
direct_mode = False
direct_mode = False,
force_replay = False
):
self.video_id = video_id
self.seektime = seektime
@@ -84,7 +94,8 @@ class LiveChat:
self._executor = ThreadPoolExecutor(max_workers=2)
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._is_replay = force_replay
self._parser = Parser(is_replay = self._is_replay)
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
@@ -184,7 +195,7 @@ class LiveChat:
prohibit from blocking by putting None into _pauser.
'''
self._pauser.put_nowait(None)
if self._parser.mode == 'LIVE':
if not self._is_replay:
continuation = liveparam.getparam(self.video_id,3)
return continuation
@@ -202,9 +213,9 @@ class LiveChat:
)
contents = self._parser.get_contents(livechat_json)
if self._first_fetch:
if contents is None:
if contents is None or self._is_replay:
'''Try to fetch archive chat data.'''
self._parser.mode = 'REPLAY'
self._parser.is_replay = True
self._fetch_url = ("live_chat_replay/"
"get_live_chat_replay?continuation=")
continuation = arcparam.getparam(self.video_id, self.seektime)
@@ -235,7 +246,6 @@ class LiveChat:
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
#self.terminate()
return None
return livechat_json
@@ -266,8 +276,8 @@ class LiveChat:
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def get_mode(self):
return self._parser.mode
def is_replay(self):
return self._is_replay
def pause(self):
if self._callback is None:
@@ -298,7 +308,7 @@ class LiveChat:
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put({'chatdata':'','timeout':1})
self._buffer.put({'chatdata':'','timeout':0})
logger.info(f'[{self.video_id}]finished.')
@classmethod

View File

@@ -1,7 +1,7 @@
"""
pytchat.parser.live
~~~~~~~~~~~~~~~~~~~
This module is parser of live chat JSON.
Parser of live chat JSON.
"""
import json
@@ -15,11 +15,12 @@ from .. exceptions import (
logger = config.logger(__name__)
from .. import util
class Parser:
def __init__(self):
self.mode = 'LIVE'
__slots__ = ['is_replay']
def __init__(self, is_replay):
self.is_replay = is_replay
def get_contents(self, jsn):
if jsn is None:
@@ -31,29 +32,23 @@ class Parser:
def parse(self, contents):
"""
このparse関数はLiveChat._listen() 関数から定期的に呼び出される。
引数contentsはYoutubeから取得したチャットデータの生JSONであり、
与えられたJSONをチャットデータとメタデータに分割して返す。
Parameter
----------
+ contents : dict
+ Youtubeから取得したチャットデータのJSONオブジェクト。
pythonの辞書形式に変換済みの状態で渡される
+ JSON of chat data from YouTube.
Returns
-------
tuple:
+ metadata : dict  チャットデータに付随するメタデータ
+ metadata : dict
+ timeout
+ video_id
+ continuation
+ chatdata : list[dict]
    チャットデータ本体のリスト。
+ chatdata : List[dict]
"""
if contents is None:
'''配信が終了した場合、もしくはチャットデータが取得できない場合'''
'''Broadcasting end or cannot fetch chat stream'''
raise NoContentsException('Chat data stream is empty.')
cont = contents['liveChatContinuation']['continuations'][0]
@@ -76,15 +71,16 @@ class Parser:
return self._create_data(metadata, contents)
def _create_data(self, metadata, contents):
chatdata = contents['liveChatContinuation'].get('actions')
if self.mode == 'LIVE':
metadata.setdefault('timeoutMs', 10000)
else:
interval = self._get_interval(chatdata)
actions = contents['liveChatContinuation'].get('actions')
if self.is_replay:
interval = self._get_interval(actions)
metadata.setdefault("timeoutMs",interval)
"""アーカイブ済みチャットはライブチャットと構造が異なっているため、以下の行により
ライブチャットと同じ形式にそろえる"""
chatdata = [action["replayChatItemAction"]["actions"][0] for action in chatdata]
"""Archived chat has different structures than live chat,
so make it the same format."""
chatdata = [action["replayChatItemAction"]["actions"][0] for action in actions]
else:
metadata.setdefault('timeoutMs', 10000)
chatdata = actions
return metadata, chatdata
def _get_interval(self, actions: list):