VideoHelp Forum
+ Reply to Thread
Results 1 to 11 of 11
Thread
  1. Hello. I need any information regarding this specific DRM from youku. Any info is appreciated. Someone on a different forum mentioned this with vague information in Chinese. Though, from what I could gather from the Google translation is that only blocks of TS files of m3u8 are encrypted and that you need to find a way to discover the location and the length of each (encrypted) block. Did I understand this right? If anyone got an open source project that works on this, that would be appreciated. Thank you.
    Quote Quote  
  2. ts ==> enc_pes ==>aes-ecb decrypt ==> dec_pes ==> ts
    discord chuliaohaibaipiao
    Quote Quote  
  3. Education Student (Grad.) CrymanChen's Avatar
    Join Date
    Apr 2022
    Location
    Mainland China
    Search Comp PM
    Mind refering us a link? I'd like to know how vague could this information be?
    twitter @Cryman_Chen
    email crymanchen@gmail.com
    Quote Quote  
  4. Edit: Never mind. Gave up on it.
    Last edited by wandrermo; 24th Jun 2024 at 01:26.
    Quote Quote  
  5. Originally Posted by CrymanChen View Post
    Mind refering us a link? I'd like to know how vague could this information be?
    This is regarding the TS file structure.
    Quote Quote  
  6. Originally Posted by wandrermo View Post
    Edit: Never mind. Gave up on it.
    I have a fully automated script that supports copyright DRM, Widevine, and HLS. However, it is not available for free, as it is a very complex site.
    Quote Quote  
  7. Member aqzs's Avatar
    Join Date
    Mar 2024
    Location
    Paris
    Search Comp PM
    Here is a script to get the keys on youku :
    HTML Code:
    from pywidevine.cdm import Cdm
    from pywidevine.device import Device
    from pywidevine.pssh import PSSH
    import requests
    import base64
    
    TOKEN = input('token: ')
    VIDID = input('vidid: ')
    PSID = input('psid: ')
    pssh = input('pssh: ')
    
    pssh = PSSH(pssh)
    lic_url = 'https://drm-license.youku.tv/ups/drm.json'
    
    device = Device.load("device.wvd")
    cdm = Cdm.from_device(device)
    session_id = cdm.open()
    challenge = cdm.get_license_challenge(session_id, pssh)
    
    challenge = {
      'drmType': 'widevine',
      'token': TOKEN,
      'vid': VIDID,
      'psid': PSID,
      'ccode': '0597',
      'licenseRequest': base64.b64encode(challenge).decode()
    }
    
    licence = requests.post(lic_url, data=challenge)
    cdm.parse_license(session_id, licence.json()['data'])
    for key in cdm.get_keys(session_id):
        if key.type=='CONTENT':
            print(f"\n--key {key.kid.hex}:{key.key.hex()}")
    cdm.close(session_id)
    As @maherz12 said fully automated script are really hard to made because of the way they sign every api request.
    Quote Quote  
  8. Code:
    import os
    import re, requests, time, json
    from hashlib import md5
    from urllib.parse import parse_qsl, urlsplit
    import base64
    from Crypto.Cipher import AES
    from tabulate import tabulate
    from pywidevine.L3.cdm import deviceconfig
    from pywidevine.L3.decrypt.wvdecryptcustom import WvDecrypt
    
    requests = requests.Session()
    
    
    def dealck(ck: str) -> dict:
        ck = ck.split(";")
        ckdict = {}
        for i in ck:
            i = i.strip()
            i = i.split("=")
            try:
                ckdict[i[0]] = i[1]
            except:
                pass
        return ckdict
    
    
    class YouKu:
        def __init__(self, ck):
            if not ck:
                ck = input("please input the cookie:\n")
            self.cookie = dealck(ck)
            self.r = "xWrtQpP4Z4RsrRCY"
            self.R = "aq1mVooivzaolmJY5NrQ3A=="
            self.key = ""
            self.headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36",
            }
            requests.headers.update(self.headers)
            requests.cookies.update(self.cookie)
            self.ptoken = self.cookie.get("P_pck_rm")
    
        def youku_sign(self, t, data, token):
            appKey = '24679788'  # 固定值
            '''token值在cookie中'''
            sign = token + '&' + t + '&' + appKey + '&' + data
            md = md5()
            md.update(sign.encode('UTF-8'))
            sign = md.hexdigest()
            return sign
    
        def utid(self):
            json_cookie = requests.cookies.get_dict()
            requests.cookies.clear()
            requests.cookies.update(json_cookie)
            utid = json_cookie.get("cna")
            token = json_cookie.get("_m_h5_tk").split("_")[0]
            return {"utid": utid, "token": token}
    
        def redirect(self, url):
            headers = {
                "referer": "https://www.youku.com/",
                "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36",
            }
            resp = requests.get(url=url)
            return resp.url
    
        def page_parser(self, url):
            vid = re.findall(r"id_(.*?)\.html", url)[0]
            url = "https://openapi.youku.com/v2/videos/show.json"
            params = {
                "client_id": "53e6cc67237fc59a",
                "package": "com.huawei.hwvplayer.youku",
                "ext": "show",
                "video_id": vid
            }
            try:
                response = requests.get(url, params=params).json()
                showid = response["show"]["id"]
                return {"current_showid": showid, "videoId": 0, "vid": vid}
            except Exception as e:
                print(f"获取showid失败:{e}")
                print(f"[red]获取showid失败[/red]")
    
        def get_emb(self, videoId):
            emb = base64.b64encode(("%swww.youku.com/" % videoId).encode('utf-8')).decode('utf-8')
            return emb
    
        # 这个函数用来获取元素的第一个值
        def takeOne(self, elem):
            return float(elem[0])
    
        def save_m3u8(self, video, auto=1):
            title, size, resolution, drm_type, key, stream_type, _, m3u8_url, _ = video
            title = f"{title}_{resolution}_{size}_{stream_type}"
            savepath = os.path.join(os.getcwd(), "/download/yk")
            rm3u8_url = m3u8_url.replace("%", "%%")
            if rm3u8_url.startswith("http"):
                common_args = f"N_m3u8DL-RE.exe \"{rm3u8_url}\" --tmp-dir ./cache --save-name \"{title}\" --save-dir \"{savepath}\" --thread-count 16 --download-retry-count 30  --check-segments-count"
                if auto:
                    common_args += " --auto-select"
                if drm_type == "default":
                    cmd = common_args
                elif drm_type == "cbcs":
                    cmd = f"{common_args} --key {key}  -M format=mp4"
                else:
                    key = key if ":" not in key else base64.b64encode(bytes.fromhex(key.split(":")[1])).decode()
                    txt = f'''
                #OUT,{savepath}
                #DECMETHOD,ECB
                #KEY,{key}
                {title},{m3u8_url}
                                    '''
                    with open("{}.txt".format(title), "a", encoding="gbk") as f:
                        f.write(txt)
                        print(f"The download file has been generated")
                        return
            else:
                m3u8_path = "{}.m3u8".format(title)
                with open(m3u8_path, "w", encoding="utf-8") as f:
                    f.write(m3u8_url)
                common_args = f"N_m3u8DL-RE.exe \"{m3u8_path}\" --tmp-dir ./cache --save-name \"{title}\" --save-dir \"{savepath}\" --thread-count 16 --download-retry-count 30  --check-segments-count"
                if ":" not in key:
                    uri = re.findall(r'(http.*)\n', m3u8_url)[0]
                    m3u8_text = requests.get(uri).text
                    keyid = re.findall(r'KEYID=0x(.*),IV', m3u8_text)[0].lower()
                    key = "--key {}:{}".format(keyid, base64.b64decode(key).hex())
                cmd = f"{common_args}  {key}  -M format=mp4"
            with open("{}.bat".format(title), "a", encoding="gbk") as f:
                f.write(cmd)
                f.write("\n")
            f.close()
            print(f"The download file has been generated")
    
        def m3u8_url(self, t, params_data, sign, vid):
            url = "https://acs.youku.com/h5/mtop.youku.play.ups.appinfo.get/1.1/"
    
            params = {
                "jsv": "2.5.8",
                "appKey": "24679788",
                "t": t,
                "sign": sign,
                "api": "mtop.youku.play.ups.appinfo.get",
                "v": "1.1",
                "timeout": "20000",
                "YKPid": "20160317PLF000211",
                "YKLoginRequest": "true",
                "AntiFlood": "true",
                "AntiCreep": "true",
                "type": "jsonp",
                "dataType": "jsonp",
                "callback": "mtopjsonp1",
                "data": params_data,
            }
            resp = requests.get(url=url, params=params)
            result = resp.text
            # print(result)
            data = json.loads(result[12:-1])
            # print(data)
            ret = data["ret"]
            video_lists = []
            if ret == ["SUCCESS::调用成功"]:
                stream = data["data"]["data"].get("stream", [])
                title = data["data"]["data"].get("video", {}).get("title", "")
                keys = {}
                for video in stream:
                    m3u8_url = video["m3u8_url"]
                    width = video["width"]
                    height = video["height"]
                    size = video.get("size", 0)
                    size = '{:.1f}'.format(float(size) / 1048576)
                    drm_type = video["drm_type"]
                    audio_lang = video["audio_lang"]
                    audio = video['stream_ext'].get("audioGroupId", "") or "default"
                    if audio_lang == "default":
                        audio_lang = "guoyu"
                    language = []
                    language = re.findall(r'LANGUAGE="([\w\s]+)"', m3u8_url)
                    # print("language是------------------>>>>>", language)
                    if 'en' in language:
                        audio_lang = "en"
                    if video['drm_type'] == "default":
                        key = ""
                    elif audio_lang not in keys.keys():
                        if drm_type == "cbcs":
                            license_url = video["stream_ext"]["uri"]
                            key = self.get_cbcs_key(license_url, m3u8_url)
                            if key[0]:
                                key = key[1][0]
                        else:
                            encryptR_server = video['encryptR_server']
                            copyright_key = video['stream_ext']['copyright_key']
                            key = self.copyrightDRM(self.r, encryptR_server, copyright_key)
                        keys[audio_lang] = key
                    else:
                        key = keys[audio_lang]
                    video_lists.append(
                        [title, size + "M", f"{width}x{height}", drm_type, key, video["stream_type"],
                         audio + "_" + audio_lang, m3u8_url,
                         video.get("size", 0)])
                video_lists = sorted(video_lists, key=lambda x: x[-1], reverse=True)
                tb = tabulate([[*video_lists[i][:7]] for i in range(len(video_lists))],
                              headers=["title", "size", "resolution","drm_type", "base64key", "stream_type", "audio"],
                              tablefmt="pretty",
                              showindex=range(1, len(video_lists) + 1))
                ch=input(f"{tb}\nplesae input the video number you want to download") or "1"
                ch = ch.split(",")
                for i in ch:
                    video = video_lists[int(i) - 1]
                    self.save_m3u8(video)
                subtitle=data['data']['data']['subtitle']
                for i in subtitle:
                    print(f"subtitle:{i['subtitle_info_code'][0],i['url']}")
            elif ret == ["FAIL_SYS_ILLEGAL_ACCESS::非法请求"]:
                print("请求参数错误")
            elif ret == ["FAIL_SYS_TOKEN_EXOIRED::令牌过期"]:
                print("Cookie过期")
                return 10086
            else:
                print(ret[0])
            return 0
    
        def copyrightDRM(self, r, encryptR_server, copyright_key):
            try:
                crypto_1 = AES.new(r.encode(), AES.MODE_ECB)
                key_2 = crypto_1.decrypt(base64.b64decode(encryptR_server))
                crypto_2 = AES.new(key_2, AES.MODE_ECB)
                return base64.b64encode(base64.b64decode(crypto_2.decrypt(base64.b64decode(copyright_key)))).decode()
            except:
                return ""
    
        def get_cbcs_key(self, license_url, m3u8_url):
            headers = {
                "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.82"
            }
            m3u8data = requests.get(m3u8_url, headers=headers).text
            key_url = re.findall(r"URI=\"(.*?)\"", m3u8data)[0]
            response = requests.get(key_url, headers=headers).text
            pssh = response.split("data:text/plain;base64,").pop().split('",')[0]
            wvdecrypt = WvDecrypt(init_data_b64=pssh, cert_data_b64="", device=deviceconfig.device_android_generic)
            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.82",
            }
            dic = dict(parse_qsl(urlsplit(license_url).query))
            url = license_url.split("?")[0]
            dic["licenseRequest"] = base64.b64encode(wvdecrypt.get_challenge()).decode()
            dic["drmType"] = "widevine"
            response = requests.post(url, data=dic, headers=headers)
            license_b64 = response.json()["data"]
            wvdecrypt.update_license(license_b64)
            Correct, keyswvdecrypt = wvdecrypt.start_process()
            if Correct:
                return Correct, keyswvdecrypt
    
        def get(self, url):
            t = str(int(time.time() * 1000))
            user_info = self.utid()
            userid = user_info["utid"]
            page_info = self.page_parser(url)
            emb = self.get_emb(page_info["videoId"])
            steal_params = {
                "ccode": "0597",
                "utid": userid,
                "version": "9.4.39",
                "ckey": "DIl58SLFxFNndSV1GFNnMQVYkx1PP5tKe1siZu/86PR1u/Wh1Ptd+WOZsHHWxysSfAOhNJpdVWsdVJNsfJ8Sxd8WKVvNfAS8aS8fAOzYARzPyPc3JvtnPHjTdKfESTdnuTW6ZPvk2pNDh4uFzotgdMEFkzQ5wZVXl2Pf1/Y6hLK0OnCNxBj3+nb0v72gZ6b0td+WOZsHHWxysSo/0y9D2K42SaB8Y/+aD2K42SaB8Y/+ahU+WOZsHcrxysooUeND",
                "client_ip": "192.168.1.1",
                "client_ts": 1698373135
            }
            biz_params = {
                "vid": page_info["vid"],
                "h265": 1,
                "preferClarity": 4,
                "media_type": "standard,subtitle",
                "app_ver": "9.4.39",
                "extag": "EXT-X-PRIVINF",
                "play_ability": 16782592,
                "master_m3u8": 1,
                "drm_type": 19,
                "key_index": "web01",
                "encryptR_client": self.R,
                "skh": 1,
                "last_clarity": 5,
                "clarity_chg_ts": 1689341442,
                "needad": 0,
            }
            ad_params = {
                "vip": 1,
                "needad": 0,
            }
            params_data = {
                "steal_params": json.dumps(steal_params),
                "biz_params": json.dumps(biz_params),
                "ad_params": json.dumps(ad_params),
            }
            params_data = json.dumps(params_data)
            sign = self.youku_sign(t, params_data, user_info["token"])
            return self.m3u8_url(t, params_data, sign, page_info["vid"])
    
        def start(self, url=None):
            url=input("please input the url:") if url is None else url
            url = self.redirect(url) if url.startswith("https://") else f"https://v.youku.com/v_show/id_{url}.html"
            for i in range(3):
                ret = self.get(url)
                if ret:
                    continue
                break
    
    if __name__ == '__main__':
        cookie = ''
        youku = YouKu(cookie)
        youku.start("https://www.youku.tv/v/v_show/id_XNjQwMDc1NjIyNA==.html")

    A simple example

    N_m3u8DL-CLI v2.5.4 can dec copyrighDRM , add --enableYouKuAes
    discord chuliaohaibaipiao
    Quote Quote  
  9. Feels Good Man 2nHxWW6GkN1l916N3ayz8HQoi's Avatar
    Join Date
    Jan 2024
    Location
    Pepe Island
    Search Comp PM
    Nice job @novia.
    --[----->+<]>.++++++++++++.---.--------.
    [*Have questions about widefrog? You can find all your answers here*]
    Quote Quote  
  10. Speaking of chinese services, does anyone know how to download from Tencent video in 4k?
    Quote Quote  
  11. Originally Posted by stupid.345 View Post
    Speaking of chinese services, does anyone know how to download from Tencent video in 4k?
    Tencent video is not encrypted
    Quote Quote  



Similar Threads

Visit our sponsor! Try DVDFab and backup Blu-rays!