diff --git a/pytchat/paramgen/arcparam.py b/pytchat/paramgen/arcparam.py index e91ba74..2cd1829 100644 --- a/pytchat/paramgen/arcparam.py +++ b/pytchat/paramgen/arcparam.py @@ -20,7 +20,7 @@ def _gen_vid(video_id): Return --------- - byte[] : base64 encoded video_id parameter. + bytes : base64 encoded video_id parameter. """ header_magic = b'\x0A\x0F\x1A\x0D\x0A' header_id = video_id.encode() diff --git a/pytchat/paramgen/arcparam_mining.py b/pytchat/paramgen/arcparam_mining.py new file mode 100644 index 0000000..e6694e2 --- /dev/null +++ b/pytchat/paramgen/arcparam_mining.py @@ -0,0 +1,133 @@ +from base64 import urlsafe_b64encode as b64enc +from functools import reduce +import math +import random +import urllib.parse + +''' +Generate continuation parameter of youtube replay chat. + +Author: taizan-hokuto (2019) @taizan205 + +ver 0.0.1 2019.10.05 +''' + +def _gen_vid_long(video_id): + """generate video_id parameter. + Parameter + --------- + video_id : str + + Return + --------- + byte[] : base64 encoded video_id parameter. + """ + header_magic = b'\x0A\x0F\x1A\x0D\x0A' + header_id = video_id.encode() + header_sep_1 = b'\x1A\x13\xEA\xA8\xDD\xB9\x01\x0D\x0A\x0B' + header_terminator = b'\x20\x01' + + item = [ + header_magic, + _nval(len(header_id)), + header_id, + header_sep_1, + header_id, + header_terminator + ] + + return urllib.parse.quote( + b64enc(reduce(lambda x, y: x+y, item)).decode() + ).encode() + +def _gen_vid(video_id): + """generate video_id parameter. + Parameter + --------- + video_id : str + + Return + --------- + bytes : base64 encoded video_id parameter. + """ + header_magic = b'\x0A\x0F\x1A\x0D\x0A' + header_id = video_id.encode() + header_terminator = b'\x20\x01' + + item = [ + header_magic, + _nval(len(header_id)), + header_id, + header_terminator + ] + + return urllib.parse.quote( + b64enc(reduce(lambda x, y: x+y, item)).decode() + ).encode() + +def _nval(val): + """convert value to byte array""" + if val<0: raise ValueError + buf = b'' + while val >> 7: + m = val & 0xFF | 0x80 + buf += m.to_bytes(1,'big') + val >>= 7 + buf += val.to_bytes(1,'big') + return buf + +def _build(video_id, seektime, topchat_only): + switch_01 = b'\x04' if topchat_only else b'\x01' + if seektime < 0: + raise ValueError("seektime must be greater than or equal to zero.") + if seektime == 0: + times = b'' + else: + times =_nval(int(seektime*1000)) + if seektime > 0: + _len_time = ( b'\x5A' + + (len(times)+1).to_bytes(1,'big') + + b'\x10') + else: + _len_time = b'' + + header_magic = b'\xA2\x9D\xB0\xD3\x04' + sep_0 = b'\x1A' + vid = _gen_vid(video_id) + _tag = b'\x40\x01' + timestamp1 = times + sep_1 = b'\x60\x04\x72\x02\x08' + terminator = b'\x78\x01' + + body = [ + sep_0, + _nval(len(vid)), + vid, + _tag, + _len_time, + timestamp1, + sep_1, + switch_01, + terminator + ] + + body = reduce(lambda x, y: x+y, body) + + return urllib.parse.quote( + b64enc( header_magic + + _nval(len(body)) + + body + ).decode() + ) + +def getparam(video_id, seektime = 0.0, topchat_only = False): + ''' + Parameter + --------- + seektime : int + unit:seconds + start position of fetching chat data. + topchat_only : bool + if True, fetch only 'top chat' + ''' + return _build(video_id, seektime, topchat_only) diff --git a/pytchat/tool/download/downloader.py b/pytchat/tool/download/downloader.py index 23a4fad..15ab152 100644 --- a/pytchat/tool/download/downloader.py +++ b/pytchat/tool/download/downloader.py @@ -81,8 +81,8 @@ def download(video_id, div = 1, callback = None, processor = None): if processor is None: return data return processor.process( - [{'video_id':None,'timeout':1,'chatdata' : [action - ["replayChatItemAction"]["actions"][0] for action in data]}] + [{'video_id':None,'timeout':1,'chatdata' : (action + ["replayChatItemAction"]["actions"][0] for action in data)}] ) def cancel(): diff --git a/pytchat/tool/mining/__init__.py b/pytchat/tool/mining/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pytchat/tool/mining/asyncdl.py b/pytchat/tool/mining/asyncdl.py new file mode 100644 index 0000000..fd62b56 --- /dev/null +++ b/pytchat/tool/mining/asyncdl.py @@ -0,0 +1,141 @@ + +import aiohttp +import asyncio +import json +from . import parser +from . block import Block +from . dlworker import DownloadWorker +from . patch import Patch +from ... import config +from ... paramgen import arcparam_mining as arcparam +from concurrent.futures import CancelledError +from urllib.parse import quote + +headers = config.headers +REPLAY_URL = "https://www.youtube.com/live_chat_replay?continuation=" +INTERVAL = 1 +def _split(start, end, count, min_interval_sec = 120): + """ + Split section from `start` to `end` into `count` pieces, + and returns the beginning of each piece. + The `count` is adjusted so that the length of each piece + is no smaller than `min_interval`. + + Returns: + -------- + List of the offset of each block's first chat data. + """ + + if not (isinstance(start,int) or isinstance(start,float)) or \ + not (isinstance(end,int) or isinstance(end,float)): + raise ValueError("start/end must be int or float") + if not isinstance(count,int): + raise ValueError("count must be int") + if start>end: + raise ValueError("end must be equal to or greater than start.") + if count<1: + raise ValueError("count must be equal to or greater than 1.") + if (end-start)/count < min_interval_sec: + count = int((end-start)/min_interval_sec) + if count == 0 : count = 1 + interval= (end-start)/count + + if count == 1: + return [start] + return sorted( list(set( [int(start + interval*j) + for j in range(count) ]))) + +def ready_blocks(video_id, duration, div, callback): + if div <= 0: raise ValueError + + async def _get_blocks( video_id, duration, div, callback): + async with aiohttp.ClientSession() as session: + tasks = [_create_block(session, video_id, seektime, callback) + for seektime in _split(0, duration, div)] + return await asyncio.gather(*tasks) + + + + async def _create_block(session, video_id, seektime, callback): + continuation = arcparam.getparam(video_id, seektime = seektime) + url=(f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs=" + f"{int(seektime*1000)}&hidden=false&pbj=1") + async with session.get(url, headers = headers) as resp: + chat_json = await resp.text() + if chat_json is None: + return + continuation, actions = parser.parse(json.loads(chat_json)[1]) + first = seektime + seektime += INTERVAL + if callback: + callback(actions, INTERVAL) + return Block( + continuation = continuation, + chat_data = actions, + first = first, + last = seektime, + seektime = seektime + ) + """ + fetch initial blocks. + """ + loop = asyncio.get_event_loop() + blocks = loop.run_until_complete( + _get_blocks(video_id, duration, div, callback)) + return blocks + +def download_patch(callback, blocks, video_id): + + async def _allocate_workers(): + workers = [ + DownloadWorker( + fetch = _fetch, block = block, + blocks = blocks, video_id = video_id + ) + for block in blocks + ] + async with aiohttp.ClientSession() as session: + tasks = [worker.run(session) for worker in workers] + return await asyncio.gather(*tasks) + + async def _fetch(seektime,session) -> Patch: + continuation = arcparam.getparam(video_id, seektime = seektime) + url=(f"{REPLAY_URL}{quote(continuation)}&playerOffsetMs=" + f"{int(seektime*1000)}&hidden=false&pbj=1") + async with session.get(url,headers = config.headers) as resp: + chat_json = await resp.text() + actions = [] + try: + if chat_json is None: + return Patch() + continuation, actions = parser.parse(json.loads(chat_json)[1]) + except json.JSONDecodeError: + pass + if callback: + callback(actions, INTERVAL) + return Patch(chats = actions, continuation = continuation, + seektime = seektime, last = seektime) + """ + allocate workers and assign blocks. + """ + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(_allocate_workers()) + except CancelledError: + pass + +async def _shutdown(): + print("\nshutdown...") + tasks = [t for t in asyncio.all_tasks() + if t is not asyncio.current_task()] + for task in tasks: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + +def cancel(): + loop = asyncio.get_event_loop() + loop.create_task(_shutdown()) + \ No newline at end of file diff --git a/pytchat/tool/mining/block.py b/pytchat/tool/mining/block.py new file mode 100644 index 0000000..cbd0f68 --- /dev/null +++ b/pytchat/tool/mining/block.py @@ -0,0 +1,59 @@ +from . import parser +class Block: + """Block object represents something like a box + to join chunk of chatdata. + + Parameter: + --------- + first : int : + videoOffsetTimeMs of the first chat_data + (chat_data[0]) + + last : int : + videoOffsetTimeMs of the last chat_data. + (chat_data[-1]) + + this value increases as fetching chatdata progresses. + + end : int : + target videoOffsetTimeMs of last chat data for download, + equals to first videoOffsetTimeMs of next block. + when download worker reaches this offset, stop downloading. + + continuation : str : + continuation param of last chat data. + + chat_data : list + + done : bool : + whether this block has been downloaded. + + remaining : int : + remaining data to download. + equals end - last. + + is_last : bool : + whether this block is the last one in blocklist. + + during_split : bool : + whether this block is in the process of during_split. + while True, this block is excluded from duplicate split procedure. + """ + + __slots__ = ['first','last','end','continuation','chat_data','remaining', + 'done','is_last','during_split','seektime'] + + def __init__(self, first = 0, last = 0, end = 0, + continuation = '', chat_data = [], is_last = False, + during_split = False,seektime = None): + self.first = first + self.last = last + self.end = end + self.continuation = continuation + self.chat_data = chat_data + self.done = False + self.remaining = self.end - self.last + self.is_last = is_last + self.during_split = during_split + self.seektime = seektime + diff --git a/pytchat/tool/mining/dlworker.py b/pytchat/tool/mining/dlworker.py new file mode 100644 index 0000000..b20ec42 --- /dev/null +++ b/pytchat/tool/mining/dlworker.py @@ -0,0 +1,54 @@ +from . import parser +from . block import Block +from . patch import Patch, fill +from ... paramgen import arcparam +INTERVAL = 1 +class DownloadWorker: + """ + DownloadWorker associates a download session with a block. + + When the dlworker finishes downloading, the block + being downloaded is splitted and assigned the free dlworker. + + Parameter + ---------- + fetch : func : + download function of asyncdl + + block : Block : + Block object that includes chat_data + + blocks : list : + List of Block(s) + + video_id : str : + + parent_block : Block : + the block from which current block is splitted + """ + __slots__ = ['block', 'fetch', 'blocks', 'video_id', 'parent_block'] + def __init__(self, fetch, block, blocks, video_id ): + self.block:Block = block + self.fetch = fetch + self.blocks:list = blocks + self.video_id:str = video_id + self.parent_block:Block = None + + + async def run(self, session): + while self.block.continuation: + patch = await self.fetch( + self.block.seektime, session) + fill(self.block, patch) + self.block.seektime += INTERVAL + self.block.done = True + + +def fd(name,mes,src,patch,end): + def offset(chats): + if len(chats)==0: + return None,None + return parser.get_offset(chats[0]),parser.get_offset(chats[-1]) + + with open("v://tlog.csv",encoding="utf-8",mode="a") as f: + f.write(f"WORKER,{name},mes,{mes},edge,{offset(src)[1]},first,{offset(patch)[0]},last,{offset(patch)[1]},end,{end}\n") diff --git a/pytchat/tool/mining/downloader.py b/pytchat/tool/mining/downloader.py new file mode 100644 index 0000000..0df892b --- /dev/null +++ b/pytchat/tool/mining/downloader.py @@ -0,0 +1,72 @@ +from . import asyncdl +from . import parser +from .. videoinfo import VideoInfo +from ... import config +from ... exceptions import InvalidVideoIdException +logger = config.logger(__name__) +headers=config.headers + +class Downloader: + def __init__(self, video_id, duration, div, callback): + if not isinstance(div ,int) or div < 1: + raise ValueError('div must be positive integer.') + elif div > 10: + div = 10 + if not isinstance(duration ,int) or duration < 1: + raise ValueError('duration must be positive integer.') + self.video_id = video_id + self.duration = duration + self.div = div + self.callback = callback + self.blocks = [] + + def _ready_blocks(self): + blocks = asyncdl.ready_blocks( + self.video_id, self.duration, self.div, self.callback) + self.blocks = [block for block in blocks if block is not None] + return self + + def _set_block_end(self): + for i in range(len(self.blocks)-1): + self.blocks[i].end = self.blocks[i+1].first + self.blocks[-1].end = self.duration + self.blocks[-1].is_last =True + return self + + def _download_blocks(self): + asyncdl.download_patch(self.callback, self.blocks, self.video_id) + return self + + def _combine(self): + ret = [] + for block in self.blocks: + ret.extend(block.chat_data) + return ret + + def download(self): + return ( + self._ready_blocks() + ._set_block_end() + ._download_blocks() + ._combine() + ) + +def download(video_id, div = 1, callback = None, processor = None): + duration = 0 + try: + duration = VideoInfo(video_id).get("duration") + except InvalidVideoIdException: + raise + if duration == 0: + print("video is live.") + return [] + data = Downloader(video_id, duration, div, callback).download() + if processor is None: + return data + return processor.process( + [{'video_id':None,'timeout':1,'chatdata' : (action + for action in data)}] + ) + +def cancel(): + asyncdl.cancel() \ No newline at end of file diff --git a/pytchat/tool/mining/parser.py b/pytchat/tool/mining/parser.py new file mode 100644 index 0000000..36c915c --- /dev/null +++ b/pytchat/tool/mining/parser.py @@ -0,0 +1,70 @@ +import json +from ... import config +from ... exceptions import ( + ResponseContextError, + NoContentsException, + NoContinuationsException ) + +logger = config.logger(__name__) + +def parse(jsn): + """ + Parse replay chat data. + Parameter: + ---------- + jsn : dict + JSON of replay chat data. + Returns: + ------ + continuation : str + actions : list + + """ + if jsn is None: + raise ValueError("parameter JSON is None") + if jsn['response']['responseContext'].get('errors'): + raise ResponseContextError( + 'video_id is invalid or private/deleted.') + contents=jsn["response"].get('continuationContents') + if contents is None: + raise NoContentsException('No chat data.') + + cont = contents['liveChatContinuation']['continuations'][0] + if cont is None: + raise NoContinuationsException('No Continuation') + metadata = cont.get('liveChatReplayContinuationData') + if metadata: + continuation = metadata.get("continuation") + #print(continuation) + actions = contents['liveChatContinuation'].get('actions') + # print(list(actions[0]['replayChatItemAction']["actions"][0].values() + # )[0]['item'].get("liveChatPaidMessageRenderer")) + if continuation: + return continuation, [action["replayChatItemAction"]["actions"][0] + for action in actions + if list(action['replayChatItemAction']["actions"][0].values() + )[0]['item'].get("liveChatPaidMessageRenderer") + or list(action['replayChatItemAction']["actions"][0].values() + )[0]['item'].get("liveChatPaidStickerRenderer") + ] + return None, [] + + +def get_offset(item): + return int(item['replayChatItemAction']["videoOffsetTimeMsec"]) + +def get_id(item): + return list((list(item['replayChatItemAction']["actions"][0].values() + )[0])['item'].values())[0].get('id') + +def get_type(item): + return list((list(item['replayChatItemAction']["actions"][0].values() + )[0])['item'].keys())[0] +import re +_REGEX_YTINIT = re.compile("window\\[\"ytInitialData\"\\]\\s*=\\s*({.+?});\\s+") +def extract(text): + + match = re.findall(_REGEX_YTINIT, str(text)) + if match: + return match[0] + return None diff --git a/pytchat/tool/mining/patch.py b/pytchat/tool/mining/patch.py new file mode 100644 index 0000000..186cc82 --- /dev/null +++ b/pytchat/tool/mining/patch.py @@ -0,0 +1,27 @@ +from . import parser +from . block import Block +from typing import NamedTuple + +class Patch(NamedTuple): + """ + Patch represents chunk of chat data + which is fetched by asyncdl.download_patch._fetch(). + """ + chats : list = [] + continuation : str = None + seektime : float = None + first : int = None + last : int = None + +def fill(block:Block, patch:Patch): + if patch.last < block.end: + set_patch(block, patch) + return + block.continuation = None + +def set_patch(block:Block, patch:Patch): + block.continuation = patch.continuation + block.chat_data.extend(patch.chats) + block.last = patch.seektime + block.seektime = patch.seektime + diff --git a/tests/test_arcparam.py b/tests/test_arcparam.py index e30466e..e05d85c 100644 --- a/tests/test_arcparam.py +++ b/tests/test_arcparam.py @@ -1,29 +1,29 @@ import pytest -from pytchat.parser.replay import Parser +from pytchat.parser.live import Parser import pytchat.config as config import requests, json from pytchat.paramgen import arcparam def test_arcparam_0(mocker): param = arcparam.getparam("01234567890",-1) - assert "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoADAAOABAAEgEUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAHIECAEQAHgA" == param + assert param == "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoADAAOABAAEgEUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAHIECAEQAHgA" def test_arcparam_1(mocker): param = arcparam.getparam("01234567890", seektime = 100000) - assert "op2w0wR3GjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QogNDbw_QCMAA4AEAASANSHAgAEAAYACAAKg5zdGF0aWNjaGVja3N1bUAAWANgAWgAcgQIARAAeAA%3D" == param + assert param == "op2w0wR3GjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QogNDbw_QCMAA4AEAASANSHAgAEAAYACAAKg5zdGF0aWNjaGVja3N1bUAAWANgAWgAcgQIARAAeAA%3D" def test_arcparam_2(mocker): param = arcparam.getparam("SsjCnHOk-Sk") url=f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1" resp = requests.Session().get(url,headers = config.headers) jsn = json.loads(resp.text) - parser = Parser() - _ , chatdata = parser.parse(jsn) + parser = Parser(is_replay=True) + contents= parser.get_contents(jsn) + _ , chatdata = parser.parse(contents) test_id = chatdata[0]["addChatItemAction"]["item"]["liveChatTextMessageRenderer"]["id"] - print(test_id) - assert "CjoKGkNMYXBzZTdudHVVQ0Zjc0IxZ0FkTnFnQjVREhxDSnlBNHV2bnR1VUNGV0dnd2dvZDd3NE5aZy0w" == test_id + assert test_id == "CjoKGkNMYXBzZTdudHVVQ0Zjc0IxZ0FkTnFnQjVREhxDSnlBNHV2bnR1VUNGV0dnd2dvZDd3NE5aZy0w" def test_arcparam_3(mocker): param = arcparam.getparam("01234567890") - assert "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoATAAOABAAEgDUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAHIECAEQAHgA" == param + assert param == "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoATAAOABAAEgDUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAHIECAEQAHgA" diff --git a/tests/test_arcparam_mining.py b/tests/test_arcparam_mining.py new file mode 100644 index 0000000..7cfcee4 --- /dev/null +++ b/tests/test_arcparam_mining.py @@ -0,0 +1,40 @@ +import pytest +from pytchat.tool.mining import parser +import pytchat.config as config +import requests, json +from pytchat.paramgen import arcparam_mining as arcparam + +def test_arcparam_e(mocker): + try: + arcparam.getparam("01234567890",-1) + assert False + except ValueError: + assert True + + + + +def test_arcparam_0(mocker): + param = arcparam.getparam("01234567890",0) + + assert param =="op2w0wQsGiBDZzhhRFFvTE1ERXlNelExTmpjNE9UQWdBUSUzRCUzREABYARyAggBeAE%3D" + + +def test_arcparam_1(mocker): + param = arcparam.getparam("01234567890", seektime = 100000) + print(param) + assert param == "op2w0wQzGiBDZzhhRFFvTE1ERXlNelExTmpjNE9UQWdBUSUzRCUzREABWgUQgMLXL2AEcgIIAXgB" + +def test_arcparam_2(mocker): + param = arcparam.getparam("PZz9NB0-Z64",1) + url=f"https://www.youtube.com/live_chat_replay?continuation={param}&playerOffsetMs=1000&pbj=1" + resp = requests.Session().get(url,headers = config.headers) + jsn = json.loads(resp.text) + _ , chatdata = parser.parse(jsn[1]) + test_id = chatdata[0]["addChatItemAction"]["item"]["liveChatPaidMessageRenderer"]["id"] + print(test_id) + assert test_id == "ChwKGkNKSGE0YnFJeWVBQ0ZWcUF3Z0VkdGIwRm9R" + +def test_arcparam_3(mocker): + param = arcparam.getparam("01234567890") + assert param == "op2w0wQsGiBDZzhhRFFvTE1ERXlNelExTmpjNE9UQWdBUSUzRCUzREABYARyAggBeAE%3D" diff --git a/tests/test_patch.py b/tests/test_dl_patch.py similarity index 100% rename from tests/test_patch.py rename to tests/test_dl_patch.py