BiliBili视频下载

SESSDATA请不要泄露以免被滥用,若不设置则最高只能下载480P的视频

主函数部分(main.py)


import re
import json
from multiprocessing import thread
import requests
from bs4 import BeautifulSoup
 
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36',
}
 
url = input("请输入视频地址:")
with open('SESSDATA.txt', 'r') as f:
    SESSDATA = f.read()
if SESSDATA == '0':
    print('检测到您未设置SESSDATA,最高只能下载480p画质哦!')
 
# 获取视频名称
r = requests.get(url)
r.encoding = 'utf-8'
soup = BeautifulSoup(r.text, 'html.parser')
name = soup.title.text.split('_')[0]
print(name)
 
# 获取BV号
if '?' in url:
    url = url.split('?')[0]
    print(url)
bvid = re.search(r'BV.*', url).group()
print('BV号:' + bvid + "1111")
# 获取cid
cid_json = requests.get('https://api.bilibili.com/x/player/pagelist?bvid={}&jsonp=jsonp'.format(bvid)).text
print(cid_json)
cid = re.search(r'{"cid":(\d+)', cid_json).group()[7:]
print('CID:' + cid)
# 获取视频的av号
rsp = requests.get(url, headers=headers)
aid = re.search(r'"aid":(.*?),"', rsp.text).group()[6:-2]
print('AV号:' + aid)
 
 
# 抓取视频真实地址,清晰度
qn = 16  # 先设置一个默认低清晰度
headers['Referer'] = url
api_url = 'https://api.bilibili.com/x/player/playurl?cid={}&avid={}&qn={}&otype=json&requestFrom=bilibili-helper'.format(
    cid, aid, qn)
qn_dict = {}  # 用来存放清晰度选择参数
rsp = requests.get(api_url, headers=headers).content
rsp = json.loads(rsp)
qn_accept_description = rsp.get('data').get('accept_description')
qn_accept_quality = rsp.get('data').get('accept_quality')
print('下载视频清晰度选择')
for i, j, xuhao in zip(qn_accept_description, qn_accept_quality, range(len(qn_accept_quality))):
    print(str(xuhao + 1) + ':' + i)
    qn_dict[str(xuhao + 1)] = j
xuhao = input('请选择(输入清晰度前的标号):')
qn = qn_dict[xuhao]
print('清晰度参数qn:' + str(qn))
 
 
api_url = 'https://api.bilibili.com/x/player/playurl?cid={}&avid={}&qn={}&otype=json&requestFrom=bilibili-helper'.format(
    cid, aid, qn)
# print('api_url='+api_url)
cookies = {}
cookies['SESSDATA'] = SESSDATA  # 这里输入你的SESSDATA
rsp = requests.get(api_url, headers=headers, cookies=cookies).content  # 这里代cookies才能得到会员或者登录后才能下载的视频的链接
rsp = json.loads(rsp)
real_url = rsp.get('data').get('durl')[0].get('url')
print('成功获取视频直链!')
print('正在开启多线程极速下载……')
thread(real_url, url, name + '.flv')  # 多线程下载
 
# 把上面那行删掉,把下面注释去掉就是单线程下载
# content = requests.get(real_url, headers=headers).content
# with open('1.flv', 'wb') as f:
#    f.write(content)

多线程下载部分(multiprocessing.py)


import requests
import threading
import datetime
import time
#改headers参数和url就好了
def thread(url, Referer, file_name):
  # print(r.status_code, r.headers)
  headers = {
      'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36',
      'Referer': Referer
  }
  r = requests.get(url, headers=headers, stream=True, timeout=30)
  all_thread = 1
  # 获取视频大小
  file_size = int(r.headers['content-length'])
  # 如果获取到文件大小,创建一个和需要下载文件一样大小的文件
  if file_size:
    fp = open(file_name, 'wb')
    fp.truncate(file_size)
    print('视频大小:' + str(int(file_size / 1024 / 1024)) + "MB")
    fp.close()
  # 每个线程每次下载大小为5M
  size = 5242880
  # 当前文件大小需大于5M
  if file_size > size:
    # 获取总线程数
    all_thread = int(file_size / size)
    # 设最大线程数为10,如总线程数大于10
    # 线程数为10
    if all_thread > 10:
      all_thread = 10
  part = file_size // all_thread
  threads = []
  starttime = datetime.datetime.now().replace(microsecond=0)
  for i in range(all_thread):
    # 获取每个线程开始时的文件位置
    start = part * i
    # 获取每个文件结束位置
    if i == all_thread - 1:
      end = file_size
    else:
      end = start + part
    if i > 0:
      start += 1
    headers = headers.copy()
    headers['Range'] = "bytes=%s-%s" % (start, end)
    t = threading.Thread(target=Handler, name='线程-' + str(i),
               kwargs={'start': start, 'end': end, 'url': url, 'filename': file_name, 'headers': headers})
    t.setDaemon(True)
    threads.append(t)
  # 线程开始
  for t in threads:
    time.sleep(0.2)
    t.start()
  # 等待所有线程结束
  print('正在下载……')
  for t in threads:
    t.join()
  endtime = datetime.datetime.now().replace(microsecond=0)
  print('下载完成!用时:%s' % (endtime - starttime))
def Handler(start, end, url, filename, headers={}):
  tt_name = threading.current_thread().getName()
  print(tt_name + ' 已启动')
  r = requests.get(url, headers=headers, stream=True)
  total_size = end - start
  downsize = 0
  startTime = time.time()
  with open(filename, 'r+b') as fp:
    fp.seek(start)
    var = fp.tell()
    for chunk in r.iter_content(204800):
      if chunk:
        fp.write(chunk)
        downsize += len(chunk)
        line = tt_name + '-downloading %d KB/s - %.2f MB, 共 %.2f MB'
        line = line % (
          downsize / 1024 / (time.time() - startTime), downsize / 1024 / 1024,
          total_size / 1024 / 1024)
        print(line, end='\r')
if __name__ == '__main__':
  url = input('输入视频链接(请输入视频原链):')
  thread(url)

在文件夹中新建SESSDATA.txt,粘贴账号的SESSDATA即可使用

贴吧自动签到

之前看论坛上有人发了很多版本的签到,但是编程水平真的一言难尽。

很多人都是没测试好代码就发帖了,然后编译都怪罪到cookie上。

百度的cookie是一个月一换,应该还算坚挺。


import datetime
import os
import re

import requests
import bs4


# 获取关注的所有贴吧链接
def get_tieba_link():
    url = 'http://tieba.baidu.com/f/like/mylike?&pn='
    page = 1
    links = []
    while True:
        response = requests.get(url + str(page), headers=headers)
        bs = bs4.BeautifulSoup(response.text, 'lxml')
        for a in bs.select('table tr>td:first-child>a'):
            links.append({'href': a.get('href'), 'name': a.string})
        if '下一页' not in str(bs):
            break
        page += 1
    return links


# 从贴吧链接中获取贴吧签到参数
def tieba_batch_sign():
    links = get_tieba_link()
    for link in links:
        response = requests.get('https://tieba.baidu.com/' + link['href'],
                                headers=headers)
        tbs_reg = re.compile(r'\'tbs\':\s"(.*?)"')
        tbs = tbs_reg.search(response.text).group(1)
        if not tbs:  # 如果tbs不存在则跳过本次循环
            continue
        param = {'ie': 'utf-8', 'kw': link['name'], 'tbs': tbs}
        response = requests.post('https://tieba.baidu.com/sign/add',
                                 data=param,
                                 headers=headers)
        result = response.json()
        if result['no'] == 0:
            msg = str(datetime.datetime.now().strftime(
                '%Y-%m-%d %H:%M:%S')) + ' ' + link['name'] + ' 签到成功\n'
        else:
            msg = str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                      ) + ' ' + link['name'] + ' ' + result['error'] + '\n'
        print(msg)
        with open(os.path.join(os.getcwd(), 'sign.log'), 'a+') as loghandle:
            loghandle.write(msg)


if __name__ == '__main__':
    headers = {
        'cookie':
        'BDUSS=你的BDUSS;STOKEN=你的Token'
    }
    tieba_batch_sign()

抖音下载无水印视频

访问地址:https://v.douyin.com/J8cHNas/,他帮我们自动跳转到一个页面。

不用管页面如何,我们可以直接打开f12看到请求,稍微看一下,会发现所有的视频信息(包含视频路径、名称、作者等等)都存储在

这个请求中:https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids=6841990853624433920&dytk=

直接从接口获取到video的链接:https://aweme.snssdk.com/aweme/v1/playwm/video_id=v0200f4f0000brpq51lahtm68p8nvh6g&ratio=720p&line=0

把链接中的playwm替换成play(去掉wm就行),就可以获得无水印的地址了,如:https://aweme.snssdk.com/aweme/v1/play/?video_id=v0200f4f0000brpq51lahtm68p8nvh6g&ratio=720p&line=0

注:无水印的视频,只能在移动端播放,所以我们需要在user-agent中模拟手机端的操作。

至于如何得出去掉wm就可以无水印的结论,有经验的可以尝试看抖音时抓包,可以得到他的视频链接,简单对比就可以得出结论了。


import requests
import re
import json
url=input("请输入要下载的抖音视频地址!\n")
userAgent = "Mozilla/5.0 (Linux; Android 8.0.0; Pixel 2 XL Build/OPD1.170816.004) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Mobile Safari/537.36"
header = {
    "Referer": "https://v.douyin.com/",
    'User-Agent': userAgent,
}
#获取要下载的视频id
r = requests.get(url,headers = header)
url=r.url
id=url.split("video/")[1].split("/?")[0]
url="https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids="+id
r = requests.get(url,headers = header)
video=json.loads(r.content)["item_list"][0]
url=video["video"]["play_addr"]["url_list"][0]
url=url.replace("playwm","play")
#开始下载
r = requests.get(url,headers = header)
with open(video["desc"]+".mp4", "wb") as code:
     code.write(r.content)
     print("下载成功")
input("按下回车退出程序")

当然你要是懒得编译,我这有成品

或者若是对Python不感兴趣,你可以使用我的PHP版解析站:DylanC的抖音无水印解析站

微信自动推送天气

对,没错。是用了Server酱
地区的代码可以参考:中国天气网


import requests,json
 
desp=''
sckey='server酱的key'
 
def gettianqi(city):
    global desp
    url=('http://t.weather.sojson.com/api/weather/city/%d'%(city))
    tianqi_res=requests.get(url=url).text
    tianqi_res=json.loads(tianqi_res)
    city=tianqi_res['cityInfo']['city']
    data=tianqi_res['data']
    shidu=data['shidu']
    pm25=data['pm25']
    pm10=data['pm10']
    quality=data['quality']
    tianqi=data['forecast']
    jintiantianqi=tianqi[0]
    jintian_ymd=jintiantianqi['ymd']
    jintian_week=jintiantianqi['week']
    jintian_high=jintiantianqi['high']
    jintian_low=jintiantianqi['low']
    jintian_fx=jintiantianqi['fx']
    jintian_fl=jintiantianqi['fl']
    jintian_type=jintiantianqi['type']
    jintian_notice=jintiantianqi['notice']
    desp=(
        '##'+city+'\n'
        '#####'+'-'*10+jintian_ymd+'-'*5+jintian_week+'-'*10+'\n'
        '####空气质量:'+str(quality)+'\n'
        '#####'+'pm2.5:'+str(pm25)+'\n'
        '#####'+'pm10:'+str(pm10)+'\n'
        '#### 天气状况:\n'
        '#####'+str(jintian_low)+'~'+str(jintian_high)+'\n'
        '####'+str(jintian_type)+'\n'
        '#####'+str(jintian_notice)+'\n'
        '####'+str(jintian_fx)+str(jintian_fl)+'\n')
 
def upserver():
    global desp,sckey
    data={
        'text':'天气预报',
        'desp':desp
    }
    url=('https://sc.ftqq.com/%s.send'%sckey)
    server=requests.post(url=url,data=data)
 
gettianqi(地区id)
upserver()

微博短链生成

利用微博评论将长地址转为微博短链

将博文下的长地址评论删除


import requests
import urllib.parse
import re

headers = {
      'Cookie': 'SUB=你的cookie',
      'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36',
      'Referer': 'https://www.weibo.com',
      'Content-Type': 'application/x-www-form-urlencoded'
    }

def get_short_url(long_url):
    url = "https://www.weibo.com/aj/v6/comment/add"

    payload = urllib.parse.urlencode({
        'mid': '微博mid',
        'content': long_url
    })
    response = requests.post(url, headers=headers, data=payload)

    try:
        data = response.json()['data']['comment']
        short_url = re.search(r'(https?)://t.cn/\w+', data).group(0)
        comment_id = re.findall(r'comment_id="(.+\d)"', data)[-1]   # 评论id
        print('微博短链:' + short_url)
        # del_comment(comment_id)  # 需要删除评论,可以取消该行注释
    except:
        pass

# 删除评论
def del_comment(comment_id):
    url = 'https://www.weibo.com/aj/comment/del'

    payload = urllib.parse.urlencode({
        'mid': '微博mid',
        'cid': comment_id  # 评论id
    })
    response = requests.post(url, headers=headers, data=payload)
    try:
        if response.json()['code'] == '100000':
            print('评论已删除')
    except:
        pass

if __name__ == '__main__':
    get_short_url(input('请输入长地址:'))

人工语音模拟输出

利用了百度的语音api,可调整多种方言以及音量语速等


import json
import tkinter as tk
from urllib.request import urlopen
from urllib.request import Request
from urllib.error import URLError
from urllib.parse import urlencode
from urllib.parse import quote_plus
from tkinter import ttk
from playsound import playsound
from EP import *

API_KEY = ''# 替换你的 API_KEY
SECRET_KEY = ''# 替换你的 SECRET_KEY

TTS_URL = 'http://tsn.baidu.com/text2audio'
TOKEN_URL = 'http://openapi.baidu.com/oauth/2.0/token'
角色表={'度小宇':1, '度小美': 0, '度逍遥': 3, '度丫丫': 4, '度博文': 106, '度小童':110 , '度小萌':111 , '度米朵':103 , '度小娇':5}

class PyWinDesign:
    def __init__(self, 启动窗口):
        self.启动窗口 = 启动窗口
        self.启动窗口.title('DylanC的语音合成助手')
        self.启动窗口.resizable(width=False, height=False)
        screenwidth = self.启动窗口.winfo_screenwidth()
        screenheight = self.启动窗口.winfo_screenheight()
        size = '%dx%d+%d+%d' % (305, 137, (screenwidth - 305) / 2, (screenheight - 137) / 2)
        self.启动窗口.geometry(size)

        self.标签1_标题 = tk.StringVar()
        self.标签1_标题.set('内容')
        self.标签1 = tk.Label(self.启动窗口, textvariable=self.标签1_标题, anchor=tk.E)
        self.标签1.place(x=12, y=10, width=35, height=24)

        self.标签2_标题 = tk.StringVar()
        self.标签2_标题.set('语速')
        self.标签2 = tk.Label(self.启动窗口, textvariable=self.标签2_标题, anchor=tk.E)
        self.标签2.place(x=14, y=70, width=35, height=24)

        self.标签3_标题 = tk.StringVar()
        self.标签3_标题.set('音调')
        self.标签3 = tk.Label(self.启动窗口, textvariable=self.标签3_标题, anchor=tk.E)
        self.标签3.place(x=102, y=70, width=35, height=24)

        self.标签4_标题 = tk.StringVar()
        self.标签4_标题.set('音量')
        self.标签4 = tk.Label(self.启动窗口, textvariable=self.标签4_标题, anchor=tk.E)
        self.标签4.place(x=192, y=70, width=35, height=24)

        self.编辑框1_内容 = tk.StringVar()
        self.编辑框1_内容.set('3')
        self.编辑框1 = ttk.Entry(self.启动窗口, textvariable=self.编辑框1_内容, justify=tk.LEFT)
        self.编辑框1.place(x=51, y=71, width=51, height=22)

        self.编辑框2_内容 = tk.StringVar()
        self.编辑框2_内容.set('5')
        self.编辑框2 = ttk.Entry(self.启动窗口, textvariable=self.编辑框2_内容, justify=tk.LEFT)
        self.编辑框2.place(x=139, y=71, width=51, height=22)

        self.编辑框3_内容 = tk.StringVar()
        self.编辑框3_内容.set('5')
        self.编辑框3 = ttk.Entry(self.启动窗口, textvariable=self.编辑框3_内容, justify=tk.LEFT)
        self.编辑框3.place(x=231, y=71, width=51, height=22)

        self.编辑框4 = tk.Text(self.启动窗口, wrap=tk.NONE)
        self.编辑框4.insert(tk.END, '')
        self.编辑框4.place(x=51, y=15, width=233, height=43)

        self.按钮1_标题 = tk.StringVar()
        self.按钮1_标题.set('开始朗读')
        self.按钮1 = ttk.Button(self.启动窗口, textvariable=self.按钮1_标题, command=self.按钮1_被鼠标左键单击)
        self.按钮1.place(x=175, y=101, width=109, height=25)

        self.标签5_标题 = tk.StringVar()
        self.标签5_标题.set('角色')
        self.标签5 = tk.Label(self.启动窗口, textvariable=self.标签5_标题, anchor=tk.E)
        self.标签5.place(x=14, y=101, width=35, height=24)

        self.组合框1 = ttk.Combobox(self.启动窗口, values=('度小宇', '度小美', '度逍遥', '度丫丫', '度博文', '度小童', '度小萌', '度米朵', '度小娇'),
                                 state='readonly')
        self.组合框1.current(0)
        self.组合框1.place(x=51, y=103, width=100, height=20)

    def 按钮1_被鼠标左键单击(self):
        str = self.编辑框4.get(1.0, tk.END)
        列表 = 文件_遍历指定路径文件('./rec/')
        if 加密_MD5(str)+'.mp3' in 列表:
            playsound('./rec/{}.mp3'.format(加密_MD5(str)))  # 播放
            return
        token = self.fetch_token()
        tex = quote_plus(str)  # 此处TEXT需要两次urlencode
        params = {'tok': token, 'tex': tex, 'cuid': "quickstart",
                  'lan': 'zh', 'ctp': 1, 'spd': int(self.编辑框1_内容.get()), 'pit': int(self.编辑框2_内容.get()), 'vol': int(self.编辑框3_内容.get()),'per':角色表[self.组合框1.get()]}  # lan ctp 固定参数 spd是语速0-15 pit音调 vol音量

        data = urlencode(params)
        req = Request(TTS_URL, data.encode('utf-8'))
        has_error = False
        try:
            f = urlopen(req)
            result_str = f.read()

            headers = dict((name.lower(), value) for name, value in f.headers.items())

            has_error = ('content-type' not in headers.keys() or headers['content-type'].find('audio/') < 0)
        except  URLError as err:
            print('http response http code : ' + str(err.code))
            result_str = err.read()
            has_error = True

        save_file = "error.txt" if has_error else './rec/{}.mp3'.format(加密_MD5(str))
        目录, 文件 = 文件_目录文件名分割(save_file)
        if 目录 != '' and os.path.exists(目录) is False:
            os.makedirs(目录)

        with open(save_file, 'wb') as of:
            of.write(result_str)
        if has_error:
            result_str = str(result_str, 'utf-8')
            print("tts api  error:" + result_str)
        print("file saved as : " + save_file)
        playsound('./rec/{}.mp3'.format(加密_MD5(str)))#播放



    def fetch_token(self):
        params = {'grant_type': 'client_credentials',
                  'client_id': API_KEY,
                  'client_secret': SECRET_KEY}
        post_data = urlencode(params)
        post_data = post_data.encode('utf-8')
        req = Request(TOKEN_URL, post_data)
        try:
            f = urlopen(req, timeout=5)
            result_str = f.read()
        except URLError as err:
            print('token http response http code : ' + str(err.code))
            result_str = err.read()

        result_str = result_str.decode()

        result = json.loads(result_str)

        if ('access_token' in result.keys() and 'scope' in result.keys()):
            if not 'audio_tts_post' in result['scope'].split(' '):
                print('please ensure has check the tts ability')
                exit()
            return result['access_token']
        else:
            print('please overwrite the correct API_KEY and SECRET_KEY')
            exit()


if __name__ == '__main__':
    root = tk.Tk()
    app = PyWinDesign(root)
    root.mainloop()

当然也可以直接用成品

QQ推送每日天气

懒得自己搭bot就找了几个接口,用的是CP酷推。


#coding=utf-8

import requests, json
import time

spkey = '去酷推官网获取的个人key'


def get_iciba_everyday():
    icbapi = 'http://open.iciba.com/dsapi/'
    eed = requests.get(icbapi)
    bee = eed.json()
    english = eed.json()['content']
    zh_CN = eed.json()['note']
    str = '\n【WE的英语角】\n' + english + '\n' + zh_CN
    return str


print(get_iciba_everyday())


def main(*args):
    api = 'http://t.weather.sojson.com/api/weather/city/'
    city_code = '101180701'
    tqurl = api + city_code
    response = requests.get(tqurl)
    d = response.json()

    if (d['status'] == 200):
        print("城市:", d["cityInfo"]["parent"], d["cityInfo"]["city"])
        print("更新时间:", d["time"])
        print("日期:", d["data"]["forecast"][0]["ymd"])
        print("星期:", d["data"]["forecast"][0]["week"])
        print("天气:", d["data"]["forecast"][0]["type"])
        print("温度:", d["data"]["forecast"][0]["high"],
              d["data"]["forecast"][0]["low"])
        print("湿度:", d["data"]["shidu"])
        print("PM25:", d["data"]["pm25"])
        print("PM10:", d["data"]["pm10"])
        print("空气质量:", d["data"]["quality"])
        print("风力风向:", d["data"]["forecast"][0]["fx"],
              d["data"]["forecast"][0]["fl"])
        print("感冒指数:", d["data"]["ganmao"])
        print("温馨提示:", d["data"]["forecast"][0]["notice"], "。")

        cpurl = 'https://push.xuthus.cc/group/' + spkey
        tdwt = '【WE的天气推送】\n城市:' + d['cityInfo']['parent'] + ' ' + d[
            'cityInfo']['city'] + '\n日期:' + d["data"]["forecast"][0][
                "ymd"] + ' ' + d["data"]["forecast"][0]["week"] + '\n天气:' + d[
                    "data"]["forecast"][0]["type"] + '\n温度:' + d["data"][
                        "forecast"][0]["high"] + ' ' + d["data"]["forecast"][0][
                            "low"] + '\n湿度:' + d["data"]["shidu"] + '\n空气质量:' + d[
                                "data"]["quality"] + '\n风力风向:' + d["data"][
                                    "forecast"][0]["fx"] + ' ' + d["data"][
                                        "forecast"][0]["fl"] + '\n温馨提示:' + d[
                                            "data"]["forecast"][0][
                                                "notice"] + '。\n[更新时间:' + d[
                                                    "time"] + ']' + get_iciba_everyday(
                                                    )
        requests.post(cpurl, tdwt.encode('utf-8'))
    else:
        error = '【出现错误】\n  今日天气推送错误,请检查服务状态!'
        requests.post(cpurl, error.encode('utf-8'))


def main_handler(event, context):
    return main()


if __name__ == '__main__':
    main()
最后修改:2022 年 05 月 15 日
如果觉得我的文章对你有用,请随意赞赏