From aab7c14d4815a8c424021f3b040440e77f6cf2c3 Mon Sep 17 00:00:00 2001 From: taizan-hokuto <55448286+taizan-hokuto@users.noreply.github.com> Date: Sun, 10 Nov 2019 15:25:25 +0900 Subject: [PATCH] Add archived chat retriever --- pytchat/__init__.py | 2 +- pytchat/paramgen/arcparam.py | 128 +++++++++++++++++++++++++++++++++++ tests/test_arcparam.py | 26 +++++++ tests/test_liveparam.py | 12 ++++ 4 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 pytchat/paramgen/arcparam.py create mode 100644 tests/test_arcparam.py create mode 100644 tests/test_liveparam.py diff --git a/pytchat/__init__.py b/pytchat/__init__.py index 60da97f..dbb580f 100644 --- a/pytchat/__init__.py +++ b/pytchat/__init__.py @@ -1,5 +1,5 @@ """ -pytchat is a python library for fetching youtube live chat. +pytchat is a python library for fetching youtube live chat without using yt api, Selenium, or BeautifulSoup. """ __copyright__ = 'Copyright (C) 2019 taizan-hokuto' __version__ = '0.0.2.3' diff --git a/pytchat/paramgen/arcparam.py b/pytchat/paramgen/arcparam.py new file mode 100644 index 0000000..31f1c55 --- /dev/null +++ b/pytchat/paramgen/arcparam.py @@ -0,0 +1,128 @@ +from base64 import urlsafe_b64encode as b64enc +from functools import reduce +import calendar, datetime, pytz +import math +import random +import urllib.parse + + +def _gen_vid(video_id): + """generate video_id parameter. + Parameter + --------- + video_id : str + + Return + --------- + byte[] : base64 encoded video_id parameter. + """ + header_magic = b'\x0A\x0F\x1A\x0D\x0A' + header_id = video_id.encode() + header_sep_1 = b'\x1A\x13\xEA\xA8\xDD\xB9\x01\x0D\x0A\x0B' + header_terminator = b'\x20\x01' + + item = [ + header_magic, + _nval(len(header_id)), + header_id, + header_sep_1, + header_id, + header_terminator + ] + + return urllib.parse.quote( + b64enc(reduce(lambda x, y: x+y, item)).decode() + ).encode() + +def _nval(val): + """convert value to byte array""" + if val<0: raise ValueError + buf = b'' + while val >> 7: + m = val & 0xFF | 0x80 + buf += m.to_bytes(1,'big') + val >>= 7 + buf += val.to_bytes(1,'big') + return buf + +def get(video_id, pos = 0, topchatonly = False): + switch_01 = b'\x04' if topchatonly else b'\x01' + + + if pos<0: + raise ValueError('pos is 0 or positive number.') + if pos == 0: + times =_nval(1) + switch = b'\x04' + else: + times =_nval(int(pos*1000000)) + switch = b'\x03' + header_magic= b'\xA2\x9D\xB0\xD3\x04' + sep_0 = b'\x1A' + vid = _gen_vid(video_id) + time_tag = b'\x28' + timestamp1 = times + sep_1 = b'\x30\x00\x38\x00\x40\x00\x48' + sep_2 = b'\x52\x1C\x08\x00\x10\x00\x18\x00\x20\x00' + chkstr = b'\x2A\x0E\x73\x74\x61\x74\x69\x63\x63\x68\x65\x63\x6B\x73\x75\x6D\x40' + sep_3 = b'\x00\x58\x03\x60' + sep_4 = b'\x68\x00\x72\x04\x08' + sep_5 = b'\x10\x00\x78\x00' + + body = [ + sep_0, + _nval(len(vid)), + vid, + time_tag, + timestamp1, + sep_1, + switch, + sep_2, + chkstr, + sep_3, + switch_01, + sep_4, + switch_01, + sep_5 + ] + + body = reduce(lambda x, y: x+y, body) + + return urllib.parse.quote( + b64enc( header_magic + + _nval(len(body)) + + body + ).decode() + ) + +import requests +import json +from .. core_multithread.parser import Parser +if __name__=='__main__': + headers = { + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36'} + + #"sirpY2XUktI" + # print(_gen_vid("KeytykF37jY")) + param = get("OzBmHAOA2rA",pos = 0) + # target = "op2w0wRyGjxDZzhhRFFvTFMyVjVkSGxyUmpNM2Fsa2FFLXFvM2JrQkRRb0xTMlY1ZEhsclJqTTNhbGtnQVElM0QlM0QoATAAOABAAEgEUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAHIECAEQAHgA" + print(param) + # if param == target: + # print('ok') + # else: + # print('ng') + # print(target) + # for i,t in enumerate(target): + # if(param[i]!=t): + # print('^',end = '') + # else: + # print(' ',end = '') + url=f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1" + print(url) + resp = requests.Session().get(url,headers = headers) + print(resp) + jsn = json.loads(resp.text) + print(jsn) + parser = Parser() + metadata , chatdata = parser.parse(jsn) + print(chatdata[0]) diff --git a/tests/test_arcparam.py b/tests/test_arcparam.py new file mode 100644 index 0000000..d5d1c54 --- /dev/null +++ b/tests/test_arcparam.py @@ -0,0 +1,26 @@ +import pytest +from pytchat.core_multithread.parser import Parser +import pytchat.config as config +import requests, json +from pytchat.paramgen import arcparam + +def test_arcparam_0(mocker): + param = arcparam.get("01234567890") + assert "op2w0wRyGjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QoATAAOABAAEgEUhwIABAAGAAgACoOc3RhdGljY2hlY2tzdW1AAFgDYAFoAHIECAEQAHgA" == param + + +def test_arcparam_1(mocker): + param = arcparam.get("01234567890", pos = 100000) + assert "op2w0wR3GjxDZzhhRFFvTE1ERXlNelExTmpjNE9UQWFFLXFvM2JrQkRRb0xNREV5TXpRMU5qYzRPVEFnQVElM0QlM0QogNDbw_QCMAA4AEAASANSHAgAEAAYACAAKg5zdGF0aWNjaGVja3N1bUAAWANgAWgAcgQIARAAeAA%3D" == param + +def test_arcparam_2(mocker): + param = arcparam.get("SsjCnHOk-Sk") + url=f"https://www.youtube.com/live_chat_replay/get_live_chat_replay?continuation={param}&pbj=1" + resp = requests.Session().get(url,headers = config.headers) + jsn = json.loads(resp.text) + parser = Parser() + metadata , chatdata = parser.parse(jsn) + test_id = chatdata[0]["replayChatItemAction"]["actions"][0]["addChatItemAction"]["item"]["liveChatTextMessageRenderer"]["id"] + print(test_id) + assert "CjoKGkNMYXBzZTdudHVVQ0Zjc0IxZ0FkTnFnQjVREhxDSnlBNHV2bnR1VUNGV0dnd2dvZDd3NE5aZy0w" == test_id + \ No newline at end of file diff --git a/tests/test_liveparam.py b/tests/test_liveparam.py new file mode 100644 index 0000000..7490bde --- /dev/null +++ b/tests/test_liveparam.py @@ -0,0 +1,12 @@ +import pytest +from pytchat.core_multithread.parser import Parser +import pytchat.config as config +import requests, json +from pytchat.paramgen import liveparam + +def test_liveparam_0(mocker): + _ts1= 1546268400 + param = liveparam._build("01234567890", + *([_ts1*1000000 for i in range(5)])) + test_param="0ofMyAPiARp8Q2c4S0RRb0xNREV5TXpRMU5qYzRPVEFhUTZxNXdiMEJQUW83YUhSMGNITTZMeTkzZDNjdWVXOTFkSFZpWlM1amIyMHZiR2wyWlY5amFHRjBQM1k5TURFeU16UTFOamM0T1RBbWFYTmZjRzl3YjNWMFBURWdBZyUzRCUzRCiAuNbVqsrfAjAAOABAAkorCAAQABgAIAAqDnN0YXRpY2NoZWNrc3VtOgBAAEoCCAFQgLjW1arK3wJYA1CAuNbVqsrfAliAuNbVqsrfAmgBggEECAEQAIgBAKABgLjW1arK3wI%3D" + assert test_param == param \ No newline at end of file