Files
pytchat-fork/pytchat/processors/speed_calculator.py
2019-12-19 01:21:49 +09:00

193 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
speedmeter.py
チャットの勢いを算出するChatProcessor
Calculate speed of chat.
"""
import calendar, datetime, pytz
class RingQueue:
"""
リング型キュー
Attributes
----------
items : list
格納されているアイテムのリスト。
first_pos : int
キュー内の一番古いアイテムを示すリストのインデックス。
last_pos : int
キュー内の一番新しいアイテムを示すリストのインデックス。
mergin : boolean
キュー内に余裕があるか。キュー内のアイテム個数が、キューの最大個数未満であればTrue。
"""
def __init__(self, capacity = 10):
"""
コンストラクタ
Parameter
----------
capacity:このキューに格納するアイテムの最大個数。
格納時に最大個数を超える場合は一番古いアイテムから
上書きする。
"""
if capacity <= 0:
raise ValueError
self.items = list()
self.capacity = capacity
self.first_pos = 0
self.last_pos = 0
self.mergin = True
def put(self, item):
"""
引数itemに指定されたアイテムをこのキューに格納する。
キューの最大個数を超える場合は、一番古いアイテムの位置に上書きする。
Parameter
----------
item:格納するアイテム
"""
if self.mergin:
self.items.append(item)
self.last_pos = len(self.items)-1
if self.last_pos == self.capacity-1:
self.mergin = False
return
self.last_pos += 1
if self.last_pos > self.capacity-1:
self.last_pos = 0
self.items[self.last_pos] = item
self.first_pos += 1
if self.first_pos > self.capacity-1:
self.first_pos = 0
def get(self):
"""
キュー内の一番古いアイテムへの参照を返す
(アイテムは削除しない)
Return
----------
キュー内の一番古いアイテムへの参照
"""
return self.items[self.first_pos]
def item_count(self):
return len(self.items)
class SpeedCalculator(RingQueue):
"""
チャットの勢いを計算するクラス
Parameter
----------
格納するチャットブロックの数
"""
def __init__(self, capacity, video_id):
super().__init__(capacity)
self.video_id=video_id
self.speed = 0
def process(self, chat_components: list):
if chat_components:
for component in chat_components:
chatdata = component.get('chatdata')
if chatdata is None:
return self.speed
self.speed = self.calc(chatdata)
return self.speed
def _value(self):
"""
ActionsQueue内のチャットデータリストから、
チャット速度を計算して返す
Return
---------------------------
チャット速度(1分間で換算したチャット数)
"""
try:
#キュー内のactionsの総チャット数
total = sum(item['chat_count'] for item in self.items)
#キュー内の最初と最後のチャットの時間差
duration = (self.items[self.last_pos]['endtime']
- self.items[self.first_pos]['starttime'])
if duration != 0:
return int(total*60/duration)
return 0
except IndexError:
return 0
def _get_timestamp(self, action :dict):
"""
チャットデータのtimestampUsecを読み取る
liveChatTickerSponsorItemRenderer等のtickerデータは時刻格納位置が
異なるため、時刻データなしとして扱う
"""
try:
item = action['addChatItemAction']['item']
timestamp = int(item[list(item.keys())[0]]['timestampUsec'])
except (KeyError,TypeError):
return None
return timestamp
def calc(self,actions):
def empty_data():
'''
データがない場合にゼロのデータをリングキューに入れる
'''
timestamp_now = calendar.timegm(datetime.datetime.
now(pytz.utc).utctimetuple())
self.put({
'chat_count':0,
'starttime':int(timestamp_now),
'endtime':int(timestamp_now)
})
return self._value()
if actions is None or len(actions)==0:
return empty_data
#actions内の時刻データを持つチャットデータの数tickerは除く
counter=0
#actions内の最初のチャットデータの時刻
starttime= None
#actions内の最後のチャットデータの時刻
endtime=None
for action in actions:
#チャットデータからtimestampUsecを読み取る
gettime = self._get_timestamp(action)
#時刻のないデータだった場合は次の行のデータで読み取り試行
if gettime is None:
continue
#最初に有効な時刻を持つデータのtimestampをstarttimeに設定
if starttime is None:
starttime = gettime
#最後のtimestampを設定(途中で時刻のないデータの場合もあるので上書きしていく)
endtime = gettime
#チャットの数をインクリメント
counter+=1
#チャット速度用のデータをリングキューに送る
if starttime is None or endtime is None:
return empty_data
self.put({
'chat_count':counter,
'starttime':int(starttime/1000000),
'endtime':int(endtime/1000000)
})
return self._value()