Implement mining

This commit is contained in:
taizan-hokuto
2020-02-22 17:10:40 +09:00
parent eae485b914
commit e770d95fe8
12 changed files with 605 additions and 9 deletions

View File

View File

@@ -0,0 +1,141 @@
import aiohttp
import asyncio
import json
from . import parser
from . block import Block
from . dlworker import DownloadWorker
from . patch import Patch
from ... import config
from ... paramgen import arcparam_mining as arcparam
from concurrent.futures import CancelledError
from urllib.parse import quote
headers = config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay?continuation="
INTERVAL = 1
def _split(start, end, count, min_interval_sec = 120):
"""
Split section from `start` to `end` into `count` pieces,
and returns the beginning of each piece.
The `count` is adjusted so that the length of each piece
is no smaller than `min_interval`.
Returns:
--------
List of the offset of each block's first chat data.
"""
if not (isinstance(start,int) or isinstance(start,float)) or \
not (isinstance(end,int) or isinstance(end,float)):
raise ValueError("start/end must be int or float")
if not isinstance(count,int):
raise ValueError("count must be int")
if start>end:
raise ValueError("end must be equal to or greater than start.")
if count<1:
raise ValueError("count must be equal to or greater than 1.")
if (end-start)/count < min_interval_sec:
count = int((end-start)/min_interval_sec)
if count == 0 : count = 1
interval= (end-start)/count
if count == 1:
return [start]
return sorted( list(set( [int(start + interval*j)
for j in range(count) ])))
def ready_blocks(video_id, duration, div, callback):
if div <= 0: raise ValueError
async def _get_blocks( video_id, duration, div, callback):
async with aiohttp.ClientSession() as session:
tasks = [_create_block(session, video_id, seektime, callback)
for seektime in _split(0, duration, div)]
return await asyncio.gather(*tasks)
async def _create_block(session, video_id, seektime, callback):
continuation = arcparam.getparam(video_id, seektime = seektime)
url=(f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url, headers = headers) as resp:
chat_json = await resp.text()
if chat_json is None:
return
continuation, actions = parser.parse(json.loads(chat_json)[1])
first = seektime
seektime += INTERVAL
if callback:
callback(actions, INTERVAL)
return Block(
continuation = continuation,
chat_data = actions,
first = first,
last = seektime,
seektime = seektime
)
"""
fetch initial blocks.
"""
loop = asyncio.get_event_loop()
blocks = loop.run_until_complete(
_get_blocks(video_id, duration, div, callback))
return blocks
def download_patch(callback, blocks, video_id):
async def _allocate_workers():
workers = [
DownloadWorker(
fetch = _fetch, block = block,
blocks = blocks, video_id = video_id
)
for block in blocks
]
async with aiohttp.ClientSession() as session:
tasks = [worker.run(session) for worker in workers]
return await asyncio.gather(*tasks)
async def _fetch(seektime,session) -> Patch:
continuation = arcparam.getparam(video_id, seektime = seektime)
url=(f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs="
f"{int(seektime*1000)}&hidden=false&pbj=1")
async with session.get(url,headers = config.headers) as resp:
chat_json = await resp.text()
actions = []
try:
if chat_json is None:
return Patch()
continuation, actions = parser.parse(json.loads(chat_json)[1])
except json.JSONDecodeError:
pass
if callback:
callback(actions, INTERVAL)
return Patch(chats = actions, continuation = continuation,
seektime = seektime, last = seektime)
"""
allocate workers and assign blocks.
"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_allocate_workers())
except CancelledError:
pass
async def _shutdown():
print("\nshutdown...")
tasks = [t for t in asyncio.all_tasks()
if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
def cancel():
loop = asyncio.get_event_loop()
loop.create_task(_shutdown())

View File

@@ -0,0 +1,59 @@
from . import parser
class Block:
"""Block object represents something like a box
to join chunk of chatdata.
Parameter:
---------
first : int :
videoOffsetTimeMs of the first chat_data
(chat_data[0])
last : int :
videoOffsetTimeMs of the last chat_data.
(chat_data[-1])
this value increases as fetching chatdata progresses.
end : int :
target videoOffsetTimeMs of last chat data for download,
equals to first videoOffsetTimeMs of next block.
when download worker reaches this offset, stop downloading.
continuation : str :
continuation param of last chat data.
chat_data : list
done : bool :
whether this block has been downloaded.
remaining : int :
remaining data to download.
equals end - last.
is_last : bool :
whether this block is the last one in blocklist.
during_split : bool :
whether this block is in the process of during_split.
while True, this block is excluded from duplicate split procedure.
"""
__slots__ = ['first','last','end','continuation','chat_data','remaining',
'done','is_last','during_split','seektime']
def __init__(self, first = 0, last = 0, end = 0,
continuation = '', chat_data = [], is_last = False,
during_split = False,seektime = None):
self.first = first
self.last = last
self.end = end
self.continuation = continuation
self.chat_data = chat_data
self.done = False
self.remaining = self.end - self.last
self.is_last = is_last
self.during_split = during_split
self.seektime = seektime

View File

@@ -0,0 +1,54 @@
from . import parser
from . block import Block
from . patch import Patch, fill
from ... paramgen import arcparam
INTERVAL = 1
class DownloadWorker:
"""
DownloadWorker associates a download session with a block.
When the dlworker finishes downloading, the block
being downloaded is splitted and assigned the free dlworker.
Parameter
----------
fetch : func :
download function of asyncdl
block : Block :
Block object that includes chat_data
blocks : list :
List of Block(s)
video_id : str :
parent_block : Block :
the block from which current block is splitted
"""
__slots__ = ['block', 'fetch', 'blocks', 'video_id', 'parent_block']
def __init__(self, fetch, block, blocks, video_id ):
self.block:Block = block
self.fetch = fetch
self.blocks:list = blocks
self.video_id:str = video_id
self.parent_block:Block = None
async def run(self, session):
while self.block.continuation:
patch = await self.fetch(
self.block.seektime, session)
fill(self.block, patch)
self.block.seektime += INTERVAL
self.block.done = True
def fd(name,mes,src,patch,end):
def offset(chats):
if len(chats)==0:
return None,None
return parser.get_offset(chats[0]),parser.get_offset(chats[-1])
with open("v://tlog.csv",encoding="utf-8",mode="a") as f:
f.write(f"WORKER,{name},mes,{mes},edge,{offset(src)[1]},first,{offset(patch)[0]},last,{offset(patch)[1]},end,{end}\n")

View File

@@ -0,0 +1,72 @@
from . import asyncdl
from . import parser
from .. videoinfo import VideoInfo
from ... import config
from ... exceptions import InvalidVideoIdException
logger = config.logger(__name__)
headers=config.headers
class Downloader:
def __init__(self, video_id, duration, div, callback):
if not isinstance(div ,int) or div < 1:
raise ValueError('div must be positive integer.')
elif div > 10:
div = 10
if not isinstance(duration ,int) or duration < 1:
raise ValueError('duration must be positive integer.')
self.video_id = video_id
self.duration = duration
self.div = div
self.callback = callback
self.blocks = []
def _ready_blocks(self):
blocks = asyncdl.ready_blocks(
self.video_id, self.duration, self.div, self.callback)
self.blocks = [block for block in blocks if block is not None]
return self
def _set_block_end(self):
for i in range(len(self.blocks)-1):
self.blocks[i].end = self.blocks[i+1].first
self.blocks[-1].end = self.duration
self.blocks[-1].is_last =True
return self
def _download_blocks(self):
asyncdl.download_patch(self.callback, self.blocks, self.video_id)
return self
def _combine(self):
ret = []
for block in self.blocks:
ret.extend(block.chat_data)
return ret
def download(self):
return (
self._ready_blocks()
._set_block_end()
._download_blocks()
._combine()
)
def download(video_id, div = 1, callback = None, processor = None):
duration = 0
try:
duration = VideoInfo(video_id).get("duration")
except InvalidVideoIdException:
raise
if duration == 0:
print("video is live.")
return []
data = Downloader(video_id, duration, div, callback).download()
if processor is None:
return data
return processor.process(
[{'video_id':None,'timeout':1,'chatdata' : (action
for action in data)}]
)
def cancel():
asyncdl.cancel()

View File

@@ -0,0 +1,70 @@
import json
from ... import config
from ... exceptions import (
ResponseContextError,
NoContentsException,
NoContinuationsException )
logger = config.logger(__name__)
def parse(jsn):
"""
Parse replay chat data.
Parameter:
----------
jsn : dict
JSON of replay chat data.
Returns:
------
continuation : str
actions : list
"""
if jsn is None:
raise ValueError("parameter JSON is None")
if jsn['response']['responseContext'].get('errors'):
raise ResponseContextError(
'video_id is invalid or private/deleted.')
contents=jsn["response"].get('continuationContents')
if contents is None:
raise NoContentsException('No chat data.')
cont = contents['liveChatContinuation']['continuations'][0]
if cont is None:
raise NoContinuationsException('No Continuation')
metadata = cont.get('liveChatReplayContinuationData')
if metadata:
continuation = metadata.get("continuation")
#print(continuation)
actions = contents['liveChatContinuation'].get('actions')
# print(list(actions[0]['replayChatItemAction']["actions"][0].values()
# )[0]['item'].get("liveChatPaidMessageRenderer"))
if continuation:
return continuation, [action["replayChatItemAction"]["actions"][0]
for action in actions
if list(action['replayChatItemAction']["actions"][0].values()
)[0]['item'].get("liveChatPaidMessageRenderer")
or list(action['replayChatItemAction']["actions"][0].values()
)[0]['item'].get("liveChatPaidStickerRenderer")
]
return None, []
def get_offset(item):
return int(item['replayChatItemAction']["videoOffsetTimeMsec"])
def get_id(item):
return list((list(item['replayChatItemAction']["actions"][0].values()
)[0])['item'].values())[0].get('id')
def get_type(item):
return list((list(item['replayChatItemAction']["actions"][0].values()
)[0])['item'].keys())[0]
import re
_REGEX_YTINIT = re.compile("window\\[\"ytInitialData\"\\]\\s*=\\s*({.+?});\\s+")
def extract(text):
match = re.findall(_REGEX_YTINIT, str(text))
if match:
return match[0]
return None

View File

@@ -0,0 +1,27 @@
from . import parser
from . block import Block
from typing import NamedTuple
class Patch(NamedTuple):
"""
Patch represents chunk of chat data
which is fetched by asyncdl.download_patch._fetch().
"""
chats : list = []
continuation : str = None
seektime : float = None
first : int = None
last : int = None
def fill(block:Block, patch:Patch):
if patch.last < block.end:
set_patch(block, patch)
return
block.continuation = None
def set_patch(block:Block, patch:Patch):
block.continuation = patch.continuation
block.chat_data.extend(patch.chats)
block.last = patch.seektime
block.seektime = patch.seektime