Compare commits

...

57 Commits

Author SHA1 Message Date
taizan-hokuto
de6ef2490e Merge branch 'develop' 2019-12-19 02:03:09 +09:00
taizan-hokuto
b8bdbdc36f Increment version 2019-12-19 02:02:32 +09:00
taizan-hokuto
9f5d3f323e Fix README 2019-12-19 02:00:41 +09:00
taizan-hokuto
cf9aae3322 Merge branch 'develop' 2019-12-19 01:57:08 +09:00
taizan-hokuto
6ac2315936 Increment version 2019-12-19 01:53:55 +09:00
taizan-hokuto
50c8e34080 Fix README 2019-12-19 01:51:31 +09:00
taizan-hokuto
2d3da91d51 Fix calculation algorithm 2019-12-19 01:21:49 +09:00
taizan-hokuto
3ac71985ff Implement SpeedCalculator 2019-12-17 21:29:20 +09:00
taizan-hokuto
13bdf0376b Merge branch 'develop' 2019-12-01 22:58:53 +09:00
taizan-hokuto
b2ffdaec0c Increment version 2019-12-01 22:54:43 +09:00
taizan-hokuto
c85786679f Merge branch 'feature/1' into develop 2019-12-01 22:50:06 +09:00
taizan-hokuto
c7a7886672 Fix superSticker rendering 2019-12-01 22:47:01 +09:00
taizan-hokuto
12996fb44d Merge branch 'feature/1' 2019-11-22 00:17:15 +09:00
taizan-hokuto
c884ef7288 Merge branch 'feature/1' into develop 2019-11-22 00:16:20 +09:00
taizan-hokuto
2cd9e98fc2 Increment version 2019-11-22 00:15:57 +09:00
taizan-hokuto
2ac4c99ab4 Increment version 2019-11-22 00:04:18 +09:00
taizan-hokuto
51bf8ad738 Update README 2019-11-21 23:37:35 +09:00
taizan-hokuto
2e70e74bcd Update README 2019-11-21 23:04:12 +09:00
taizan-hokuto
a39d6cb420 Use list comprehension 2019-11-21 22:46:15 +09:00
taizan-hokuto
5dd0cb45b7 Implement messageEx 2019-11-21 22:35:27 +09:00
taizan-hokuto
24873651a6 Fix comments 2019-11-21 20:47:42 +09:00
taizan-hokuto
0e060bf998 Use logger when errors occur 2019-11-20 23:59:16 +09:00
taizan-hokuto
817fed9d1d Make functions private. 2019-11-19 20:53:37 +09:00
taizan-hokuto
823f7fefa4 Fix comments 2019-11-19 20:36:54 +09:00
taizan-hokuto
aa894fc52b Fix comments 2019-11-15 00:58:36 +09:00
taizan-hokuto
6d775e5cd0 Merge branch 'hotfix' 2019-11-14 22:39:22 +09:00
taizan-hokuto
53b70ed86b Increment version 2019-11-14 22:38:25 +09:00
taizan-hokuto
68c707b7d6 Update README 2019-11-14 22:37:16 +09:00
taizan-hokuto
30aaa54a2f Change debug mode 2019-11-14 22:34:29 +09:00
taizan-hokuto
d8202daed1 Merge branch 'feature/test' 2019-11-14 22:19:11 +09:00
taizan-hokuto
db8f49f41c Increment version 2019-11-14 22:16:21 +09:00
taizan-hokuto
76f41bbd59 Update README 2019-11-14 01:35:06 +09:00
taizan-hokuto
0aa45109a5 Add 'timestmpText' to DefaultProcessor 2019-11-12 22:14:56 +09:00
taizan-hokuto
3ad0d1a61e Update README 2019-11-11 23:46:56 +09:00
taizan-hokuto
6655e1bce4 Update README 2019-11-11 23:46:04 +09:00
taizan-hokuto
8a0793ea64 Fix test 2019-11-11 23:04:07 +09:00
taizan-hokuto
df33771b10 Fix initializing of handler exception 2019-11-11 22:56:54 +09:00
taizan-hokuto
b17f3ce06e Fix replaychat async startlisten 2019-11-11 22:52:28 +09:00
taizan-hokuto
7f232e8628 Export replaychat 2019-11-11 22:06:28 +09:00
taizan-hokuto
40262de6c9 Move parser to common directory 2019-11-11 21:26:03 +09:00
taizan-hokuto
ba4e75063a Make it possible to pause /resume
fetching chat
2019-11-11 21:04:20 +09:00
taizan-hokuto
194eecec2f Fix temporary interval value 2019-11-11 20:17:25 +09:00
taizan-hokuto
3e0d7617d5 Make sure to quit when failed to get replay chat data 2019-11-11 20:07:18 +09:00
taizan-hokuto
dfa58146f6 Fix test data 2019-11-11 20:06:05 +09:00
taizan-hokuto
47d0464b14 Add a 'parity' 2019-11-11 19:59:25 +09:00
taizan-hokuto
45f3274907 Add testreplay chat 2019-11-10 22:27:06 +09:00
taizan-hokuto
7b704c2b12 Delete unnecessary lines 2019-11-10 21:05:27 +09:00
taizan-hokuto
2fcb9469b3 Fix parameter of util.py 2019-11-10 21:03:35 +09:00
taizan-hokuto
aab7c14d48 Add archived chat retriever 2019-11-10 15:25:25 +09:00
55448286+taizan-hokuto@users.noreply.github.com
365964d88c Merge branch 'develop' 2019-11-04 23:50:58 +09:00
55448286+taizan-hokuto@users.noreply.github.com
517f41f5fe Increment version 2019-11-04 23:50:41 +09:00
55448286+taizan-hokuto@users.noreply.github.com
432825b5ed Use logger when errors occur 2019-11-04 23:45:10 +09:00
55448286+taizan-hokuto@users.noreply.github.com
64ec413bca Move parser functions to processor 2019-11-04 23:43:00 +09:00
55448286+taizan-hokuto@users.noreply.github.com
7c6e12cbe5 Change format of multithread parser to class 2019-11-04 23:29:00 +09:00
55448286+taizan-hokuto@users.noreply.github.com
3912758a52 Update README 2019-11-03 22:43:41 +09:00
55448286+taizan-hokuto@users.noreply.github.com
8bc209fde8 Fix renderer type check 2019-11-03 22:26:47 +09:00
55448286+taizan-hokuto@users.noreply.github.com
3b580690c7 Delete listen_manager 2019-11-03 21:57:30 +09:00
30 changed files with 4404 additions and 389 deletions

View File

@@ -13,7 +13,7 @@ Other features:
+ Quick fetching of initial chat data by generating continuation params
instead of web scraping.
For more detailed information, see [wiki](https://github.com/taizan-hokuto/pytchat/wiki)
For more detailed information, see [wiki](https://github.com/taizan-hokuto/pytchat/wiki).
## Install
```python
@@ -40,15 +40,17 @@ while chat.is_alive():
from pytchat import LiveChat
import time
chat = LiveChat("G1w62uEMZ74", callback = func)
while chat.is_alive():
#other background operation here.
time.sleep(3)
def main()
chat = LiveChat("G1w62uEMZ74", callback = func)
while chat.is_alive():
time.sleep(3)
#other background operation.
def func(chatdata):
for c in chatdata.items:
#callback function is automatically called periodically.
def func(data):
for c in data.items:
print(f"{c.datetime} [{c.author.name}]-{c.message} {c.amountString}")
chat.tick()
data.tick()
```
### asyncio context:
@@ -59,20 +61,21 @@ import asyncio
async def main():
chat = LiveChatAsync("G1w62uEMZ74", callback = func)
while chat.is_alive():
#other background operation here.
await asyncio.sleep(3)
await asyncio.sleep(3)
#other background operation.
async def func(chat)
for c in chat.items:
#callback function is automatically called periodically.
async def func(data):
for c in data.items:
print(f"{c.datetime} [{c.author.name}]-{c.message} {c.amountString}")
await chat.tick_async()
await data.tick_async()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```
### yt api compatible processor:
### youtube api compatible processor:
```python
from pytchat import LiveChat, CompatibleProcessor
@@ -89,10 +92,34 @@ while chat.is_alive():
time.sleep(polling/len(data["items"]))
```
### replay:
```python
from pytchat import ReplayChatAsync
import asyncio
async def main():
chat = ReplayChatAsync("G1w62uEMZ74", seektime = 1000, callback = func)
while chat.is_alive():
await asyncio.sleep(3)
#other background operation here.
## Chatdata Structure of Default Processor
Structure of each item which got from items() function.
#callback function is automatically called periodically.
async def func(data):
for count in range(0,len(data.items)):
c= data.items[count]
if count!=len(data.items):
tick=data.items[count+1].timestamp -data.items[count].timestamp
else:
tick=0
print(f"<{c.elapsedTime}> [{c.author.name}]-{c.message} {c.amountString}")
await asyncio.sleep(tick/1000)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```
## Structure of Default Processor
Each item can be got with items() function.
<table>
<tr>
<th>name</th>
@@ -114,6 +141,11 @@ Structure of each item which got from items() function.
<td>str</td>
<td>emojis are represented by ":(shortcut text):"</td>
</tr>
<tr>
<td>messageEx</td>
<td>str</td>
<td>list of message texts and emoji URLs.</td>
</tr>
<tr>
<td>timestamp</td>
<td>int</td>
@@ -122,7 +154,11 @@ Structure of each item which got from items() function.
<tr>
<td>datetime</td>
<td>str</td>
<td></td>
<td>ex. "2019-10-10 12:34:56"</td>
</tr>
<td>elapsedTime</td>
<td>str</td>
<td>elapsed time. (ex. "1:02:27") *Replay Only.</td>
</tr>
<tr>
<td>amountValue</td>
@@ -137,7 +173,7 @@ Structure of each item which got from items() function.
<tr>
<td>currency</td>
<td>str</td>
<td>ISO 4217 currency codes (ex. "USD")</td>
<td><a href="https://en.wikipedia.org/wiki/ISO_4217">ISO 4217 currency codes</a> (ex. "USD")</td>
</tr>
<tr>
<td>bgColor</td>
@@ -166,7 +202,7 @@ Structure of author object.
<tr>
<td>channelId</td>
<td>str</td>
<td></td>
<td>*chatter's channel ID. NOT broadcasting video's channel ID.</td>
</tr>
<tr>
<td>channelUrl</td>
@@ -175,12 +211,12 @@ Structure of author object.
</tr>
<tr>
<td>imageUrl</td>
<td></td>
<td>str</td>
<td></td>
</tr>
<tr>
<td>badgeUrl</td>
<td></td>
<td>str</td>
<td></td>
</tr>
<tr>

View File

@@ -1,8 +1,8 @@
"""
pytchat is a python library for fetching youtube live chat.
pytchat is a python library for fetching youtube live chat without using yt api, Selenium, or BeautifulSoup.
"""
__copyright__ = 'Copyright (C) 2019 taizan-hokuto'
__version__ = '0.0.2.2'
__version__ = '0.0.3.6'
__license__ = 'MIT'
__author__ = 'taizan-hokuto'
__author_email__ = '55448286+taizan-hokuto@users.noreply.github.com'
@@ -13,6 +13,8 @@ __all__ = ["core_async","core_multithread","processors"]
from .api import (
LiveChat,
LiveChatAsync,
ReplayChat,
ReplayChatAsync,
ChatProcessor,
CompatibleProcessor,
SimpleDisplayProcessor,

View File

@@ -1,5 +1,7 @@
from .core_async.livechat import LiveChatAsync
from .core_multithread.livechat import LiveChat
from .core_async.livechat import LiveChatAsync
from .core_multithread.replaychat import ReplayChat
from .core_async.replaychat import ReplayChatAsync
from .processors.chat_processor import ChatProcessor
from .processors.default.processor import DefaultProcessor
from .processors.compatible.processor import CompatibleProcessor

View File

@@ -1,184 +0,0 @@
import asyncio
from .listener import AsyncListener
from .. import config
from .. import mylogger
import datetime
import os
import aiohttp
import signal
import threading
from .buffer import Buffer
from concurrent.futures import CancelledError
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
class ListenManager:
'''
動画IDまたは動画IDのリストを受け取り、
動画IDに対応したListenerを生成・保持する。
#Attributes
----------
_listeners: dict
ListenManegerがつかんでいるListener達のリスト.
key:動画ID value:動画IDに対応するListener
_queue: Queue
動画IDを外部から受け渡しするためのキュー
_queueが空である間は、ンブロッキングで他のタスクを実行
_queueに動画IDが投入されると、_dequeueメソッドで
直ちにListenerを生成し返す。
_event: threading.Event
キーボードのCTRL+Cを検知するためのEventオブジェクト
'''
def __init__(self,interruptable = True):
#チャット監視中の動画リスト
self._listeners={}
self._tasks = []
#外部からvideoを受け取るためのキュー
self._queue = asyncio.Queue()
self._event = threading.Event()
self._ready_queue()
self._is_alive = True
#キーボードのCtrl+cを押したとき、_hundler関数を呼び出すように設定
signal.signal(signal.SIGINT, (lambda a, b: self._handler(self._event, a, b)))
def is_alive(self)->bool:
'''
ListenManagerが稼働中であるか。
True->稼働中
False->Ctrl+Cが押されて終了
'''
logger.debug(f'check is_alive() :{self._is_alive}')
return self._is_alive
def _handler(self, event, sig, handler):
'''
Ctrl+Cが押下されたとき、終了フラグをセットする。
'''
logger.debug('Ctrl+c pushed')
self._is_alive = False
logger.debug('terminating listeners.')
for listener in self._listeners.values():
listener.terminate()
logger.debug('end.')
def _ready_queue(self):
#loop = asyncio.get_event_loop()
self._tasks.append(
asyncio.create_task(self._dequeue())
)
async def set_video_ids(self,video_ids:list):
for video_id in video_ids:
if video_id:
await self._queue.put(video_id)
async def get_listener(self,video_id) -> AsyncListener:
return await self._create_listener(video_id)
# async def getlivechat(self,video_id):
# '''
# 指定された動画IDのチャットデータを返す
# Parameter
# ----------
# video_id: str
# 動画ID
# Return
# ----------
# 引数で受け取った動画IDに対応する
# Listenerオブジェクトへの参照
# '''
# logger.debug('manager get/create listener')
# listener = await self._create_listener(video_id)
# '''
# 上が完了しないうちに、下が呼び出される
# '''
# if not listener._initialized:
# await asyncio.sleep(2)
# return []
# if listener:
# #listener._isfirstrun=False
# return await listener.getlivechat()
async def _dequeue(self):
'''
キューに入った動画IDを
Listener登録に回す。
'''
while True:
video_id = await self._queue.get()
#listenerを登録、タスクとして実行する
logger.debug(f'deque got [{video_id}]')
await self._create_listener(video_id)
async def _create_listener(self, video_id) -> AsyncListener:
'''
Listenerを作成しチャット取得中リストに加え、
Listenerを返す
'''
if video_id is None or not isinstance(video_id, str):
raise TypeError('video_idは文字列でなければなりません')
if video_id in self._listeners:
return self._listeners[video_id]
else:
#listenerを登録する
listener = AsyncListener(video_id,interruptable = False,buffer = Buffer())
self._listeners.setdefault(video_id,listener)
#task = asyncio.ensure_future(listener.initialize())
#await asyncio.gather(listener.initialize())
#task.add_done_callback(self.finish)
#await listener.initialize()
#self._tasks.append(task)
return listener
def finish(self,sender):
try:
if sender.result():
video_id = sender.result()[0]
message = sender.result()[1]
#listener終了時のコールバック
#sender.result()[]でデータを取得できる
logger.info(f'終了しました VIDEO_ID:[{video_id}] message:{message}')
#logger.info(f'終了しました')
if video_id in self._listeners:
self._listeners.pop(video_id)
except CancelledError:
logger.debug('cancelled.')
def get_listeners(self):
return self._listeners
def shutdown(self):
'''
ListenManegerを終了する
'''
logger.debug("start shutdown")
self._is_alive =False
try:
#Listenerを停止する。
for listener in self._listeners.values():
listener.terminate()
#taskをキャンセルする。
for task in self._tasks:
if not task.done():
#print(task)
task.cancel()
except Exception as er:
logger.info(str(er),type(er))
logger.debug("finished.")
def get_tasks(self):
return self._tasks

View File

@@ -3,19 +3,17 @@ import datetime
import json
import random
import signal
import threading
import time
import traceback
import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from .buffer import Buffer
from .parser import Parser
from ..parser.live import Parser
from .. import config
from .. import mylogger
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam
from ..processors.default.processor import DefaultProcessor
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
@@ -81,7 +79,7 @@ class LiveChatAsync:
self._exception_handler = exception_handler
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._setup()
if not LiveChatAsync._setup_finished:
@@ -124,7 +122,7 @@ class LiveChatAsync:
async def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループ開始する
_listenループのタスクを作成し開始する
"""
initial_continuation = await self._get_initial_continuation()
if initial_continuation is None:
@@ -164,7 +162,7 @@ class LiveChatAsync:
livechat_json = (await
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = Parser.parse( livechat_json )
metadata, chatdata = self._parser.parse( livechat_json )
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
@@ -288,12 +286,3 @@ class LiveChatAsync:
await asyncio.gather(*tasks,return_exceptions=True)
loop = asyncio.get_event_loop()
loop.stop()

View File

@@ -1,40 +0,0 @@
import json
from .. import config
from .. import mylogger
from .. exceptions import (
ResponseContextError,
NoContentsException,
NoContinuationsException )
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
class Parser:
@classmethod
def parse(cls, jsn):
if jsn is None:
return {'timeoutMs':0,'continuation':None},[]
if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError('動画に接続できません。'
'動画IDが間違っているか、動画が削除非公開の可能性があります。')
contents=jsn['response'].get('continuationContents')
#配信が終了した場合、もしくはチャットデータが取得できない場合
if contents is None:
raise NoContentsException('チャットデータを取得できませんでした。')
cont = contents['liveChatContinuation']['continuations'][0]
if cont is None:
raise NoContinuationsException('Continuationがありません。')
metadata = (cont.get('invalidationContinuationData') or
cont.get('timedContinuationData') or
cont.get('reloadContinuationData')
)
if metadata is None:
unknown = list(cont.keys())[0]
if unknown:
logger.error(f"Received unknown continuation type:{unknown}")
metadata = cont.get(unknown)
metadata.setdefault('timeoutMs', 10000)
chatdata = contents['liveChatContinuation'].get('actions')
return metadata, chatdata

View File

@@ -0,0 +1,311 @@
import aiohttp, asyncio
import datetime
import json
import random
import signal
import time
import traceback
import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from queue import Queue
from .buffer import Buffer
from ..parser.replay import Parser
from .. import config
from .. import mylogger
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import arcparam
from ..processors.default.processor import DefaultProcessor
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
MAX_RETRY = 10
headers = config.headers
class ReplayChatAsync:
'''asyncio(aiohttp)を利用してYouTubeのチャットデータを取得する。
Parameter
---------
video_id : str
動画ID
seektime : int
リプレイするチャットデータの開始時間(秒)
processor : ChatProcessor
チャットデータを加工するオブジェクト
buffer : Buffer(maxsize:20[default])
チャットデータchat_componentを格納するバッファ。
maxsize : 格納できるchat_componentの個数
default値20個。1個で約5~10秒分。
interruptable : bool
Ctrl+Cによる処理中断を行うかどうか。
callback : func
_listen()関数から一定間隔で自動的に呼びだす関数。
done_callback : func
listener終了時に呼び出すコールバック。
exception_handler : func
例外を処理する関数
direct_mode : bool
Trueの場合、bufferを使わずにcallbackを呼ぶ。
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
Attributes
---------
_is_alive : bool
チャット取得を停止するためのフラグ
'''
_setup_finished = False
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
callback = None,
done_callback = None,
exception_handler = None,
direct_mode = False):
self.video_id = video_id
self.seektime = seektime
self.processor = processor
self._buffer = buffer
self._callback = callback
self._done_callback = done_callback
self._exception_handler = exception_handler
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
if not ReplayChatAsync._setup_finished:
ReplayChatAsync._setup_finished = True
if exception_handler == None:
self._set_exception_handler(self._handle_exception)
else:
self._set_exception_handler(exception_handler)
if interruptable:
signal.signal(signal.SIGINT,
(lambda a, b:asyncio.create_task(
ReplayChatAsync.shutdown(None,signal.SIGINT,b))
))
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
else:
#callbackを呼ぶループタスクの開始
loop = asyncio.get_event_loop()
loop.create_task(self._callback_loop(self._callback))
#_listenループタスクの開始
loop = asyncio.get_event_loop()
listen_task = loop.create_task(self._startlisten())
#add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
async def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する
"""
initial_continuation = await self._get_initial_continuation()
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
await self._listen(initial_continuation)
async def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = arcparam.get(self.video_id,self.seektime)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
async def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
Bufferにチャットデータを格納、
次のcontinuaitonを取得してループする。
Parameter
---------
continuation : str
次のチャットデータ取得に必要なパラメータ
'''
try:
async with aiohttp.ClientSession() as session:
while(continuation and self._is_alive):
if self._pauser.empty():
#pause
await self._pauser.get()
#resume
#prohibit from blocking by putting None into _pauser.
self._pauser.put_nowait(None)
livechat_json = (await
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = self._parser.parse( livechat_json )
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
}
time_mark =time.time()
if self._direct_mode:
await self._callback(
self.processor.process([chat_component])
)
else:
await self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
await asyncio.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
logger.info(f"{str(e)}video_id:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
async def _get_livechat_json(self, continuation, session, headers):
'''
チャットデータが格納されたjsonデータを取得する。
'''
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =(
f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?"
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1):
async with session.get(url ,headers = headers) as resp:
try:
text = await resp.text()
status_code = resp.status
livechat_json = json.loads(text)
break
except (ClientConnectorError,json.JSONDecodeError) :
await asyncio.sleep(1)
continue
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
async def _callback_loop(self,callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
加工済みのチャットデータを渡す先の関数。
"""
while self.is_alive():
items = await self._buffer.get()
data = self.processor.process(items)
await callback(data)
async def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = await self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def pause(self):
if not self._pauser.empty():
self._pauser.get()
def resume(self):
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
def finish(self,sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
except CancelledError:
logger.debug(f'[{self.video_id}]cancelled:{sender}')
def terminate(self):
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put_nowait({'chatdata':'','timeout':1})
logger.info(f'終了しました:[{self.video_id}]')
@classmethod
def _set_exception_handler(cls, handler):
loop = asyncio.get_event_loop()
#default handler: cls._handle_exception
loop.set_exception_handler(handler)
@classmethod
def _handle_exception(cls, loop, context):
#msg = context.get("exception", context["message"])
if not isinstance(context["exception"],CancelledError):
logger.error(f"Caught exception: {context}")
loop= asyncio.get_event_loop()
loop.create_task(cls.shutdown(None,None,None))
@classmethod
async def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logger.debug(f"残っているタスクを終了しています")
await asyncio.gather(*tasks,return_exceptions=True)
loop = asyncio.get_event_loop()
loop.stop()

View File

@@ -3,13 +3,12 @@ import datetime
import json
import random
import signal
import threading
import time
import traceback
import urllib.parse
from concurrent.futures import CancelledError, ThreadPoolExecutor
from .buffer import Buffer
from .parser import Parser
from ..parser.live import Parser
from .. import config
from .. import mylogger
from ..exceptions import ChatParseException,IllegalFunctionCall
@@ -58,7 +57,7 @@ class LiveChat:
チャットデータ取得ループ_listen用のスレッド
_is_alive : bool
チャット取得を終了したか
チャット取得を停止するためのフラグ
'''
_setup_finished = False
@@ -143,7 +142,7 @@ class LiveChat:
def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
にチャットデータを格納、
BUfferにチャットデータを格納、
次のcontinuaitonを取得してループする
Parameter
@@ -158,7 +157,6 @@ class LiveChat:
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = self._parser.parse( livechat_json )
#チャットデータを含むコンポーネントを組み立ててbufferに投入する
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
@@ -172,16 +170,12 @@ class LiveChat:
)
else:
self._buffer.put(chat_component)
#次のchatを取得するまでsleepする
diff_time = timeout - (time.time()-time_mark)
if diff_time < 0 : diff_time=0
time.sleep(diff_time)
#次のチャットデータのcontinuationパラメータを取り出す。
continuation = metadata.get('continuation')
#whileループ先頭に戻る
except ChatParseException as e:
logger.error(f"{str(e)}動画ID:\"{self.video_id}\"")
logger.info(f"{str(e)}video_id:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
logger.error(f"{traceback.format_exc(limit = -1)}")

View File

@@ -0,0 +1,286 @@
import requests
import datetime
import json
import random
import signal
import time
import traceback
import urllib.parse
from concurrent.futures import CancelledError, ThreadPoolExecutor
from queue import Queue
from .buffer import Buffer
from ..parser.replay import Parser
from .. import config
from .. import mylogger
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import arcparam
from ..processors.default.processor import DefaultProcessor
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
MAX_RETRY = 10
headers = config.headers
class ReplayChat:
''' スレッドプールを利用してYouTubeのライブ配信のチャットデータを取得する
Parameter
---------
video_id : str
動画ID
seektime : int
リプレイするチャットデータの開始時間(秒)
processor : ChatProcessor
チャットデータを加工するオブジェクト
buffer : Buffer(maxsize:20[default])
チャットデータchat_componentを格納するバッファ。
maxsize : 格納できるchat_componentの個数
default値20個。1個で約5~10秒分。
interruptable : bool
Ctrl+Cによる処理中断を行うかどうか。
callback : func
_listen()関数から一定間隔で自動的に呼びだす関数。
done_callback : func
listener終了時に呼び出すコールバック。
direct_mode : bool
Trueの場合、bufferを使わずにcallbackを呼ぶ。
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
Attributes
---------
_executor : ThreadPoolExecutor
チャットデータ取得ループ_listen用のスレッド
_is_alive : bool
チャット取得を終了したか
'''
_setup_finished = False
#チャット監視中のListenerのリスト
_listeners= []
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = Buffer(maxsize = 20),
interruptable = True,
callback = None,
done_callback = None,
direct_mode = False
):
self.video_id = video_id
self.seektime = seektime
self.processor = processor
self._buffer = buffer
self._callback = callback
self._done_callback = done_callback
self._executor = ThreadPoolExecutor(max_workers=2)
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
if not ReplayChat._setup_finished:
ReplayChat._setup_finished = True
if interruptable:
signal.signal(signal.SIGINT, (lambda a, b:
(ReplayChat.shutdown(None,signal.SIGINT,b))
))
ReplayChat._listeners.append(self)
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
else:
#callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop,self._callback)
#_listenループタスクの開始
listen_task = self._executor.submit(self._startlisten)
#add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する
"""
initial_continuation = self._get_initial_continuation()
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
self._listen(initial_continuation)
def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = arcparam.get(self.video_id,self.seektime)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
にチャットデータを格納、
次のcontinuaitonを取得してループする
Parameter
---------
continuation : str
次のチャットデータ取得に必要なパラメータ
'''
try:
with requests.Session() as session:
while(continuation and self._is_alive):
if self._pauser.empty():
#pause
self._pauser.get()
#resume
#prohibit from blocking by putting None into _pauser.
self._pauser.put_nowait(None)
livechat_json = (
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = self._parser.parse( livechat_json )
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
}
time_mark =time.time()
if self._direct_mode:
self._callback(
self.processor.process([chat_component])
)
else:
self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
if diff_time < 0 : diff_time=0
time.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
logger.error(f"{str(e)}動画ID:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
def _get_livechat_json(self, continuation, session, headers):
'''
チャットデータが格納されたjsonデータを取得する。
'''
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =(
f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?"
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1):
with session.get(url ,headers = headers) as resp:
try:
text = resp.text
status_code = resp.status_code
livechat_json = json.loads(text)
break
except json.JSONDecodeError :
time.sleep(1)
continue
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
def _callback_loop(self,callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
加工済みのチャットデータを渡す先の関数。
"""
while self.is_alive():
items = self._buffer.get()
data = self.processor.process(items)
callback(data)
def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def pause(self):
if not self._pauser.empty():
self._pauser.get()
def resume(self):
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
def finish(self,sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
except CancelledError:
logger.debug(f'[{self.video_id}]cancelled:{sender}')
def terminate(self):
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put({'chatdata':'','timeout':1})
logger.info(f'終了しました:[{self.video_id}]')
@classmethod
def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています")
for t in ReplayChat._listeners:
t._is_alive = False

View File

@@ -17,7 +17,7 @@ def get_logger(modname,mode=logging.DEBUG):
logger.addHandler(handler1)
#create handler2 for recording log file
if mode <= logging.DEBUG:
handler2 = logging.FileHandler(filename="log.txt")
handler2 = logging.FileHandler(filename="log.txt", encoding='utf-8')
handler2.setLevel(logging.ERROR)
handler2.setFormatter(my_formatter)

View File

@@ -0,0 +1,120 @@
from base64 import urlsafe_b64encode as b64enc
from functools import reduce
import calendar, datetime, pytz
import math
import random
import urllib.parse
'''
Generate continuation parameter of youtube replay chat.
Author: taizan-hokuto (2019) @taizan205
ver 0.0.1 2019.10.05
'''
def _gen_vid(video_id):
"""generate video_id parameter.
Parameter
---------
video_id : str
Return
---------
byte[] : base64 encoded video_id parameter.
"""
header_magic = b'\x0A\x0F\x1A\x0D\x0A'
header_id = video_id.encode()
header_sep_1 = b'\x1A\x13\xEA\xA8\xDD\xB9\x01\x0D\x0A\x0B'
header_terminator = b'\x20\x01'
item = [
header_magic,
_nval(len(header_id)),
header_id,
header_sep_1,
header_id,
header_terminator
]
return urllib.parse.quote(
b64enc(reduce(lambda x, y: x+y, item)).decode()
).encode()
def _nval(val):
"""convert value to byte array"""
if val<0: raise ValueError
buf = b''
while val >> 7:
m = val & 0xFF | 0x80
buf += m.to_bytes(1,'big')
val >>= 7
buf += val.to_bytes(1,'big')
return buf
def _tzparity(video_id,times):
t=0
for i,s in enumerate(video_id):
ss = ord(s)
if(ss % 2 == 0):
t += ss*(12-i)
else:
t ^= ss*i
return ((times^t) % 2).to_bytes(1,'big')
def get(video_id, seektime = 0, topchatonly = False):
switch_01 = b'\x04' if topchatonly else b'\x01'
if seektime < 0:
raise ValueError('seektime is 0 or positive number.')
if seektime == 0:
times =_nval(1)
switch = b'\x04'
else:
times =_nval(int(seektime*1000000))
switch = b'\x03'
parity = _tzparity(video_id, seektime)
header_magic= b'\xA2\x9D\xB0\xD3\x04'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = times
sep_1 = b'\x30\x00\x38\x00\x40\x00\x48'
sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40'
sep_3 = b'\x00\x58\x03\x60'
sep_4 = b'\x68'+parity+b'\x72\x04\x08'
sep_5 = b'\x10'+parity+b'\x78\x00'
body = [
sep_0,
_nval(len(vid)),
vid,
time_tag,
timestamp1,
sep_1,
switch,
sep_2,
chkstr,
sep_3,
switch_01,
sep_4,
switch_01,
sep_5
]
body = reduce(lambda x, y: x+y, body)
return urllib.parse.quote(
b64enc( header_magic +
_nval(len(body)) +
body
).decode()
)

View File

@@ -1,11 +1,16 @@
from base64 import urlsafe_b64encode as b64enc
from functools import reduce
import calendar, datetime, pytz
import math
import random
import urllib.parse
'''
Generate continuation parameter of youtube live chat.
Author: taizan-hokuto (2019) @taizan205
ver 0.0.1 2019.10.05
'''
def _gen_vid(video_id):
"""generate video_id parameter.
Parameter
@@ -39,6 +44,17 @@ def _gen_vid(video_id):
b64enc(reduce(lambda x, y: x+y, item)).decode()
).encode()
def _tzparity(video_id,times):
t=0
for i,s in enumerate(video_id):
ss = ord(s)
if(ss % 2 == 0):
t += ss*(12-i)
else:
t ^= ss*i
return ((times^t) % 2).to_bytes(1,'big')
def _nval(val):
"""convert value to byte array"""
if val<0: raise ValueError
@@ -53,15 +69,16 @@ def _nval(val):
def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchatonly = False):
#_short_type2
switch_01 = b'\x04' if topchatonly else b'\x01'
header_magic= b'\xD2\x87\xCC\xC8\x03'
parity = _tzparity(video_id, _ts1^_ts2^_ts3^_ts4^_ts5)
header_magic= b'\xD2\x87\xCC\xC8\x03'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = _nval(_ts1)
sep_1 = b'\x30\x00\x38\x00\x40\x02\x4A'
un_len = b'\x2B'
sep_2 = b'\x08\x00\x10\x00\x18\x00\x20\x00'
sep_2 = b'\x08'+parity+b'\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D'
sep_3 = b'\x3A\x00\x40\x00\x4A'
sep_4_len = b'\x02'

View File

View File

@@ -1,3 +1,9 @@
"""
pytchat.parser.live
~~~~~~~~~~~~~~~~~~~
This module is parser of live chat JSON.
"""
import json
from .. import config
from .. import mylogger
@@ -12,6 +18,27 @@ logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
class Parser:
def parse(self, jsn):
"""
このparse関数はLiveChat._listen() 関数から定期的に呼び出される
引数jsnはYoutubeから取得したチャットデータの生JSONであり
このparse関数によって与えられたJSONを以下に分割して返す
+ timeout (次のチャットデータ取得までのインターバル)
+ chat dataチャットデータ本体
+ continuation 次のチャットデータ取得に必要となるパラメータ.
Parameter
----------
+ jsn : dict
+ Youtubeから取得したチャットデータのJSONオブジェクト
pythonの辞書形式に変換済みの状態で渡される
Returns
-------
+ metadata : dict
+ チャットデータに付随するメタデータtimeout 動画IDcontinuationパラメータで構成される
+ chatdata : list[dict]
+ チャットデータ本体のリスト
"""
if jsn is None:
return {'timeoutMs':0,'continuation':None},[]
if jsn['response']['responseContext'].get('errors'):

77
pytchat/parser/replay.py Normal file
View File

@@ -0,0 +1,77 @@
import json
from .. import config
from .. import mylogger
from .. exceptions import (
ResponseContextError,
NoContentsException,
NoContinuationsException )
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
class Parser:
def parse(self, jsn):
"""
このparse関数はReplayChat._listen() 関数から定期的に呼び出される。
引数jsnはYoutubeから取得したアーカイブ済みチャットデータの生JSONであり、
このparse関数によって与えられたJSONを以下に分割して返す。
+ timeout (次のチャットデータ取得までのインターバル)
+ chat dataチャットデータ本体
+ continuation (次のチャットデータ取得に必要となるパラメータ).
ライブ配信のチャットとアーカイブ済み動画のチャットは構造が若干異なっているが、
ライブチャットと同じデータ形式に変換することにより、
同じprocessorでライブとリプレイどちらでも利用できるようにしている。
Parameter
----------
+ jsn : dict
+ Youtubeから取得したチャットデータのJSONオブジェクト。
pythonの辞書形式に変換済みの状態で渡される
Returns
-------
+ metadata : dict
+ チャットデータに付随するメタデータ。timeout、 動画ID、continuationパラメータで構成される。
+ chatdata : list[dict]
+ チャットデータ本体のリスト。
"""
if jsn is None:
return {'timeoutMs':0,'continuation':None},[]
if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError('動画に接続できません。'
'動画IDが間違っているか、動画が削除非公開の可能性があります。')
contents=jsn['response'].get('continuationContents')
#配信が終了した場合、もしくはチャットデータが取得できない場合
if contents is None:
raise NoContentsException('チャットデータを取得できませんでした。')
cont = contents['liveChatContinuation']['continuations'][0]
if cont is None:
raise NoContinuationsException('Continuationがありません。')
metadata = cont.get('liveChatReplayContinuationData')
if metadata is None:
unknown = list(cont.keys())[0]
if unknown:
metadata = cont.get(unknown)
actions = contents['liveChatContinuation'].get('actions')
if actions is None:
raise NoContentsException('チャットデータを取得できませんでした。')
interval = self.get_interval(actions)
metadata.setdefault("timeoutMs",interval)
"""アーカイブ済みチャットはライブチャットと構造が異なっているため、以下の行により
ライブチャットと同じ形式にそろえる"""
chatdata = [action["replayChatItemAction"]["actions"][0] for action in actions]
return metadata, chatdata
def get_interval(self, actions: list):
if actions is None:
return 0
start = int(actions[0]["replayChatItemAction"]["videoOffsetTimeMsec"])
last = int(actions[-1]["replayChatItemAction"]["videoOffsetTimeMsec"])
return (last - start)

View File

@@ -1,43 +0,0 @@
from .renderer.textmessage import LiveChatTextMessageRenderer
from .renderer.paidmessage import LiveChatPaidMessageRenderer
from .renderer.paidsticker import LiveChatPaidStickerRenderer
from .renderer.legacypaid import LiveChatLegacyPaidMessageRenderer
def parse(sitem):
action = sitem.get("addChatItemAction")
if action:
item = action.get("item")
if item is None: return None
rd={}
try:
renderer = get_renderer(item)
if renderer == None:
return None
rd["kind"] = "youtube#liveChatMessage"
rd["etag"] = ""
rd["id"] = 'LCC.' + renderer.get_id()
rd["snippet"] = renderer.get_snippet()
rd["authorDetails"] = renderer.get_authordetails()
except (KeyError,TypeError,AttributeError) as e:
print(f"------{str(type(e))}-{str(e)}----------")
print(sitem)
return None
return rd
def get_renderer(item):
if item.get("liveChatTextMessageRenderer"):
renderer = LiveChatTextMessageRenderer(item)
elif item.get("liveChatPaidMessageRenderer"):
renderer = LiveChatPaidMessageRenderer(item)
elif item.get( "liveChatPaidStickerRenderer"):
renderer = LiveChatPaidStickerRenderer(item)
elif item.get("liveChatLegacyPaidMessageRenderer"):
renderer = LiveChatLegacyPaidMessageRenderer(item)
else:
renderer = None
return renderer

View File

@@ -1,10 +1,14 @@
from . import parser
import json
import os
import traceback
import datetime
import time
class CompatibleProcessor():
from .renderer.textmessage import LiveChatTextMessageRenderer
from .renderer.paidmessage import LiveChatPaidMessageRenderer
from .renderer.paidsticker import LiveChatPaidStickerRenderer
from .renderer.legacypaid import LiveChatLegacyPaidMessageRenderer
from ... import mylogger
from ... import config
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
class CompatibleProcessor:
def process(self, chat_components: list):
@@ -26,7 +30,7 @@ class CompatibleProcessor():
if action.get('addChatItemAction') is None: continue
if action['addChatItemAction'].get('item') is None: continue
chat = parser.parse(action)
chat = self.parse(action)
if chat:
chatlist.append(chat)
ret["pollingIntervalMillis"] = int(timeout*1000)
@@ -37,3 +41,41 @@ class CompatibleProcessor():
ret["items"] = chatlist
return ret
def parse(self, sitem):
action = sitem.get("addChatItemAction")
if action:
item = action.get("item")
if item is None: return None
rd={}
try:
renderer = self.get_renderer(item)
if renderer == None:
return None
rd["kind"] = "youtube#liveChatMessage"
rd["etag"] = ""
rd["id"] = 'LCC.' + renderer.get_id()
rd["snippet"] = renderer.get_snippet()
rd["authorDetails"] = renderer.get_authordetails()
except (KeyError,TypeError,AttributeError) as e:
logger.error(f"Error: {str(type(e))}-{str(e)}")
logger.error(f"item: {sitem}")
return None
return rd
def get_renderer(self, item):
if item.get("liveChatTextMessageRenderer"):
renderer = LiveChatTextMessageRenderer(item)
elif item.get("liveChatPaidMessageRenderer"):
renderer = LiveChatPaidMessageRenderer(item)
elif item.get( "liveChatPaidStickerRenderer"):
renderer = LiveChatPaidStickerRenderer(item)
elif item.get("liveChatLegacyPaidMessageRenderer"):
renderer = LiveChatLegacyPaidMessageRenderer(item)
else:
renderer = None
return renderer

View File

@@ -1,39 +0,0 @@
from .renderer.textmessage import LiveChatTextMessageRenderer
from .renderer.paidmessage import LiveChatPaidMessageRenderer
from .renderer.paidsticker import LiveChatPaidStickerRenderer
from .renderer.legacypaid import LiveChatLegacyPaidMessageRenderer
def parse(sitem):
action = sitem.get("addChatItemAction")
if action:
item = action.get("item")
if item is None: return None
try:
renderer = get_renderer(item)
if renderer == None:
return None
renderer.get_snippet()
renderer.get_authordetails()
except (KeyError,TypeError,AttributeError) as e:
print(f"------{str(type(e))}-{str(e)}----------")
print(sitem)
return None
return renderer
def get_renderer(item):
if item.get("liveChatTextMessageRenderer"):
renderer = LiveChatTextMessageRenderer(item)
elif item.get("liveChatPaidMessageRenderer"):
renderer = LiveChatPaidMessageRenderer(item)
elif item.get( "liveChatPaidStickerRenderer"):
renderer = LiveChatPaidStickerRenderer(item)
elif item.get("liveChatLegacyPaidMessageRenderer"):
renderer = LiveChatLegacyPaidMessageRenderer(item)
else:
renderer = None
return renderer

View File

@@ -1,7 +1,12 @@
from . import parser
import asyncio
import time
from .renderer.textmessage import LiveChatTextMessageRenderer
from .renderer.paidmessage import LiveChatPaidMessageRenderer
from .renderer.paidsticker import LiveChatPaidStickerRenderer
from .renderer.legacypaid import LiveChatLegacyPaidMessageRenderer
from ... import config
from ... import mylogger
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
class Chatdata:
def __init__(self,chatlist:list, timeout:float):
@@ -10,13 +15,13 @@ class Chatdata:
def tick(self):
if self.interval == 0:
time.sleep(3)
time.sleep(1)
return
time.sleep(self.interval/len(self.items))
async def tick_async(self):
if self.interval == 0:
await asyncio.sleep(0.5)
await asyncio.sleep(1)
return
await asyncio.sleep(self.interval/len(self.items))
@@ -37,8 +42,39 @@ class DefaultProcessor:
if action.get('addChatItemAction') is None: continue
if action['addChatItemAction'].get('item') is None: continue
chat = parser.parse(action)
chat = self._parse(action)
if chat:
chatlist.append(chat)
return Chatdata(chatlist, float(timeout))
def _parse(self, sitem):
action = sitem.get("addChatItemAction")
if action:
item = action.get("item")
if item is None: return None
try:
renderer = self._get_renderer(item)
if renderer == None:
return None
renderer.get_snippet()
renderer.get_authordetails()
except (KeyError,TypeError,AttributeError) as e:
logger.error(f"{str(type(e))}-{str(e)} sitem:{str(sitem)}")
return None
return renderer
def _get_renderer(self, item):
if item.get("liveChatTextMessageRenderer"):
renderer = LiveChatTextMessageRenderer(item)
elif item.get("liveChatPaidMessageRenderer"):
renderer = LiveChatPaidMessageRenderer(item)
elif item.get( "liveChatPaidStickerRenderer"):
renderer = LiveChatPaidStickerRenderer(item)
elif item.get("liveChatLegacyPaidMessageRenderer"):
renderer = LiveChatLegacyPaidMessageRenderer(item)
else:
renderer = None
return renderer

View File

@@ -13,8 +13,14 @@ class BaseRenderer:
self.id = self.renderer.get('id')
timestampUsec = int(self.renderer.get("timestampUsec",0))
self.timestamp = int(timestampUsec/1000)
tst = self.renderer.get("timestampText")
if tst:
self.elapsedTime = tst.get("simpleText")
else:
self.elapsedTime = ""
self.datetime = self.get_datetime(timestampUsec)
self.message = self.get_message(self.renderer)
self.messageEx = self.get_message_ex(self.renderer)
self.id = self.renderer.get('id')
self.amountValue= 0.0
self.amountString = ""
@@ -49,6 +55,19 @@ class BaseRenderer:
message += r.get('text','')
return message
def get_message_ex(self,renderer):
message = []
if renderer.get("message"):
runs=renderer["message"].get("runs")
if runs:
for r in runs:
if r:
if r.get('emoji'):
message.append(r['emoji']['image']['thumbnails'][1].get('url'))
else:
message.append(r.get('text',''))
return message
def get_badges(self,renderer):
isVerified = False
isChatOwner = False

View File

@@ -1,12 +1,42 @@
import re
from . import currency
from .paidmessage import LiveChatPaidMessageRenderer
from .base import BaseRenderer
superchat_regex = re.compile(r"^(\D*)(\d{1,3}(,\d{3})*(\.\d*)*\b)$")
class LiveChatPaidStickerRenderer(LiveChatPaidMessageRenderer):
class LiveChatPaidStickerRenderer(BaseRenderer):
def __init__(self, item):
super().__init__(item, "superSticker")
def get_snippet(self):
super().get_snippet()
self.author.name = self.renderer["authorName"]["simpleText"]
amountDisplayString, symbol, amount =(
self.get_amountdata(self.renderer)
)
self.message = ""
self.amountValue = amount
self.amountString = amountDisplayString
self.currency = currency.symbols[symbol]["fxtext"] if currency.symbols.get(symbol) else symbol
self.bgColor = self.renderer.get("moneyChipBackgroundColor", 0)
self.sticker = "https:"+self.renderer["sticker"]["thumbnails"][0]["url"]
def get_amountdata(self,renderer):
amountDisplayString = renderer["purchaseAmountText"]["simpleText"]
m = superchat_regex.search(amountDisplayString)
if m:
symbol = m.group(1)
amount = float(m.group(2).replace(',',''))
else:
symbol = ""
amount = 0.0
return amountDisplayString, symbol, amount

View File

@@ -31,7 +31,7 @@ class SimpleDisplayProcessor(ChatProcessor):
purchase_amount_text = ''
else:
root = ( action['addChatItemAction']['item'].get('liveChatPaidMessageRenderer') or
action['addChatItemAction']['item'].get('liveChatPaidMessageRenderer') )
action['addChatItemAction']['item'].get('liveChatPaidStickerRenderer') )
if root:
author_name = root['authorName']['simpleText']
message = self._parse_message(root.get('message'))

View File

@@ -0,0 +1,192 @@
"""
speedmeter.py
チャットの勢いを算出するChatProcessor
Calculate speed of chat.
"""
import calendar, datetime, pytz
class RingQueue:
"""
リング型キュー
Attributes
----------
items : list
格納されているアイテムのリスト。
first_pos : int
キュー内の一番古いアイテムを示すリストのインデックス。
last_pos : int
キュー内の一番新しいアイテムを示すリストのインデックス。
mergin : boolean
キュー内に余裕があるか。キュー内のアイテム個数が、キューの最大個数未満であればTrue。
"""
def __init__(self, capacity = 10):
"""
コンストラクタ
Parameter
----------
capacity:このキューに格納するアイテムの最大個数。
格納時に最大個数を超える場合は一番古いアイテムから
上書きする。
"""
if capacity <= 0:
raise ValueError
self.items = list()
self.capacity = capacity
self.first_pos = 0
self.last_pos = 0
self.mergin = True
def put(self, item):
"""
引数itemに指定されたアイテムをこのキューに格納する。
キューの最大個数を超える場合は、一番古いアイテムの位置に上書きする。
Parameter
----------
item:格納するアイテム
"""
if self.mergin:
self.items.append(item)
self.last_pos = len(self.items)-1
if self.last_pos == self.capacity-1:
self.mergin = False
return
self.last_pos += 1
if self.last_pos > self.capacity-1:
self.last_pos = 0
self.items[self.last_pos] = item
self.first_pos += 1
if self.first_pos > self.capacity-1:
self.first_pos = 0
def get(self):
"""
キュー内の一番古いアイテムへの参照を返す
(アイテムは削除しない)
Return
----------
キュー内の一番古いアイテムへの参照
"""
return self.items[self.first_pos]
def item_count(self):
return len(self.items)
class SpeedCalculator(RingQueue):
"""
チャットの勢いを計算するクラス
Parameter
----------
格納するチャットブロックの数
"""
def __init__(self, capacity, video_id):
super().__init__(capacity)
self.video_id=video_id
self.speed = 0
def process(self, chat_components: list):
if chat_components:
for component in chat_components:
chatdata = component.get('chatdata')
if chatdata is None:
return self.speed
self.speed = self.calc(chatdata)
return self.speed
def _value(self):
"""
ActionsQueue内のチャットデータリストから、
チャット速度を計算して返す
Return
---------------------------
チャット速度(1分間で換算したチャット数)
"""
try:
#キュー内のactionsの総チャット数
total = sum(item['chat_count'] for item in self.items)
#キュー内の最初と最後のチャットの時間差
duration = (self.items[self.last_pos]['endtime']
- self.items[self.first_pos]['starttime'])
if duration != 0:
return int(total*60/duration)
return 0
except IndexError:
return 0
def _get_timestamp(self, action :dict):
"""
チャットデータのtimestampUsecを読み取る
liveChatTickerSponsorItemRenderer等のtickerデータは時刻格納位置が
異なるため、時刻データなしとして扱う
"""
try:
item = action['addChatItemAction']['item']
timestamp = int(item[list(item.keys())[0]]['timestampUsec'])
except (KeyError,TypeError):
return None
return timestamp
def calc(self,actions):
def empty_data():
'''
データがない場合にゼロのデータをリングキューに入れる
'''
timestamp_now = calendar.timegm(datetime.datetime.
now(pytz.utc).utctimetuple())
self.put({
'chat_count':0,
'starttime':int(timestamp_now),
'endtime':int(timestamp_now)
})
return self._value()
if actions is None or len(actions)==0:
return empty_data
#actions内の時刻データを持つチャットデータの数tickerは除く
counter=0
#actions内の最初のチャットデータの時刻
starttime= None
#actions内の最後のチャットデータの時刻
endtime=None
for action in actions:
#チャットデータからtimestampUsecを読み取る
gettime = self._get_timestamp(action)
#時刻のないデータだった場合は次の行のデータで読み取り試行
if gettime is None:
continue
#最初に有効な時刻を持つデータのtimestampをstarttimeに設定
if starttime is None:
starttime = gettime
#最後のtimestampを設定(途中で時刻のないデータの場合もあるので上書きしていく)
endtime = gettime
#チャットの数をインクリメント
counter+=1
#チャット速度用のデータをリングキューに送る
if starttime is None or endtime is None:
return empty_data
self.put({
'chat_count':counter,
'starttime':int(starttime/1000000),
'endtime':int(endtime/1000000)
})
return self._value()

View File

@@ -1,7 +1,7 @@
import requests,json,datetime
from .. import config
def download(cls,url):
def download(url):
_session = requests.Session()
html = _session.get(url, headers=config.headers)
with open(str(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
@@ -9,7 +9,7 @@ def download(cls,url):
json.dump(html.json(),f,ensure_ascii=False)
def save(cls,data,filename):
def save(data,filename):
with open(str(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')
)+filename,mode ='w',encoding='utf-8') as f:
f.writelines(data)

26
tests/test_arcparam.py Normal file
View File

@@ -0,0 +1,26 @@
import pytest
from pytchat.parser.replay import Parser
import pytchat.config as config
import requests, json
from pytchat.paramgen import arcparam
def test_arcparam_0(mocker):
param = arcparam.get("01234567890")
assert "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoATAAOABAAEgEUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAXIECAEQAXgA" == param
def test_arcparam_1(mocker):
param = arcparam.get("01234567890", seektime = 100000)
assert "op2w0wR3GjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QogNDbw_QCMAA4AEAASANSHAgAEAAYACAAKg5zdGF0aWNjaGVja3N1bUAAWANgAWgBcgQIARABeAA%3D" == param
def test_arcparam_2(mocker):
param = arcparam.get("SsjCnHOk-Sk")
url=f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1"
resp = requests.Session().get(url,headers = config.headers)
jsn = json.loads(resp.text)
parser = Parser()
_ , chatdata = parser.parse(jsn)
test_id = chatdata[0]["addChatItemAction"]["item"]["liveChatTextMessageRenderer"]["id"]
print(test_id)
assert "CjoKGkNMYXBzZTdudHVVQ0Zjc0IxZ0FkTnFnQjVREhxDSnlBNHV2bnR1VUNGV0dnd2dvZDd3NE5aZy0w" == test_id

View File

@@ -1,7 +1,7 @@
import json
import pytest
import asyncio,aiohttp
from pytchat.core_async.parser import Parser
from pytchat.parser.live import Parser
from pytchat.processors.compatible.processor import CompatibleProcessor
from pytchat.exceptions import (
NoLivechatRendererException,NoYtinitialdataException,
@@ -12,13 +12,15 @@ from pytchat.processors.compatible.renderer.paidmessage import LiveChatPaidMessa
from pytchat.processors.compatible.renderer.paidsticker import LiveChatPaidStickerRenderer
from pytchat.processors.compatible.renderer.legacypaid import LiveChatLegacyPaidMessageRenderer
parser = Parser()
def test_textmessage(mocker):
'''api互換processorのテスト通常テキストメッセージ'''
processor = CompatibleProcessor()
_json = _open_file("tests/testdata/compatible/textmessage.json")
_, chatdata = Parser.parse(json.loads(_json))
_, chatdata = parser.parse(json.loads(_json))
data = {
"video_id" : "",
"timeout" : 7,
@@ -55,7 +57,7 @@ def test_newsponcer(mocker):
_json = _open_file("tests/testdata/compatible/newSponsor.json")
_, chatdata = Parser.parse(json.loads(_json))
_, chatdata = parser.parse(json.loads(_json))
data = {
"video_id" : "",
"timeout" : 7,
@@ -91,7 +93,7 @@ def test_superchat(mocker):
_json = _open_file("tests/testdata/compatible/superchat.json")
_, chatdata = Parser.parse(json.loads(_json))
_, chatdata = parser.parse(json.loads(_json))
data = {
"video_id" : "",
"timeout" : 7,

View File

@@ -1,6 +1,6 @@
import pytest
from pytchat.core_async.parser import Parser as AsyncParser
from pytchat.core_multithread.parser import Parser as ThreadParser
from pytchat.parser.live import Parser
import json
import asyncio,aiohttp

9
tests/test_liveparam.py Normal file
View File

@@ -0,0 +1,9 @@
import pytest
from pytchat.paramgen import liveparam
def test_liveparam_0(mocker):
_ts1= 1546268400
param = liveparam._build("01234567890",
*([_ts1*1000000 for i in range(5)]))
test_param="0ofMyAPiARp8Q2c4S0RRb0xNREV5TXpRMU5qYzRPVEFhUTZxNXdiMEJQUW83YUhSMGNITTZMeTkzZDNjdWVXOTFkSFZpWlM1amIyMHZiR2wyWlY5amFHRjBQM1k5TURFeU16UTFOamM0T1RBbWFYTmZjRzl3YjNWMFBURWdBZyUzRCUzRCiAuNbVqsrfAjAAOABAAkorCAEQABgAIAAqDnN0YXRpY2NoZWNrc3VtOgBAAEoCCAFQgLjW1arK3wJYA1CAuNbVqsrfAliAuNbVqsrfAmgBggEECAEQAIgBAKABgLjW1arK3wI%3D"
assert test_param == param

View File

@@ -1,5 +1,5 @@
import pytest
from pytchat.core_async.parser import Parser
from pytchat.parser.live import Parser
import json
import asyncio,aiohttp
from aioresponses import aioresponses
@@ -11,7 +11,7 @@ from pytchat.exceptions import (
def _open_file(path):
with open(path,mode ='r',encoding = 'utf-8') as f:
return f.read()
parser = Parser()
@aioresponses()
def test_finishedlive(*mock):
@@ -21,7 +21,7 @@ def test_finishedlive(*mock):
_text = json.loads(_text)
try:
Parser.parse(_text)
parser.parse(_text)
assert False
except NoContentsException:
assert True
@@ -34,7 +34,7 @@ def test_parsejson(*mock):
_text = json.loads(_text)
try:
Parser.parse(_text)
parser.parse(_text)
jsn = _text
timeout = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["timeoutMs"]
continuation = jsn["response"]["continuationContents"]["liveChatContinuation"]["continuations"][0]["timedContinuationData"]["continuation"]

3104
tests/testdata/chatreplay.json vendored Normal file

File diff suppressed because it is too large Load Diff