diff --git a/pytchat/core_multithread/livechat.py b/pytchat/core_multithread/livechat.py index 3e5d133..8ce84e8 100644 --- a/pytchat/core_multithread/livechat.py +++ b/pytchat/core_multithread/livechat.py @@ -11,8 +11,8 @@ from queue import Queue from .buffer import Buffer from ..parser.live import Parser from .. import config -from ..exceptions import ChatParseException,IllegalFunctionCall -from ..paramgen import liveparam, arcparam +from ..exceptions import ChatParseException, IllegalFunctionCall +from ..paramgen import liveparam, arcparam from ..processors.default.processor import DefaultProcessor from ..processors.combinator import Combinator @@ -27,7 +27,7 @@ class LiveChat: --------- video_id : str 動画ID - + seektime : int (ライブチャット取得時は無視) 取得開始するアーカイブ済みチャットの経過時間(秒) @@ -61,7 +61,7 @@ class LiveChat: topchat_only : bool Trueの場合、上位チャットのみ取得する。 - + Attributes --------- _executor : ThreadPoolExecutor @@ -72,22 +72,20 @@ class LiveChat: ''' _setup_finished = False - #チャット監視中のListenerのリスト - _listeners = [] def __init__(self, video_id, - seektime = 0, - processor = DefaultProcessor(), - buffer = None, - interruptable = True, - callback = None, - done_callback = None, - direct_mode = False, - force_replay = False, - topchat_only = False, - logger = config.logger(__name__) - ): - self.video_id = video_id + seektime=0, + processor=DefaultProcessor(), + buffer=None, + interruptable=True, + callback=None, + done_callback=None, + direct_mode=False, + force_replay=False, + topchat_only=False, + logger=config.logger(__name__) + ): + self.video_id = video_id self.seektime = seektime if isinstance(processor, tuple): self.processor = Combinator(processor) @@ -98,57 +96,51 @@ class LiveChat: self._done_callback = done_callback self._executor = ThreadPoolExecutor(max_workers=2) self._direct_mode = direct_mode - self._is_alive = True + self._is_alive = True self._is_replay = force_replay - self._parser = Parser(is_replay = self._is_replay) + self._parser = Parser(is_replay=self._is_replay) self._pauser = Queue() self._pauser.put_nowait(None) - self._setup() self._first_fetch = True self._fetch_url = "live_chat/get_live_chat?continuation=" self._topchat_only = topchat_only self._logger = logger - LiveChat._logger = logger - if not LiveChat._setup_finished: - LiveChat._setup_finished = True - if interruptable: - signal.signal(signal.SIGINT, (lambda a, b: - (LiveChat.shutdown(None,signal.SIGINT,b)) - )) - LiveChat._listeners.append(self) + if interruptable: + signal.signal(signal.SIGINT, lambda a, b: self.terminate()) + self._setup() def _setup(self): - #direct modeがTrueでcallback未設定の場合例外発生。 + # direct modeがTrueでcallback未設定の場合例外発生。 if self._direct_mode: if self._callback is None: raise IllegalFunctionCall( "When direct_mode=True, callback parameter is required.") else: - #direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 + # direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成 if self._buffer is None: - self._buffer = Buffer(maxsize = 20) - #callbackが指定されている場合はcallbackを呼ぶループタスクを作成 + self._buffer = Buffer(maxsize=20) + # callbackが指定されている場合はcallbackを呼ぶループタスクを作成 if self._callback is None: - pass + pass else: - #callbackを呼ぶループタスクの開始 - self._executor.submit(self._callback_loop,self._callback) - #_listenループタスクの開始 + # callbackを呼ぶループタスクの開始 + self._executor.submit(self._callback_loop, self._callback) + # _listenループタスクの開始 listen_task = self._executor.submit(self._startlisten) - #add_done_callbackの登録 + # add_done_callbackの登録 if self._done_callback is None: listen_task.add_done_callback(self.finish) else: listen_task.add_done_callback(self._done_callback) def _startlisten(self): - time.sleep(0.1) #sleep shortly to prohibit skipping fetching data + time.sleep(0.1) # sleep shortly to prohibit skipping fetching data """Fetch first continuation parameter, create and start _listen loop. """ - initial_continuation = liveparam.getparam(self.video_id,3) + initial_continuation = liveparam.getparam(self.video_id, 3) self._listen(initial_continuation) - + def _listen(self, continuation): ''' Fetch chat data and store them into buffer, get next continuaiton parameter and loop. @@ -164,33 +156,34 @@ class LiveChat: continuation = self._check_pause(continuation) contents = self._get_contents( continuation, session, headers) - metadata, chatdata = self._parser.parse(contents) + metadata, chatdata = self._parser.parse(contents) timeout = metadata['timeoutMs']/1000 chat_component = { - "video_id" : self.video_id, - "timeout" : timeout, - "chatdata" : chatdata + "video_id": self.video_id, + "timeout": timeout, + "chatdata": chatdata } - time_mark =time.time() + time_mark = time.time() if self._direct_mode: - processed_chat = self.processor.process([chat_component]) - if isinstance(processed_chat,tuple): + processed_chat = self.processor.process( + [chat_component]) + if isinstance(processed_chat, tuple): self._callback(*processed_chat) else: self._callback(processed_chat) else: self._buffer.put(chat_component) diff_time = timeout - (time.time()-time_mark) - time.sleep(diff_time if diff_time > 0 else 0) - continuation = metadata.get('continuation') + time.sleep(diff_time if diff_time > 0 else 0) + continuation = metadata.get('continuation') except ChatParseException as e: self._logger.debug(f"[{self.video_id}]{str(e)}") - return - except (TypeError , json.JSONDecodeError) : + return + except (TypeError, json.JSONDecodeError): self._logger.error(f"{traceback.format_exc(limit = -1)}") return - + self._logger.debug(f"[{self.video_id}]finished fetching chat.") def _check_pause(self, continuation): @@ -202,7 +195,7 @@ class LiveChat: ''' self._pauser.put_nowait(None) if not self._is_replay: - continuation = liveparam.getparam(self.video_id,3) + continuation = liveparam.getparam(self.video_id, 3) return continuation def _get_contents(self, continuation, session, headers): @@ -214,7 +207,7 @@ class LiveChat: ------- 'continuationContents' which includes metadata & chat data. ''' - livechat_json = ( + livechat_json = ( self._get_livechat_json(continuation, session, headers) ) contents = self._parser.get_contents(livechat_json) @@ -225,7 +218,7 @@ class LiveChat: self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation=" continuation = arcparam.getparam( self.video_id, self.seektime, self._topchat_only) - livechat_json = ( self._get_livechat_json( + livechat_json = (self._get_livechat_json( continuation, session, headers)) reload_continuation = self._parser.reload_continuation( self._parser.get_contents(livechat_json)) @@ -244,26 +237,26 @@ class LiveChat: continuation = urllib.parse.quote(continuation) livechat_json = None status_code = 0 - url =f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1" + url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1" for _ in range(MAX_RETRY + 1): - with session.get(url ,headers = headers) as resp: + with session.get(url, headers=headers) as resp: try: text = resp.text livechat_json = json.loads(text) break - except json.JSONDecodeError : + except json.JSONDecodeError: time.sleep(1) continue else: self._logger.error(f"[{self.video_id}]" - f"Exceeded retry count. status_code={status_code}") + f"Exceeded retry count. status_code={status_code}") return None return livechat_json - - def _callback_loop(self,callback): + + def _callback_loop(self, callback): """ コンストラクタでcallbackを指定している場合、バックグラウンドで callbackに指定された関数に一定間隔でチャットデータを投げる。 - + Parameter --------- callback : func @@ -280,13 +273,13 @@ class LiveChat: def get(self): """ bufferからデータを取り出し、processorに投げ、 加工済みのチャットデータを返す。 - + Returns : Processorによって加工されたチャットデータ """ if self._callback is None: items = self._buffer.get() - return self.processor.process(items) + return self.processor.process(items) raise IllegalFunctionCall( "既にcallbackを登録済みのため、get()は実行できません。") @@ -304,13 +297,13 @@ class LiveChat: return if self._pauser.empty(): self._pauser.put_nowait(None) - + def is_alive(self): return self._is_alive - def finish(self,sender): + def finish(self, sender): '''Listener終了時のコールバック''' - try: + try: self.terminate() except CancelledError: self._logger.debug(f'[{self.video_id}]cancelled:{sender}') @@ -319,14 +312,7 @@ class LiveChat: ''' Listenerを終了する。 ''' - self._is_alive = False - if self._direct_mode == False: - #bufferにダミーオブジェクトを入れてis_alive()を判定させる - self._buffer.put({'chatdata':'','timeout':0}) - self._logger.info(f'[{self.video_id}]finished.') - - @classmethod - def shutdown(cls, event, sig = None, handler=None): - cls._logger.debug("shutdown...") - for t in LiveChat._listeners: - t._is_alive = False \ No newline at end of file + if self.is_alive(): + self._is_alive = False + self._buffer.put({}) + self._logger.info(f'[{self.video_id}]終了しました') diff --git a/pytchat/paramgen/arcparam.py b/pytchat/paramgen/arcparam.py index 2cd1829..f62cb40 100644 --- a/pytchat/paramgen/arcparam.py +++ b/pytchat/paramgen/arcparam.py @@ -12,6 +12,7 @@ Author: taizan-hokuto (2019) @taizan205 ver 0.0.1 2019.10.05 ''' + def _gen_vid(video_id): """generate video_id parameter. Parameter @@ -23,7 +24,7 @@ def _gen_vid(video_id): bytes : base64 encoded video_id parameter. """ header_magic = b'\x0A\x0F\x1A\x0D\x0A' - header_id = video_id.encode() + header_id = video_id.encode() header_sep_1 = b'\x1A\x13\xEA\xA8\xDD\xB9\x01\x0D\x0A\x0B' header_terminator = b'\x20\x01' @@ -40,42 +41,46 @@ def _gen_vid(video_id): b64enc(reduce(lambda x, y: x+y, item)).decode() ).encode() + def _nval(val): """convert value to byte array""" - if val<0: raise ValueError + if val < 0: + raise ValueError buf = b'' while val >> 7: m = val & 0xFF | 0x80 - buf += m.to_bytes(1,'big') + buf += m.to_bytes(1, 'big') val >>= 7 - buf += val.to_bytes(1,'big') + buf += val.to_bytes(1, 'big') return buf + def _build(video_id, seektime, topchat_only): switch_01 = b'\x04' if topchat_only else b'\x01' if seektime < 0: - times =_nval(0) - switch = b'\x04' - elif seektime == 0: - times =_nval(1) - switch = b'\x03' + times = _nval(0) + switch = b'\x04' + elif seektime == 0: + times = _nval(1) + switch = b'\x03' else: - times =_nval(int(seektime*1000000)) + times = _nval(int(seektime*1000000)) switch = b'\x03' parity = b'\x00' - header_magic= b'\xA2\x9D\xB0\xD3\x04' - sep_0 = b'\x1A' - vid = _gen_vid(video_id) - time_tag = b'\x28' - timestamp1 = times - sep_1 = b'\x30\x00\x38\x00\x40\x00\x48' - sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00' - chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40' - sep_3 = b'\x00\x58\x03\x60' - sep_4 = b'\x68' + parity + b'\x72\x04\x08' - sep_5 = b'\x10' + parity + b'\x78\x00' - body = [ + header_magic = b'\xA2\x9D\xB0\xD3\x04' + sep_0 = b'\x1A' + vid = _gen_vid(video_id) + time_tag = b'\x28' + timestamp1 = times + sep_1 = b'\x30\x00\x38\x00\x40\x00\x48' + sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00' + chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40' + sep_3 = b'\x00\x58\x03\x60' + sep_4 = b'\x68' + parity + b'\x72\x04\x08' + sep_5 = b'\x10' + parity + b'\x78\x00' + + body = b''.join([ sep_0, _nval(len(vid)), vid, @@ -90,18 +95,17 @@ def _build(video_id, seektime, topchat_only): sep_4, switch_01, sep_5 - ] + ]) - body = reduce(lambda x, y: x+y, body) - - return urllib.parse.quote( - b64enc( header_magic + - _nval(len(body)) + - body - ).decode() - ) + return urllib.parse.quote( + b64enc(header_magic + + _nval(len(body)) + + body + ).decode() + ) -def getparam(video_id, seektime = 0, topchat_only = False): + +def getparam(video_id, seektime=0, topchat_only=False): ''' Parameter --------- diff --git a/pytchat/paramgen/liveparam.py b/pytchat/paramgen/liveparam.py index a091b11..09d8273 100644 --- a/pytchat/paramgen/liveparam.py +++ b/pytchat/paramgen/liveparam.py @@ -11,6 +11,8 @@ Author: taizan-hokuto (2019) @taizan205 ver 0.0.1 2019.10.05 ''' + + def _gen_vid(video_id): """generate video_id parameter. Parameter @@ -22,11 +24,11 @@ def _gen_vid(video_id): byte[] : base64 encoded video_id parameter. """ header_magic = b'\x0A\x0F\x0A\x0D\x0A' - header_id = video_id.encode() + header_id = video_id.encode() header_sep_1 = b'\x1A' header_sep_2 = b'\x43\xAA\xB9\xC1\xBD\x01\x3D\x0A' header_suburl = ('https://www.youtube.com/live_chat?v=' - f'{video_id}&is_popout=1').encode() + f'{video_id}&is_popout=1').encode() header_terminator = b'\x20\x02' item = [ @@ -44,62 +46,66 @@ def _gen_vid(video_id): b64enc(reduce(lambda x, y: x+y, item)).decode() ).encode() -def _tzparity(video_id,times): - t=0 - for i,s in enumerate(video_id): + +def _tzparity(video_id, times): + t = 0 + for i, s in enumerate(video_id): ss = ord(s) if(ss % 2 == 0): t += ss*(12-i) else: t ^= ss*i - return ((times^t) % 2).to_bytes(1,'big') + return ((times ^ t) % 2).to_bytes(1, 'big') + def _nval(val): """convert value to byte array""" - if val<0: raise ValueError + if val < 0: + raise ValueError buf = b'' while val >> 7: m = val & 0xFF | 0x80 - buf += m.to_bytes(1,'big') + buf += m.to_bytes(1, 'big') val >>= 7 - buf += val.to_bytes(1,'big') + buf += val.to_bytes(1, 'big') return buf + def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchat_only): - #_short_type2 + # _short_type2 switch_01 = b'\x04' if topchat_only else b'\x01' - parity = _tzparity(video_id, _ts1^_ts2^_ts3^_ts4^_ts5) + parity = _tzparity(video_id, _ts1 ^ _ts2 ^ _ts3 ^ _ts4 ^ _ts5) - header_magic= b'\xD2\x87\xCC\xC8\x03' - sep_0 = b'\x1A' - vid = _gen_vid(video_id) - time_tag = b'\x28' - timestamp1 = _nval(_ts1) - sep_1 = b'\x30\x00\x38\x00\x40\x02\x4A' - un_len = b'\x2B' - sep_2 = b'\x08'+parity+b'\x10\x00\x18\x00\x20\x00' - chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D' - sep_3 = b'\x3A\x00\x40\x00\x4A' - sep_4_len = b'\x02' - sep_4 = b'\x08\x01' - ts_2_start = b'\x50' - timestamp2 = _nval(_ts2) - ts_2_end = b'\x58' - sep_5 = b'\x03' - ts_3_start = b'\x50' - timestamp3 = _nval(_ts3) - ts_3_end = b'\x58' - timestamp4 = _nval(_ts4) - sep_6 = b'\x68' - #switch - sep_7 = b'\x82\x01\x04\x08' - #switch - sep_8 = b'\x10\x00' - sep_9 = b'\x88\x01\x00\xA0\x01' - timestamp5 = _nval(_ts5) + header_magic = b'\xD2\x87\xCC\xC8\x03' + sep_0 = b'\x1A' + vid = _gen_vid(video_id) + time_tag = b'\x28' + timestamp1 = _nval(_ts1) + sep_1 = b'\x30\x00\x38\x00\x40\x02\x4A' + un_len = b'\x2B' + sep_2 = b'\x08'+parity+b'\x10\x00\x18\x00\x20\x00' + chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D' + sep_3 = b'\x3A\x00\x40\x00\x4A' + sep_4_len = b'\x02' + sep_4 = b'\x08\x01' + ts_2_start = b'\x50' + timestamp2 = _nval(_ts2) + ts_2_end = b'\x58' + sep_5 = b'\x03' + ts_3_start = b'\x50' + timestamp3 = _nval(_ts3) + ts_3_end = b'\x58' + timestamp4 = _nval(_ts4) + sep_6 = b'\x68' + # switch + sep_7 = b'\x82\x01\x04\x08' + # switch + sep_8 = b'\x10\x00' + sep_9 = b'\x88\x01\x00\xA0\x01' + timestamp5 = _nval(_ts5) - body = [ + body = b''.join([ sep_0, _nval(len(vid)), vid, @@ -121,37 +127,35 @@ def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchat_only): ts_3_end, timestamp4, sep_6, - switch_01,# + switch_01, sep_7, - switch_01,# + switch_01, sep_8, sep_9, timestamp5 - ] + ]) + + return urllib.parse.quote( + b64enc(header_magic + + _nval(len(body)) + + body + ).decode() + ) - body = reduce(lambda x, y: x+y, body) - - return urllib.parse.quote( - b64enc( header_magic + - _nval(len(body)) + - body - ).decode() - ) - def _times(past_sec): - + n = int(time.time()) - - _ts1= n - random.uniform(0,1*3) - _ts2= n - random.uniform(0.01,0.99) - _ts3= n - past_sec + random.uniform(0,1) - _ts4= n - random.uniform(10*60,60*60) - _ts5= n - random.uniform(0.01,0.99) - return list(map(lambda x:int(x*1000000),[_ts1,_ts2,_ts3,_ts4,_ts5])) + + _ts1 = n - random.uniform(0, 1*3) + _ts2 = n - random.uniform(0.01, 0.99) + _ts3 = n - past_sec + random.uniform(0, 1) + _ts4 = n - random.uniform(10*60, 60*60) + _ts5 = n - random.uniform(0.01, 0.99) + return list(map(lambda x: int(x*1000000), [_ts1, _ts2, _ts3, _ts4, _ts5])) -def getparam(video_id, past_sec = 0, topchat_only = False): +def getparam(video_id, past_sec=0, topchat_only=False): ''' Parameter --------- @@ -160,5 +164,4 @@ def getparam(video_id, past_sec = 0, topchat_only = False): topchat_only : bool if True, fetch only 'top chat' ''' - return _build(video_id,*_times(past_sec),topchat_only) - + return _build(video_id, *_times(past_sec), topchat_only)