Compare commits

...

34 Commits

Author SHA1 Message Date
taizan-hokuto
f0a1a509a0 Merge branch 'release/v0.0.7' 2020-05-05 22:59:16 +09:00
taizan-hokuto
5ebca605ac Increment version 2020-05-05 22:58:29 +09:00
taizan-hokuto
a46c82d3c0 Merge branch 'hotfix/membership_renderer' 2020-05-05 22:51:16 +09:00
taizan-hokuto
3826b32ab9 Merge tag 'membership_renderer' into develop 2020-05-05 22:51:16 +09:00
taizan-hokuto
206d052907 Modify parsing membership 2020-05-05 22:47:12 +09:00
taizan-hokuto
04457eaa5c Merge branch 'hotfix/termination' 2020-05-05 21:18:46 +09:00
taizan-hokuto
141d7a9299 Merge tag 'termination' into develop 2020-05-05 21:18:46 +09:00
taizan-hokuto
bd32c75833 Modify termination 2020-05-05 21:16:06 +09:00
taizan-hokuto
84bae4ad2a Modify bytes combination 2020-04-18 00:55:56 +09:00
taizan-hokuto
3243d69d7a Merge branch 'hotfix/json_decode_error' 2020-03-14 09:43:37 +09:00
taizan-hokuto
d72608bf0a Merge tag 'json_decode_error' into develop
v0.0.6.6
2020-03-14 09:43:37 +09:00
taizan-hokuto
6e1b735ebc Increment version 2020-03-14 09:42:53 +09:00
taizan-hokuto
c54481dad5 Add header html and show progress 2020-03-14 09:26:28 +09:00
taizan-hokuto
78604c84d4 Fix testdata path separator 2020-03-14 08:16:19 +09:00
taizan-hokuto
21d93613a2 Handling JSONDecodeError 2020-03-14 08:00:31 +09:00
taizan-hokuto
56bf721330 Merge tag 'argparse' into develop
v0.0.6.5
2020-03-10 01:58:25 +09:00
taizan-hokuto
5f50598f79 Merge branch 'hotfix/argparse' 2020-03-10 01:58:24 +09:00
taizan-hokuto
5e8c438c6b Increment version 2020-03-10 01:57:55 +09:00
taizan-hokuto
23e47f6fb0 Fix parsing video_id which starts with '-' 2020-03-10 01:57:01 +09:00
taizan-hokuto
74dfe0a612 Modify requirements.txt 2020-03-10 01:06:36 +09:00
taizan-hokuto
725af25d81 Merge tag 'v0.0.6.4' into develop
v0.0.6.4
2020-03-08 23:43:01 +09:00
taizan-hokuto
316fc5594a Merge branch 'release/v0.0.6.4' 2020-03-08 23:43:00 +09:00
taizan-hokuto
44dffc7650 Increment version 2020-03-08 23:42:28 +09:00
taizan-hokuto
102d8c48c4 Merge branch 'feature/commandline-tool' into develop 2020-03-08 23:39:47 +09:00
taizan-hokuto
f8822a053f Add desription to README.md 2020-03-08 23:33:50 +09:00
taizan-hokuto
9d624f771a Implement CLI 2020-03-08 23:18:30 +09:00
taizan-hokuto
778d4db28b Merge tag 'fix_resume' into develop
v0.0.6.3
2020-03-08 14:34:08 +09:00
taizan-hokuto
36e0fd5c54 Merge branch 'hotfix/fix_resume' 2020-03-08 14:34:07 +09:00
taizan-hokuto
4252643273 Increment version 2020-03-08 14:31:49 +09:00
taizan-hokuto
c88fd8bc4e Fix resume 2020-03-08 14:31:24 +09:00
taizan-hokuto
af3b6d4271 Merge tag 'full_of_que_exception' into develop
v0.0.6.2
2020-03-07 22:58:13 +09:00
taizan-hokuto
331e825c97 Merge branch 'hotfix/full_of_que_exception' 2020-03-07 22:58:13 +09:00
taizan-hokuto
4019ad4b9d Increment version 2020-03-07 22:49:18 +09:00
taizan-hokuto
1074178afc Fix handling full que exception 2020-03-07 22:16:46 +09:00
30 changed files with 2584 additions and 906 deletions

View File

@@ -7,10 +7,10 @@ pytchat is a python library for fetching youtube live chat.
pytchat is a python library for fetching youtube live chat
without using youtube api, Selenium or BeautifulSoup.
pytchatはAPIを使わずにYouTubeチャットを取得するための軽量pythonライブラリです。
pytchatはAPIを使わずにYouTubeチャットを取得するためのpythonライブラリです。
Other features:
+ Customizable chat data processors including youtube api compatible one.
+ Customizable [chat data processors](https://github.com/taizan-hokuto/pytchat/wiki/ChatProcessor) including youtube api compatible one.
+ Available on asyncio context.
+ Quick fetching of initial chat data by generating continuation params
instead of web scraping.
@@ -22,10 +22,23 @@ For more detailed information, see [wiki](https://github.com/taizan-hokuto/pytch
```python
pip install pytchat
```
## Demo
![demo](https://taizan-hokuto.github.io/statics/demo.gif "demo")
## Examples
### CLI
One-liner command.
Save chat data to html.
```bash
$ pytchat -v ZJ6Q4U_Vg6s -o "c:/temp/"
# options:
# -v : video_id
# -o : output directory (default path: './')
# saved filename is [video_id].html
```
### on-demand mode
```python
from pytchat import LiveChat
@@ -263,6 +276,15 @@ Structure of author object.
[![MIT License](http://img.shields.io/badge/license-MIT-blue.svg?style=flat)](LICENSE)
## Contributes
Great thanks:
Most of source code of CLI refer to:
[PetterKraabol / Twitch-Chat-Downloader](https://github.com/PetterKraabol/Twitch-Chat-Downloader)
## Author
[taizan-hokuto](https://github.com/taizan-hokuto)

View File

@@ -2,7 +2,7 @@
pytchat is a python library for fetching youtube live chat without using yt api, Selenium, or BeautifulSoup.
"""
__copyright__ = 'Copyright (C) 2019 taizan-hokuto'
__version__ = '0.0.6.1'
__version__ = '0.0.7'
__license__ = 'MIT'
__author__ = 'taizan-hokuto'
__author_email__ = '55448286+taizan-hokuto@users.noreply.github.com'
@@ -11,6 +11,7 @@ __url__ = 'https://github.com/taizan-hokuto/pytchat'
__all__ = ["core_async","core_multithread","processors"]
from .api import (
cli,
config,
LiveChat,
LiveChatAsync,
@@ -19,6 +20,8 @@ from .api import (
DummyProcessor,
DefaultProcessor,
Extractor,
HTMLArchiver,
TSVArchiver,
JsonfileArchiver,
SimpleDisplayProcessor,
SpeedCalculator,

View File

@@ -1,3 +1,4 @@
from . import cli
from . import config
from .core_multithread.livechat import LiveChat
from .core_async.livechat import LiveChatAsync
@@ -5,6 +6,8 @@ from .processors.chat_processor import ChatProcessor
from .processors.compatible.processor import CompatibleProcessor
from .processors.default.processor import DefaultProcessor
from .processors.dummy_processor import DummyProcessor
from .processors.html_archiver import HTMLArchiver
from .processors.tsv_archiver import TSVArchiver
from .processors.jsonfile_archiver import JsonfileArchiver
from .processors.simple_display_processor import SimpleDisplayProcessor
from .processors.speed.calculator import SpeedCalculator

60
pytchat/cli/__init__.py Normal file
View File

@@ -0,0 +1,60 @@
import argparse
import os
from pathlib import Path
from typing import List, Callable
from .arguments import Arguments
from .. exceptions import InvalidVideoIdException, NoContentsException
from .. processors.tsv_archiver import TSVArchiver
from .. processors.html_archiver import HTMLArchiver
from .. tool.extract.extractor import Extractor
from .. tool.videoinfo import VideoInfo
from .. import __version__
'''
Most of CLI modules refer to
Petter Kraabøl's Twitch-Chat-Downloader
https://github.com/PetterKraabol/Twitch-Chat-Downloader
(MIT License)
'''
def main():
# Arguments
parser = argparse.ArgumentParser(description=f'pytchat v{__version__}')
parser.add_argument('-v', f'--{Arguments.Name.VIDEO}', type=str,
help='Video IDs separated by commas without space.\n'
'If ID starts with a hyphen (-), enclose the ID in square brackets.')
parser.add_argument('-o', f'--{Arguments.Name.OUTPUT}', type=str,
help='Output directory (end with "/"). default="./"', default='./')
parser.add_argument(f'--{Arguments.Name.VERSION}', action='store_true',
help='Settings version')
Arguments(parser.parse_args().__dict__)
if Arguments().print_version:
print(f'pytchat v{__version__}')
return
# Extractor
if Arguments().video_ids:
for video_id in Arguments().video_ids:
if '[' in video_id:
video_id = video_id.replace('[','').replace(']','')
try:
info = VideoInfo(video_id)
print(f"Extracting...\n"
f" video_id: {video_id}\n"
f" channel: {info.get_channel_name()}\n"
f" title: {info.get_title()}")
path = Path(Arguments().output+video_id+'.html')
print(f"output path: {path.resolve()}")
Extractor(video_id,
processor = HTMLArchiver(Arguments().output+video_id+'.html'),
callback = _disp_progress
).extract()
print("\nExtraction end.\n")
except (InvalidVideoIdException, NoContentsException) as e:
print(e)
return
parser.print_help()
def _disp_progress(a,b):
print('.',end="",flush=True)

39
pytchat/cli/arguments.py Normal file
View File

@@ -0,0 +1,39 @@
from typing import Optional, Dict, Union, List
from .singleton import Singleton
'''
This modules refer to
Petter Kraabøl's Twitch-Chat-Downloader
https://github.com/PetterKraabol/Twitch-Chat-Downloader
(MIT License)
'''
class Arguments(metaclass=Singleton):
"""
Arguments singleton
"""
class Name:
VERSION: str = 'version'
OUTPUT: str = 'output'
VIDEO: str = 'video'
def __init__(self,
arguments: Optional[Dict[str, Union[str, bool, int]]] = None):
"""
Initialize arguments
:param arguments: Arguments from cli
(Optional to call singleton instance without parameters)
"""
if arguments is None:
print('Error: arguments were not provided')
exit()
self.print_version: bool = arguments[Arguments.Name.VERSION]
self.output: str = arguments[Arguments.Name.OUTPUT]
self.video_ids: List[int] = []
# Videos
if arguments[Arguments.Name.VIDEO]:
self.video_ids = [video_id
for video_id in arguments[Arguments.Name.VIDEO].split(',')]

19
pytchat/cli/singleton.py Normal file
View File

@@ -0,0 +1,19 @@
'''
This modules refer to
Petter Kraabøl's Twitch-Chat-Downloader
https://github.com/PetterKraabol/Twitch-Chat-Downloader
(MIT License)
'''
class Singleton(type):
"""
Abstract class for singletons
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
def get_instance(cls, *args, **kwargs):
cls.__call__(*args, **kwargs)

View File

@@ -1,6 +1,6 @@
from logging import NullHandler, getLogger, StreamHandler, FileHandler, Formatter
import logging
import datetime
from datetime import datetime
def get_logger(modname,loglevel=logging.DEBUG):
@@ -28,5 +28,11 @@ def get_logger(modname,loglevel=logging.DEBUG):
class MyFormatter(logging.Formatter):
def format(self, record):
s =(datetime.datetime.fromtimestamp(record.created)).strftime("%m-%d %H:%M:%S")+'| '+ (record.module).ljust(15)+(' { '+record.funcName).ljust(20) +":"+str(record.lineno).rjust(4)+'} - '+record.getMessage()
return s
timestamp = (
datetime.fromtimestamp(record.created)).strftime("%m-%d %H:%M:%S")
module = (record.module).ljust(15)
funcname = (record.funcName).ljust(18)
lineno = str(record.lineno).rjust(4)
message = record.getMessage()
return timestamp+'| '+module+' { '+funcname+':'+lineno+'} - '+message

View File

@@ -20,6 +20,13 @@ class Buffer(asyncio.Queue):
super().get_nowait()
await super().put(item)
def put_nowait(self,item):
if item is None:
return
if super().full():
super().get_nowait()
super().put_nowait(item)
async def get(self):
ret = []
ret.append(await super().get())

View File

@@ -238,6 +238,7 @@ class LiveChatAsync:
livechat_json = (await self._get_livechat_json(
reload_continuation, session, headers))
contents = self._parser.get_contents(livechat_json)
self._is_replay = True
self._first_fetch = False
return contents

View File

@@ -1,317 +0,0 @@
import aiohttp, asyncio
import datetime
import json
import random
import signal
import time
import traceback
import urllib.parse
import warnings
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from asyncio import Queue
from .buffer import Buffer
from ..parser.replay import Parser
from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import arcparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
logger = config.logger(__name__)
headers = config.headers
MAX_RETRY = 10
class ReplayChatAsync:
'''
### -----------------------------------------------------------
### [Warning] ReplayChatAsync is integrated into LiveChatAsync.
### This class is deprecated and will be removed at v0.0.5.0.
### ReplayChatAsyncはLiveChatAsyncに統合しました。
### このクラスはv0.0.5.0で廃止予定です。
### -----------------------------------------------------------
asyncio(aiohttp)を利用してYouTubeのチャットデータを取得する。
Parameter
---------
video_id : str
動画ID
seektime : int
リプレイするチャットデータの開始時間(秒)
processor : ChatProcessor
チャットデータを加工するオブジェクト
buffer : Buffer(maxsize:20[default])
チャットデータchat_componentを格納するバッファ。
maxsize : 格納できるchat_componentの個数
default値20個。1個で約5~10秒分。
interruptable : bool
Ctrl+Cによる処理中断を行うかどうか。
callback : func
_listen()関数から一定間隔で自動的に呼びだす関数。
done_callback : func
listener終了時に呼び出すコールバック。
exception_handler : func
例外を処理する関数
direct_mode : bool
Trueの場合、bufferを使わずにcallbackを呼ぶ。
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
Attributes
---------
_is_alive : bool
チャット取得を停止するためのフラグ
'''
_setup_finished = False
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
callback = None,
done_callback = None,
exception_handler = None,
direct_mode = False):
warnings.warn(""
f"\n{'-'*60}\n[WARNING] ReplayChatAsync is integrated "
f"into LiveChatAsync.\n{' '*5} This is deprecated and will"
f" be removed at v0.0.5.0.\n{'-'*60}\n"
)
self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
else:
self.processor = processor
self._buffer = buffer
self._callback = callback
self._done_callback = done_callback
self._exception_handler = exception_handler
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
if not ReplayChatAsync._setup_finished:
ReplayChatAsync._setup_finished = True
if exception_handler == None:
self._set_exception_handler(self._handle_exception)
else:
self._set_exception_handler(exception_handler)
if interruptable:
signal.signal(signal.SIGINT,
(lambda a, b:asyncio.create_task(
ReplayChatAsync.shutdown(None,signal.SIGINT,b))
))
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
else:
#callbackを呼ぶループタスクの開始
loop = asyncio.get_event_loop()
loop.create_task(self._callback_loop(self._callback))
#_listenループタスクの開始
loop = asyncio.get_event_loop()
listen_task = loop.create_task(self._startlisten())
#add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
async def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する
"""
initial_continuation = arcparam.getparam(self.video_id, self.seektime)
await self._listen(initial_continuation)
async def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
Bufferにチャットデータを格納、
次のcontinuaitonを取得してループする。
Parameter
---------
continuation : str
次のチャットデータ取得に必要なパラメータ
'''
try:
async with aiohttp.ClientSession() as session:
while(continuation and self._is_alive):
if self._pauser.empty():
'''pause'''
await self._pauser.get()
'''resume:
prohibit from blocking by putting None into _pauser.
'''
self._pauser.put_nowait(None)
#when replay, not reacquire continuation param
livechat_json = (await
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = self._parser.parse( livechat_json )
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
}
time_mark =time.time()
if self._direct_mode:
await self._callback(
self.processor.process([chat_component])
)
else:
await self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
await asyncio.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
self.terminate()
logger.error(f"{str(e)}video_id:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
self.terminate()
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
self.terminate()
async def _get_livechat_json(self, continuation, session, headers):
'''
チャットデータが格納されたjsonデータを取得する。
'''
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =(
f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?"
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1):
async with session.get(url ,headers = headers) as resp:
try:
text = await resp.text()
status_code = resp.status
livechat_json = json.loads(text)
break
except (ClientConnectorError,json.JSONDecodeError) :
await asyncio.sleep(1)
continue
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
async def _callback_loop(self,callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
加工済みのチャットデータを渡す先の関数。
"""
while self.is_alive():
items = await self._buffer.get()
data = self.processor.process(items)
await callback(data)
async def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = await self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def pause(self):
if self._callback is None:
return
if not self._pauser.empty():
self._pauser.get_nowait()
def resume(self):
if self._callback is None:
return
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
def finish(self,sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
except CancelledError:
logger.debug(f'[{self.video_id}]cancelled:{sender}')
def terminate(self):
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put_nowait({'chatdata':'','timeout':1})
logger.info(f'[{self.video_id}]終了しました')
@classmethod
def _set_exception_handler(cls, handler):
loop = asyncio.get_event_loop()
loop.set_exception_handler(handler)
@classmethod
def _handle_exception(cls, loop, context):
if not isinstance(context["exception"],CancelledError):
logger.error(f"Caught exception: {context}")
loop= asyncio.get_event_loop()
loop.create_task(cls.shutdown(None,None,None))
@classmethod
async def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logger.debug(f"残っているタスクを終了しています")
await asyncio.gather(*tasks,return_exceptions=True)
loop = asyncio.get_event_loop()
loop.stop()

View File

@@ -22,7 +22,14 @@ class Buffer(queue.Queue):
else:
super().put(item)
def put_nowait(self,item):
if item is None:
return
if super().full():
super().get_nowait()
else:
super().put_nowait(item)
def get(self):
ret = []
ret.append(super().get())

View File

@@ -11,8 +11,8 @@ from queue import Queue
from .buffer import Buffer
from ..parser.live import Parser
from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam, arcparam
from ..exceptions import ChatParseException, IllegalFunctionCall
from ..paramgen import liveparam, arcparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
@@ -27,7 +27,7 @@ class LiveChat:
---------
video_id : str
動画ID
seektime : int
(ライブチャット取得時は無視)
取得開始するアーカイブ済みチャットの経過時間(秒)
@@ -61,7 +61,7 @@ class LiveChat:
topchat_only : bool
Trueの場合、上位チャットのみ取得する。
Attributes
---------
_executor : ThreadPoolExecutor
@@ -72,22 +72,20 @@ class LiveChat:
'''
_setup_finished = False
#チャット監視中のListenerのリスト
_listeners = []
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
callback = None,
done_callback = None,
direct_mode = False,
force_replay = False,
topchat_only = False,
logger = config.logger(__name__)
):
self.video_id = video_id
seektime=0,
processor=DefaultProcessor(),
buffer=None,
interruptable=True,
callback=None,
done_callback=None,
direct_mode=False,
force_replay=False,
topchat_only=False,
logger=config.logger(__name__)
):
self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
@@ -98,57 +96,51 @@ class LiveChat:
self._done_callback = done_callback
self._executor = ThreadPoolExecutor(max_workers=2)
self._direct_mode = direct_mode
self._is_alive = True
self._is_alive = True
self._is_replay = force_replay
self._parser = Parser(is_replay = self._is_replay)
self._parser = Parser(is_replay=self._is_replay)
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
self._first_fetch = True
self._fetch_url = "live_chat/get_live_chat?continuation="
self._topchat_only = topchat_only
self._logger = logger
LiveChat._logger = logger
if not LiveChat._setup_finished:
LiveChat._setup_finished = True
if interruptable:
signal.signal(signal.SIGINT, (lambda a, b:
(LiveChat.shutdown(None,signal.SIGINT,b))
))
LiveChat._listeners.append(self)
if interruptable:
signal.signal(signal.SIGINT, lambda a, b: self.terminate())
self._setup()
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
# direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"When direct_mode=True, callback parameter is required.")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
# direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
self._buffer = Buffer(maxsize=20)
# callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
pass
else:
#callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop,self._callback)
#_listenループタスクの開始
# callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop, self._callback)
# _listenループタスクの開始
listen_task = self._executor.submit(self._startlisten)
#add_done_callbackの登録
# add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
def _startlisten(self):
time.sleep(0.1) #sleep shortly to prohibit skipping fetching data
time.sleep(0.1) # sleep shortly to prohibit skipping fetching data
"""Fetch first continuation parameter,
create and start _listen loop.
"""
initial_continuation = liveparam.getparam(self.video_id,3)
initial_continuation = liveparam.getparam(self.video_id, 3)
self._listen(initial_continuation)
def _listen(self, continuation):
''' Fetch chat data and store them into buffer,
get next continuaiton parameter and loop.
@@ -164,33 +156,34 @@ class LiveChat:
continuation = self._check_pause(continuation)
contents = self._get_contents(
continuation, session, headers)
metadata, chatdata = self._parser.parse(contents)
metadata, chatdata = self._parser.parse(contents)
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
"video_id": self.video_id,
"timeout": timeout,
"chatdata": chatdata
}
time_mark =time.time()
time_mark = time.time()
if self._direct_mode:
processed_chat = self.processor.process([chat_component])
if isinstance(processed_chat,tuple):
processed_chat = self.processor.process(
[chat_component])
if isinstance(processed_chat, tuple):
self._callback(*processed_chat)
else:
self._callback(processed_chat)
else:
self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
time.sleep(diff_time if diff_time > 0 else 0)
continuation = metadata.get('continuation')
time.sleep(diff_time if diff_time > 0 else 0)
continuation = metadata.get('continuation')
except ChatParseException as e:
self._logger.debug(f"[{self.video_id}]{str(e)}")
return
except (TypeError , json.JSONDecodeError) :
return
except (TypeError, json.JSONDecodeError):
self._logger.error(f"{traceback.format_exc(limit = -1)}")
return
self._logger.debug(f"[{self.video_id}]finished fetching chat.")
def _check_pause(self, continuation):
@@ -202,7 +195,7 @@ class LiveChat:
'''
self._pauser.put_nowait(None)
if not self._is_replay:
continuation = liveparam.getparam(self.video_id,3)
continuation = liveparam.getparam(self.video_id, 3)
return continuation
def _get_contents(self, continuation, session, headers):
@@ -214,7 +207,7 @@ class LiveChat:
-------
'continuationContents' which includes metadata & chat data.
'''
livechat_json = (
livechat_json = (
self._get_livechat_json(continuation, session, headers)
)
contents = self._parser.get_contents(livechat_json)
@@ -225,7 +218,7 @@ class LiveChat:
self._fetch_url = "live_chat_replay/get_live_chat_replay?continuation="
continuation = arcparam.getparam(
self.video_id, self.seektime, self._topchat_only)
livechat_json = ( self._get_livechat_json(
livechat_json = (self._get_livechat_json(
continuation, session, headers))
reload_continuation = self._parser.reload_continuation(
self._parser.get_contents(livechat_json))
@@ -233,6 +226,7 @@ class LiveChat:
livechat_json = (self._get_livechat_json(
reload_continuation, session, headers))
contents = self._parser.get_contents(livechat_json)
self._is_replay = True
self._first_fetch = False
return contents
@@ -243,26 +237,26 @@ class LiveChat:
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
url = f"https://www.youtube.com/{self._fetch_url}{continuation}&pbj=1"
for _ in range(MAX_RETRY + 1):
with session.get(url ,headers = headers) as resp:
with session.get(url, headers=headers) as resp:
try:
text = resp.text
livechat_json = json.loads(text)
break
except json.JSONDecodeError :
except json.JSONDecodeError:
time.sleep(1)
continue
else:
self._logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
def _callback_loop(self,callback):
def _callback_loop(self, callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
@@ -279,13 +273,13 @@ class LiveChat:
def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = self._buffer.get()
return self.processor.process(items)
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
@@ -303,13 +297,13 @@ class LiveChat:
return
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
def finish(self,sender):
def finish(self, sender):
'''Listener終了時のコールバック'''
try:
try:
self.terminate()
except CancelledError:
self._logger.debug(f'[{self.video_id}]cancelled:{sender}')
@@ -318,14 +312,7 @@ class LiveChat:
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put({'chatdata':'','timeout':0})
self._logger.info(f'[{self.video_id}]finished.')
@classmethod
def shutdown(cls, event, sig = None, handler=None):
cls._logger.debug("shutdown...")
for t in LiveChat._listeners:
t._is_alive = False
if self.is_alive():
self._is_alive = False
self._buffer.put({})
self._logger.info(f'[{self.video_id}]終了しました')

View File

@@ -1,309 +0,0 @@
import requests
import datetime
import json
import random
import signal
import time
import traceback
import urllib.parse
import warnings
from concurrent.futures import CancelledError, ThreadPoolExecutor
from queue import Queue
from .buffer import Buffer
from ..parser.replay import Parser
from .. import config
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import arcparam
from ..processors.default.processor import DefaultProcessor
from ..processors.combinator import Combinator
logger = config.logger(__name__)
headers = config.headers
MAX_RETRY = 10
class ReplayChat:
'''
### -----------------------------------------------------------
### [Warning] ReplayChat is integrated into LiveChat.
### This class is deprecated and will be removed at v0.0.5.0.
### ReplayChatはLiveChatに統合しました。
### このクラスはv0.0.5.0で廃止予定です。
### -----------------------------------------------------------
スレッドプールを利用してYouTubeのライブ配信のチャットデータを取得する
Parameter
---------
video_id : str
動画ID
seektime : int
リプレイするチャットデータの開始時間(秒)
processor : ChatProcessor
チャットデータを加工するオブジェクト
buffer : Buffer(maxsize:20[default])
チャットデータchat_componentを格納するバッファ。
maxsize : 格納できるchat_componentの個数
default値20個。1個で約5~10秒分。
interruptable : bool
Ctrl+Cによる処理中断を行うかどうか。
callback : func
_listen()関数から一定間隔で自動的に呼びだす関数。
done_callback : func
listener終了時に呼び出すコールバック。
direct_mode : bool
Trueの場合、bufferを使わずにcallbackを呼ぶ。
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
Attributes
---------
_executor : ThreadPoolExecutor
チャットデータ取得ループ_listen用のスレッド
_is_alive : bool
チャット取得を停止するためのフラグ
'''
_setup_finished = False
#チャット監視中のListenerのリスト
_listeners= []
def __init__(self, video_id,
seektime = 0,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
callback = None,
done_callback = None,
direct_mode = False
):
warnings.warn(""
f"\n{'-'*60}\n[WARNING] ReplayChat is integrated into LiveChat.\n"
f"{' '*5}This is deprecated and will be removed at v0.0.5.0.\n"
f"{'-'*60}\n"
)
self.video_id = video_id
self.seektime = seektime
if isinstance(processor, tuple):
self.processor = Combinator(processor)
else:
self.processor = processor
self._buffer = buffer
self._callback = callback
self._done_callback = done_callback
self._executor = ThreadPoolExecutor(max_workers=2)
self._direct_mode = direct_mode
self._is_alive = True
self._parser = Parser()
self._pauser = Queue()
self._pauser.put_nowait(None)
self._setup()
if not ReplayChat._setup_finished:
ReplayChat._setup_finished = True
if interruptable:
signal.signal(signal.SIGINT, (lambda a, b:
(ReplayChat.shutdown(None,signal.SIGINT,b))
))
ReplayChat._listeners.append(self)
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
else:
#callbackを呼ぶループタスクの開始
self._executor.submit(self._callback_loop,self._callback)
#_listenループタスクの開始
listen_task = self._executor.submit(self._startlisten)
#add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループのタスクを作成し開始する
"""
initial_continuation = self._get_initial_continuation()
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
self._listen(initial_continuation)
def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = arcparam.getparam(self.video_id,self.seektime)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
BUfferにチャットデータを格納、
次のcontinuaitonを取得してループする
Parameter
---------
continuation : str
次のチャットデータ取得に必要なパラメータ
'''
try:
with requests.Session() as session:
while(continuation and self._is_alive):
if self._pauser.empty():
#pause
self._pauser.get()
#resume
#prohibit from blocking by putting None into _pauser.
self._pauser.put_nowait(None)
livechat_json = (
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = self._parser.parse( livechat_json )
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
}
time_mark =time.time()
if self._direct_mode:
self._callback(
self.processor.process([chat_component])
)
else:
self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
if diff_time < 0 : diff_time=0
time.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
self.terminate()
logger.error(f"{str(e)}video_id:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
self.terminate()
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
def _get_livechat_json(self, continuation, session, headers):
'''
チャットデータが格納されたjsonデータを取得する。
'''
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =(
f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?"
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1):
with session.get(url ,headers = headers) as resp:
try:
text = resp.text
status_code = resp.status_code
livechat_json = json.loads(text)
break
except json.JSONDecodeError :
time.sleep(1)
continue
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
self.terminate()
return None
return livechat_json
def _callback_loop(self,callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
加工済みのチャットデータを渡す先の関数。
"""
while self.is_alive():
items = self._buffer.get()
data = self.processor.process(items)
callback(data)
def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def pause(self):
if not self._pauser.empty():
self._pauser.get()
def resume(self):
if self._pauser.empty():
self._pauser.put_nowait(None)
def is_alive(self):
return self._is_alive
def finish(self,sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
except RuntimeError:
logger.debug(f'[{self.video_id}]cancelled:{sender}')
def terminate(self):
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put({'chatdata':'','timeout':1})
logger.info(f'[{self.video_id}]終了しました')
@classmethod
def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています")
for t in ReplayChat._listeners:
t._is_alive = False

View File

@@ -1,46 +1,52 @@
class ChatParseException(Exception):
'''
チャットデータをパースするライブラリが投げる例外の基底クラス
Base exception thrown by the parser
'''
pass
class NoYtinitialdataException(ChatParseException):
'''
配信ページ内にチャットデータurlが見つからないときに投げる例外
Thrown when the video is not found.
'''
pass
class ResponseContextError(ChatParseException):
'''
配信ページでチャットデータ無効の時に投げる例外
Thrown when chat data is invalid.
'''
pass
class NoLivechatRendererException(ChatParseException):
'''
チャットデータのJSON中にlivechatRendererがない時に投げる例外
Thrown when livechatRenderer is missing in JSON.
'''
pass
class NoContentsException(ChatParseException):
'''
チャットデータのJSON中にContinuationContentsがない時に投げる例外
Thrown when ContinuationContents is missing in JSON.
'''
pass
class NoContinuationsException(ChatParseException):
'''
チャットデータのContinuationContents中にcontinuationがない時に投げる例外
Thrown when continuation is missing in ContinuationContents.
'''
pass
class IllegalFunctionCall(Exception):
'''
set_callback()を実行済みにもかかわらず
get()を呼び出した場合の例外
Thrown when get () is called even though
set_callback () has been executed.
'''
pass
class InvalidVideoIdException(Exception):
'''
Thrown when the video_id is not exist (VideoInfo).
'''
pass
class UnknownConnectionError(Exception):
pass

View File

@@ -12,6 +12,7 @@ Author: taizan-hokuto (2019) @taizan205
ver 0.0.1 2019.10.05
'''
def _gen_vid(video_id):
"""generate video_id parameter.
Parameter
@@ -23,7 +24,7 @@ def _gen_vid(video_id):
bytes : base64 encoded video_id parameter.
"""
header_magic = b'\x0A\x0F\x1A\x0D\x0A'
header_id = video_id.encode()
header_id = video_id.encode()
header_sep_1 = b'\x1A\x13\xEA\xA8\xDD\xB9\x01\x0D\x0A\x0B'
header_terminator = b'\x20\x01'
@@ -40,42 +41,46 @@ def _gen_vid(video_id):
b64enc(reduce(lambda x, y: x+y, item)).decode()
).encode()
def _nval(val):
"""convert value to byte array"""
if val<0: raise ValueError
if val < 0:
raise ValueError
buf = b''
while val >> 7:
m = val & 0xFF | 0x80
buf += m.to_bytes(1,'big')
buf += m.to_bytes(1, 'big')
val >>= 7
buf += val.to_bytes(1,'big')
buf += val.to_bytes(1, 'big')
return buf
def _build(video_id, seektime, topchat_only):
switch_01 = b'\x04' if topchat_only else b'\x01'
if seektime < 0:
times =_nval(0)
switch = b'\x04'
elif seektime == 0:
times =_nval(1)
switch = b'\x03'
times = _nval(0)
switch = b'\x04'
elif seektime == 0:
times = _nval(1)
switch = b'\x03'
else:
times =_nval(int(seektime*1000000))
times = _nval(int(seektime*1000000))
switch = b'\x03'
parity = b'\x00'
header_magic= b'\xA2\x9D\xB0\xD3\x04'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = times
sep_1 = b'\x30\x00\x38\x00\x40\x00\x48'
sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40'
sep_3 = b'\x00\x58\x03\x60'
sep_4 = b'\x68' + parity + b'\x72\x04\x08'
sep_5 = b'\x10' + parity + b'\x78\x00'
body = [
header_magic = b'\xA2\x9D\xB0\xD3\x04'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = times
sep_1 = b'\x30\x00\x38\x00\x40\x00\x48'
sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40'
sep_3 = b'\x00\x58\x03\x60'
sep_4 = b'\x68' + parity + b'\x72\x04\x08'
sep_5 = b'\x10' + parity + b'\x78\x00'
body = b''.join([
sep_0,
_nval(len(vid)),
vid,
@@ -90,18 +95,17 @@ def _build(video_id, seektime, topchat_only):
sep_4,
switch_01,
sep_5
]
])
body = reduce(lambda x, y: x+y, body)
return urllib.parse.quote(
b64enc( header_magic +
_nval(len(body)) +
body
).decode()
)
return urllib.parse.quote(
b64enc(header_magic +
_nval(len(body)) +
body
).decode()
)
def getparam(video_id, seektime = 0, topchat_only = False):
def getparam(video_id, seektime=0, topchat_only=False):
'''
Parameter
---------

View File

@@ -11,6 +11,8 @@ Author: taizan-hokuto (2019) @taizan205
ver 0.0.1 2019.10.05
'''
def _gen_vid(video_id):
"""generate video_id parameter.
Parameter
@@ -22,11 +24,11 @@ def _gen_vid(video_id):
byte[] : base64 encoded video_id parameter.
"""
header_magic = b'\x0A\x0F\x0A\x0D\x0A'
header_id = video_id.encode()
header_id = video_id.encode()
header_sep_1 = b'\x1A'
header_sep_2 = b'\x43\xAA\xB9\xC1\xBD\x01\x3D\x0A'
header_suburl = ('https://www.youtube.com/live_chat?v='
f'{video_id}&is_popout=1').encode()
f'{video_id}&is_popout=1').encode()
header_terminator = b'\x20\x02'
item = [
@@ -44,62 +46,66 @@ def _gen_vid(video_id):
b64enc(reduce(lambda x, y: x+y, item)).decode()
).encode()
def _tzparity(video_id,times):
t=0
for i,s in enumerate(video_id):
def _tzparity(video_id, times):
t = 0
for i, s in enumerate(video_id):
ss = ord(s)
if(ss % 2 == 0):
t += ss*(12-i)
else:
t ^= ss*i
return ((times^t) % 2).to_bytes(1,'big')
return ((times ^ t) % 2).to_bytes(1, 'big')
def _nval(val):
"""convert value to byte array"""
if val<0: raise ValueError
if val < 0:
raise ValueError
buf = b''
while val >> 7:
m = val & 0xFF | 0x80
buf += m.to_bytes(1,'big')
buf += m.to_bytes(1, 'big')
val >>= 7
buf += val.to_bytes(1,'big')
buf += val.to_bytes(1, 'big')
return buf
def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchat_only):
#_short_type2
# _short_type2
switch_01 = b'\x04' if topchat_only else b'\x01'
parity = _tzparity(video_id, _ts1^_ts2^_ts3^_ts4^_ts5)
parity = _tzparity(video_id, _ts1 ^ _ts2 ^ _ts3 ^ _ts4 ^ _ts5)
header_magic= b'\xD2\x87\xCC\xC8\x03'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = _nval(_ts1)
sep_1 = b'\x30\x00\x38\x00\x40\x02\x4A'
un_len = b'\x2B'
sep_2 = b'\x08'+parity+b'\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D'
sep_3 = b'\x3A\x00\x40\x00\x4A'
sep_4_len = b'\x02'
sep_4 = b'\x08\x01'
ts_2_start = b'\x50'
timestamp2 = _nval(_ts2)
ts_2_end = b'\x58'
sep_5 = b'\x03'
ts_3_start = b'\x50'
timestamp3 = _nval(_ts3)
ts_3_end = b'\x58'
timestamp4 = _nval(_ts4)
sep_6 = b'\x68'
#switch
sep_7 = b'\x82\x01\x04\x08'
#switch
sep_8 = b'\x10\x00'
sep_9 = b'\x88\x01\x00\xA0\x01'
timestamp5 = _nval(_ts5)
header_magic = b'\xD2\x87\xCC\xC8\x03'
sep_0 = b'\x1A'
vid = _gen_vid(video_id)
time_tag = b'\x28'
timestamp1 = _nval(_ts1)
sep_1 = b'\x30\x00\x38\x00\x40\x02\x4A'
un_len = b'\x2B'
sep_2 = b'\x08'+parity+b'\x10\x00\x18\x00\x20\x00'
chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D'
sep_3 = b'\x3A\x00\x40\x00\x4A'
sep_4_len = b'\x02'
sep_4 = b'\x08\x01'
ts_2_start = b'\x50'
timestamp2 = _nval(_ts2)
ts_2_end = b'\x58'
sep_5 = b'\x03'
ts_3_start = b'\x50'
timestamp3 = _nval(_ts3)
ts_3_end = b'\x58'
timestamp4 = _nval(_ts4)
sep_6 = b'\x68'
# switch
sep_7 = b'\x82\x01\x04\x08'
# switch
sep_8 = b'\x10\x00'
sep_9 = b'\x88\x01\x00\xA0\x01'
timestamp5 = _nval(_ts5)
body = [
body = b''.join([
sep_0,
_nval(len(vid)),
vid,
@@ -121,37 +127,35 @@ def _build(video_id, _ts1, _ts2, _ts3, _ts4, _ts5, topchat_only):
ts_3_end,
timestamp4,
sep_6,
switch_01,#
switch_01,
sep_7,
switch_01,#
switch_01,
sep_8,
sep_9,
timestamp5
]
])
return urllib.parse.quote(
b64enc(header_magic +
_nval(len(body)) +
body
).decode()
)
body = reduce(lambda x, y: x+y, body)
return urllib.parse.quote(
b64enc( header_magic +
_nval(len(body)) +
body
).decode()
)
def _times(past_sec):
n = int(time.time())
_ts1= n - random.uniform(0,1*3)
_ts2= n - random.uniform(0.01,0.99)
_ts3= n - past_sec + random.uniform(0,1)
_ts4= n - random.uniform(10*60,60*60)
_ts5= n - random.uniform(0.01,0.99)
return list(map(lambda x:int(x*1000000),[_ts1,_ts2,_ts3,_ts4,_ts5]))
_ts1 = n - random.uniform(0, 1*3)
_ts2 = n - random.uniform(0.01, 0.99)
_ts3 = n - past_sec + random.uniform(0, 1)
_ts4 = n - random.uniform(10*60, 60*60)
_ts5 = n - random.uniform(0.01, 0.99)
return list(map(lambda x: int(x*1000000), [_ts1, _ts2, _ts3, _ts4, _ts5]))
def getparam(video_id, past_sec = 0, topchat_only = False):
def getparam(video_id, past_sec=0, topchat_only=False):
'''
Parameter
---------
@@ -160,5 +164,4 @@ def getparam(video_id, past_sec = 0, topchat_only = False):
topchat_only : bool
if True, fetch only 'top chat'
'''
return _build(video_id,*_times(past_sec),topchat_only)
return _build(video_id, *_times(past_sec), topchat_only)

View File

@@ -4,17 +4,19 @@ from .renderer.textmessage import LiveChatTextMessageRenderer
from .renderer.paidmessage import LiveChatPaidMessageRenderer
from .renderer.paidsticker import LiveChatPaidStickerRenderer
from .renderer.legacypaid import LiveChatLegacyPaidMessageRenderer
from .renderer.membership import LiveChatMembershipItemRenderer
from .. chat_processor import ChatProcessor
from ... import config
logger = config.logger(__name__)
class CompatibleProcessor(ChatProcessor):
def process(self, chat_components: list):
chatlist = []
timeout = 0
ret={}
ret = {}
ret["kind"] = "youtube#liveChatMessageListResponse"
ret["etag"] = ""
ret["nextPageToken"] = ""
@@ -23,20 +25,24 @@ class CompatibleProcessor(ChatProcessor):
for chat_component in chat_components:
timeout += chat_component.get('timeout', 0)
chatdata = chat_component.get('chatdata')
if chatdata is None: break
if chatdata is None:
break
for action in chatdata:
if action is None: continue
if action.get('addChatItemAction') is None: continue
if action['addChatItemAction'].get('item') is None: continue
if action is None:
continue
if action.get('addChatItemAction') is None:
continue
if action['addChatItemAction'].get('item') is None:
continue
chat = self.parse(action)
if chat:
chatlist.append(chat)
ret["pollingIntervalMillis"] = int(timeout*1000)
ret["pageInfo"]={
"totalResults":len(chatlist),
"resultsPerPage":len(chatlist),
ret["pageInfo"] = {
"totalResults": len(chatlist),
"resultsPerPage": len(chatlist),
}
ret["items"] = chatlist
@@ -47,8 +53,9 @@ class CompatibleProcessor(ChatProcessor):
action = sitem.get("addChatItemAction")
if action:
item = action.get("item")
if item is None: return None
rd={}
if item is None:
return None
rd = {}
try:
renderer = self.get_renderer(item)
if renderer == None:
@@ -57,25 +64,26 @@ class CompatibleProcessor(ChatProcessor):
rd["kind"] = "youtube#liveChatMessage"
rd["etag"] = ""
rd["id"] = 'LCC.' + renderer.get_id()
rd["snippet"] = renderer.get_snippet()
rd["snippet"] = renderer.get_snippet()
rd["authorDetails"] = renderer.get_authordetails()
except (KeyError,TypeError,AttributeError) as e:
except (KeyError, TypeError, AttributeError) as e:
logger.error(f"Error: {str(type(e))}-{str(e)}")
logger.error(f"item: {sitem}")
return None
return rd
return rd
def get_renderer(self, item):
if item.get("liveChatTextMessageRenderer"):
renderer = LiveChatTextMessageRenderer(item)
elif item.get("liveChatPaidMessageRenderer"):
renderer = LiveChatPaidMessageRenderer(item)
elif item.get( "liveChatPaidStickerRenderer"):
elif item.get("liveChatPaidStickerRenderer"):
renderer = LiveChatPaidStickerRenderer(item)
elif item.get("liveChatLegacyPaidMessageRenderer"):
renderer = LiveChatLegacyPaidMessageRenderer(item)
elif item.get("liveChatMembershipItemRenderer"):
renderer = LiveChatMembershipItemRenderer(item)
else:
renderer = None
return renderer

View File

@@ -0,0 +1,40 @@
from .base import BaseRenderer
class LiveChatMembershipItemRenderer(BaseRenderer):
def __init__(self, item):
super().__init__(item, "newSponsorEvent")
def get_snippet(self):
message = self.get_message(self.renderer)
return {
"type": self.chattype,
"liveChatId": "",
"authorChannelId": self.renderer.get("authorExternalChannelId"),
"publishedAt": self.get_publishedat(self.renderer.get("timestampUsec", 0)),
"hasDisplayContent": True,
"displayMessage": message,
}
def get_authordetails(self):
authorExternalChannelId = self.renderer.get("authorExternalChannelId")
# parse subscriber type
isVerified, isChatOwner, _, isChatModerator = (
self.get_badges(self.renderer)
)
return {
"channelId": authorExternalChannelId,
"channelUrl": "http://www.youtube.com/channel/"+authorExternalChannelId,
"displayName": self.renderer["authorName"]["simpleText"],
"profileImageUrl": self.renderer["authorPhoto"]["thumbnails"][1]["url"],
"isVerified": isVerified,
"isChatOwner": isChatOwner,
"isChatSponsor": True,
"isChatModerator": isChatModerator
}
def get_message(self, renderer):
message = (renderer["headerSubtext"]["runs"][0]["text"]
)+' / '+(renderer["authorName"]["simpleText"])
return message

View File

@@ -4,15 +4,18 @@ from .renderer.textmessage import LiveChatTextMessageRenderer
from .renderer.paidmessage import LiveChatPaidMessageRenderer
from .renderer.paidsticker import LiveChatPaidStickerRenderer
from .renderer.legacypaid import LiveChatLegacyPaidMessageRenderer
from .renderer.membership import LiveChatMembershipItemRenderer
from .. chat_processor import ChatProcessor
from ... import config
logger = config.logger(__name__)
class Chatdata:
def __init__(self,chatlist:list, timeout:float):
def __init__(self, chatlist: list, timeout: float):
self.items = chatlist
self.interval = timeout
def tick(self):
if self.interval == 0:
time.sleep(1)
@@ -25,6 +28,7 @@ class Chatdata:
return
await asyncio.sleep(self.interval/len(self.items))
class DefaultProcessor(ChatProcessor):
def process(self, chat_components: list):
@@ -35,24 +39,27 @@ class DefaultProcessor(ChatProcessor):
for component in chat_components:
timeout += component.get('timeout', 0)
chatdata = component.get('chatdata')
if chatdata is None: continue
if chatdata is None:
continue
for action in chatdata:
if action is None: continue
if action.get('addChatItemAction') is None: continue
if action['addChatItemAction'].get('item') is None: continue
if action is None:
continue
if action.get('addChatItemAction') is None:
continue
if action['addChatItemAction'].get('item') is None:
continue
chat = self._parse(action)
if chat:
chatlist.append(chat)
return Chatdata(chatlist, float(timeout))
def _parse(self, sitem):
action = sitem.get("addChatItemAction")
if action:
item = action.get("item")
if item is None: return None
if item is None:
return None
try:
renderer = self._get_renderer(item)
if renderer == None:
@@ -60,20 +67,22 @@ class DefaultProcessor(ChatProcessor):
renderer.get_snippet()
renderer.get_authordetails()
except (KeyError,TypeError) as e:
except (KeyError, TypeError) as e:
logger.error(f"{str(type(e))}-{str(e)} sitem:{str(sitem)}")
return None
return renderer
return renderer
def _get_renderer(self, item):
if item.get("liveChatTextMessageRenderer"):
renderer = LiveChatTextMessageRenderer(item)
elif item.get("liveChatPaidMessageRenderer"):
renderer = LiveChatPaidMessageRenderer(item)
elif item.get( "liveChatPaidStickerRenderer"):
elif item.get("liveChatPaidStickerRenderer"):
renderer = LiveChatPaidStickerRenderer(item)
elif item.get("liveChatLegacyPaidMessageRenderer"):
renderer = LiveChatLegacyPaidMessageRenderer(item)
elif item.get("liveChatMembershipItemRenderer"):
renderer = LiveChatMembershipItemRenderer(item)
else:
renderer = None
return renderer
return renderer

View File

@@ -59,6 +59,7 @@ class BaseRenderer:
def get_badges(self,renderer):
self.author.type = ''
isVerified = False
isChatOwner = False
isChatSponsor = False
@@ -68,6 +69,7 @@ class BaseRenderer:
for badge in badges:
if badge["liveChatAuthorBadgeRenderer"].get("icon"):
author_type = badge["liveChatAuthorBadgeRenderer"]["icon"]["iconType"]
self.author.type = author_type
if author_type == 'VERIFIED':
isVerified = True
if author_type == 'OWNER':
@@ -76,6 +78,7 @@ class BaseRenderer:
isChatModerator = True
if badge["liveChatAuthorBadgeRenderer"].get("customThumbnail"):
isChatSponsor = True
self.author.type = 'MEMBER'
self.get_badgeurl(badge)
return isVerified, isChatOwner, isChatSponsor, isChatModerator

View File

@@ -0,0 +1,15 @@
from .base import BaseRenderer
class LiveChatMembershipItemRenderer(BaseRenderer):
def __init__(self, item):
super().__init__(item, "newSponsor")
def get_authordetails(self):
super().get_authordetails()
self.author.isChatSponsor = True
def get_message(self, renderer):
message = (renderer["headerSubtext"]["runs"][0]["text"]
)+' / '+(renderer["authorName"]["simpleText"])
return message

View File

@@ -0,0 +1,98 @@
import csv
import os
import re
from .chat_processor import ChatProcessor
from .default.processor import DefaultProcessor
PATTERN = re.compile(r"(.*)\(([0-9]+)\)$")
fmt_headers = ['datetime','elapsed','authorName','message','superchat'
,'type','authorChannel']
HEADER_HTML = '''
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
<meta http-equiv="Content-Type" content="text/html;charset=UTF-8">
'''
class HTMLArchiver(ChatProcessor):
'''
HtmlArchiver saves chat data as HTML table format.
'''
def __init__(self, save_path):
super().__init__()
self.save_path = self._checkpath(save_path)
with open(self.save_path, mode='a', encoding = 'utf-8') as f:
f.write(HEADER_HTML)
f.write('<table border="1" style="border-collapse: collapse">')
f.writelines(self._parse_html_header(fmt_headers))
self.processor = DefaultProcessor()
def _checkpath(self, filepath):
splitter = os.path.splitext(os.path.basename(filepath))
body = splitter[0]
extention = splitter[1]
newpath = filepath
counter = 0
while os.path.exists(newpath):
match = re.search(PATTERN,body)
if match:
counter=int(match[2])+1
num_with_bracket = f'({str(counter)})'
body = f'{match[1]}{num_with_bracket}'
else:
body = f'{body}({str(counter)})'
newpath = os.path.join(os.path.dirname(filepath),body+extention)
return newpath
def process(self, chat_components: list):
"""
Returns
----------
dict :
save_path : str :
Actual save path of file.
total_lines : int :
count of total lines written to the file.
"""
if chat_components is None or len (chat_components) == 0:
return
with open(self.save_path, mode='a', encoding = 'utf-8') as f:
chats = self.processor.process(chat_components).items
for c in chats:
f.writelines(
self._parse_html_line([
c.datetime,
c.elapsedTime,
c.author.name,
c.message,
c.amountString,
c.author.type,
c.author.channelId]
)
)
'''
#Palliative treatment#
Comment out below line to prevent the table
display from collapsing.
'''
#f.write('</table>')
def _parse_html_line(self, raw_line):
html = ''
html+=' <tr>'
for cell in raw_line:
html+='<td>'+cell+'</td>'
html+='</tr>\n'
return html
def _parse_html_header(self,raw_line):
html = ''
html+='<thead>\n'
html+=' <tr>'
for cell in raw_line:
html+='<th>'+cell+'</th>'
html+='</tr>\n'
html+='</thead>\n'
return html

View File

@@ -0,0 +1,70 @@
import csv
import os
import re
from .chat_processor import ChatProcessor
from .default.processor import DefaultProcessor
PATTERN = re.compile(r"(.*)\(([0-9]+)\)$")
fmt_headers = ['datetime','elapsed','authorName','message','superchatAmount'
,'authorType','authorChannel']
class TSVArchiver(ChatProcessor):
'''
TsvArchiver saves chat data as Tab Separated Values format text.
'''
def __init__(self, save_path):
super().__init__()
self.save_path = self._checkpath(save_path)
with open(self.save_path, mode='a', encoding = 'utf-8') as f:
writer = csv.writer(f, delimiter='\t')
writer.writerow(fmt_headers)
self.processor = DefaultProcessor()
def _checkpath(self, filepath):
splitter = os.path.splitext(os.path.basename(filepath))
body = splitter[0]
extention = splitter[1]
newpath = filepath
counter = 0
while os.path.exists(newpath):
match = re.search(PATTERN,body)
if match:
counter=int(match[2])+1
num_with_bracket = f'({str(counter)})'
body = f'{match[1]}{num_with_bracket}'
else:
body = f'{body}({str(counter)})'
newpath = os.path.join(os.path.dirname(filepath),body+extention)
return newpath
def process(self, chat_components: list):
"""
Returns
----------
dict :
save_path : str :
Actual save path of file.
total_lines : int :
count of total lines written to the file.
"""
if chat_components is None or len (chat_components) == 0:
return
with open(self.save_path, mode='a', encoding = 'utf-8') as f:
writer = csv.writer(f, delimiter='\t')
chats = self.processor.process(chat_components).items
for c in chats:
writer.writerow([
c.datetime,
c.elapsedTime,
c.author.name,
c.message,
c.amountString,
c.author.type,
c.author.channelId
])

View File

@@ -7,12 +7,15 @@ from . worker import ExtractWorker
from . patch import Patch
from ... import config
from ... paramgen import arcparam
from ... exceptions import UnknownConnectionError
from concurrent.futures import CancelledError
from json import JSONDecodeError
from urllib.parse import quote
headers = config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
"get_live_chat_replay?continuation="
MAX_RETRY_COUNT = 3
def _split(start, end, count, min_interval_sec = 120):
"""
@@ -53,13 +56,22 @@ def ready_blocks(video_id, duration, div, callback):
tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(-1, duration, div)]
return await asyncio.gather(*tasks)
async def _create_block(session, video_id, seektime, callback):
continuation = arcparam.getparam(video_id, seektime = seektime)
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url, headers = headers) as resp:
text = await resp.text()
next_continuation, actions = parser.parse(json.loads(text))
for _ in range(MAX_RETRY_COUNT):
try :
async with session.get(url, headers = headers) as resp:
text = await resp.text()
next_continuation, actions = parser.parse(json.loads(text))
break
except JSONDecodeError:
await asyncio.sleep(3)
else:
cancel()
raise UnknownConnectionError("Abort: Unknown connection error.")
if actions:
first = parser.get_offset(actions[0])
last = parser.get_offset(actions[-1])
@@ -71,6 +83,7 @@ def ready_blocks(video_id, duration, div, callback):
first = first,
last = last
)
"""
fetch initial blocks.
"""
@@ -95,9 +108,18 @@ def fetch_patch(callback, blocks, video_id):
async def _fetch(continuation,session) -> Patch:
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url,headers = config.headers) as resp:
chat_json = await resp.text()
continuation, actions = parser.parse(json.loads(chat_json))
for _ in range(MAX_RETRY_COUNT):
try:
async with session.get(url,headers = config.headers) as resp:
chat_json = await resp.text()
continuation, actions = parser.parse(json.loads(chat_json))
break
except JSONDecodeError:
await asyncio.sleep(3)
else:
cancel()
raise UnknownConnectionError("Abort: Unknown connection error.")
if actions:
last = parser.get_offset(actions[-1])
first = parser.get_offset(actions[0])
@@ -105,6 +127,7 @@ def fetch_patch(callback, blocks, video_id):
callback(actions, last - first)
return Patch(actions, continuation, first, last)
return Patch(continuation = continuation)
"""
allocate workers and assign blocks.
"""

View File

@@ -68,6 +68,9 @@ def _search_new_block(worker) -> Block:
continuation = continuation,
during_split = True,
is_last = worker.parent_block.is_last)
'''swap last block'''
if worker.parent_block.is_last:
worker.parent_block.is_last = False
worker.blocks.insert(index+1, new_block)
return new_block

View File

@@ -65,8 +65,7 @@ item_moving_thumbnail = [
class VideoInfo:
'''
VideoInfo object retrieves YouTube video informations
from the video page.
VideoInfo object retrieves YouTube video information.
Parameter
---------
@@ -93,13 +92,19 @@ class VideoInfo:
res= json.loads(result.group(1))
response = self._get_item(res, item_response)
if response is None:
raise InvalidVideoIdException(
f"Specified video_id [{self.video_id}] is invalid.")
self._check_video_is_private(res.get("args"))
self._renderer = self._get_item(json.loads(response), item_renderer)
if self._renderer is None:
raise InvalidVideoIdException(
f"No renderer found in video_id: [{self.video_id}].")
def _check_video_is_private(self,args):
if args and args.get("video_id"):
raise InvalidVideoIdException(
f"video_id [{self.video_id}] is private or deleted.")
raise InvalidVideoIdException(
f"video_id [{self.video_id}] is invalid.")
def _get_item(self, dict_body, items: list):
for item in items:
if dict_body is None:

View File

@@ -44,29 +44,29 @@ with open('README.md', encoding='utf-8') as f:
setup(
name=package_name,
packages=find_packages(exclude=['*log.txt','*tests','*testrun']),
version=version,
url=url,
author=author,
author_email=author_email,
long_description=long_description,
long_description_content_type='text/markdown',
license=license,
install_requires=_requirements(),
description="a python library for fetching youtube live chat.",
classifiers=[
'Natural Language :: Japanese',
'Development Status :: 4 - Beta',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'License :: OSI Approved :: MIT License',
],
description="a python library for fetching youtube live chat.",
entry_points=
'''
[console_scripts]
pytchat=pytchat.cli:main
''',
install_requires=_requirements(),
keywords='youtube livechat asyncio',
license=license,
long_description=long_description,
long_description_content_type='text/markdown',
name=package_name,
packages=find_packages(exclude=['*log.txt','*tests','*testrun']),
url=url,
version=version,
)

View File

@@ -36,7 +36,7 @@ def test_process_0():
chat_component = {
'video_id':'',
'timeout':10,
'chatdata':load_chatdata(r"tests\testdata\calculator\superchat_0.json")
'chatdata':load_chatdata(r"tests/testdata/calculator/superchat_0.json")
}
assert SuperchatCalculator().process([chat_component])=={'': 6800.0, '': 2.0}
@@ -47,7 +47,7 @@ def test_process_1():
chat_component = {
'video_id':'',
'timeout':10,
'chatdata':load_chatdata(r"tests\testdata\calculator\text_only.json")
'chatdata':load_chatdata(r"tests/testdata/calculator/text_only.json")
}
assert SuperchatCalculator().process([chat_component])=={}
@@ -59,7 +59,7 @@ def test_process_2():
chat_component = {
'video_id':'',
'timeout':10,
'chatdata':load_chatdata(r"tests\testdata\calculator\replay_end.json")
'chatdata':load_chatdata(r"tests/testdata/calculator/replay_end.json")
}
assert False
SuperchatCalculator().process([chat_component])

View File

@@ -1,10 +1,11 @@
import json
import pytest
import asyncio,aiohttp
import asyncio
import aiohttp
from pytchat.parser.live import Parser
from pytchat.processors.compatible.processor import CompatibleProcessor
from pytchat.exceptions import (
NoLivechatRendererException,NoYtinitialdataException,
NoLivechatRendererException, NoYtinitialdataException,
ResponseContextError, NoContentsException)
from pytchat.processors.compatible.renderer.textmessage import LiveChatTextMessageRenderer
@@ -14,6 +15,7 @@ from pytchat.processors.compatible.renderer.legacypaid import LiveChatLegacyPaid
parser = Parser(is_replay=False)
def test_textmessage(mocker):
'''api互換processorのテスト通常テキストメッセージ'''
processor = CompatibleProcessor()
@@ -22,16 +24,16 @@ def test_textmessage(mocker):
_, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = {
"video_id" : "",
"timeout" : 7,
"chatdata" : chatdata
"video_id": "",
"timeout": 7,
"chatdata": chatdata
}
ret = processor.process([data])
assert ret["kind"]== "youtube#liveChatMessageListResponse"
assert ret["pollingIntervalMillis"]==data["timeout"]*1000
assert ret["kind"] == "youtube#liveChatMessageListResponse"
assert ret["pollingIntervalMillis"] == data["timeout"]*1000
assert ret.keys() == {
"kind", "etag", "pageInfo", "nextPageToken","pollingIntervalMillis","items"
"kind", "etag", "pageInfo", "nextPageToken", "pollingIntervalMillis", "items"
}
assert ret["pageInfo"].keys() == {
"totalResults", "resultsPerPage"
@@ -48,8 +50,9 @@ def test_textmessage(mocker):
assert ret["items"][0]["snippet"]["textMessageDetails"].keys() == {
'messageText'
}
assert "LCC." in ret["items"][0]["id"]
assert ret["items"][0]["snippet"]["type"]=="textMessageEvent"
assert "LCC." in ret["items"][0]["id"]
assert ret["items"][0]["snippet"]["type"] == "textMessageEvent"
def test_newsponcer(mocker):
'''api互換processorのテストメンバ新規登録'''
@@ -59,22 +62,22 @@ def test_newsponcer(mocker):
_, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = {
"video_id" : "",
"timeout" : 7,
"chatdata" : chatdata
"video_id": "",
"timeout": 7,
"chatdata": chatdata
}
ret = processor.process([data])
assert ret["kind"]== "youtube#liveChatMessageListResponse"
assert ret["pollingIntervalMillis"]==data["timeout"]*1000
assert ret["kind"] == "youtube#liveChatMessageListResponse"
assert ret["pollingIntervalMillis"] == data["timeout"]*1000
assert ret.keys() == {
"kind", "etag", "pageInfo", "nextPageToken","pollingIntervalMillis","items"
"kind", "etag", "pageInfo", "nextPageToken", "pollingIntervalMillis", "items"
}
assert ret["pageInfo"].keys() == {
"totalResults", "resultsPerPage"
}
assert ret["items"][0].keys() == {
"kind", "etag", "id", "snippet","authorDetails"
"kind", "etag", "id", "snippet", "authorDetails"
}
assert ret["items"][0]["snippet"].keys() == {
'type', 'liveChatId', 'authorChannelId', 'publishedAt', 'hasDisplayContent', 'displayMessage'
@@ -83,8 +86,44 @@ def test_newsponcer(mocker):
assert ret["items"][0]["authorDetails"].keys() == {
'channelId', 'channelUrl', 'displayName', 'profileImageUrl', 'isVerified', 'isChatOwner', 'isChatSponsor', 'isChatModerator'
}
assert "LCC." in ret["items"][0]["id"]
assert ret["items"][0]["snippet"]["type"]=="newSponsorEvent"
assert "LCC." in ret["items"][0]["id"]
assert ret["items"][0]["snippet"]["type"] == "newSponsorEvent"
def test_newsponcer_rev(mocker):
'''api互換processorのテストメンバ新規登録'''
processor = CompatibleProcessor()
_json = _open_file("tests/testdata/compatible/newSponsor_rev.json")
_, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = {
"video_id": "",
"timeout": 7,
"chatdata": chatdata
}
ret = processor.process([data])
assert ret["kind"] == "youtube#liveChatMessageListResponse"
assert ret["pollingIntervalMillis"] == data["timeout"]*1000
assert ret.keys() == {
"kind", "etag", "pageInfo", "nextPageToken", "pollingIntervalMillis", "items"
}
assert ret["pageInfo"].keys() == {
"totalResults", "resultsPerPage"
}
assert ret["items"][0].keys() == {
"kind", "etag", "id", "snippet", "authorDetails"
}
assert ret["items"][0]["snippet"].keys() == {
'type', 'liveChatId', 'authorChannelId', 'publishedAt', 'hasDisplayContent', 'displayMessage'
}
assert ret["items"][0]["authorDetails"].keys() == {
'channelId', 'channelUrl', 'displayName', 'profileImageUrl', 'isVerified', 'isChatOwner', 'isChatSponsor', 'isChatModerator'
}
assert "LCC." in ret["items"][0]["id"]
assert ret["items"][0]["snippet"]["type"] == "newSponsorEvent"
def test_superchat(mocker):
@@ -95,16 +134,16 @@ def test_superchat(mocker):
_, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = {
"video_id" : "",
"timeout" : 7,
"chatdata" : chatdata
"video_id": "",
"timeout": 7,
"chatdata": chatdata
}
ret = processor.process([data])
assert ret["kind"]== "youtube#liveChatMessageListResponse"
assert ret["pollingIntervalMillis"]==data["timeout"]*1000
assert ret["kind"] == "youtube#liveChatMessageListResponse"
assert ret["pollingIntervalMillis"] == data["timeout"]*1000
assert ret.keys() == {
"kind", "etag", "pageInfo", "nextPageToken","pollingIntervalMillis","items"
"kind", "etag", "pageInfo", "nextPageToken", "pollingIntervalMillis", "items"
}
assert ret["pageInfo"].keys() == {
"totalResults", "resultsPerPage"
@@ -121,8 +160,9 @@ def test_superchat(mocker):
assert ret["items"][0]["snippet"]["superChatDetails"].keys() == {
'amountMicros', 'currency', 'amountDisplayString', 'tier', 'backgroundColor'
}
assert "LCC." in ret["items"][0]["id"]
assert ret["items"][0]["snippet"]["type"]=="superChatEvent"
assert "LCC." in ret["items"][0]["id"]
assert ret["items"][0]["snippet"]["type"] == "superChatEvent"
def test_unregistered_currency(mocker):
processor = CompatibleProcessor()
@@ -132,14 +172,14 @@ def test_unregistered_currency(mocker):
_, chatdata = parser.parse(parser.get_contents(json.loads(_json)))
data = {
"video_id" : "",
"timeout" : 7,
"chatdata" : chatdata
"video_id": "",
"timeout": 7,
"chatdata": chatdata
}
ret = processor.process([data])
assert ret["items"][0]["snippet"]["superChatDetails"]["currency"] == "[UNREGISTERD]"
def _open_file(path):
with open(path,mode ='r',encoding = 'utf-8') as f:
with open(path, mode='r', encoding='utf-8') as f:
return f.read()

File diff suppressed because it is too large Load Diff