Merge tag 'termination' into develop

This commit is contained in:
taizan-hokuto
2020-05-05 21:18:46 +09:00
3 changed files with 166 additions and 173 deletions

View File

@@ -11,8 +11,8 @@ from queue import Queue
from .buffer import Buffer from .buffer import Buffer
from ..parser.live import Parser from ..parser.live import Parser
from .. import config from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall from ..exceptions import ChatParseException, IllegalFunctionCall
from ..paramgen import liveparam, arcparam from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator from ..processors.combinator import Combinator
@@ -27,7 +27,7 @@ class LiveChat:
--------- ---------
video_id : str video_id : str
動画ID 動画ID
seektime : int seektime : int
(ライブチャット取得時は無視) (ライブチャット取得時は無視)
取得開始するアーカイブ済みチャットの経過時間(秒) 取得開始するアーカイブ済みチャットの経過時間(秒)
@@ -61,7 +61,7 @@ class LiveChat:
topchat_only : bool topchat_only : bool
Trueの場合、上位チャットのみ取得する。 Trueの場合、上位チャットのみ取得する。
Attributes Attributes
--------- ---------
_executor : ThreadPoolExecutor _executor : ThreadPoolExecutor
@@ -72,22 +72,20 @@ class LiveChat:
''' '''
_setup_finished = False _setup_finished = False
#チャット監視中のListenerのリスト
_listeners = []
def __init__(self, video_id, def __init__(self, video_id,
seektime = 0, seektime=0,
processor = DefaultProcessor(), processor=DefaultProcessor(),
buffer = None, buffer=None,
interruptable = True, interruptable=True,
callback = None, callback=None,
done_callback = None, done_callback=None,
direct_mode = False, direct_mode=False,
force_replay = False, force_replay=False,
topchat_only = False, topchat_only=False,
logger = config.logger(__name__) logger=config.logger(__name__)
): ):
self.video_id = video_id self.video_id = video_id
self.seektime = seektime self.seektime = seektime
if isinstance(processor, tuple): if isinstance(processor, tuple):
self.processor = Combinator(processor) self.processor = Combinator(processor)
@@ -98,57 +96,51 @@ class LiveChat:
self._done_callback = done_callback self._done_callback = done_callback
self._executor = ThreadPoolExecutor(max_workers=2) self._executor = ThreadPoolExecutor(max_workers=2)
self._direct_mode = direct_mode self._direct_mode = direct_mode
self._is_alive = True self._is_alive = True
self._is_replay = force_replay self._is_replay = force_replay
self._parser = Parser(is_replay = self._is_replay) self._parser = Parser(is_replay=self._is_replay)
self._pauser = Queue() self._pauser = Queue()
self._pauser.put_nowait(None) self._pauser.put_nowait(None)
self._setup()
self._first_fetch = True self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation=" self._fetch_url = "live_chat/get_live_chat?continuation="
self._topchat_only = topchat_only self._topchat_only = topchat_only
self._logger = logger self._logger = logger
LiveChat._logger = logger if interruptable:
if not LiveChat._setup_finished: signal.signal(signal.SIGINT, lambda a, b: self.terminate())
LiveChat._setup_finished = True self._setup()
if interruptable:
signal.signal(signal.SIGINT, (lambda a, b:
(LiveChat.shutdown(None,signal.SIGINT,b))
))
LiveChat._listeners.append(self)
def _setup(self): def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。 # direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode: if self._direct_mode:
if self._callback is None: if self._callback is None:
raise IllegalFunctionCall( raise IllegalFunctionCall(
"When direct_mode=True, callback parameter is required.") "When direct_mode=True, callback parameter is required.")
else: else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 # direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None: if self._buffer is None:
self._buffer = Buffer(maxsize = 20) self._buffer = Buffer(maxsize=20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成 # callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None: if self._callback is None:
pass pass
else: else:
#callbackを呼ぶループタスクの開始 # callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop,self._callback) self._executor.submit(self._callback_loop, self._callback)
#_listenループタスクの開始 # _listenループタスクの開始
listen_task = self._executor.submit(self._startlisten) listen_task = self._executor.submit(self._startlisten)
#add_done_callbackの登録 # add_done_callbackの登録
if self._done_callback is None: if self._done_callback is None:
listen_task.add_done_callback(self.finish) listen_task.add_done_callback(self.finish)
else: else:
listen_task.add_done_callback(self._done_callback) listen_task.add_done_callback(self._done_callback)
def _startlisten(self): def _startlisten(self):
time.sleep(0.1) #sleep shortly to prohibit skipping fetching data time.sleep(0.1) # sleep shortly to prohibit skipping fetching data
"""Fetch first continuation parameter, """Fetch first continuation parameter,
create and start _listen loop. create and start _listen loop.
""" """
initial_continuation = liveparam.getparam(self.video_id,3) initial_continuation = liveparam.getparam(self.video_id, 3)
self._listen(initial_continuation) self._listen(initial_continuation)
def _listen(self, continuation): def _listen(self, continuation):
''' Fetch chat data and store them into buffer, ''' Fetch chat data and store them into buffer,
get next continuaiton parameter and loop. get next continuaiton parameter and loop.
@@ -164,33 +156,34 @@ class LiveChat:
continuation = self._check_pause(continuation) continuation = self._check_pause(continuation)
contents = self._get_contents( contents = self._get_contents(
continuation, session, headers) continuation, session, headers)
metadata, chatdata = self._parser.parse(contents) metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs']/1000 timeout = metadata['timeoutMs']/1000
chat_component = { chat_component = {
"video_id" : self.video_id, "video_id": self.video_id,
"timeout" : timeout, "timeout": timeout,
"chatdata" : chatdata "chatdata": chatdata
} }
time_mark =time.time() time_mark = time.time()
if self._direct_mode: if self._direct_mode:
processed_chat = self.processor.process([chat_component]) processed_chat = self.processor.process(
if isinstance(processed_chat,tuple): [chat_component])
if isinstance(processed_chat, tuple):
self._callback(*processed_chat) self._callback(*processed_chat)
else: else:
self._callback(processed_chat) self._callback(processed_chat)
else: else:
self._buffer.put(chat_component) self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark) diff_time = timeout - (time.time()-time_mark)
time.sleep(diff_time if diff_time > 0 else 0) time.sleep(diff_time if diff_time > 0 else 0)
continuation = metadata.get('continuation') continuation = metadata.get('continuation')
except ChatParseException as e: except ChatParseException as e:
self._logger.debug(f"[{self.video_id}]{str(e)}") self._logger.debug(f"[{self.video_id}]{str(e)}")
return return
except (TypeError , json.JSONDecodeError) : except (TypeError, json.JSONDecodeError):
self._logger.error(f"{traceback.format_exc(limit = -1)}") self._logger.error(f"{traceback.format_exc(limit = -1)}")
return return
self._logger.debug(f"[{self.video_id}]finished fetching chat.") self._logger.debug(f"[{self.video_id}]finished fetching chat.")
def _check_pause(self, continuation): def _check_pause(self, continuation):
@@ -202,7 +195,7 @@ class LiveChat:
''' '''
self._pauser.put_nowait(None) self._pauser.put_nowait(None)
if not self._is_replay: if not self._is_replay:
continuation = liveparam.getparam(self.video_id,3) continuation = liveparam.getparam(self.video_id, 3)
return continuation return continuation
def _get_contents(self, continuation, session, headers): def _get_contents(self, continuation, session, headers):
@@ -214,7 +207,7 @@ class LiveChat:
------- -------
'continuationContents' which includes metadata & chat data. 'continuationContents' which includes metadata & chat data.
''' '''
livechat_json = ( livechat_json = (
self._get_livechat_json(continuation, session, headers) self._get_livechat_json(continuation, session, headers)
) )
contents = self._parser.get_contents(livechat_json) contents = self._parser.get_contents(livechat_json)
@@ -225,7 +218,7 @@ class LiveChat:
self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation=" self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation="
continuation = arcparam.getparam( continuation = arcparam.getparam(
self.video_id, self.seektime, self._topchat_only) self.video_id, self.seektime, self._topchat_only)
livechat_json = ( self._get_livechat_json( livechat_json = (self._get_livechat_json(
continuation, session, headers)) continuation, session, headers))
reload_continuation = self._parser.reload_continuation( reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json)) self._parser.get_contents(livechat_json))
@@ -244,26 +237,26 @@ class LiveChat:
continuation = urllib.parse.quote(continuation) continuation = urllib.parse.quote(continuation)
livechat_json = None livechat_json = None
status_code = 0 status_code = 0
url =f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1" url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
for _ in range(MAX_RETRY + 1): for _ in range(MAX_RETRY + 1):
with session.get(url ,headers = headers) as resp: with session.get(url, headers=headers) as resp:
try: try:
text = resp.text text = resp.text
livechat_json = json.loads(text) livechat_json = json.loads(text)
break break
except json.JSONDecodeError : except json.JSONDecodeError:
time.sleep(1) time.sleep(1)
continue continue
else: else:
self._logger.error(f"[{self.video_id}]" self._logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}") f"Exceeded retry count. status_code={status_code}")
return None return None
return livechat_json return livechat_json
def _callback_loop(self,callback): def _callback_loop(self, callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで """ コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。 callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter Parameter
--------- ---------
callback : func callback : func
@@ -280,13 +273,13 @@ class LiveChat:
def get(self): def get(self):
""" bufferからデータを取り出し、processorに投げ、 """ bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。 加工済みのチャットデータを返す。
Returns Returns
: Processorによって加工されたチャットデータ : Processorによって加工されたチャットデータ
""" """
if self._callback is None: if self._callback is None:
items = self._buffer.get() items = self._buffer.get()
return self.processor.process(items) return self.processor.process(items)
raise IllegalFunctionCall( raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。") "既にcallbackを登録済みのため、get()は実行できません。")
@@ -304,13 +297,13 @@ class LiveChat:
return return
if self._pauser.empty(): if self._pauser.empty():
self._pauser.put_nowait(None) self._pauser.put_nowait(None)
def is_alive(self): def is_alive(self):
return self._is_alive return self._is_alive
def finish(self,sender): def finish(self, sender):
'''Listener終了時のコールバック''' '''Listener終了時のコールバック'''
try: try:
self.terminate() self.terminate()
except CancelledError: except CancelledError:
self._logger.debug(f'[{self.video_id}]cancelled:{sender}') self._logger.debug(f'[{self.video_id}]cancelled:{sender}')
@@ -319,14 +312,7 @@ class LiveChat:
''' '''
Listenerを終了する。 Listenerを終了する。
''' '''
self._is_alive = False if self.is_alive():
if self._direct_mode == False: self._is_alive = False
#bufferにダミーオブジェクトを入れてis_alive()を判定させる self._buffer.put({})
self._buffer.put({'chatdata':'','timeout':0}) self._logger.info(f'[{self.video_id}]終了しました')
self._logger.info(f'[{self.video_id}]finished.')
@classmethod
def shutdown(cls, event, sig = None, handler=None):
cls._logger.debug("shutdown...")
for t in LiveChat._listeners:
t._is_alive = False

View File

@@ -12,6 +12,7 @@ Author: taizan-hokuto (2019) @taizan205
ver 0.0.1 2019.10.05 ver 0.0.1 2019.10.05
''' '''
def _gen_vid(video_id): def _gen_vid(video_id):
"""generate video_id parameter. """generate video_id parameter.
Parameter Parameter
@@ -23,7 +24,7 @@ def _gen_vid(video_id):
bytes : base64 encoded video_id parameter. bytes : base64 encoded video_id parameter.
""" """
header_magic = b'\x0A\x0F\x1A\x0D\x0A' header_magic = b'\x0A\x0F\x1A\x0D\x0A'
header_id = video_id.encode() header_id = video_id.encode()
header_sep_1 = b'\x1A\x13\xEA\xA8\xDD\xB9\x01\x0D\x0A\x0B' header_sep_1 = b'\x1A\x13\xEA\xA8\xDD\xB9\x01\x0D\x0A\x0B'
header_terminator = b'\x20\x01' header_terminator = b'\x20\x01'
@@ -40,42 +41,46 @@ def _gen_vid(video_id):
b64enc(reduce(lambda x, y: x+y, item)).decode() b64enc(reduce(lambda x, y: x+y, item)).decode()
).encode() ).encode()
def _nval(val): def _nval(val):
"""convert value to byte array""" """convert value to byte array"""
if val<0: raise ValueError if val < 0:
raise ValueError
buf = b'' buf = b''
while val >> 7: while val >> 7:
m = val & 0xFF | 0x80 m = val & 0xFF | 0x80
buf += m.to_bytes(1,'big') buf += m.to_bytes(1, 'big')
val >>= 7 val >>= 7
buf += val.to_bytes(1,'big') buf += val.to_bytes(1, 'big')
return buf return buf
def _build(video_id, seektime, topchat_only): def _build(video_id, seektime, topchat_only):
switch_01 = b'\x04' if topchat_only else b'\x01' switch_01 = b'\x04' if topchat_only else b'\x01'
if seektime < 0: if seektime < 0:
times =_nval(0) times = _nval(0)
switch = b'\x04' switch = b'\x04'
elif seektime == 0: elif seektime == 0:
times =_nval(1) times = _nval(1)
switch = b'\x03' switch = b'\x03'
else: else:
times =_nval(int(seektime*1000000)) times = _nval(int(seektime*1000000))
switch = b'\x03' switch = b'\x03'
parity = b'\x00' parity = b'\x00'
header_magic= b'\xA2\x9D\xB0\xD3\x04' header_magic = b'\xA2\x9D\xB0\xD3\x04'
sep_0 = b'\x1A' sep_0 = b'\x1A'
vid = _gen_vid(video_id) vid = _gen_vid(video_id)
time_tag = b'\x28' time_tag = b'\x28'
timestamp1 = times timestamp1 = times
sep_1 = b'\x30\x00\x38\x00\x40\x00\x48' sep_1 = b'\x30\x00\x38\x00\x40\x00\x48'
sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00' sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40' chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40'
sep_3 = b'\x00\x58\x03\x60' sep_3 = b'\x00\x58\x03\x60'
sep_4 = b'\x68' + parity + b'\x72\x04\x08' sep_4 = b'\x68' + parity + b'\x72\x04\x08'
sep_5 = b'\x10' + parity + b'\x78\x00' sep_5 = b'\x10' + parity + b'\x78\x00'
body = [
body = b''.join([
sep_0, sep_0,
_nval(len(vid)), _nval(len(vid)),
vid, vid,
@@ -90,18 +95,17 @@ def _build(video_id, seektime, topchat_only):
sep_4, sep_4,
switch_01, switch_01,
sep_5 sep_5
] ])
body = reduce(lambda x, y: x+y, body) return urllib.parse.quote(
b64enc(header_magic +
return urllib.parse.quote( _nval(len(body)) +
b64enc( header_magic + body
_nval(len(body)) + ).decode()
body )
).decode()
)
def getparam(video_id, seektime = 0, topchat_only = False):
def getparam(video_id, seektime=0, topchat_only=False):
''' '''
Parameter Parameter
--------- ---------

View File

@@ -11,6 +11,8 @@ Author: taizan-hokuto (2019) @taizan205
ver 0.0.1 2019.10.05 ver 0.0.1 2019.10.05
''' '''
def _gen_vid(video_id): def _gen_vid(video_id):
"""generate video_id parameter. """generate video_id parameter.
Parameter Parameter
@@ -22,11 +24,11 @@ def _gen_vid(video_id):
byte[] : base64 encoded video_id parameter. byte[] : base64 encoded video_id parameter.
""" """
header_magic = b'\x0A\x0F\x0A\x0D\x0A' header_magic = b'\x0A\x0F\x0A\x0D\x0A'
header_id = video_id.encode() header_id = video_id.encode()
header_sep_1 = b'\x1A' header_sep_1 = b'\x1A'
header_sep_2 = b'\x43\xAA\xB9\xC1\xBD\x01\x3D\x0A' header_sep_2 = b'\x43\xAA\xB9\xC1\xBD\x01\x3D\x0A'
header_suburl = ('https://www.youtube.com/live_chat?v=' header_suburl = ('https://www.youtube.com/live_chat?v='
f'{video_id}&is_popout=1').encode() f'{video_id}&is_popout=1').encode()
header_terminator = b'\x20\x02' header_terminator = b'\x20\x02'
item = [ item = [
@@ -44,62 +46,66 @@ def _gen_vid(video_id):
b64enc(reduce(lambda x, y: x+y, item)).decode() b64enc(reduce(lambda x, y: x+y, item)).decode()
).encode() ).encode()
def _tzparity(video_id,times):
t=0 def _tzparity(video_id, times):
for i,s in enumerate(video_id): t = 0
for i, s in enumerate(video_id):
ss = ord(s) ss = ord(s)
if(ss % 2 == 0): if(ss % 2 == 0):
t += ss*(12-i) t += ss*(12-i)
else: else:
t ^= ss*i t ^= ss*i
return ((times^t) % 2).to_bytes(1,'big') return ((times ^ t) % 2).to_bytes(1, 'big')
def _nval(val): def _nval(val):
"""convert value to byte array""" """convert value to byte array"""
if val<0: raise ValueError if val < 0:
raise ValueError
buf = b'' buf = b''
while val >> 7: while val >> 7:
m = val & 0xFF | 0x80 m = val & 0xFF | 0x80
buf += m.to_bytes(1,'big') buf += m.to_bytes(1, 'big')
val >>= 7 val >>= 7
buf += val.to_bytes(1,'big') buf += val.to_bytes(1, 'big')
return buf return buf
def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchat_only): def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchat_only):
#_short_type2 # _short_type2
switch_01 = b'\x04' if topchat_only else b'\x01' switch_01 = b'\x04' if topchat_only else b'\x01'
parity = _tzparity(video_id, _ts1^_ts2^_ts3^_ts4^_ts5) parity = _tzparity(video_id, _ts1 ^ _ts2 ^ _ts3 ^ _ts4 ^ _ts5)
header_magic= b'\xD2\x87\xCC\xC8\x03' header_magic = b'\xD2\x87\xCC\xC8\x03'
sep_0 = b'\x1A' sep_0 = b'\x1A'
vid = _gen_vid(video_id) vid = _gen_vid(video_id)
time_tag = b'\x28' time_tag = b'\x28'
timestamp1 = _nval(_ts1) timestamp1 = _nval(_ts1)
sep_1 = b'\x30\x00\x38\x00\x40\x02\x4A' sep_1 = b'\x30\x00\x38\x00\x40\x02\x4A'
un_len = b'\x2B' un_len = b'\x2B'
sep_2 = b'\x08'+parity+b'\x10\x00\x18\x00\x20\x00' sep_2 = b'\x08'+parity+b'\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D' chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D'
sep_3 = b'\x3A\x00\x40\x00\x4A' sep_3 = b'\x3A\x00\x40\x00\x4A'
sep_4_len = b'\x02' sep_4_len = b'\x02'
sep_4 = b'\x08\x01' sep_4 = b'\x08\x01'
ts_2_start = b'\x50' ts_2_start = b'\x50'
timestamp2 = _nval(_ts2) timestamp2 = _nval(_ts2)
ts_2_end = b'\x58' ts_2_end = b'\x58'
sep_5 = b'\x03' sep_5 = b'\x03'
ts_3_start = b'\x50' ts_3_start = b'\x50'
timestamp3 = _nval(_ts3) timestamp3 = _nval(_ts3)
ts_3_end = b'\x58' ts_3_end = b'\x58'
timestamp4 = _nval(_ts4) timestamp4 = _nval(_ts4)
sep_6 = b'\x68' sep_6 = b'\x68'
#switch # switch
sep_7 = b'\x82\x01\x04\x08' sep_7 = b'\x82\x01\x04\x08'
#switch # switch
sep_8 = b'\x10\x00' sep_8 = b'\x10\x00'
sep_9 = b'\x88\x01\x00\xA0\x01' sep_9 = b'\x88\x01\x00\xA0\x01'
timestamp5 = _nval(_ts5) timestamp5 = _nval(_ts5)
body = [ body = b''.join([
sep_0, sep_0,
_nval(len(vid)), _nval(len(vid)),
vid, vid,
@@ -121,37 +127,35 @@ def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchat_only):
ts_3_end, ts_3_end,
timestamp4, timestamp4,
sep_6, sep_6,
switch_01,# switch_01,
sep_7, sep_7,
switch_01,# switch_01,
sep_8, sep_8,
sep_9, sep_9,
timestamp5 timestamp5
] ])
return urllib.parse.quote(
b64enc(header_magic +
_nval(len(body)) +
body
).decode()
)
body = reduce(lambda x, y: x+y, body)
return urllib.parse.quote(
b64enc( header_magic +
_nval(len(body)) +
body
).decode()
)
def _times(past_sec): def _times(past_sec):
n = int(time.time()) n = int(time.time())
_ts1= n - random.uniform(0,1*3) _ts1 = n - random.uniform(0, 1*3)
_ts2= n - random.uniform(0.01,0.99) _ts2 = n - random.uniform(0.01, 0.99)
_ts3= n - past_sec + random.uniform(0,1) _ts3 = n - past_sec + random.uniform(0, 1)
_ts4= n - random.uniform(10*60,60*60) _ts4 = n - random.uniform(10*60, 60*60)
_ts5= n - random.uniform(0.01,0.99) _ts5 = n - random.uniform(0.01, 0.99)
return list(map(lambda x:int(x*1000000),[_ts1,_ts2,_ts3,_ts4,_ts5])) return list(map(lambda x: int(x*1000000), [_ts1, _ts2, _ts3, _ts4, _ts5]))
def getparam(video_id, past_sec = 0, topchat_only = False): def getparam(video_id, past_sec=0, topchat_only=False):
''' '''
Parameter Parameter
--------- ---------
@@ -160,5 +164,4 @@ def getparam(video_id, past_sec = 0, topchat_only = False):
topchat_only : bool topchat_only : bool
if True, fetch only 'top chat' if True, fetch only 'top chat'
''' '''
return _build(video_id,*_times(past_sec),topchat_only) return _build(video_id, *_times(past_sec), topchat_only)