Fix comments
This commit is contained in:
@@ -4,13 +4,13 @@ import asyncio
|
|||||||
|
|
||||||
class Buffer(asyncio.Queue):
|
class Buffer(asyncio.Queue):
|
||||||
'''
|
'''
|
||||||
チャットデータを格納するバッファの役割を持つFIFOキュー
|
Buffer for storing chat data.
|
||||||
|
|
||||||
Parameter
|
Parameter
|
||||||
---------
|
---------
|
||||||
maxsize : int
|
maxsize : int
|
||||||
格納するチャットブロックの最大個数。0の場合は無限。
|
Maximum number of chat blocks to be stored.
|
||||||
最大値を超える場合は古いチャットブロックから破棄される。
|
If it exceeds the maximum, the oldest chat block will be discarded.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
def __init__(self, maxsize=0):
|
def __init__(self, maxsize=0):
|
||||||
|
|||||||
@@ -22,54 +22,51 @@ MAX_RETRY = 10
|
|||||||
|
|
||||||
|
|
||||||
class LiveChatAsync:
|
class LiveChatAsync:
|
||||||
'''asyncioを利用してYouTubeのライブ配信のチャットデータを取得する。
|
'''LiveChatAsync object fetches chat data and stores them
|
||||||
|
in a buffer with asyncio.
|
||||||
|
|
||||||
Parameter
|
Parameter
|
||||||
---------
|
---------
|
||||||
video_id : str
|
video_id : str
|
||||||
動画ID
|
|
||||||
|
|
||||||
seektime : int
|
seektime : int
|
||||||
(ライブチャット取得時は無視)
|
start position of fetching chat (seconds).
|
||||||
取得開始するアーカイブ済みチャットの経過時間(秒)
|
This option is valid for archived chat only.
|
||||||
マイナス値を指定した場合は、配信開始前のチャットも取得する。
|
If negative value, chat data posted before the start of the broadcast
|
||||||
|
will be retrieved as well.
|
||||||
|
|
||||||
processor : ChatProcessor
|
processor : ChatProcessor
|
||||||
チャットデータを加工するオブジェクト
|
|
||||||
|
|
||||||
buffer : Buffer(maxsize:20[default])
|
buffer : Buffer
|
||||||
チャットデータchat_componentを格納するバッファ。
|
buffer of chat data fetched background.
|
||||||
maxsize : 格納できるchat_componentの個数
|
|
||||||
default値20個。1個で約5~10秒分。
|
|
||||||
|
|
||||||
interruptable : bool
|
interruptable : bool
|
||||||
Ctrl+Cによる処理中断を行うかどうか。
|
Allows keyboard interrupts.
|
||||||
|
Set this parameter to False if your own threading program causes
|
||||||
|
the problem.
|
||||||
|
|
||||||
callback : func
|
callback : func
|
||||||
_listen()関数から一定間隔で自動的に呼びだす関数。
|
function called periodically from _listen().
|
||||||
|
|
||||||
done_callback : func
|
done_callback : func
|
||||||
listener終了時に呼び出すコールバック。
|
function called when listener ends.
|
||||||
|
|
||||||
exception_handler : func
|
exception_handler : func
|
||||||
例外を処理する関数
|
|
||||||
|
|
||||||
direct_mode : bool
|
direct_mode : bool
|
||||||
Trueの場合、bufferを使わずにcallbackを呼ぶ。
|
If True, invoke specified callback function without using buffer.
|
||||||
Trueの場合、callbackの設定が必須
|
callback is required. If not, IllegalFunctionCall will be raised.
|
||||||
(設定していない場合IllegalFunctionCall例外を発生させる)
|
|
||||||
|
|
||||||
force_replay : bool
|
force_replay : bool
|
||||||
Trueの場合、ライブチャットが取得できる場合であっても
|
force to fetch archived chat data, even if specified video is live.
|
||||||
強制的にアーカイブ済みチャットを取得する。
|
|
||||||
|
|
||||||
topchat_only : bool
|
topchat_only : bool
|
||||||
Trueの場合、上位チャットのみ取得する。
|
If True, get only top chat.
|
||||||
|
|
||||||
Attributes
|
Attributes
|
||||||
---------
|
---------
|
||||||
_is_alive : bool
|
_is_alive : bool
|
||||||
チャット取得を停止するためのフラグ
|
Flag to stop getting chat.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
_setup_finished = False
|
_setup_finished = False
|
||||||
@@ -119,26 +116,26 @@ class LiveChatAsync:
|
|||||||
self._setup()
|
self._setup()
|
||||||
|
|
||||||
def _setup(self):
|
def _setup(self):
|
||||||
# direct modeがTrueでcallback未設定の場合例外発生。
|
# An exception is raised when direct mode is true and no callback is set.
|
||||||
if self._direct_mode:
|
if self._direct_mode:
|
||||||
if self._callback is None:
|
if self._callback is None:
|
||||||
raise exceptions.IllegalFunctionCall(
|
raise exceptions.IllegalFunctionCall(
|
||||||
"When direct_mode=True, callback parameter is required.")
|
"When direct_mode=True, callback parameter is required.")
|
||||||
else:
|
else:
|
||||||
# direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
|
# Create a default buffer if `direct_mode` is False and buffer is not set.
|
||||||
if self._buffer is None:
|
if self._buffer is None:
|
||||||
self._buffer = Buffer(maxsize=20)
|
self._buffer = Buffer(maxsize=20)
|
||||||
# callbackが指定されている場合はcallbackを呼ぶループタスクを作成
|
# Create a loop task to call callback if the `callback` param is specified.
|
||||||
if self._callback is None:
|
if self._callback is None:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
# callbackを呼ぶループタスクの開始
|
# Create a loop task to call callback if the `callback` param is specified.
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
loop.create_task(self._callback_loop(self._callback))
|
loop.create_task(self._callback_loop(self._callback))
|
||||||
# _listenループタスクの開始
|
# Start a loop task for _listen()
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
self.listen_task = loop.create_task(self._startlisten())
|
self.listen_task = loop.create_task(self._startlisten())
|
||||||
# add_done_callbackの登録
|
# Register add_done_callback
|
||||||
if self._done_callback is None:
|
if self._done_callback is None:
|
||||||
self.listen_task.add_done_callback(self._finish)
|
self.listen_task.add_done_callback(self._finish)
|
||||||
else:
|
else:
|
||||||
@@ -263,13 +260,14 @@ class LiveChatAsync:
|
|||||||
return livechat_json
|
return livechat_json
|
||||||
|
|
||||||
async def _callback_loop(self, callback):
|
async def _callback_loop(self, callback):
|
||||||
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
|
""" If a callback is specified in the constructor,
|
||||||
callbackに指定された関数に一定間隔でチャットデータを投げる。
|
it throws chat data at regular intervals to the
|
||||||
|
function specified in the callback in the backgroun
|
||||||
|
|
||||||
Parameter
|
Parameter
|
||||||
---------
|
---------
|
||||||
callback : func
|
callback : func
|
||||||
加工済みのチャットデータを渡す先の関数。
|
function to which the processed chat data is passed.
|
||||||
"""
|
"""
|
||||||
while self.is_alive():
|
while self.is_alive():
|
||||||
items = await self._buffer.get()
|
items = await self._buffer.get()
|
||||||
@@ -280,11 +278,13 @@ class LiveChatAsync:
|
|||||||
await self._callback(processed_chat)
|
await self._callback(processed_chat)
|
||||||
|
|
||||||
async def get(self):
|
async def get(self):
|
||||||
""" bufferからデータを取り出し、processorに投げ、
|
"""
|
||||||
加工済みのチャットデータを返す。
|
Retrieves data from the buffer,
|
||||||
|
throws it to the processor,
|
||||||
|
and returns the processed chat data.
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
: Processorによって加工されたチャットデータ
|
: Chat data processed by the Processor
|
||||||
"""
|
"""
|
||||||
if self._callback is None:
|
if self._callback is None:
|
||||||
if self.is_alive():
|
if self.is_alive():
|
||||||
@@ -293,7 +293,7 @@ class LiveChatAsync:
|
|||||||
else:
|
else:
|
||||||
return []
|
return []
|
||||||
raise exceptions.IllegalFunctionCall(
|
raise exceptions.IllegalFunctionCall(
|
||||||
"既にcallbackを登録済みのため、get()は実行できません。")
|
"Callback parameter is already set, so get() cannot be performed.")
|
||||||
|
|
||||||
def is_replay(self):
|
def is_replay(self):
|
||||||
return self._is_replay
|
return self._is_replay
|
||||||
@@ -314,7 +314,7 @@ class LiveChatAsync:
|
|||||||
return self._is_alive
|
return self._is_alive
|
||||||
|
|
||||||
def _finish(self, sender):
|
def _finish(self, sender):
|
||||||
'''Listener終了時のコールバック'''
|
'''Called when the _listen() task finished.'''
|
||||||
try:
|
try:
|
||||||
self._task_finished()
|
self._task_finished()
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
@@ -329,7 +329,7 @@ class LiveChatAsync:
|
|||||||
|
|
||||||
def _task_finished(self):
|
def _task_finished(self):
|
||||||
'''
|
'''
|
||||||
Listenerを終了する。
|
Terminate fetching chats.
|
||||||
'''
|
'''
|
||||||
if self.is_alive():
|
if self.is_alive():
|
||||||
self.terminate()
|
self.terminate()
|
||||||
@@ -339,7 +339,7 @@ class LiveChatAsync:
|
|||||||
self.exception = e
|
self.exception = e
|
||||||
if not isinstance(e, exceptions.ChatParseException):
|
if not isinstance(e, exceptions.ChatParseException):
|
||||||
self._logger.error(f'Internal exception - {type(e)}{str(e)}')
|
self._logger.error(f'Internal exception - {type(e)}{str(e)}')
|
||||||
self._logger.info(f'[{self._video_id}]終了しました')
|
self._logger.info(f'[{self._video_id}] finished.')
|
||||||
|
|
||||||
def raise_for_status(self):
|
def raise_for_status(self):
|
||||||
if self.exception is not None:
|
if self.exception is not None:
|
||||||
|
|||||||
@@ -4,13 +4,13 @@ import queue
|
|||||||
|
|
||||||
class Buffer(queue.Queue):
|
class Buffer(queue.Queue):
|
||||||
'''
|
'''
|
||||||
チャットデータを格納するバッファの役割を持つFIFOキュー
|
Buffer for storing chat data.
|
||||||
|
|
||||||
Parameter
|
Parameter
|
||||||
---------
|
---------
|
||||||
max_size : int
|
maxsize : int
|
||||||
格納するチャットブロックの最大個数。0の場合は無限。
|
Maximum number of chat blocks to be stored.
|
||||||
最大値を超える場合は古いチャットブロックから破棄される。
|
If it exceeds the maximum, the oldest chat block will be discarded.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
def __init__(self, maxsize=0):
|
def __init__(self, maxsize=0):
|
||||||
|
|||||||
@@ -21,54 +21,53 @@ MAX_RETRY = 10
|
|||||||
|
|
||||||
|
|
||||||
class LiveChat:
|
class LiveChat:
|
||||||
''' スレッドプールを利用してYouTubeのライブ配信のチャットデータを取得する
|
'''
|
||||||
|
LiveChat object fetches chat data and stores them
|
||||||
|
in a buffer with ThreadpoolExecutor.
|
||||||
|
|
||||||
Parameter
|
Parameter
|
||||||
---------
|
---------
|
||||||
video_id : str
|
video_id : str
|
||||||
動画ID
|
|
||||||
|
|
||||||
seektime : int
|
seektime : int
|
||||||
(ライブチャット取得時は無視)
|
start position of fetching chat (seconds).
|
||||||
取得開始するアーカイブ済みチャットの経過時間(秒)
|
This option is valid for archived chat only.
|
||||||
マイナス値を指定した場合は、配信開始前のチャットも取得する。
|
If negative value, chat data posted before the start of the broadcast
|
||||||
|
will be retrieved as well.
|
||||||
|
|
||||||
processor : ChatProcessor
|
processor : ChatProcessor
|
||||||
チャットデータを加工するオブジェクト
|
|
||||||
|
|
||||||
buffer : Buffer(maxsize:20[default])
|
buffer : Buffer
|
||||||
チャットデータchat_componentを格納するバッファ。
|
buffer of chat data fetched background.
|
||||||
maxsize : 格納できるchat_componentの個数
|
|
||||||
default値20個。1個で約5~10秒分。
|
|
||||||
|
|
||||||
interruptable : bool
|
interruptable : bool
|
||||||
Ctrl+Cによる処理中断を行うかどうか。
|
Allows keyboard interrupts.
|
||||||
|
Set this parameter to False if your own threading program causes
|
||||||
|
the problem.
|
||||||
|
|
||||||
callback : func
|
callback : func
|
||||||
_listen()関数から一定間隔で自動的に呼びだす関数。
|
function called periodically from _listen().
|
||||||
|
|
||||||
done_callback : func
|
done_callback : func
|
||||||
listener終了時に呼び出すコールバック。
|
function called when listener ends.
|
||||||
|
|
||||||
direct_mode : bool
|
direct_mode : bool
|
||||||
Trueの場合、bufferを使わずにcallbackを呼ぶ。
|
If True, invoke specified callback function without using buffer.
|
||||||
Trueの場合、callbackの設定が必須
|
callback is required. If not, IllegalFunctionCall will be raised.
|
||||||
(設定していない場合IllegalFunctionCall例外を発生させる)
|
|
||||||
|
|
||||||
force_replay : bool
|
force_replay : bool
|
||||||
Trueの場合、ライブチャットが取得できる場合であっても
|
force to fetch archived chat data, even if specified video is live.
|
||||||
強制的にアーカイブ済みチャットを取得する。
|
|
||||||
|
|
||||||
topchat_only : bool
|
topchat_only : bool
|
||||||
Trueの場合、上位チャットのみ取得する。
|
If True, get only top chat.
|
||||||
|
|
||||||
Attributes
|
Attributes
|
||||||
---------
|
---------
|
||||||
_executor : ThreadPoolExecutor
|
_executor : ThreadPoolExecutor
|
||||||
チャットデータ取得ループ(_listen)用のスレッド
|
This is used for _listen() loop.
|
||||||
|
|
||||||
_is_alive : bool
|
_is_alive : bool
|
||||||
チャット取得を停止するためのフラグ
|
Flag to stop getting chat.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
_setup_finished = False
|
_setup_finished = False
|
||||||
@@ -112,24 +111,24 @@ class LiveChat:
|
|||||||
self._setup()
|
self._setup()
|
||||||
|
|
||||||
def _setup(self):
|
def _setup(self):
|
||||||
# direct modeがTrueでcallback未設定の場合例外発生。
|
# An exception is raised when direct mode is true and no callback is set.
|
||||||
if self._direct_mode:
|
if self._direct_mode:
|
||||||
if self._callback is None:
|
if self._callback is None:
|
||||||
raise exceptions.IllegalFunctionCall(
|
raise exceptions.IllegalFunctionCall(
|
||||||
"When direct_mode=True, callback parameter is required.")
|
"When direct_mode=True, callback parameter is required.")
|
||||||
else:
|
else:
|
||||||
# direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
|
# Create a default buffer if `direct_mode` is False and buffer is not set.
|
||||||
if self._buffer is None:
|
if self._buffer is None:
|
||||||
self._buffer = Buffer(maxsize=20)
|
self._buffer = Buffer(maxsize=20)
|
||||||
# callbackが指定されている場合はcallbackを呼ぶループタスクを作成
|
# Create a loop task to call callback if the `callback` param is specified.
|
||||||
if self._callback is None:
|
if self._callback is None:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
# callbackを呼ぶループタスクの開始
|
# Start a loop task calling callback function.
|
||||||
self._executor.submit(self._callback_loop, self._callback)
|
self._executor.submit(self._callback_loop, self._callback)
|
||||||
# _listenループタスクの開始
|
# Start a loop task for _listen()
|
||||||
self.listen_task = self._executor.submit(self._startlisten)
|
self.listen_task = self._executor.submit(self._startlisten)
|
||||||
# add_done_callbackの登録
|
# Register add_done_callback
|
||||||
if self._done_callback is None:
|
if self._done_callback is None:
|
||||||
self.listen_task.add_done_callback(self._finish)
|
self.listen_task.add_done_callback(self._finish)
|
||||||
else:
|
else:
|
||||||
@@ -243,8 +242,8 @@ class LiveChat:
|
|||||||
try:
|
try:
|
||||||
livechat_json = client.get(url, headers=headers).json()
|
livechat_json = client.get(url, headers=headers).json()
|
||||||
break
|
break
|
||||||
except json.JSONDecodeError:
|
except (json.JSONDecodeError, httpx.TimeoutError):
|
||||||
time.sleep(1)
|
time.sleep(2)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
self._logger.error(f"[{self._video_id}]"
|
self._logger.error(f"[{self._video_id}]"
|
||||||
@@ -253,13 +252,14 @@ class LiveChat:
|
|||||||
return livechat_json
|
return livechat_json
|
||||||
|
|
||||||
def _callback_loop(self, callback):
|
def _callback_loop(self, callback):
|
||||||
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
|
""" If a callback is specified in the constructor,
|
||||||
callbackに指定された関数に一定間隔でチャットデータを投げる。
|
it throws chat data at regular intervals to the
|
||||||
|
function specified in the callback in the backgroun
|
||||||
|
|
||||||
Parameter
|
Parameter
|
||||||
---------
|
---------
|
||||||
callback : func
|
callback : func
|
||||||
加工済みのチャットデータを渡す先の関数。
|
function to which the processed chat data is passed.
|
||||||
"""
|
"""
|
||||||
while self.is_alive():
|
while self.is_alive():
|
||||||
items = self._buffer.get()
|
items = self._buffer.get()
|
||||||
@@ -270,11 +270,13 @@ class LiveChat:
|
|||||||
self._callback(processed_chat)
|
self._callback(processed_chat)
|
||||||
|
|
||||||
def get(self):
|
def get(self):
|
||||||
""" bufferからデータを取り出し、processorに投げ、
|
"""
|
||||||
加工済みのチャットデータを返す。
|
Retrieves data from the buffer,
|
||||||
|
throws it to the processor,
|
||||||
|
and returns the processed chat data.
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
: Processorによって加工されたチャットデータ
|
: Chat data processed by the Processor
|
||||||
"""
|
"""
|
||||||
if self._callback is None:
|
if self._callback is None:
|
||||||
if self.is_alive():
|
if self.is_alive():
|
||||||
@@ -283,7 +285,7 @@ class LiveChat:
|
|||||||
else:
|
else:
|
||||||
return []
|
return []
|
||||||
raise exceptions.IllegalFunctionCall(
|
raise exceptions.IllegalFunctionCall(
|
||||||
"既にcallbackを登録済みのため、get()は実行できません。")
|
"Callback parameter is already set, so get() cannot be performed.")
|
||||||
|
|
||||||
def is_replay(self):
|
def is_replay(self):
|
||||||
return self._is_replay
|
return self._is_replay
|
||||||
@@ -304,13 +306,16 @@ class LiveChat:
|
|||||||
return self._is_alive
|
return self._is_alive
|
||||||
|
|
||||||
def _finish(self, sender):
|
def _finish(self, sender):
|
||||||
'''Listener終了時のコールバック'''
|
'''Called when the _listen() task finished.'''
|
||||||
try:
|
try:
|
||||||
self._task_finished()
|
self._task_finished()
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
self._logger.debug(f'[{self._video_id}] cancelled:{sender}')
|
self._logger.debug(f'[{self._video_id}] cancelled:{sender}')
|
||||||
|
|
||||||
def terminate(self):
|
def terminate(self):
|
||||||
|
'''
|
||||||
|
Terminate fetching chats.
|
||||||
|
'''
|
||||||
if self._pauser.empty():
|
if self._pauser.empty():
|
||||||
self._pauser.put_nowait(None)
|
self._pauser.put_nowait(None)
|
||||||
self._is_alive = False
|
self._is_alive = False
|
||||||
@@ -319,9 +324,6 @@ class LiveChat:
|
|||||||
self.processor.finalize()
|
self.processor.finalize()
|
||||||
|
|
||||||
def _task_finished(self):
|
def _task_finished(self):
|
||||||
'''
|
|
||||||
Listenerを終了する。
|
|
||||||
'''
|
|
||||||
if self.is_alive():
|
if self.is_alive():
|
||||||
self.terminate()
|
self.terminate()
|
||||||
try:
|
try:
|
||||||
@@ -330,7 +332,7 @@ class LiveChat:
|
|||||||
self.exception = e
|
self.exception = e
|
||||||
if not isinstance(e, exceptions.ChatParseException):
|
if not isinstance(e, exceptions.ChatParseException):
|
||||||
self._logger.error(f'Internal exception - {type(e)}{str(e)}')
|
self._logger.error(f'Internal exception - {type(e)}{str(e)}')
|
||||||
self._logger.info(f'[{self._video_id}]終了しました')
|
self._logger.info(f'[{self._video_id}] finished.')
|
||||||
|
|
||||||
def raise_for_status(self):
|
def raise_for_status(self):
|
||||||
if self.exception is not None:
|
if self.exception is not None:
|
||||||
|
|||||||
Reference in New Issue
Block a user