Merge branch 'feature/integrate' into develop

This commit is contained in:
taizan-hokuto
2020-01-03 22:12:15 +09:00
13 changed files with 557 additions and 162 deletions

View File

@@ -1,7 +1,7 @@
import logging import logging
from . import mylogger from . import mylogger
LOGGER_MODE = None LOGGER_MODE = logging.DEBUG
headers = { headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36'} 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36'}

View File

@@ -0,0 +1,297 @@
import aiohttp, asyncio
import datetime
import json
import random
import signal
import time
import traceback
import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from asyncio import Queue
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
logger = config.logger(__name__)
headers = config.headers
MAX_RETRY = 10
class LiveChatAsync:
'''asyncio(aiohttp)を利用してYouTubeのライブ配信のチャットデータを取得する。
Parameter
---------
video_id : str
動画ID
processor : ChatProcessor
チャットデータを加工するオブジェクト
buffer : Buffer(maxsize:20[default])
チャットデータchat_componentを格納するバッファ。
maxsize : 格納できるchat_componentの個数
default値20個。1個で約5~10秒分。
interruptable : bool
Ctrl+Cによる処理中断を行うかどうか。
callback : func
_listen()関数から一定間隔で自動的に呼びだす関数。
done_callback : func
listener終了時に呼び出すコールバック。
exception_handler : func
例外を処理する関数
direct_mode : bool
Trueの場合、bufferを使わずにcallbackを呼ぶ。
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
Attributes
---------
_is_alive : bool
チャット取得を停止するためのフラグ
'''
_setup_finished = False
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
callback = None,
done_callback = None,
exception_handler = None,
direct_mode = False):
self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
else:
self.processor = processor
self._buffer = buffer
self._callback = callback
self._done_callback = done_callback
self._exception_handler = exception_handler
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
if not LiveChatAsync._setup_finished:
LiveChatAsync._setup_finished = True
if exception_handler == None:
self._set_exception_handler(self._handle_exception)
else:
self._set_exception_handler(exception_handler)
if interruptable:
signal.signal(signal.SIGINT,
(lambda a, b:asyncio.create_task(
LiveChatAsync.shutdown(None,signal.SIGINT,b))
))
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
else:
#callbackを呼ぶループタスクの開始
loop = asyncio.get_event_loop()
loop.create_task(self._callback_loop(self._callback))
#_listenループタスクの開始
loop = asyncio.get_event_loop()
listen_task = loop.create_task(self._startlisten())
#add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
async def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する
"""
initial_continuation = liveparam.getparam(self.video_id,3)
await self._listen(initial_continuation)
async def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
Bufferにチャットデータを格納、
次のcontinuaitonを取得してループする。
Parameter
---------
continuation : str
次のチャットデータ取得に必要なパラメータ
'''
try:
async with aiohttp.ClientSession() as session:
while(continuation and self._is_alive):
if self._pauser.empty():
'''pause'''
await self._pauser.get()
'''resume:
prohibit from blocking by putting None into _pauser.
'''
self._pauser.put_nowait(None)
continuation= liveparam.getparam(self.video_id,3)
livechat_json = (await
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = self._parser.parse( livechat_json )
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
}
time_mark =time.time()
if self._direct_mode:
await self._callback(
self.processor.process([chat_component])
)
else:
await self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
await asyncio.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
self.terminate()
logger.error(f"{str(e)}video_id:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
self.terminate()
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
self.terminate()
async def _get_livechat_json(self, continuation, session, headers):
'''
チャットデータが格納されたjsonデータを取得する。
'''
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =(
f"https://www.youtube.com/live_chat/get_live_chat?"
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1):
async with session.get(url ,headers = headers) as resp:
try:
text = await resp.text()
status_code = resp.status
livechat_json = json.loads(text)
break
except (ClientConnectorError,json.JSONDecodeError) :
await asyncio.sleep(1)
continue
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
async def _callback_loop(self,callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
加工済みのチャットデータを渡す先の関数。
"""
while self.is_alive():
items = await self._buffer.get()
data = self.processor.process(items)
await callback(data)
async def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = await self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def pause(self):
if self._callback is None:
return
if not self._pauser.empty():
self._pauser.get_nowait()
def resume(self):
if self._callback is None:
return
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
def finish(self,sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
except CancelledError:
logger.debug(f'[{self.video_id}]cancelled:{sender}')
def terminate(self):
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put_nowait({'chatdata':'','timeout':1})
logger.info(f'[{self.video_id}]終了しました')
@classmethod
def _set_exception_handler(cls, handler):
loop = asyncio.get_event_loop()
loop.set_exception_handler(handler)
@classmethod
def _handle_exception(cls, loop, context):
if not isinstance(context["exception"],CancelledError):
logger.error(f"Caught exception: {context}")
loop= asyncio.get_event_loop()
loop.create_task(cls.shutdown(None,None,None))
@classmethod
async def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logger.debug(f"残っているタスクを終了しています")
await asyncio.gather(*tasks,return_exceptions=True)
loop = asyncio.get_event_loop()
loop.stop()

View File

@@ -8,11 +8,12 @@ import traceback
import urllib.parse import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError from concurrent.futures import CancelledError
from asyncio import Queue
from .buffer import Buffer from .buffer import Buffer
from ..parser.live import Parser from ..parser.live import Parser
from .. import config from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator from ..processors.combinator import Combinator
@@ -63,6 +64,7 @@ class LiveChatAsync:
_setup_finished = False _setup_finished = False
def __init__(self, video_id, def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(), processor = DefaultProcessor(),
buffer = None, buffer = None,
interruptable = True, interruptable = True,
@@ -71,6 +73,7 @@ class LiveChatAsync:
exception_handler = None, exception_handler = None,
direct_mode = False): direct_mode = False):
self.video_id = video_id self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple): if isinstance(processor, tuple):
self.processor = Combinator(processor) self.processor = Combinator(processor)
else: else:
@@ -82,8 +85,11 @@ class LiveChatAsync:
self._direct_mode = direct_mode self._direct_mode = direct_mode
self._is_alive = True self._is_alive = True
self._parser = Parser() self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup() self._setup()
self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation="
if not LiveChatAsync._setup_finished: if not LiveChatAsync._setup_finished:
LiveChatAsync._setup_finished = True LiveChatAsync._setup_finished = True
if exception_handler == None: if exception_handler == None:
@@ -101,7 +107,7 @@ class LiveChatAsync:
if self._direct_mode: if self._direct_mode:
if self._callback is None: if self._callback is None:
raise IllegalFunctionCall( raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。") "When direct_mode=True, callback parameter is required.")
else: else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None: if self._buffer is None:
@@ -123,48 +129,29 @@ class LiveChatAsync:
listen_task.add_done_callback(self._done_callback) listen_task.add_done_callback(self._done_callback)
async def _startlisten(self): async def _startlisten(self):
"""最初のcontinuationパラメータを取得し、 """Fetch first continuation parameter,
_listenループのタスクを作成し開始する create and start _listen loop.
""" """
initial_continuation = await self._get_initial_continuation() initial_continuation = liveparam.getparam(self.video_id,3)
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
await self._listen(initial_continuation) await self._listen(initial_continuation)
async def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = liveparam.getparam(self.video_id)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
async def _listen(self, continuation): async def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し ''' Fetch chat data and store them into buffer,
Bufferにチャットデータを格納、 get next continuaiton parameter and loop.
次のcontinuaitonを取得してループする。
Parameter Parameter
--------- ---------
continuation : str continuation : str
次のチャットデータ取得に必要なパラメータ parameter for next chat data
''' '''
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
while(continuation and self._is_alive): while(continuation and self._is_alive):
livechat_json = (await continuation = await self._check_pause(continuation)
self._get_livechat_json(continuation, session, headers) contents = await self._get_contents(
) continuation, session, headers)
metadata, chatdata = self._parser.parse( livechat_json ) metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs']/1000 timeout = metadata['timeoutMs']/1000
chat_component = { chat_component = {
"video_id" : self.video_id, "video_id" : self.video_id,
@@ -182,31 +169,67 @@ class LiveChatAsync:
await asyncio.sleep(diff_time) await asyncio.sleep(diff_time)
continuation = metadata.get('continuation') continuation = metadata.get('continuation')
except ChatParseException as e: except ChatParseException as e:
self.terminate() #self.terminate()
logger.error(f"{str(e)}video_id:\"{self.video_id}\"") logger.debug(f"[{self.video_id}]{str(e)}")
return return
except (TypeError , json.JSONDecodeError) : except (TypeError , json.JSONDecodeError) :
self.terminate() #self.terminate()
logger.error(f"{traceback.format_exc(limit = -1)}") logger.error(f"{traceback.format_exc(limit = -1)}")
return return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。") logger.debug(f"[{self.video_id}]finished fetching chat.")
async def _check_pause(self, continuation):
if self._pauser.empty():
'''pause'''
await self._pauser.get()
'''resume:
prohibit from blocking by putting None into _pauser.
'''
self._pauser.put_nowait(None)
if self._parser.mode == 'LIVE':
continuation = liveparam.getparam(self.video_id,3)
return continuation
async def _get_contents(self, continuation, session, headers):
'''Get 'contents' dict from livechat json.
If contents is None at first fetching,
try to fetch archive chat data.
Return:
-------
'contents' dict which includes metadata & chatdata.
'''
livechat_json = (await
self._get_livechat_json(continuation, session, headers)
)
contents = self._parser.get_contents(livechat_json)
if self._first_fetch:
if contents is None:
'''Try to fetch archive chat data.'''
self._parser.mode = 'REPLAY'
self._fetch_url = ("live_chat_replay/"
"get_live_chat_replay?continuation=")
continuation = arcparam.getparam(self.video_id, self.seektime)
livechat_json = (await self._get_livechat_json(
continuation, session, headers))
contents = self._parser.get_contents(livechat_json)
self._first_fetch = False
return contents
async def _get_livechat_json(self, continuation, session, headers): async def _get_livechat_json(self, continuation, session, headers):
''' '''
チャットデータが格納されたjsonデータを取得する。 Get json which includes chat data.
''' '''
continuation = urllib.parse.quote(continuation) continuation = urllib.parse.quote(continuation)
livechat_json = None livechat_json = None
status_code = 0 status_code = 0
url =( url =(
f"https://www.youtube.com/live_chat/get_live_chat?" f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1")
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1): for _ in range(MAX_RETRY + 1):
async with session.get(url ,headers = headers) as resp: async with session.get(url ,headers = headers) as resp:
try: try:
text = await resp.text() text = await resp.text()
status_code = resp.status
livechat_json = json.loads(text) livechat_json = json.loads(text)
break break
except (ClientConnectorError,json.JSONDecodeError) : except (ClientConnectorError,json.JSONDecodeError) :
@@ -215,7 +238,6 @@ class LiveChatAsync:
else: else:
logger.error(f"[{self.video_id}]" logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}") f"Exceeded retry count. status_code={status_code}")
self.terminate()
return None return None
return livechat_json return livechat_json
@@ -246,6 +268,21 @@ class LiveChatAsync:
raise IllegalFunctionCall( raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。") "既にcallbackを登録済みのため、get()は実行できません。")
def get_mode(self):
return self._parser.mode
def pause(self):
if self._callback is None:
return
if not self._pauser.empty():
self._pauser.get_nowait()
def resume(self):
if self._callback is None:
return
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self): def is_alive(self):
return self._is_alive return self._is_alive
@@ -264,17 +301,15 @@ class LiveChatAsync:
if self._direct_mode == False: if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる #bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put_nowait({'chatdata':'','timeout':1}) self._buffer.put_nowait({'chatdata':'','timeout':1})
logger.info(f'終了しました:[{self.video_id}]') logger.info(f'[{self.video_id}]finished.')
@classmethod @classmethod
def _set_exception_handler(cls, handler): def _set_exception_handler(cls, handler):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
#default handler: cls._handle_exception
loop.set_exception_handler(handler) loop.set_exception_handler(handler)
@classmethod @classmethod
def _handle_exception(cls, loop, context): def _handle_exception(cls, loop, context):
#msg = context.get("exception", context["message"])
if not isinstance(context["exception"],CancelledError): if not isinstance(context["exception"],CancelledError):
logger.error(f"Caught exception: {context}") logger.error(f"Caught exception: {context}")
loop= asyncio.get_event_loop() loop= asyncio.get_event_loop()
@@ -282,12 +317,12 @@ class LiveChatAsync:
@classmethod @classmethod
async def shutdown(cls, event, sig = None, handler=None): async def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています") logger.debug("shutdown...")
tasks = [t for t in asyncio.all_tasks() if t is not tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()] asyncio.current_task()]
[task.cancel() for task in tasks] [task.cancel() for task in tasks]
logger.debug(f"残っているタスクを終了しています") logger.debug(f"complete remaining tasks...")
await asyncio.gather(*tasks,return_exceptions=True) await asyncio.gather(*tasks,return_exceptions=True)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.stop() loop.stop()

View File

@@ -8,7 +8,7 @@ import traceback
import urllib.parse import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError from concurrent.futures import CancelledError
from queue import Queue from asyncio import Queue
from .buffer import Buffer from .buffer import Buffer
from ..parser.replay import Parser from ..parser.replay import Parser
from .. import config from .. import config
@@ -18,8 +18,9 @@ from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator from ..processors.combinator import Combinator
logger = config.logger(__name__) logger = config.logger(__name__)
MAX_RETRY = 10
headers = config.headers headers = config.headers
MAX_RETRY = 10
@@ -135,28 +136,9 @@ class ReplayChatAsync:
"""最初のcontinuationパラメータを取得し、 """最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する _listenループのタスクを作成し開始する
""" """
initial_continuation = await self._get_initial_continuation() initial_continuation = arcparam.getparam(self.video_id, self.seektime)
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
await self._listen(initial_continuation) await self._listen(initial_continuation)
async def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = arcparam.get(self.video_id,self.seektime)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
async def _listen(self, continuation): async def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し ''' continuationに紐付いたチャットデータを取得し
Bufferにチャットデータを格納、 Bufferにチャットデータを格納、
@@ -171,11 +153,13 @@ class ReplayChatAsync:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
while(continuation and self._is_alive): while(continuation and self._is_alive):
if self._pauser.empty(): if self._pauser.empty():
#pause '''pause'''
await self._pauser.get() await self._pauser.get()
#resume '''resume:
#prohibit from blocking by putting None into _pauser. prohibit from blocking by putting None into _pauser.
'''
self._pauser.put_nowait(None) self._pauser.put_nowait(None)
#when replay, not reacquire continuation param
livechat_json = (await livechat_json = (await
self._get_livechat_json(continuation, session, headers) self._get_livechat_json(continuation, session, headers)
) )
@@ -197,11 +181,12 @@ class ReplayChatAsync:
await asyncio.sleep(diff_time) await asyncio.sleep(diff_time)
continuation = metadata.get('continuation') continuation = metadata.get('continuation')
except ChatParseException as e: except ChatParseException as e:
self.terminate()
logger.error(f"{str(e)}video_id:\"{self.video_id}\"") logger.error(f"{str(e)}video_id:\"{self.video_id}\"")
return return
except (TypeError , json.JSONDecodeError) : except (TypeError , json.JSONDecodeError) :
logger.error(f"{traceback.format_exc(limit = -1)}")
self.terminate() self.terminate()
logger.error(f"{traceback.format_exc(limit = -1)}")
return return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。") logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
@@ -261,14 +246,17 @@ class ReplayChatAsync:
"既にcallbackを登録済みのため、get()は実行できません。") "既にcallbackを登録済みのため、get()は実行できません。")
def pause(self): def pause(self):
if self._callback is None:
return
if not self._pauser.empty(): if not self._pauser.empty():
self._pauser.get() self._pauser.get_nowait()
def resume(self): def resume(self):
if self._callback is None:
return
if self._pauser.empty(): if self._pauser.empty():
self._pauser.put_nowait(None) self._pauser.put_nowait(None)
def is_alive(self): def is_alive(self):
return self._is_alive return self._is_alive
@@ -292,12 +280,10 @@ class ReplayChatAsync:
@classmethod @classmethod
def _set_exception_handler(cls, handler): def _set_exception_handler(cls, handler):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
#default handler: cls._handle_exception
loop.set_exception_handler(handler) loop.set_exception_handler(handler)
@classmethod @classmethod
def _handle_exception(cls, loop, context): def _handle_exception(cls, loop, context):
#msg = context.get("exception", context["message"])
if not isinstance(context["exception"],CancelledError): if not isinstance(context["exception"],CancelledError):
logger.error(f"Caught exception: {context}") logger.error(f"Caught exception: {context}")
loop= asyncio.get_event_loop() loop= asyncio.get_event_loop()

View File

@@ -7,11 +7,12 @@ import time
import traceback import traceback
import urllib.parse import urllib.parse
from concurrent.futures import CancelledError, ThreadPoolExecutor from concurrent.futures import CancelledError, ThreadPoolExecutor
from queue import Queue
from .buffer import Buffer from .buffer import Buffer
from ..parser.live import Parser from ..parser.live import Parser
from .. import config from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator from ..processors.combinator import Combinator
@@ -63,6 +64,7 @@ class LiveChat:
#チャット監視中のListenerのリスト #チャット監視中のListenerのリスト
_listeners= [] _listeners= []
def __init__(self, video_id, def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(), processor = DefaultProcessor(),
buffer = None, buffer = None,
interruptable = True, interruptable = True,
@@ -71,6 +73,7 @@ class LiveChat:
direct_mode = False direct_mode = False
): ):
self.video_id = video_id self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple): if isinstance(processor, tuple):
self.processor = Combinator(processor) self.processor = Combinator(processor)
else: else:
@@ -82,7 +85,11 @@ class LiveChat:
self._direct_mode = direct_mode self._direct_mode = direct_mode
self._is_alive = True self._is_alive = True
self._parser = Parser() self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup() self._setup()
self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation="
if not LiveChat._setup_finished: if not LiveChat._setup_finished:
LiveChat._setup_finished = True LiveChat._setup_finished = True
@@ -93,11 +100,12 @@ class LiveChat:
LiveChat._listeners.append(self) LiveChat._listeners.append(self)
def _setup(self): def _setup(self):
#logger.debug("setup")
#direct modeがTrueでcallback未設定の場合例外発生。 #direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode: if self._direct_mode:
if self._callback is None: if self._callback is None:
raise IllegalFunctionCall( raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。") "When direct_mode=True, callback parameter is required.")
else: else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None: if self._buffer is None:
@@ -117,48 +125,30 @@ class LiveChat:
listen_task.add_done_callback(self._done_callback) listen_task.add_done_callback(self._done_callback)
def _startlisten(self): def _startlisten(self):
"""最初のcontinuationパラメータを取得し、 time.sleep(0.1) #sleep shortly to prohibit skipping fetching data
_listenループのタスクを作成し開始する """Fetch first continuation parameter,
create and start _listen loop.
""" """
initial_continuation = self._get_initial_continuation() initial_continuation = liveparam.getparam(self.video_id,3)
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
self._listen(initial_continuation) self._listen(initial_continuation)
def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = liveparam.getparam(self.video_id)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
def _listen(self, continuation): def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し ''' Fetch chat data and store them into buffer,
Bufferにチャットデータを格納、 get next continuaiton parameter and loop.
次のcontinuaitonを取得してループする。
Parameter Parameter
--------- ---------
continuation : str continuation : str
次のチャットデータ取得に必要なパラメータ parameter for next chat data
''' '''
try: try:
with requests.Session() as session: with requests.Session() as session:
while(continuation and self._is_alive): while(continuation and self._is_alive):
livechat_json = ( continuation = self._check_pause(continuation)
self._get_livechat_json(continuation, session, headers) contents = self._get_contents(
) continuation, session, headers)
metadata, chatdata = self._parser.parse( livechat_json ) metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs']/1000 timeout = metadata['timeoutMs']/1000
chat_component = { chat_component = {
"video_id" : self.video_id, "video_id" : self.video_id,
@@ -173,35 +163,70 @@ class LiveChat:
else: else:
self._buffer.put(chat_component) self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark) diff_time = timeout - (time.time()-time_mark)
if diff_time < 0 : diff_time=0 time.sleep(diff_time if diff_time > 0 else 0)
time.sleep(diff_time)
continuation = metadata.get('continuation') continuation = metadata.get('continuation')
except ChatParseException as e: except ChatParseException as e:
self.terminate() #self.terminate()
logger.error(f"{str(e)}video_id:\"{self.video_id}\"") logger.debug(f"[{self.video_id}]{str(e)}")
return return
except (TypeError , json.JSONDecodeError) : except (TypeError , json.JSONDecodeError) :
self.terminate() #self.terminate()
logger.error(f"{traceback.format_exc(limit = -1)}") logger.error(f"{traceback.format_exc(limit = -1)}")
return return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。") logger.debug(f"[{self.video_id}]finished fetching chat.")
def _check_pause(self, continuation):
if self._pauser.empty():
'''pause'''
self._pauser.get()
'''resume:
prohibit from blocking by putting None into _pauser.
'''
self._pauser.put_nowait(None)
if self._parser.mode == 'LIVE':
continuation = liveparam.getparam(self.video_id,3)
return continuation
def _get_contents(self, continuation, session, headers):
'''Get 'contents' dict from livechat json.
If contents is None at first fetching,
try to fetch archive chat data.
Return:
-------
'contents' dict which includes metadata & chatdata.
'''
livechat_json = (
self._get_livechat_json(continuation, session, headers)
)
contents = self._parser.get_contents(livechat_json)
if self._first_fetch:
if contents is None:
'''Try to fetch archive chat data.'''
self._parser.mode = 'REPLAY'
self._fetch_url = ("live_chat_replay/"
"get_live_chat_replay?continuation=")
continuation = arcparam.getparam(self.video_id, self.seektime)
livechat_json = ( self._get_livechat_json(
continuation, session, headers))
contents = self._parser.get_contents(livechat_json)
self._first_fetch = False
return contents
def _get_livechat_json(self, continuation, session, headers): def _get_livechat_json(self, continuation, session, headers):
''' '''
チャットデータが格納されたjsonデータを取得する。 Get json which includes chat data.
''' '''
continuation = urllib.parse.quote(continuation) continuation = urllib.parse.quote(continuation)
livechat_json = None livechat_json = None
status_code = 0 status_code = 0
url =( url =(
f"https://www.youtube.com/live_chat/get_live_chat?" f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1")
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1): for _ in range(MAX_RETRY + 1):
with session.get(url ,headers = headers) as resp: with session.get(url ,headers = headers) as resp:
try: try:
text = resp.text text = resp.text
status_code = resp.status_code
livechat_json = json.loads(text) livechat_json = json.loads(text)
break break
except json.JSONDecodeError : except json.JSONDecodeError :
@@ -210,7 +235,7 @@ class LiveChat:
else: else:
logger.error(f"[{self.video_id}]" logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}") f"Exceeded retry count. status_code={status_code}")
self.terminate() #self.terminate()
return None return None
return livechat_json return livechat_json
@@ -241,6 +266,21 @@ class LiveChat:
raise IllegalFunctionCall( raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。") "既にcallbackを登録済みのため、get()は実行できません。")
def get_mode(self):
return self._parser.mode
def pause(self):
if self._callback is None:
return
if not self._pauser.empty():
self._pauser.get()
def resume(self):
if self._callback is None:
return
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self): def is_alive(self):
return self._is_alive return self._is_alive
@@ -259,10 +299,10 @@ class LiveChat:
if self._direct_mode == False: if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる #bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put({'chatdata':'','timeout':1}) self._buffer.put({'chatdata':'','timeout':1})
logger.info(f'[{self.video_id}]終了しました') logger.info(f'[{self.video_id}]finished.')
@classmethod @classmethod
def shutdown(cls, event, sig = None, handler=None): def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています") logger.debug("shutdown...")
for t in LiveChat._listeners: for t in LiveChat._listeners:
t._is_alive = False t._is_alive = False

View File

@@ -139,7 +139,7 @@ class ReplayChat:
def _get_initial_continuation(self): def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。''' ''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try: try:
initial_continuation = arcparam.get(self.video_id,self.seektime) initial_continuation = arcparam.getparam(self.video_id,self.seektime)
except ChatParseException as e: except ChatParseException as e:
self.terminate() self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}") logger.debug(f"[{self.video_id}]Error:{str(e)}")

View File

@@ -65,7 +65,7 @@ def _tzparity(video_id,times):
return ((times^t) % 2).to_bytes(1,'big') return ((times^t) % 2).to_bytes(1,'big')
def get(video_id, seektime = 0, topchatonly = False): def _build(video_id, seektime, topchatonly = False):
switch_01 = b'\x04' if topchatonly else b'\x01' switch_01 = b'\x04' if topchatonly else b'\x01'
@@ -116,5 +116,12 @@ def get(video_id, seektime = 0, topchatonly = False):
).decode() ).decode()
) )
def getparam(video_id, seektime = 0):
'''
Parameter
---------
seektime : int
unit:seconds
start position of fetching chat data.
'''
return _build(video_id, seektime)

View File

@@ -155,7 +155,7 @@ def _times(past_sec):
return list(map(lambda x:int(x*1000000),[_ts1,_ts2,_ts3,_ts4,_ts5])) return list(map(lambda x:int(x*1000000),[_ts1,_ts2,_ts3,_ts4,_ts5]))
def getparam(video_id,past_sec = 60): def getparam(video_id,past_sec = 0):
''' '''
Parameter Parameter
--------- ---------

View File

@@ -9,57 +9,87 @@ from .. import config
from .. exceptions import ( from .. exceptions import (
ResponseContextError, ResponseContextError,
NoContentsException, NoContentsException,
NoContinuationsException ) NoContinuationsException,
ChatParseException )
logger = config.logger(__name__) logger = config.logger(__name__)
from .. import util
class Parser: class Parser:
def parse(self, jsn):
def __init__(self):
self.mode = 'LIVE'
def get_contents(self, jsn):
if jsn is None:
raise ChatParseException('Called with none JSON object.')
if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError('The video_id would be wrong, or video is deleted or private.')
contents=jsn['response'].get('continuationContents')
return contents
def parse(self, contents):
""" """
このparse関数はLiveChat._listen() 関数から定期的に呼び出される。 このparse関数はLiveChat._listen() 関数から定期的に呼び出される。
引数jsnはYoutubeから取得したチャットデータの生JSONであり、 引数contentsはYoutubeから取得したチャットデータの生JSONであり、
このparse関数によって与えられたJSONを以下に分割して返す。 与えられたJSONをチャットデータとメタデータに分割して返す。
+ timeout (次のチャットデータ取得までのインターバル)
+ chat dataチャットデータ本体
+ continuation (次のチャットデータ取得に必要となるパラメータ).
Parameter Parameter
---------- ----------
+ jsn : dict + contents : dict
+ Youtubeから取得したチャットデータのJSONオブジェクト。 + Youtubeから取得したチャットデータのJSONオブジェクト。
pythonの辞書形式に変換済みの状態で渡される pythonの辞書形式に変換済みの状態で渡される
Returns Returns
------- -------
+ metadata : dict tuple:
+ チャットデータに付随するメタデータ。timeout、 動画ID、continuationパラメータで構成される。 + metadata : dict  チャットデータに付随するメタデータ
+ timeout
+ video_id
+ continuation
+ chatdata : list[dict] + chatdata : list[dict]
+ チャットデータ本体のリスト。     チャットデータ本体のリスト。
""" """
if jsn is None:
return {'timeoutMs':0,'continuation':None},[]
if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError('動画に接続できません。'
'動画IDが間違っているか、動画が削除非公開の可能性があります。')
contents=jsn['response'].get('continuationContents')
#配信が終了した場合、もしくはチャットデータが取得できない場合
if contents is None: if contents is None:
raise NoContentsException('チャットデータ取得できませんでした。') '''配信が終了した場合、もしくはチャットデータ取得できない場合'''
raise NoContentsException('Chat data stream is empty.')
cont = contents['liveChatContinuation']['continuations'][0] cont = contents['liveChatContinuation']['continuations'][0]
if cont is None: if cont is None:
raise NoContinuationsException('Continuationがありません。') raise NoContinuationsException('No Continuation')
metadata = (cont.get('invalidationContinuationData') or metadata = (cont.get('invalidationContinuationData') or
cont.get('timedContinuationData') or cont.get('timedContinuationData') or
cont.get('reloadContinuationData') cont.get('reloadContinuationData') or
cont.get('liveChatReplayContinuationData')
) )
if metadata is None: if metadata is None:
if cont.get("playerSeekContinuationData"):
raise ChatParseException('Finished chat data')
unknown = list(cont.keys())[0] unknown = list(cont.keys())[0]
if unknown: if unknown:
logger.debug(f"Received unknown continuation type:{unknown}") logger.debug(f"Received unknown continuation type:{unknown}")
metadata = cont.get(unknown) metadata = cont.get(unknown)
metadata.setdefault('timeoutMs', 10000) else:
raise ChatParseException('Cannot extract continuation data')
return self._create_data(metadata, contents)
def _create_data(self, metadata, contents):
chatdata = contents['liveChatContinuation'].get('actions') chatdata = contents['liveChatContinuation'].get('actions')
if self.mode == 'LIVE':
metadata.setdefault('timeoutMs', 10000)
else:
interval = self._get_interval(chatdata)
metadata.setdefault("timeoutMs",interval)
"""アーカイブ済みチャットはライブチャットと構造が異なっているため、以下の行により
ライブチャットと同じ形式にそろえる"""
chatdata = [action["replayChatItemAction"]["actions"][0] for action in chatdata]
return metadata, chatdata return metadata, chatdata
def _get_interval(self, actions: list):
if actions is None:
return 0
start = int(actions[0]["replayChatItemAction"]["videoOffsetTimeMsec"])
last = int(actions[-1]["replayChatItemAction"]["videoOffsetTimeMsec"])
return (last - start)

View File

@@ -5,16 +5,16 @@ import requests, json
from pytchat.paramgen import arcparam from pytchat.paramgen import arcparam
def test_arcparam_0(mocker): def test_arcparam_0(mocker):
param = arcparam.get("01234567890") param = arcparam.getparam("01234567890")
assert "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoATAAOABAAEgEUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAXIECAEQAXgA" == param assert "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoATAAOABAAEgEUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAXIECAEQAXgA" == param
def test_arcparam_1(mocker): def test_arcparam_1(mocker):
param = arcparam.get("01234567890", seektime = 100000) param = arcparam.getparam("01234567890", seektime = 100000)
assert "op2w0wR3GjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QogNDbw_QCMAA4AEAASANSHAgAEAAYACAAKg5zdGF0aWNjaGVja3N1bUAAWANgAWgBcgQIARABeAA%3D" == param assert "op2w0wR3GjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QogNDbw_QCMAA4AEAASANSHAgAEAAYACAAKg5zdGF0aWNjaGVja3N1bUAAWANgAWgBcgQIARABeAA%3D" == param
def test_arcparam_2(mocker): def test_arcparam_2(mocker):
param = arcparam.get("SsjCnHOk-Sk") param = arcparam.getparam("SsjCnHOk-Sk")
url=f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1" url=f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1"
resp = requests.Session().get(url,headers = config.headers) resp = requests.Session().get(url,headers = config.headers)
jsn = json.loads(resp.text) jsn = json.loads(resp.text)

View File

@@ -20,7 +20,7 @@ def test_textmessage(mocker):
_json = _open_file("tests/testdata/compatible/textmessage.json") _json = _open_file("tests/testdata/compatible/textmessage.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 7, "timeout" : 7,
@@ -57,7 +57,7 @@ def test_newsponcer(mocker):
_json = _open_file("tests/testdata/compatible/newSponsor.json") _json = _open_file("tests/testdata/compatible/newSponsor.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 7, "timeout" : 7,
@@ -93,7 +93,7 @@ def test_superchat(mocker):
_json = _open_file("tests/testdata/compatible/superchat.json") _json = _open_file("tests/testdata/compatible/superchat.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 7, "timeout" : 7,

View File

@@ -21,7 +21,7 @@ def test_finishedlive(*mock):
_text = json.loads(_text) _text = json.loads(_text)
try: try:
parser.parse(_text) parser.parse(parser.get_contents(_text))
assert False assert False
except NoContentsException: except NoContentsException:
assert True assert True
@@ -34,7 +34,7 @@ def test_parsejson(*mock):
_text = json.loads(_text) _text = json.loads(_text)
try: try:
parser.parse(_text) parser.parse(parser.get_contents(_text))
jsn = _text jsn = _text
timeout = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["timeoutMs"] timeout = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["timeoutMs"]
continuation = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["continuation"] continuation = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["continuation"]

View File

@@ -21,7 +21,7 @@ def test_speed_1(mocker):
_json = _open_file("tests/testdata/speed/speedtest_normal.json") _json = _open_file("tests/testdata/speed/speedtest_normal.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 10, "timeout" : 10,
@@ -37,7 +37,7 @@ def test_speed_2(mocker):
_json = _open_file("tests/testdata/speed/speedtest_undefined.json") _json = _open_file("tests/testdata/speed/speedtest_undefined.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 10, "timeout" : 10,
@@ -53,7 +53,7 @@ def test_speed_3(mocker):
_json = _open_file("tests/testdata/speed/speedtest_empty.json") _json = _open_file("tests/testdata/speed/speedtest_empty.json")
_, chatdata = parser.parse(json.loads(_json)) _, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = { data = {
"video_id" : "", "video_id" : "",
"timeout" : 10, "timeout" : 10,