Files
pytchat-fork/pytchat/downloader/downloader.py
2020-01-26 14:21:18 +09:00

248 lines
8.4 KiB
Python

import asyncio
import aiohttp
import json
import traceback
from urllib.parse import quote
from . import parser
from .. import config
from .. import util
from .. paramgen import arcparam
logger = config.logger(__name__)
headers=config.headers
REPLAY_URL = "https://www.youtube.com/live_chat_replay/" \
"get_live_chat_replay?continuation="
class Block:
def __init__(self, pos=0, first=0, last=0,
continuation='', chat_data=[]):
self.pos = pos
self.first = first
self.last = last
self.temp_last = 0
self.continuation = continuation
self.chat_data = chat_data
class Downloader:
def __init__(self, video_id, duration, div, callback=None):
self.video_id = video_id
self.duration = duration
self.div = div
self.blocks = []
self.callback = callback
def ready_blocks(self):
if self.div <= 0: raise ValueError
def _divide(start, end, count):
min_interval = 120
if (not isinstance(start,int) or
not isinstance(end,int) or
not isinstance(count,int)):
raise ValueError("start/end/count must be int")
if start>end:
raise ValueError("end must be equal to or greater than start.")
if count<1:
raise ValueError("count must be equal to or greater than 1.")
if (end-start)/count < min_interval:
count = int((end-start)/min_interval)
if count == 0 : count = 1
interval= (end-start)/count
if count == 1:
return [start]
return sorted(list(set([int(start+interval*j)
for j in range(count) ])))
async def _get_blocks(duration,div):
async with aiohttp.ClientSession() as session:
futures = [_create_block(session, pos, seektime)
for pos, seektime in enumerate(_divide(-1, duration, div))]
return await asyncio.gather(*futures,return_exceptions=True)
async def _create_block(session, pos, seektime):
continuation = arcparam.getparam(
self.video_id, seektime = seektime)
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url, headers = headers) as resp:
text = await resp.text()
next_continuation, actions = parser.parse(json.loads(text))
first = parser.get_offset(actions[0])
last = parser.get_offset(actions[-1])
if self.callback:
self.callback(actions,last-first)
return Block(
pos = pos,
continuation = next_continuation,
chat_data = actions,
first = first,
last = last
)
loop = asyncio.get_event_loop()
self.blocks = loop.run_until_complete(
_get_blocks(self.duration, self.div))
return self
def remove_duplicate_head(self):
blocks = self.blocks
def is_same_offset(index):
return (blocks[index].first == blocks[index+1].first)
def is_same_id(index):
id_0 = parser.get_id(blocks[index].chat_data[0])
id_1 = parser.get_id(blocks[index+1].chat_data[0])
return (id_0 == id_1)
def is_same_type(index):
type_0 = parser.get_type(blocks[index].chat_data[0])
type_1 = parser.get_type(blocks[index+1].chat_data[0])
return (type_0 == type_1)
ret = []
[ret.append(blocks[i]) for i in range(len(blocks)-1)
if (len(blocks[i].chat_data)>0 and
not ( is_same_offset(i) and is_same_id(i) and is_same_type(i)))]
ret.append(blocks[-1])
self.blocks = ret
return self
def set_temporary_last(self):
for i in range(len(self.blocks)-1):
self.blocks[i].temp_last = self.blocks[i+1].first
self.blocks[-1].temp_last = -1
return self
def remove_overwrap(self):
blocks = self.blocks
if len(blocks) == 1 : return self
def is_overwrap(a, b):
return (blocks[a].last > blocks[b].first)
ret = []
a = 0
b = 1
jmp = False
ret.append(blocks[0])
while a < len(blocks)-2:
while is_overwrap(a,b):
b+=1
if b == len(blocks)-1:
jmp = True
break
if jmp: break
if b-a == 1:
a = b
else:
a = b-1
ret.append(blocks[a])
b = a+1
ret.append(blocks[-1])
self.blocks = ret
return self
def download_each_block(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._dl_block())
return self
async def _dl_block(self):
futures = []
async with aiohttp.ClientSession() as session:
futures = [self._dl_chunk(session, block) for block in self.blocks]
return await asyncio.gather(*futures,return_exceptions=True)
async def _dl_chunk(self, session, block:Block):
if (block.temp_last != -1 and
block.last > block.temp_last):
return
def get_last_offset(actions):
return parser.get_offset(actions[-1])
continuation = block.continuation
while continuation:
url = f"{REPLAY_URL}{quote(continuation)}&pbj=1"
async with session.get(url,headers = config.headers) as resp:
text = await resp.text()
continuation, actions = parser.parse(json.loads(text))
if actions:
block.chat_data.extend(actions)
last = get_last_offset(actions)
first = parser.get_offset(actions[0])
if self.callback:
self.callback(actions,last-first)
if block.temp_last != -1:
if last > block.temp_last:
block.last = last
break
else:
block.last = last
def remove_duplicate_tail(self):
blocks = self.blocks
if len(blocks) == 1 : return self
def is_same_offset(index):
return (blocks[index-1].last == blocks[index].last)
def is_same_id(index):
id_0 = parser.get_id(blocks[index-1].chat_data[-1])
id_1 = parser.get_id(blocks[index].chat_data[-1])
return (id_0 == id_1)
def is_same_type(index):
type_0 = parser.get_type(blocks[index-1].chat_data[-1])
type_1 = parser.get_type(blocks[index].chat_data[-1])
return (type_0 == type_1)
ret = []
ret.append(blocks[0])
[ret.append(blocks[i]) for i in range(1,len(blocks)-1)
if not ( is_same_offset(i) and is_same_id(i) and is_same_type(i) )]
ret.append(self.blocks[-1])
self.blocks = ret
return self
def combine(self):
line = ''
try:
if len(self.blocks[0].chat_data)>0:
lastline=self.blocks[0].chat_data[-1]
lastline_offset = parser.get_offset(lastline)
else: return None
for i in range(1,len(self.blocks)):
f=self.blocks[i].chat_data
if len(f)==0:
logger.error(f'zero size piece.:{str(i)}')
continue
for row in range(len(f)):
line = f[row]
if parser.get_offset(line) > lastline_offset:
self.blocks[0].chat_data.extend(f[row:])
break
else:
logger.error(
f'Missing connection.: pos:{str(i-1)}->{str(i)}'
f' lastline_offset= {lastline_offset}')
lastline_offset = parser.get_offset( f[-1])
return self.blocks[0].chat_data
except Exception as e:
logger.error(f"{type(e)} {str(e)} {line}")
traceback.print_exc()
def download(self):
return (
self.ready_blocks()
.remove_duplicate_head()
.set_temporary_last()
.remove_overwrap()
.download_each_block()
.remove_duplicate_tail()
.combine()
)