Compare commits

..

24 Commits

Author SHA1 Message Date
taizan-hokuto
6d775e5cd0 Merge branch 'hotfix' 2019-11-14 22:39:22 +09:00
taizan-hokuto
53b70ed86b Increment version 2019-11-14 22:38:25 +09:00
taizan-hokuto
68c707b7d6 Update README 2019-11-14 22:37:16 +09:00
taizan-hokuto
30aaa54a2f Change debug mode 2019-11-14 22:34:29 +09:00
taizan-hokuto
d8202daed1 Merge branch 'feature/test' 2019-11-14 22:19:11 +09:00
taizan-hokuto
db8f49f41c Increment version 2019-11-14 22:16:21 +09:00
taizan-hokuto
76f41bbd59 Update README 2019-11-14 01:35:06 +09:00
taizan-hokuto
0aa45109a5 Add 'timestmpText' to DefaultProcessor 2019-11-12 22:14:56 +09:00
taizan-hokuto
3ad0d1a61e Update README 2019-11-11 23:46:56 +09:00
taizan-hokuto
6655e1bce4 Update README 2019-11-11 23:46:04 +09:00
taizan-hokuto
8a0793ea64 Fix test 2019-11-11 23:04:07 +09:00
taizan-hokuto
df33771b10 Fix initializing of handler exception 2019-11-11 22:56:54 +09:00
taizan-hokuto
b17f3ce06e Fix replaychat async startlisten 2019-11-11 22:52:28 +09:00
taizan-hokuto
7f232e8628 Export replaychat 2019-11-11 22:06:28 +09:00
taizan-hokuto
40262de6c9 Move parser to common directory 2019-11-11 21:26:03 +09:00
taizan-hokuto
ba4e75063a Make it possible to pause /resume
fetching chat
2019-11-11 21:04:20 +09:00
taizan-hokuto
194eecec2f Fix temporary interval value 2019-11-11 20:17:25 +09:00
taizan-hokuto
3e0d7617d5 Make sure to quit when failed to get replay chat data 2019-11-11 20:07:18 +09:00
taizan-hokuto
dfa58146f6 Fix test data 2019-11-11 20:06:05 +09:00
taizan-hokuto
47d0464b14 Add a 'parity' 2019-11-11 19:59:25 +09:00
taizan-hokuto
45f3274907 Add testreplay chat 2019-11-10 22:27:06 +09:00
taizan-hokuto
7b704c2b12 Delete unnecessary lines 2019-11-10 21:05:27 +09:00
taizan-hokuto
2fcb9469b3 Fix parameter of util.py 2019-11-10 21:03:35 +09:00
taizan-hokuto
aab7c14d48 Add archived chat retriever 2019-11-10 15:25:25 +09:00
21 changed files with 3962 additions and 47 deletions

View File

@@ -13,7 +13,7 @@ Other features:
+ Quick fetching of initial chat data by generating continuation params
instead of web scraping.
より詳細な説明は [wiki](https://github.com/taizan-hokuto/pytchat/wiki) をご参照ください。
For more detailed information, see [wiki](https://github.com/taizan-hokuto/pytchat/wiki).
## Install
```python
@@ -45,10 +45,10 @@ while chat.is_alive():
#other background operation here.
time.sleep(3)
def func(chatdata):
for c in chatdata.items:
def func(data):
for c in data.items:
print(f"{c.datetime} [{c.author.name}]-{c.message} {c.amountString}")
chat.tick()
data.tick()
```
### asyncio context:
@@ -62,17 +62,17 @@ async def main():
#other background operation here.
await asyncio.sleep(3)
async def func(chat)
for c in chat.items:
async def func(data):
for c in data.items:
print(f"{c.datetime} [{c.author.name}]-{c.message} {c.amountString}")
await chat.tick_async()
await data.tick_async()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```
### yt api compatible processor:
### youtube api compatible processor:
```python
from pytchat import LiveChat, CompatibleProcessor
@@ -89,7 +89,30 @@ while chat.is_alive():
time.sleep(polling/len(data["items"]))
```
### replay:
```python
from pytchat import ReplayChatAsync
import asyncio
async def main():
chat = ReplayChatAsync("G1w62uEMZ74", seektime = 1000, callback = func)
while chat.is_alive():
#other background operation here.
await asyncio.sleep(3)
async def func(data):
for count in range(0,len(data.items)):
c= data.items[count]
if count!=len(data.items):
tick=data.items[count+1].timestamp -data.items[count].timestamp
else:
tick=0
print(f"<{c.timestampText}> [{c.author.name}]-{c.message} {c.amountString}")
await asyncio.sleep(tick/1000)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```
## Chatdata Structure of Default Processor
Structure of each item which got from items() function.
@@ -122,7 +145,11 @@ Structure of each item which got from items() function.
<tr>
<td>datetime</td>
<td>str</td>
<td></td>
<td>ex. "2019-10-10 12:34:56"</td>
</tr>
<td>timestampText</td>
<td>str</td>
<td>elapsed time. (ex. "1:02:27")</td>
</tr>
<tr>
<td>amountValue</td>
@@ -137,7 +164,7 @@ Structure of each item which got from items() function.
<tr>
<td>currency</td>
<td>str</td>
<td>ISO 4217 currency codes (ex. "USD")</td>
<td><a href="https://en.wikipedia.org/wiki/ISO_4217">ISO 4217 currency codes</a> (ex. "USD")</td>
</tr>
<tr>
<td>bgColor</td>
@@ -175,12 +202,12 @@ Structure of author object.
</tr>
<tr>
<td>imageUrl</td>
<td></td>
<td>str</td>
<td></td>
</tr>
<tr>
<td>badgeUrl</td>
<td></td>
<td>str</td>
<td></td>
</tr>
<tr>

View File

@@ -1,8 +1,8 @@
"""
pytchat is a python library for fetching youtube live chat.
pytchat is a python library for fetching youtube live chat without using yt api, Selenium, or BeautifulSoup.
"""
__copyright__ = 'Copyright (C) 2019 taizan-hokuto'
__version__ = '0.0.2.3'
__version__ = '0.0.3.1'
__license__ = 'MIT'
__author__ = 'taizan-hokuto'
__author_email__ = '55448286+taizan-hokuto@users.noreply.github.com'
@@ -13,6 +13,8 @@ __all__ = ["core_async","core_multithread","processors"]
from .api import (
LiveChat,
LiveChatAsync,
ReplayChat,
ReplayChatAsync,
ChatProcessor,
CompatibleProcessor,
SimpleDisplayProcessor,

View File

@@ -1,5 +1,7 @@
from .core_async.livechat import LiveChatAsync
from .core_multithread.livechat import LiveChat
from .core_async.livechat import LiveChatAsync
from .core_multithread.replaychat import ReplayChat
from .core_async.replaychat import ReplayChatAsync
from .processors.chat_processor import ChatProcessor
from .processors.default.processor import DefaultProcessor
from .processors.compatible.processor import CompatibleProcessor

View File

@@ -10,12 +10,11 @@ import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from .buffer import Buffer
from .parser import Parser
from ..parser.live import Parser
from .. import config
from .. import mylogger
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam
from ..processors.default.processor import DefaultProcessor
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
@@ -81,7 +80,7 @@ class LiveChatAsync:
self._exception_handler = exception_handler
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._setup()
if not LiveChatAsync._setup_finished:
@@ -164,7 +163,7 @@ class LiveChatAsync:
livechat_json = (await
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = Parser.parse( livechat_json )
metadata, chatdata = self._parser.parse( livechat_json )
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,

View File

@@ -0,0 +1,307 @@
import aiohttp, asyncio
import datetime
import json
import random
import signal
import time
import traceback
import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from queue import Queue
from .buffer import Buffer
from ..parser.replay import Parser
from .. import config
from .. import mylogger
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import arcparam
from ..processors.default.processor import DefaultProcessor
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
MAX_RETRY = 10
headers = config.headers
class ReplayChatAsync:
''' aiohttpを利用してYouTubeのライブ配信のチャットデータを取得する
Parameter
---------
video_id : str
動画ID
processor : ChatProcessor
チャットデータを加工するオブジェクト
buffer : Buffer(maxsize:20[default])
チャットデータchat_componentを格納するバッファ。
maxsize : 格納できるchat_componentの個数
default値20個。1個で約5~10秒分。
interruptable : bool
Ctrl+Cによる処理中断を行うかどうか。
callback : func
_listen()関数から一定間隔で自動的に呼びだす関数。
done_callback : func
listener終了時に呼び出すコールバック。
direct_mode : bool
Trueの場合、bufferを使わずにcallbackを呼ぶ。
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
Attributes
---------
_executor : ThreadPoolExecutor
チャットデータ取得ループ_listen用のスレッド
_is_alive : bool
チャット取得を終了したか
'''
_setup_finished = False
def __init__(self, video_id,
seektime =0,
processor = DefaultProcessor(),
buffer = Buffer(maxsize = 20),
interruptable = True,
callback = None,
done_callback = None,
exception_handler = None,
direct_mode = False):
self.video_id = video_id
self.seektime= seektime
self.processor = processor
self._buffer = buffer
self._callback = callback
self._done_callback = done_callback
self._exception_handler = exception_handler
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
if not ReplayChatAsync._setup_finished:
ReplayChatAsync._setup_finished = True
if exception_handler == None:
self._set_exception_handler(self._handle_exception)
else:
self._set_exception_handler(exception_handler)
if interruptable:
signal.signal(signal.SIGINT,
(lambda a, b:asyncio.create_task(
ReplayChatAsync.shutdown(None,signal.SIGINT,b))
))
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
else:
#callbackを呼ぶループタスクの開始
loop = asyncio.get_event_loop()
loop.create_task(self._callback_loop(self._callback))
#_listenループタスクの開始
loop = asyncio.get_event_loop()
listen_task = loop.create_task(self._startlisten())
#add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
async def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する
"""
initial_continuation = await self._get_initial_continuation()
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
await self._listen(initial_continuation)
async def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = arcparam.get(self.video_id,self.seektime)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
async def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
にチャットデータを格納、
次のcontinuaitonを取得してループする
Parameter
---------
continuation : str
次のチャットデータ取得に必要なパラメータ
'''
try:
async with aiohttp.ClientSession() as session:
while(continuation and self._is_alive):
if self._pauser.empty():
#pauseが呼ばれて_pauserが空状態のときは一時停止する
await self._pauser.get()
#resumeが呼ばれて_pauserにitemが入ったら再開する
#直後に_pauserにitem(None)を入れてブロックを防ぐ
self._pauser.put_nowait(None)
livechat_json = (await
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = self._parser.parse( livechat_json )
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
}
time_mark =time.time()
if self._direct_mode:
await self._callback(
self.processor.process([chat_component])
)
else:
await self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
if diff_time < 0 : diff_time=0
await asyncio.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
logger.error(f"{str(e)}動画ID:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
async def _get_livechat_json(self, continuation, session, headers):
'''
チャットデータが格納されたjsonデータを取得する。
'''
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =(
f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?"
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1):
async with session.get(url ,headers = headers) as resp:
try:
text = await resp.text()
status_code = resp.status
livechat_json = json.loads(text)
break
except (ClientConnectorError,json.JSONDecodeError) :
await asyncio.sleep(1)
continue
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
async def _callback_loop(self,callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
加工済みのチャットデータを渡す先の関数。
"""
while self.is_alive():
items = await self._buffer.get()
data = self.processor.process(items)
await callback(data)
async def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = await self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def pause(self):
if not self._pauser.empty():
self._pauser.get()
def resume(self):
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
def finish(self,sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
except CancelledError:
logger.debug(f'[{self.video_id}]cancelled:{sender}')
def terminate(self):
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put_nowait({'chatdata':'','timeout':1})
logger.info(f'終了しました:[{self.video_id}]')
@classmethod
def _set_exception_handler(cls, handler):
loop = asyncio.get_event_loop()
#default handler: cls._handle_exception
loop.set_exception_handler(handler)
@classmethod
def _handle_exception(cls, loop, context):
#msg = context.get("exception", context["message"])
if not isinstance(context["exception"],CancelledError):
logger.error(f"Caught exception: {context}")
loop= asyncio.get_event_loop()
loop.create_task(cls.shutdown(None,None,None))
@classmethod
async def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logger.debug(f"残っているタスクを終了しています")
await asyncio.gather(*tasks,return_exceptions=True)
loop = asyncio.get_event_loop()
loop.stop()

View File

@@ -3,13 +3,12 @@ import datetime
import json
import random
import signal
import threading
import time
import traceback
import urllib.parse
from concurrent.futures import CancelledError, ThreadPoolExecutor
from .buffer import Buffer
from .parser import Parser
from ..parser.live import Parser
from .. import config
from .. import mylogger
from ..exceptions import ChatParseException,IllegalFunctionCall

View File

@@ -0,0 +1,284 @@
import requests
import datetime
import json
import random
import signal
import time
import traceback
import urllib.parse
from concurrent.futures import CancelledError, ThreadPoolExecutor
from queue import Queue
from .buffer import Buffer
from ..parser.replay import Parser
from .. import config
from .. import mylogger
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import arcparam
from ..processors.default.processor import DefaultProcessor
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
MAX_RETRY = 10
headers = config.headers
class ReplayChat:
''' スレッドプールを利用してYouTubeのライブ配信のチャットデータを取得する
Parameter
---------
video_id : str
動画ID
processor : ChatProcessor
チャットデータを加工するオブジェクト
buffer : Buffer(maxsize:20[default])
チャットデータchat_componentを格納するバッファ。
maxsize : 格納できるchat_componentの個数
default値20個。1個で約5~10秒分。
interruptable : bool
Ctrl+Cによる処理中断を行うかどうか。
callback : func
_listen()関数から一定間隔で自動的に呼びだす関数。
done_callback : func
listener終了時に呼び出すコールバック。
direct_mode : bool
Trueの場合、bufferを使わずにcallbackを呼ぶ。
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
Attributes
---------
_executor : ThreadPoolExecutor
チャットデータ取得ループ_listen用のスレッド
_is_alive : bool
チャット取得を終了したか
'''
_setup_finished = False
#チャット監視中のListenerのリスト
_listeners= []
def __init__(self, video_id,
seektime =0,
processor = DefaultProcessor(),
buffer = Buffer(maxsize = 20),
interruptable = True,
callback = None,
done_callback = None,
direct_mode = False
):
self.video_id = video_id
self.seektime= seektime
self.processor = processor
self._buffer = buffer
self._callback = callback
self._done_callback = done_callback
self._executor = ThreadPoolExecutor(max_workers=2)
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
if not ReplayChat._setup_finished:
ReplayChat._setup_finished = True
if interruptable:
signal.signal(signal.SIGINT, (lambda a, b:
(ReplayChat.shutdown(None,signal.SIGINT,b))
))
ReplayChat._listeners.append(self)
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
else:
#callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop,self._callback)
#_listenループタスクの開始
listen_task = self._executor.submit(self._startlisten)
#add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する
"""
initial_continuation = self._get_initial_continuation()
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
self._listen(initial_continuation)
def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = arcparam.get(self.video_id,self.seektime)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
にチャットデータを格納、
次のcontinuaitonを取得してループする
Parameter
---------
continuation : str
次のチャットデータ取得に必要なパラメータ
'''
try:
with requests.Session() as session:
while(continuation and self._is_alive):
if self._pauser.empty():
#pauseが呼ばれて_pauserが空状態のときは一時停止する
self._pauser.get()
#resumeが呼ばれて_pauserにitemが入ったら再開する
#直後に_pauserにitem(None)を入れてブロックを防ぐ
self._pauser.put_nowait(None)
livechat_json = (
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = self._parser.parse( livechat_json )
#チャットデータを含むコンポーネントを組み立ててbufferに投入する
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
}
time_mark =time.time()
if self._direct_mode:
self._callback(
self.processor.process([chat_component])
)
else:
self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
if diff_time < 0 : diff_time=0
time.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
logger.error(f"{str(e)}動画ID:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
def _get_livechat_json(self, continuation, session, headers):
'''
チャットデータが格納されたjsonデータを取得する。
'''
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =(
f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?"
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1):
with session.get(url ,headers = headers) as resp:
try:
text = resp.text
status_code = resp.status_code
livechat_json = json.loads(text)
break
except json.JSONDecodeError :
time.sleep(1)
continue
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
def _callback_loop(self,callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
加工済みのチャットデータを渡す先の関数。
"""
while self.is_alive():
items = self._buffer.get()
data = self.processor.process(items)
callback(data)
def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def pause(self):
if not self._pauser.empty():
self._pauser.get()
def resume(self):
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
def finish(self,sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
except CancelledError:
logger.debug(f'[{self.video_id}]cancelled:{sender}')
def terminate(self):
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put({'chatdata':'','timeout':1})
logger.info(f'終了しました:[{self.video_id}]')
@classmethod
def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています")
for t in ReplayChat._listeners:
t._is_alive = False

View File

@@ -0,0 +1,120 @@
from base64 import urlsafe_b64encode as b64enc
from functools import reduce
import calendar, datetime, pytz
import math
import random
import urllib.parse
'''
Generate continuation parameter of youtube replay chat.
Author: taizan-hokuto (2019) @taizan205
ver 0.0.1 2019.10.05
'''
def _gen_vid(video_id):
"""generate video_id parameter.
Parameter
---------
video_id : str
Return
---------
byte[] : base64 encoded video_id parameter.
"""
header_magic = b'\x0A\x0F\x1A\x0D\x0A'
header_id = video_id.encode()
header_sep_1 = b'\x1A\x13\xEA\xA8\xDD\xB9\x01\x0D\x0A\x0B'
header_terminator = b'\x20\x01'
item = [
header_magic,
_nval(len(header_id)),
header_id,
header_sep_1,
header_id,
header_terminator
]
return urllib.parse.quote(
b64enc(reduce(lambda x, y: x+y, item)).decode()
).encode()
def _nval(val):
"""convert value to byte array"""
if val<0: raise ValueError
buf = b''
while val >> 7:
m = val & 0xFF | 0x80
buf += m.to_bytes(1,'big')
val >>= 7
buf += val.to_bytes(1,'big')
return buf
def _tzparity(video_id,times):
t=0
for i,s in enumerate(video_id):
ss = ord(s)
if(ss % 2 == 0):
t += ss*(12-i)
else:
t ^= ss*i
return ((times^t) % 2).to_bytes(1,'big')
def get(video_id, seektime = 0, topchatonly = False):
switch_01 = b'\x04' if topchatonly else b'\x01'
if seektime < 0:
raise ValueError('seektime is 0 or positive number.')
if seektime == 0:
times =_nval(1)
switch = b'\x04'
else:
times =_nval(int(seektime*1000000))
switch = b'\x03'
parity = _tzparity(video_id, seektime)
header_magic= b'\xA2\x9D\xB0\xD3\x04'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = times
sep_1 = b'\x30\x00\x38\x00\x40\x00\x48'
sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40'
sep_3 = b'\x00\x58\x03\x60'
sep_4 = b'\x68'+parity+b'\x72\x04\x08'
sep_5 = b'\x10'+parity+b'\x78\x00'
body = [
sep_0,
_nval(len(vid)),
vid,
time_tag,
timestamp1,
sep_1,
switch,
sep_2,
chkstr,
sep_3,
switch_01,
sep_4,
switch_01,
sep_5
]
body = reduce(lambda x, y: x+y, body)
return urllib.parse.quote(
b64enc( header_magic +
_nval(len(body)) +
body
).decode()
)

View File

@@ -1,11 +1,16 @@
from base64 import urlsafe_b64encode as b64enc
from functools import reduce
import calendar, datetime, pytz
import math
import random
import urllib.parse
'''
Generate continuation parameter of youtube live chat.
Author: taizan-hokuto (2019) @taizan205
ver 0.0.1 2019.10.05
'''
def _gen_vid(video_id):
"""generate video_id parameter.
Parameter
@@ -39,6 +44,17 @@ def _gen_vid(video_id):
b64enc(reduce(lambda x, y: x+y, item)).decode()
).encode()
def _tzparity(video_id,times):
t=0
for i,s in enumerate(video_id):
ss = ord(s)
if(ss % 2 == 0):
t += ss*(12-i)
else:
t ^= ss*i
return ((times^t) % 2).to_bytes(1,'big')
def _nval(val):
"""convert value to byte array"""
if val<0: raise ValueError
@@ -53,15 +69,16 @@ def _nval(val):
def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchatonly = False):
#_short_type2
switch_01 = b'\x04' if topchatonly else b'\x01'
header_magic= b'\xD2\x87\xCC\xC8\x03'
parity = _tzparity(video_id, _ts1^_ts2^_ts3^_ts4^_ts5)
header_magic= b'\xD2\x87\xCC\xC8\x03'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = _nval(_ts1)
sep_1 = b'\x30\x00\x38\x00\x40\x02\x4A'
un_len = b'\x2B'
sep_2 = b'\x08\x00\x10\x00\x18\x00\x20\x00'
sep_2 = b'\x08'+parity+b'\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D'
sep_3 = b'\x3A\x00\x40\x00\x4A'
sep_4_len = b'\x02'

View File

View File

@@ -11,8 +11,7 @@ logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
class Parser:
@classmethod
def parse(cls, jsn):
def parse(self, jsn):
if jsn is None:
return {'timeoutMs':0,'continuation':None},[]
if jsn['response']['responseContext'].get('errors'):
@@ -26,15 +25,28 @@ class Parser:
cont = contents['liveChatContinuation']['continuations'][0]
if cont is None:
raise NoContinuationsException('Continuationがありません。')
metadata = (cont.get('invalidationContinuationData') or
cont.get('timedContinuationData') or
cont.get('reloadContinuationData')
)
metadata = cont.get('liveChatReplayContinuationData')
if metadata is None:
unknown = list(cont.keys())[0]
if unknown:
logger.error(f"Received unknown continuation type:{unknown}")
metadata = cont.get(unknown)
metadata.setdefault('timeoutMs', 10000)
chatdata = contents['liveChatContinuation'].get('actions')
actions = contents['liveChatContinuation'].get('actions')
if actions is None:
raise NoContentsException('チャットデータを取得できませんでした。')
interval = self.get_interval(actions)
metadata.setdefault("timeoutMs",interval)
chatdata = []
for action in actions:
chatdata.append(action["replayChatItemAction"]["actions"][0])
return metadata, chatdata
def get_interval(self, actions: list):
if actions is None:
return 0
start = int(actions[0]["replayChatItemAction"]["videoOffsetTimeMsec"])
last = int(actions[-1]["replayChatItemAction"]["videoOffsetTimeMsec"])
return (last - start)

View File

@@ -13,13 +13,13 @@ class Chatdata:
def tick(self):
if self.interval == 0:
time.sleep(3)
time.sleep(1)
return
time.sleep(self.interval/len(self.items))
async def tick_async(self):
if self.interval == 0:
await asyncio.sleep(0.5)
await asyncio.sleep(1)
return
await asyncio.sleep(self.interval/len(self.items))

View File

@@ -13,6 +13,11 @@ class BaseRenderer:
self.id = self.renderer.get('id')
timestampUsec = int(self.renderer.get("timestampUsec",0))
self.timestamp = int(timestampUsec/1000)
tst = self.renderer.get("timestampText")
if tst:
self.timestampText = tst.get("simpleText")
else:
self.timestampText = ""
self.datetime = self.get_datetime(timestampUsec)
self.message = self.get_message(self.renderer)
self.id = self.renderer.get('id')

View File

@@ -1,7 +1,7 @@
import requests,json,datetime
from .. import config
def download(cls,url):
def download(url):
_session = requests.Session()
html = _session.get(url, headers=config.headers)
with open(str(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
@@ -9,7 +9,7 @@ def download(cls,url):
json.dump(html.json(),f,ensure_ascii=False)
def save(cls,data,filename):
def save(data,filename):
with open(str(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
)+filename,mode ='w',encoding='utf-8') as f:
f.writelines(data)

26
tests/test_arcparam.py Normal file
View File

@@ -0,0 +1,26 @@
import pytest
from pytchat.parser.replay import Parser
import pytchat.config as config
import requests, json
from pytchat.paramgen import arcparam
def test_arcparam_0(mocker):
param = arcparam.get("01234567890")
assert "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoATAAOABAAEgEUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAXIECAEQAXgA" == param
def test_arcparam_1(mocker):
param = arcparam.get("01234567890", seektime = 100000)
assert "op2w0wR3GjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QogNDbw_QCMAA4AEAASANSHAgAEAAYACAAKg5zdGF0aWNjaGVja3N1bUAAWANgAWgBcgQIARABeAA%3D" == param
def test_arcparam_2(mocker):
param = arcparam.get("SsjCnHOk-Sk")
url=f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1"
resp = requests.Session().get(url,headers = config.headers)
jsn = json.loads(resp.text)
parser = Parser()
_ , chatdata = parser.parse(jsn)
test_id = chatdata[0]["addChatItemAction"]["item"]["liveChatTextMessageRenderer"]["id"]
print(test_id)
assert "CjoKGkNMYXBzZTdudHVVQ0Zjc0IxZ0FkTnFnQjVREhxDSnlBNHV2bnR1VUNGV0dnd2dvZDd3NE5aZy0w" == test_id

View File

@@ -1,7 +1,7 @@
import json
import pytest
import asyncio,aiohttp
from pytchat.core_async.parser import Parser
from pytchat.parser.live import Parser
from pytchat.processors.compatible.processor import CompatibleProcessor
from pytchat.exceptions import (
NoLivechatRendererException,NoYtinitialdataException,
@@ -12,13 +12,15 @@ from pytchat.processors.compatible.renderer.paidmessage import LiveChatPaidMessa
from pytchat.processors.compatible.renderer.paidsticker import LiveChatPaidStickerRenderer
from pytchat.processors.compatible.renderer.legacypaid import LiveChatLegacyPaidMessageRenderer
parser = Parser()
def test_textmessage(mocker):
'''api互換processorのテスト通常テキストメッセージ'''
processor = CompatibleProcessor()
_json = _open_file("tests/testdata/compatible/textmessage.json")
_, chatdata = Parser.parse(json.loads(_json))
_, chatdata = parser.parse(json.loads(_json))
data = {
"video_id" : "",
"timeout" : 7,
@@ -55,7 +57,7 @@ def test_newsponcer(mocker):
_json = _open_file("tests/testdata/compatible/newSponsor.json")
_, chatdata = Parser.parse(json.loads(_json))
_, chatdata = parser.parse(json.loads(_json))
data = {
"video_id" : "",
"timeout" : 7,
@@ -91,7 +93,7 @@ def test_superchat(mocker):
_json = _open_file("tests/testdata/compatible/superchat.json")
_, chatdata = Parser.parse(json.loads(_json))
_, chatdata = parser.parse(json.loads(_json))
data = {
"video_id" : "",
"timeout" : 7,

View File

@@ -1,6 +1,6 @@
import pytest
from pytchat.core_async.parser import Parser as AsyncParser
from pytchat.core_multithread.parser import Parser as ThreadParser
from pytchat.parser.live import Parser
import json
import asyncio,aiohttp

9
tests/test_liveparam.py Normal file
View File

@@ -0,0 +1,9 @@
import pytest
from pytchat.paramgen import liveparam
def test_liveparam_0(mocker):
_ts1= 1546268400
param = liveparam._build("01234567890",
*([_ts1*1000000 for i in range(5)]))
test_param="0ofMyAPiARp8Q2c4S0RRb0xNREV5TXpRMU5qYzRPVEFhUTZxNXdiMEJQUW83YUhSMGNITTZMeTkzZDNjdWVXOTFkSFZpWlM1amIyMHZiR2wyWlY5amFHRjBQM1k5TURFeU16UTFOamM0T1RBbWFYTmZjRzl3YjNWMFBURWdBZyUzRCUzRCiAuNbVqsrfAjAAOABAAkorCAEQABgAIAAqDnN0YXRpY2NoZWNrc3VtOgBAAEoCCAFQgLjW1arK3wJYA1CAuNbVqsrfAliAuNbVqsrfAmgBggEECAEQAIgBAKABgLjW1arK3wI%3D"
assert test_param == param

View File

@@ -1,5 +1,5 @@
import pytest
from pytchat.core_async.parser import Parser
from pytchat.parser.live import Parser
import json
import asyncio,aiohttp
from aioresponses import aioresponses
@@ -11,7 +11,7 @@ from pytchat.exceptions import (
def _open_file(path):
with open(path,mode ='r',encoding = 'utf-8') as f:
return f.read()
parser = Parser()
@aioresponses()
def test_finishedlive(*mock):
@@ -21,7 +21,7 @@ def test_finishedlive(*mock):
_text = json.loads(_text)
try:
Parser.parse(_text)
parser.parse(_text)
assert False
except NoContentsException:
assert True
@@ -34,7 +34,7 @@ def test_parsejson(*mock):
_text = json.loads(_text)
try:
Parser.parse(_text)
parser.parse(_text)
jsn = _text
timeout = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["timeoutMs"]
continuation = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["continuation"]

3104
tests/testdata/chatreplay.json vendored Normal file

File diff suppressed because it is too large Load Diff