Created base files

This commit is contained in:
55448286+taizan-hokuto@users.noreply.github.com
2019-11-03 08:49:05 +09:00
commit 582c1606c1
59 changed files with 4957 additions and 0 deletions

View File

View File

@@ -0,0 +1,28 @@
import asyncio
class Buffer(asyncio.Queue):
'''
チャットデータを格納するバッファの役割を持つLIFOキュー
Parameter
---------
maxsize : int
格納するチャットブロックの最大個数。0の場合は無限。
最大値を超える場合は古いチャットブロックから破棄される。
'''
def __init__(self,maxsize = 0):
super().__init__(maxsize)
async def put(self,item):
if item is None:
return
if super().full():
super().get_nowait()
await super().put(item)
async def get(self):
ret = []
ret.append(await super().get())
while not super().empty():
ret.append(super().get_nowait())
return ret

View File

@@ -0,0 +1,184 @@
import asyncio
from .listener import AsyncListener
from .. import config
from .. import mylogger
import datetime
import os
import aiohttp
import signal
import threading
from .buffer import Buffer
from concurrent.futures import CancelledError
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
class ListenManager:
'''
動画IDまたは動画IDのリストを受け取り、
動画IDに対応したListenerを生成・保持する。
#Attributes
----------
_listeners: dict
ListenManegerがつかんでいるListener達のリスト.
key:動画ID value:動画IDに対応するListener
_queue: Queue
動画IDを外部から受け渡しするためのキュー
_queueが空である間は、ンブロッキングで他のタスクを実行
_queueに動画IDが投入されると、_dequeueメソッドで
直ちにListenerを生成し返す。
_event: threading.Event
キーボードのCTRL+Cを検知するためのEventオブジェクト
'''
def __init__(self,interruptable = True):
#チャット監視中の動画リスト
self._listeners={}
self._tasks = []
#外部からvideoを受け取るためのキュー
self._queue = asyncio.Queue()
self._event = threading.Event()
self._ready_queue()
self._is_alive = True
#キーボードのCtrl+cを押したとき、_hundler関数を呼び出すように設定
signal.signal(signal.SIGINT, (lambda a, b: self._handler(self._event, a, b)))
def is_alive(self)->bool:
'''
ListenManagerが稼働中であるか。
True->稼働中
False->Ctrl+Cが押されて終了
'''
logger.debug(f'check is_alive() :{self._is_alive}')
return self._is_alive
def _handler(self, event, sig, handler):
'''
Ctrl+Cが押下されたとき、終了フラグをセットする。
'''
logger.debug('Ctrl+c pushed')
self._is_alive = False
logger.debug('terminating listeners.')
for listener in self._listeners.values():
listener.terminate()
logger.debug('end.')
def _ready_queue(self):
#loop = asyncio.get_event_loop()
self._tasks.append(
asyncio.create_task(self._dequeue())
)
async def set_video_ids(self,video_ids:list):
for video_id in video_ids:
if video_id:
await self._queue.put(video_id)
async def get_listener(self,video_id) -> AsyncListener:
return await self._create_listener(video_id)
# async def getlivechat(self,video_id):
# '''
# 指定された動画IDのチャットデータを返す
# Parameter
# ----------
# video_id: str
# 動画ID
# Return
# ----------
# 引数で受け取った動画IDに対応する
# Listenerオブジェクトへの参照
# '''
# logger.debug('manager get/create listener')
# listener = await self._create_listener(video_id)
# '''
# 上が完了しないうちに、下が呼び出される
# '''
# if not listener._initialized:
# await asyncio.sleep(2)
# return []
# if listener:
# #listener._isfirstrun=False
# return await listener.getlivechat()
async def _dequeue(self):
'''
キューに入った動画IDを
Listener登録に回す。
'''
while True:
video_id = await self._queue.get()
#listenerを登録、タスクとして実行する
logger.debug(f'deque got [{video_id}]')
await self._create_listener(video_id)
async def _create_listener(self, video_id) -> AsyncListener:
'''
Listenerを作成しチャット取得中リストに加え、
Listenerを返す
'''
if video_id is None or not isinstance(video_id, str):
raise TypeError('video_idは文字列でなければなりません')
if video_id in self._listeners:
return self._listeners[video_id]
else:
#listenerを登録する
listener = AsyncListener(video_id,interruptable = False,buffer = Buffer())
self._listeners.setdefault(video_id,listener)
#task = asyncio.ensure_future(listener.initialize())
#await asyncio.gather(listener.initialize())
#task.add_done_callback(self.finish)
#await listener.initialize()
#self._tasks.append(task)
return listener
def finish(self,sender):
try:
if sender.result():
video_id = sender.result()[0]
message = sender.result()[1]
#listener終了時のコールバック
#sender.result()[]でデータを取得できる
logger.info(f'終了しました VIDEO_ID:[{video_id}] message:{message}')
#logger.info(f'終了しました')
if video_id in self._listeners:
self._listeners.pop(video_id)
except CancelledError:
logger.debug('cancelled.')
def get_listeners(self):
return self._listeners
def shutdown(self):
'''
ListenManegerを終了する
'''
logger.debug("start shutdown")
self._is_alive =False
try:
#Listenerを停止する。
for listener in self._listeners.values():
listener.terminate()
#taskをキャンセルする。
for task in self._tasks:
if not task.done():
#print(task)
task.cancel()
except Exception as er:
logger.info(str(er),type(er))
logger.debug("finished.")
def get_tasks(self):
return self._tasks

View File

@@ -0,0 +1,299 @@
import aiohttp, asyncio, async_timeout
import datetime
import json
import random
import signal
import threading
import time
import traceback
import urllib.parse
from aiohttp.client_exceptions import ClientConnectorError
from concurrent.futures import CancelledError
from .buffer import Buffer
from .parser import Parser
from .. import config
from .. import mylogger
from ..exceptions import ChatParseException,IllegalFunctionCall
from ..paramgen import liveparam
from ..processors.default.processor import DefaultProcessor
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
MAX_RETRY = 10
headers = config.headers
class LiveChatAsync:
'''asyncio(aiohttp)を利用してYouTubeのライブ配信のチャットデータを取得する。
Parameter
---------
video_id : str
動画ID
processor : ChatProcessor
チャットデータを加工するオブジェクト
buffer : Buffer(maxsize:20[default])
チャットデータchat_componentを格納するバッファ。
maxsize : 格納できるchat_componentの個数
default値20個。1個で約5~10秒分。
interruptable : bool
Ctrl+Cによる処理中断を行うかどうか。
callback : func
_listen()関数から一定間隔で自動的に呼びだす関数。
done_callback : func
listener終了時に呼び出すコールバック。
exception_handler : func
例外を処理する関数
direct_mode : bool
Trueの場合、bufferを使わずにcallbackを呼ぶ。
Trueの場合、callbackの設定が必須
(設定していない場合IllegalFunctionCall例外を発生させる
Attributes
---------
_is_alive : bool
チャット取得を停止するためのフラグ
'''
_setup_finished = False
def __init__(self, video_id,
processor = DefaultProcessor(),
buffer = None,
interruptable = True,
callback = None,
done_callback = None,
exception_handler = None,
direct_mode = False):
self.video_id = video_id
self.processor = processor
self._buffer = buffer
self._callback = callback
self._done_callback = done_callback
self._exception_handler = exception_handler
self._direct_mode = direct_mode
self._is_alive = True
self._setup()
if not LiveChatAsync._setup_finished:
LiveChatAsync._setup_finished = True
if exception_handler == None:
self._set_exception_handler(self._handle_exception)
else:
self._set_exception_handler(exception_handler)
if interruptable:
signal.signal(signal.SIGINT,
(lambda a, b:asyncio.create_task(
LiveChatAsync.shutdown(None,signal.SIGINT,b))
))
def _setup(self):
#direct modeがTrueでcallback未設定の場合例外発生。
if self._direct_mode:
if self._callback is None:
raise IllegalFunctionCall(
"direct_mode=Trueの場合callbackの設定が必須です。")
else:
#direct modeがFalseでbufferが未設定ならばデフォルトのbufferを作成
if self._buffer is None:
self._buffer = Buffer(maxsize = 20)
#callbackが指定されている場合はcallbackを呼ぶループタスクを作成
if self._callback is None:
pass
else:
#callbackを呼ぶループタスクの開始
loop = asyncio.get_event_loop()
loop.create_task(self._callback_loop(self._callback))
#_listenループタスクの開始
loop = asyncio.get_event_loop()
listen_task = loop.create_task(self._startlisten())
#add_done_callbackの登録
if self._done_callback is None:
listen_task.add_done_callback(self.finish)
else:
listen_task.add_done_callback(self._done_callback)
async def _startlisten(self):
"""最初のcontinuationパラメータを取得し、
_listenループを開始する
"""
initial_continuation = await self._get_initial_continuation()
if initial_continuation is None:
self.terminate()
logger.debug(f"[{self.video_id}]No initial continuation.")
return
await self._listen(initial_continuation)
async def _get_initial_continuation(self):
''' チャットデータ取得に必要な最初のcontinuationを取得する。'''
try:
initial_continuation = liveparam.getparam(self.video_id)
except ChatParseException as e:
self.terminate()
logger.debug(f"[{self.video_id}]Error:{str(e)}")
return
except KeyError:
logger.debug(f"[{self.video_id}]KeyError:"
f"{traceback.format_exc(limit = -1)}")
self.terminate()
return
return initial_continuation
async def _listen(self, continuation):
''' continuationに紐付いたチャットデータを取得し
チャットデータを格納、
次のcontinuaitonを取得してループする。
Parameter
---------
continuation : str
次のチャットデータ取得に必要なパラメータ
'''
try:
async with aiohttp.ClientSession() as session:
while(continuation and self._is_alive):
livechat_json = (await
self._get_livechat_json(continuation, session, headers)
)
metadata, chatdata = Parser.parse( livechat_json )
timeout = metadata['timeoutMs']/1000
chat_component = {
"video_id" : self.video_id,
"timeout" : timeout,
"chatdata" : chatdata
}
time_mark =time.time()
if self._direct_mode:
await self._callback(
self.processor.process([chat_component])
)
else:
await self._buffer.put(chat_component)
diff_time = timeout - (time.time()-time_mark)
await asyncio.sleep(diff_time)
continuation = metadata.get('continuation')
except ChatParseException as e:
logger.info(f"{str(e)}video_id:\"{self.video_id}\"")
return
except (TypeError , json.JSONDecodeError) :
logger.error(f"{traceback.format_exc(limit = -1)}")
return
logger.debug(f"[{self.video_id}]チャット取得を終了しました。")
async def _get_livechat_json(self, continuation, session, headers):
'''
チャットデータが格納されたjsonデータを取得する。
'''
continuation = urllib.parse.quote(continuation)
livechat_json = None
status_code = 0
url =(
f"https://www.youtube.com/live_chat/get_live_chat?"
f"continuation={continuation}&pbj=1")
for _ in range(MAX_RETRY + 1):
async with session.get(url ,headers = headers) as resp:
try:
text = await resp.text()
status_code = resp.status
livechat_json = json.loads(text)
break
except (ClientConnectorError,json.JSONDecodeError) :
await asyncio.sleep(1)
continue
else:
logger.error(f"[{self.video_id}]"
f"Exceeded retry count. status_code={status_code}")
return None
return livechat_json
async def _callback_loop(self,callback):
""" コンストラクタでcallbackを指定している場合、バックグラウンドで
callbackに指定された関数に一定間隔でチャットデータを投げる。
Parameter
---------
callback : func
加工済みのチャットデータを渡す先の関数。
"""
while self.is_alive():
items = await self._buffer.get()
data = self.processor.process(items)
await callback(data)
async def get(self):
""" bufferからデータを取り出し、processorに投げ、
加工済みのチャットデータを返す。
Returns
: Processorによって加工されたチャットデータ
"""
if self._callback is None:
items = await self._buffer.get()
return self.processor.process(items)
raise IllegalFunctionCall(
"既にcallbackを登録済みのため、get()は実行できません。")
def is_alive(self):
return self._is_alive
def finish(self,sender):
'''Listener終了時のコールバック'''
try:
self.terminate()
except CancelledError:
logger.debug(f'[{self.video_id}]cancelled:{sender}')
def terminate(self):
'''
Listenerを終了する。
'''
self._is_alive = False
if self._direct_mode == False:
#bufferにダミーオブジェクトを入れてis_alive()を判定させる
self._buffer.put_nowait({'chatdata':'','timeout':1})
logger.info(f'終了しました:[{self.video_id}]')
@classmethod
def _set_exception_handler(cls, handler):
loop = asyncio.get_event_loop()
#default handler: cls._handle_exception
loop.set_exception_handler(handler)
@classmethod
def _handle_exception(cls, loop, context):
#msg = context.get("exception", context["message"])
if not isinstance(context["exception"],CancelledError):
logger.error(f"Caught exception: {context}")
loop= asyncio.get_event_loop()
loop.create_task(cls.shutdown(None,None,None))
@classmethod
async def shutdown(cls, event, sig = None, handler=None):
logger.debug("シャットダウンしています")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
logger.debug(f"残っているタスクを終了しています")
await asyncio.gather(*tasks,return_exceptions=True)
loop = asyncio.get_event_loop()
loop.stop()

View File

@@ -0,0 +1,40 @@
import json
from .. import config
from .. import mylogger
from .. exceptions import (
ResponseContextError,
NoContentsException,
NoContinuationsException )
logger = mylogger.get_logger(__name__,mode=config.LOGGER_MODE)
class Parser:
@classmethod
def parse(cls, jsn):
if jsn is None:
return {'timeoutMs':0,'continuation':None},[]
if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError('動画に接続できません。'
'動画IDが間違っているか、動画が削除非公開の可能性があります。')
contents=jsn['response'].get('continuationContents')
#配信が終了した場合、もしくはチャットデータが取得できない場合
if contents is None:
raise NoContentsException('チャットデータを取得できませんでした。')
cont = contents['liveChatContinuation']['continuations'][0]
if cont is None:
raise NoContinuationsException('Continuationがありません。')
metadata = (cont.get('invalidationContinuationData') or
cont.get('timedContinuationData') or
cont.get('reloadContinuationData')
)
if metadata is None:
unknown = list(cont.keys())[0]
if unknown:
logger.error(f"Received unknown continuation type:{unknown}")
metadata = cont.get(unknown)
metadata.setdefault('timeoutMs', 10000)
chatdata = contents['liveChatContinuation'].get('actions')
return metadata, chatdata