Integrate replaychat into livechat

This commit is contained in:
taizan-hokuto
2020-01-02 19:35:58 +09:00
parent 48b6f2c24e
commit 7766a39c9c
6 changed files with 372 additions and 26 deletions

View File

@@ -0,0 +1,297 @@
import aiohttp, asyncio
import datetime
import json
import random
import signal
import time
import traceback
import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from asyncio import Queue
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
logger = config.logger(__name__)
headers = config.headers
MAX_RETRY = 10
class LiveChatAsync:
'''asyncio(aiohttp)を利用してYouTubeのライブ配信のチャットデータを取得する。
Parameter
---------
video_id : str
動画ID
processor : ChatProcessor
チャットデータを加工するオブジェクト
buffer : Buffer(maxsize:20[default])
チャットデータchat_componentを格納するバッファ。
maxsize : 格納できるchat_componentの個数
default値20個。1個で約5~10秒分。
interruptable : bool
Ctrl+Cによる処理中断を行うかどうか。
callback : func
_listen()関数から一定間隔で自動的に呼びだす関数。
done_callback : func
listener終了時に呼び出すコールバック。
exception_handler : func
例外を処理する関数
direct_mode : bool
Trueの場合、bufferを使わずにcallbackを呼ぶ。
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
Attributes
---------
_is_alive : bool
チャット取得を停止するためのフラグ
'''
_setup_finished = False
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
callback = None,
done_callback = None,
exception_handler = None,
direct_mode = False):
self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
else:
self.processor = processor
self._buffer = buffer
self._callback = callback
self._done_callback = done_callback
self._exception_handler = exception_handler
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
if not LiveChatAsync._setup_finished:
LiveChatAsync._setup_finished = True
if exception_handler == None:
self._set_exception_handler(self._handle_exception)
else:
self._set_exception_handler(exception_handler)
if interruptable:
signal.signal(signal.SIGINT,
(lambda a, b:asyncio.create_task(
LiveChatAsync.shutdown(None,signal.SIGINT,b))
))
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
else:
#callbackを呼ぶループタスクの開始
loop = asyncio.get_event_loop()
loop.create_task(self._callback_loop(self._callback))
#_listenループタスクの開始
loop = asyncio.get_event_loop()
listen_task = loop.create_task(self._startlisten())
#add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
async def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する
"""
initial_continuation = liveparam.getparam(self.video_id,3)
await self._listen(initial_continuation)
async def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
Bufferにチャットデータを格納、
次のcontinuaitonを取得してループする。
Parameter
---------
continuation : str
次のチャットデータ取得に必要なパラメータ
'''
try:
async with aiohttp.ClientSession() as session:
while(continuation and self._is_alive):
if self._pauser.empty():
'''pause'''
await self._pauser.get()
'''resume:
prohibit from blocking by putting None into _pauser.
'''
self._pauser.put_nowait(None)
continuation= liveparam.getparam(self.video_id,3)
livechat_json = (await
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = self._parser.parse( livechat_json )
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
}
time_mark =time.time()
if self._direct_mode:
await self._callback(
self.processor.process([chat_component])
)
else:
await self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
await asyncio.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
self.terminate()
logger.error(f"{str(e)}video_id:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
self.terminate()
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
self.terminate()
async def _get_livechat_json(self, continuation, session, headers):
'''
チャットデータが格納されたjsonデータを取得する。
'''
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =(
f"https://www.youtube.com/live_chat/get_live_chat?"
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1):
async with session.get(url ,headers = headers) as resp:
try:
text = await resp.text()
status_code = resp.status
livechat_json = json.loads(text)
break
except (ClientConnectorError,json.JSONDecodeError) :
await asyncio.sleep(1)
continue
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
async def _callback_loop(self,callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
加工済みのチャットデータを渡す先の関数。
"""
while self.is_alive():
items = await self._buffer.get()
data = self.processor.process(items)
await callback(data)
async def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = await self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def pause(self):
if self._callback is None:
return
if not self._pauser.empty():
self._pauser.get_nowait()
def resume(self):
if self._callback is None:
return
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
def finish(self,sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
except CancelledError:
logger.debug(f'[{self.video_id}]cancelled:{sender}')
def terminate(self):
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put_nowait({'chatdata':'','timeout':1})
logger.info(f'[{self.video_id}]終了しました')
@classmethod
def _set_exception_handler(cls, handler):
loop = asyncio.get_event_loop()
loop.set_exception_handler(handler)
@classmethod
def _handle_exception(cls, loop, context):
if not isinstance(context["exception"],CancelledError):
logger.error(f"Caught exception: {context}")
loop= asyncio.get_event_loop()
loop.create_task(cls.shutdown(None,None,None))
@classmethod
async def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logger.debug(f"残っているタスクを終了しています")
await asyncio.gather(*tasks,return_exceptions=True)
loop = asyncio.get_event_loop()
loop.stop()

View File

@@ -13,7 +13,7 @@ from .buffer import Buffer
from ..parser.live import Parser from ..parser.live import Parser
from .. import config from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator from ..processors.combinator import Combinator
@@ -88,7 +88,9 @@ class LiveChatAsync:
self._pauser = Queue() self._pauser = Queue()
self._pauser.put_nowait(None) self._pauser.put_nowait(None)
self._setup() self._setup()
#self._paramgen = liveparam
self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation="
if not LiveChatAsync._setup_finished: if not LiveChatAsync._setup_finished:
LiveChatAsync._setup_finished = True LiveChatAsync._setup_finished = True
if exception_handler == None: if exception_handler == None:
@@ -154,11 +156,26 @@ class LiveChatAsync:
prohibit from blocking by putting None into _pauser. prohibit from blocking by putting None into _pauser.
''' '''
self._pauser.put_nowait(None) self._pauser.put_nowait(None)
continuation= liveparam.getparam(self.video_id,3) if self._parser.mode == 'LIVE':
continuation = liveparam.getparam(self.video_id,3)
livechat_json = (await livechat_json = (await
self._get_livechat_json(continuation, session, headers) self._get_livechat_json(continuation, session, headers)
) )
metadata, chatdata = self._parser.parse( livechat_json ) contents = self._parser.get_contents(livechat_json)
'''switch live or replay'''
if self._first_fetch:
if contents is None:
self._parser.mode = 'REPLAY'
self._fetch_url = ("live_chat_replay/"
"get_live_chat_replay?continuation=")
continuation = arcparam.getparam(self.video_id)
livechat_json = (await self._get_livechat_json(
continuation, session, headers))
contents = self._parser.get_contents(livechat_json)
self._first_fetch = False
metadata, chatdata = self._parser.parse( contents )
timeout = metadata['timeoutMs']/1000 timeout = metadata['timeoutMs']/1000
chat_component = { chat_component = {
"video_id" : self.video_id, "video_id" : self.video_id,
@@ -191,12 +208,12 @@ class LiveChatAsync:
''' '''
チャットデータが格納されたjsonデータを取得する。 チャットデータが格納されたjsonデータを取得する。
''' '''
continuation = urllib.parse.quote(continuation) continuation = urllib.parse.quote(continuation)
livechat_json = None livechat_json = None
status_code = 0 status_code = 0
url =( url =(
f"https://www.youtube.com/live_chat/get_live_chat?" f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1")
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1): for _ in range(MAX_RETRY + 1):
async with session.get(url ,headers = headers) as resp: async with session.get(url ,headers = headers) as resp:
try: try:

View File

@@ -14,9 +14,24 @@ from .. exceptions import (
logger = config.logger(__name__) logger = config.logger(__name__)
from .. import util
class Parser: class Parser:
def parse(self, jsn): URL_LIVE = "live_chat/get_live_chat?continuation="
URL_REPLAY = "live_chat_replay/get_live_chat_replay?continuation="
def __init__(self):
self.mode = 'LIVE'
def get_contents(self, jsn):
if jsn is None:
return {'timeoutMs':0,'continuation':None},[]
if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError('The video_id would be wrong, or video is deleted or private.')
contents=jsn['response'].get('continuationContents')
return contents
def parse(self, contents):
util.save(json.dumps(contents,ensure_ascii=False,indent=2),"v:\\~\\test_",".json")
""" """
このparse関数はLiveChat._listen() 関数から定期的に呼び出される。 このparse関数はLiveChat._listen() 関数から定期的に呼び出される。
引数jsnはYoutubeから取得したチャットデータの生JSONであり、 引数jsnはYoutubeから取得したチャットデータの生JSONであり、
@@ -27,7 +42,7 @@ class Parser:
Parameter Parameter
---------- ----------
+ jsn : dict + contents : dict
+ Youtubeから取得したチャットデータのJSONオブジェクト。 + Youtubeから取得したチャットデータのJSONオブジェクト。
pythonの辞書形式に変換済みの状態で渡される pythonの辞書形式に変換済みの状態で渡される
@@ -38,19 +53,19 @@ class Parser:
+ chatdata : list[dict] + chatdata : list[dict]
+ チャットデータ本体のリスト。 + チャットデータ本体のリスト。
""" """
if jsn is None: # if jsn is None:
return {'timeoutMs':0,'continuation':None},[] # return {'timeoutMs':0,'continuation':None},[]
if jsn['response']['responseContext'].get('errors'): # if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError('動画に接続できません。' # raise ResponseContextError('動画に接続できません。'
'動画IDが間違っているか、動画が削除非公開の可能性があります。') # '動画IDが間違っているか、動画が削除非公開の可能性があります。')
contents=jsn['response'].get('continuationContents') # contents=jsn['response'].get('continuationContents')
#配信が終了した場合、もしくはチャットデータが取得できない場合 '''配信が終了した場合、もしくはチャットデータが取得できない場合'''
if contents is None: if contents is None:
raise NoContentsException('チャットデータを取得できませんでした。') raise NoContentsException('チャットデータを取得できませんでした。')
cont = contents['liveChatContinuation']['continuations'][0] cont = contents['liveChatContinuation']['continuations'][0]
if cont is None: if cont is None:
raise NoContinuationsException('Continuationがありません。') raise NoContinuationsException('No Continuation')
metadata = (cont.get('invalidationContinuationData') or metadata = (cont.get('invalidationContinuationData') or
cont.get('timedContinuationData') or cont.get('timedContinuationData') or
cont.get('reloadContinuationData') cont.get('reloadContinuationData')
@@ -60,6 +75,23 @@ class Parser:
if unknown: if unknown:
logger.debug(f"Received unknown continuation type:{unknown}") logger.debug(f"Received unknown continuation type:{unknown}")
metadata = cont.get(unknown) metadata = cont.get(unknown)
metadata.setdefault('timeoutMs', 10000) return self._create_data(metadata, contents)
def _create_data(self, metadata, contents):
chatdata = contents['liveChatContinuation'].get('actions') chatdata = contents['liveChatContinuation'].get('actions')
if self.mode == 'LIVE':
metadata.setdefault('timeoutMs', 10000)
else:
interval = self._get_interval(chatdata)
metadata.setdefault("timeoutMs",interval)
"""アーカイブ済みチャットはライブチャットと構造が異なっているため、以下の行により
ライブチャットと同じ形式にそろえる"""
chatdata = [action["replayChatItemAction"]["actions"][0] for action in chatdata]
return metadata, chatdata return metadata, chatdata
def _get_interval(self, actions: list):
if actions is None:
return 0
start = int(actions[0]["replayChatItemAction"]["videoOffsetTimeMsec"])
last = int(actions[-1]["replayChatItemAction"]["videoOffsetTimeMsec"])
return (last - start)

View File

@@ -20,7 +20,7 @@ def test_textmessage(mocker):
_json = _open_file("tests/testdata/compatible/textmessage.json") _json = _open_file("tests/testdata/compatible/textmessage.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 7, "timeout" : 7,
@@ -57,7 +57,7 @@ def test_newsponcer(mocker):
_json = _open_file("tests/testdata/compatible/newSponsor.json") _json = _open_file("tests/testdata/compatible/newSponsor.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 7, "timeout" : 7,
@@ -93,7 +93,7 @@ def test_superchat(mocker):
_json = _open_file("tests/testdata/compatible/superchat.json") _json = _open_file("tests/testdata/compatible/superchat.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 7, "timeout" : 7,

View File

@@ -21,7 +21,7 @@ def test_finishedlive(*mock):
_text = json.loads(_text) _text = json.loads(_text)
try: try:
parser.parse(_text) parser.parse(parser.get_contents(_text))
assert False assert False
except NoContentsException: except NoContentsException:
assert True assert True
@@ -34,7 +34,7 @@ def test_parsejson(*mock):
_text = json.loads(_text) _text = json.loads(_text)
try: try:
parser.parse(_text) parser.parse(parser.get_contents(_text))
jsn = _text jsn = _text
timeout = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["timeoutMs"] timeout = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["timeoutMs"]
continuation = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["continuation"] continuation = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["continuation"]

View File

@@ -21,7 +21,7 @@ def test_speed_1(mocker):
_json = _open_file("tests/testdata/speed/speedtest_normal.json") _json = _open_file("tests/testdata/speed/speedtest_normal.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 10, "timeout" : 10,
@@ -37,7 +37,7 @@ def test_speed_2(mocker):
_json = _open_file("tests/testdata/speed/speedtest_undefined.json") _json = _open_file("tests/testdata/speed/speedtest_undefined.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 10, "timeout" : 10,
@@ -53,7 +53,7 @@ def test_speed_3(mocker):
_json = _open_file("tests/testdata/speed/speedtest_empty.json") _json = _open_file("tests/testdata/speed/speedtest_empty.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 10, "timeout" : 10,