Merge branch 'hotfix/termination'

This commit is contained in:
taizan-hokuto
2020-05-05 21:18:46 +09:00
3 changed files with 166 additions and 173 deletions

View File

@@ -11,8 +11,8 @@ from queue import Queue
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam, arcparam
from ..exceptions import ChatParseException, IllegalFunctionCall
from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
@@ -27,7 +27,7 @@ class LiveChat:
---------
video_id : str
動画ID
seektime : int
(ライブチャット取得時は無視)
取得開始するアーカイブ済みチャットの経過時間(秒)
@@ -61,7 +61,7 @@ class LiveChat:
topchat_only : bool
Trueの場合、上位チャットのみ取得する。
Attributes
---------
_executor : ThreadPoolExecutor
@@ -72,22 +72,20 @@ class LiveChat:
'''
_setup_finished = False
#チャット監視中のListenerのリスト
_listeners = []
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
callback = None,
done_callback = None,
direct_mode = False,
force_replay = False,
topchat_only = False,
logger = config.logger(__name__)
):
self.video_id = video_id
seektime=0,
processor=DefaultProcessor(),
buffer=None,
interruptable=True,
callback=None,
done_callback=None,
direct_mode=False,
force_replay=False,
topchat_only=False,
logger=config.logger(__name__)
):
self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
@@ -98,57 +96,51 @@ class LiveChat:
self._done_callback = done_callback
self._executor = ThreadPoolExecutor(max_workers=2)
self._direct_mode = direct_mode
self._is_alive = True
self._is_alive = True
self._is_replay = force_replay
self._parser = Parser(is_replay = self._is_replay)
self._parser = Parser(is_replay=self._is_replay)
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation="
self._topchat_only = topchat_only
self._logger = logger
LiveChat._logger = logger
if not LiveChat._setup_finished:
LiveChat._setup_finished = True
if interruptable:
signal.signal(signal.SIGINT, (lambda a, b:
(LiveChat.shutdown(None,signal.SIGINT,b))
))
LiveChat._listeners.append(self)
if interruptable:
signal.signal(signal.SIGINT, lambda a, b: self.terminate())
self._setup()
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
# direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"When direct_mode=True, callback parameter is required.")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
# direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
self._buffer = Buffer(maxsize=20)
# callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
pass
else:
#callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop,self._callback)
#_listenループタスクの開始
# callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop, self._callback)
# _listenループタスクの開始
listen_task = self._executor.submit(self._startlisten)
#add_done_callbackの登録
# add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
def _startlisten(self):
time.sleep(0.1) #sleep shortly to prohibit skipping fetching data
time.sleep(0.1) # sleep shortly to prohibit skipping fetching data
"""Fetch first continuation parameter,
create and start _listen loop.
"""
initial_continuation = liveparam.getparam(self.video_id,3)
initial_continuation = liveparam.getparam(self.video_id, 3)
self._listen(initial_continuation)
def _listen(self, continuation):
''' Fetch chat data and store them into buffer,
get next continuaiton parameter and loop.
@@ -164,33 +156,34 @@ class LiveChat:
continuation = self._check_pause(continuation)
contents = self._get_contents(
continuation, session, headers)
metadata, chatdata = self._parser.parse(contents)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
"video_id": self.video_id,
"timeout": timeout,
"chatdata": chatdata
}
time_mark =time.time()
time_mark = time.time()
if self._direct_mode:
processed_chat = self.processor.process([chat_component])
if isinstance(processed_chat,tuple):
processed_chat = self.processor.process(
[chat_component])
if isinstance(processed_chat, tuple):
self._callback(*processed_chat)
else:
self._callback(processed_chat)
else:
self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
time.sleep(diff_time if diff_time > 0 else 0)
continuation = metadata.get('continuation')
time.sleep(diff_time if diff_time > 0 else 0)
continuation = metadata.get('continuation')
except ChatParseException as e:
self._logger.debug(f"[{self.video_id}]{str(e)}")
return
except (TypeError , json.JSONDecodeError) :
return
except (TypeError, json.JSONDecodeError):
self._logger.error(f"{traceback.format_exc(limit = -1)}")
return
self._logger.debug(f"[{self.video_id}]finished fetching chat.")
def _check_pause(self, continuation):
@@ -202,7 +195,7 @@ class LiveChat:
'''
self._pauser.put_nowait(None)
if not self._is_replay:
continuation = liveparam.getparam(self.video_id,3)
continuation = liveparam.getparam(self.video_id, 3)
return continuation
def _get_contents(self, continuation, session, headers):
@@ -214,7 +207,7 @@ class LiveChat:
-------
'continuationContents' which includes metadata & chat data.
'''
livechat_json = (
livechat_json = (
self._get_livechat_json(continuation, session, headers)
)
contents = self._parser.get_contents(livechat_json)
@@ -225,7 +218,7 @@ class LiveChat:
self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation="
continuation = arcparam.getparam(
self.video_id, self.seektime, self._topchat_only)
livechat_json = ( self._get_livechat_json(
livechat_json = (self._get_livechat_json(
continuation, session, headers))
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json))
@@ -244,26 +237,26 @@ class LiveChat:
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
for _ in range(MAX_RETRY + 1):
with session.get(url ,headers = headers) as resp:
with session.get(url, headers=headers) as resp:
try:
text = resp.text
livechat_json = json.loads(text)
break
except json.JSONDecodeError :
except json.JSONDecodeError:
time.sleep(1)
continue
else:
self._logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
def _callback_loop(self,callback):
def _callback_loop(self, callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
@@ -280,13 +273,13 @@ class LiveChat:
def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = self._buffer.get()
return self.processor.process(items)
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
@@ -304,13 +297,13 @@ class LiveChat:
return
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
def finish(self,sender):
def finish(self, sender):
'''Listener終了時のコールバック'''
try:
try:
self.terminate()
except CancelledError:
self._logger.debug(f'[{self.video_id}]cancelled:{sender}')
@@ -319,14 +312,7 @@ class LiveChat:
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put({'chatdata':'','timeout':0})
self._logger.info(f'[{self.video_id}]finished.')
@classmethod
def shutdown(cls, event, sig = None, handler=None):
cls._logger.debug("shutdown...")
for t in LiveChat._listeners:
t._is_alive = False
if self.is_alive():
self._is_alive = False
self._buffer.put({})
self._logger.info(f'[{self.video_id}]終了しました')

View File

@@ -12,6 +12,7 @@ Author: taizan-hokuto (2019) @taizan205
ver 0.0.1 2019.10.05
'''
def _gen_vid(video_id):
"""generate video_id parameter.
Parameter
@@ -23,7 +24,7 @@ def _gen_vid(video_id):
bytes : base64 encoded video_id parameter.
"""
header_magic = b'\x0A\x0F\x1A\x0D\x0A'
header_id = video_id.encode()
header_id = video_id.encode()
header_sep_1 = b'\x1A\x13\xEA\xA8\xDD\xB9\x01\x0D\x0A\x0B'
header_terminator = b'\x20\x01'
@@ -40,42 +41,46 @@ def _gen_vid(video_id):
b64enc(reduce(lambda x, y: x+y, item)).decode()
).encode()
def _nval(val):
"""convert value to byte array"""
if val<0: raise ValueError
if val < 0:
raise ValueError
buf = b''
while val >> 7:
m = val & 0xFF | 0x80
buf += m.to_bytes(1,'big')
buf += m.to_bytes(1, 'big')
val >>= 7
buf += val.to_bytes(1,'big')
buf += val.to_bytes(1, 'big')
return buf
def _build(video_id, seektime, topchat_only):
switch_01 = b'\x04' if topchat_only else b'\x01'
if seektime < 0:
times =_nval(0)
switch = b'\x04'
elif seektime == 0:
times =_nval(1)
switch = b'\x03'
times = _nval(0)
switch = b'\x04'
elif seektime == 0:
times = _nval(1)
switch = b'\x03'
else:
times =_nval(int(seektime*1000000))
times = _nval(int(seektime*1000000))
switch = b'\x03'
parity = b'\x00'
header_magic= b'\xA2\x9D\xB0\xD3\x04'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = times
sep_1 = b'\x30\x00\x38\x00\x40\x00\x48'
sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40'
sep_3 = b'\x00\x58\x03\x60'
sep_4 = b'\x68' + parity + b'\x72\x04\x08'
sep_5 = b'\x10' + parity + b'\x78\x00'
body = [
header_magic = b'\xA2\x9D\xB0\xD3\x04'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = times
sep_1 = b'\x30\x00\x38\x00\x40\x00\x48'
sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40'
sep_3 = b'\x00\x58\x03\x60'
sep_4 = b'\x68' + parity + b'\x72\x04\x08'
sep_5 = b'\x10' + parity + b'\x78\x00'
body = b''.join([
sep_0,
_nval(len(vid)),
vid,
@@ -90,18 +95,17 @@ def _build(video_id, seektime, topchat_only):
sep_4,
switch_01,
sep_5
]
])
body = reduce(lambda x, y: x+y, body)
return urllib.parse.quote(
b64enc( header_magic +
_nval(len(body)) +
body
).decode()
)
return urllib.parse.quote(
b64enc(header_magic +
_nval(len(body)) +
body
).decode()
)
def getparam(video_id, seektime = 0, topchat_only = False):
def getparam(video_id, seektime=0, topchat_only=False):
'''
Parameter
---------

View File

@@ -11,6 +11,8 @@ Author: taizan-hokuto (2019) @taizan205
ver 0.0.1 2019.10.05
'''
def _gen_vid(video_id):
"""generate video_id parameter.
Parameter
@@ -22,11 +24,11 @@ def _gen_vid(video_id):
byte[] : base64 encoded video_id parameter.
"""
header_magic = b'\x0A\x0F\x0A\x0D\x0A'
header_id = video_id.encode()
header_id = video_id.encode()
header_sep_1 = b'\x1A'
header_sep_2 = b'\x43\xAA\xB9\xC1\xBD\x01\x3D\x0A'
header_suburl = ('https://www.youtube.com/live_chat?v='
f'{video_id}&is_popout=1').encode()
f'{video_id}&is_popout=1').encode()
header_terminator = b'\x20\x02'
item = [
@@ -44,62 +46,66 @@ def _gen_vid(video_id):
b64enc(reduce(lambda x, y: x+y, item)).decode()
).encode()
def _tzparity(video_id,times):
t=0
for i,s in enumerate(video_id):
def _tzparity(video_id, times):
t = 0
for i, s in enumerate(video_id):
ss = ord(s)
if(ss % 2 == 0):
t += ss*(12-i)
else:
t ^= ss*i
return ((times^t) % 2).to_bytes(1,'big')
return ((times ^ t) % 2).to_bytes(1, 'big')
def _nval(val):
"""convert value to byte array"""
if val<0: raise ValueError
if val < 0:
raise ValueError
buf = b''
while val >> 7:
m = val & 0xFF | 0x80
buf += m.to_bytes(1,'big')
buf += m.to_bytes(1, 'big')
val >>= 7
buf += val.to_bytes(1,'big')
buf += val.to_bytes(1, 'big')
return buf
def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchat_only):
#_short_type2
# _short_type2
switch_01 = b'\x04' if topchat_only else b'\x01'
parity = _tzparity(video_id, _ts1^_ts2^_ts3^_ts4^_ts5)
parity = _tzparity(video_id, _ts1 ^ _ts2 ^ _ts3 ^ _ts4 ^ _ts5)
header_magic= b'\xD2\x87\xCC\xC8\x03'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = _nval(_ts1)
sep_1 = b'\x30\x00\x38\x00\x40\x02\x4A'
un_len = b'\x2B'
sep_2 = b'\x08'+parity+b'\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D'
sep_3 = b'\x3A\x00\x40\x00\x4A'
sep_4_len = b'\x02'
sep_4 = b'\x08\x01'
ts_2_start = b'\x50'
timestamp2 = _nval(_ts2)
ts_2_end = b'\x58'
sep_5 = b'\x03'
ts_3_start = b'\x50'
timestamp3 = _nval(_ts3)
ts_3_end = b'\x58'
timestamp4 = _nval(_ts4)
sep_6 = b'\x68'
#switch
sep_7 = b'\x82\x01\x04\x08'
#switch
sep_8 = b'\x10\x00'
sep_9 = b'\x88\x01\x00\xA0\x01'
timestamp5 = _nval(_ts5)
header_magic = b'\xD2\x87\xCC\xC8\x03'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = _nval(_ts1)
sep_1 = b'\x30\x00\x38\x00\x40\x02\x4A'
un_len = b'\x2B'
sep_2 = b'\x08'+parity+b'\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D'
sep_3 = b'\x3A\x00\x40\x00\x4A'
sep_4_len = b'\x02'
sep_4 = b'\x08\x01'
ts_2_start = b'\x50'
timestamp2 = _nval(_ts2)
ts_2_end = b'\x58'
sep_5 = b'\x03'
ts_3_start = b'\x50'
timestamp3 = _nval(_ts3)
ts_3_end = b'\x58'
timestamp4 = _nval(_ts4)
sep_6 = b'\x68'
# switch
sep_7 = b'\x82\x01\x04\x08'
# switch
sep_8 = b'\x10\x00'
sep_9 = b'\x88\x01\x00\xA0\x01'
timestamp5 = _nval(_ts5)
body = [
body = b''.join([
sep_0,
_nval(len(vid)),
vid,
@@ -121,37 +127,35 @@ def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchat_only):
ts_3_end,
timestamp4,
sep_6,
switch_01,#
switch_01,
sep_7,
switch_01,#
switch_01,
sep_8,
sep_9,
timestamp5
]
])
return urllib.parse.quote(
b64enc(header_magic +
_nval(len(body)) +
body
).decode()
)
body = reduce(lambda x, y: x+y, body)
return urllib.parse.quote(
b64enc( header_magic +
_nval(len(body)) +
body
).decode()
)
def _times(past_sec):
n = int(time.time())
_ts1= n - random.uniform(0,1*3)
_ts2= n - random.uniform(0.01,0.99)
_ts3= n - past_sec + random.uniform(0,1)
_ts4= n - random.uniform(10*60,60*60)
_ts5= n - random.uniform(0.01,0.99)
return list(map(lambda x:int(x*1000000),[_ts1,_ts2,_ts3,_ts4,_ts5]))
_ts1 = n - random.uniform(0, 1*3)
_ts2 = n - random.uniform(0.01, 0.99)
_ts3 = n - past_sec + random.uniform(0, 1)
_ts4 = n - random.uniform(10*60, 60*60)
_ts5 = n - random.uniform(0.01, 0.99)
return list(map(lambda x: int(x*1000000), [_ts1, _ts2, _ts3, _ts4, _ts5]))
def getparam(video_id, past_sec = 0, topchat_only = False):
def getparam(video_id, past_sec=0, topchat_only=False):
'''
Parameter
---------
@@ -160,5 +164,4 @@ def getparam(video_id, past_sec = 0, topchat_only = False):
topchat_only : bool
if True, fetch only 'top chat'
'''
return _build(video_id,*_times(past_sec),topchat_only)
return _build(video_id, *_times(past_sec), topchat_only)