BiliBili视频下载
主函数部分(main.py)
import re
import json
from multiprocessing import thread
import requests
from bs4 import BeautifulSoup
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36',
}
url = input("请输入视频地址:")
with open('SESSDATA.txt', 'r') as f:
SESSDATA = f.read()
if SESSDATA == '0':
print('检测到您未设置SESSDATA,最高只能下载480p画质哦!')
# 获取视频名称
r = requests.get(url)
r.encoding = 'utf-8'
soup = BeautifulSoup(r.text, 'html.parser')
name = soup.title.text.split('_')[0]
print(name)
# 获取BV号
if '?' in url:
url = url.split('?')[0]
print(url)
bvid = re.search(r'BV.*', url).group()
print('BV号:' + bvid + "1111")
# 获取cid
cid_json = requests.get('https://api.bilibili.com/x/player/pagelist?bvid={}&jsonp=jsonp'.format(bvid)).text
print(cid_json)
cid = re.search(r'{"cid":(\d+)', cid_json).group()[7:]
print('CID:' + cid)
# 获取视频的av号
rsp = requests.get(url, headers=headers)
aid = re.search(r'"aid":(.*?),"', rsp.text).group()[6:-2]
print('AV号:' + aid)
# 抓取视频真实地址,清晰度
qn = 16 # 先设置一个默认低清晰度
headers['Referer'] = url
api_url = 'https://api.bilibili.com/x/player/playurl?cid={}&avid={}&qn={}&otype=json&requestFrom=bilibili-helper'.format(
cid, aid, qn)
qn_dict = {} # 用来存放清晰度选择参数
rsp = requests.get(api_url, headers=headers).content
rsp = json.loads(rsp)
qn_accept_description = rsp.get('data').get('accept_description')
qn_accept_quality = rsp.get('data').get('accept_quality')
print('下载视频清晰度选择')
for i, j, xuhao in zip(qn_accept_description, qn_accept_quality, range(len(qn_accept_quality))):
print(str(xuhao + 1) + ':' + i)
qn_dict[str(xuhao + 1)] = j
xuhao = input('请选择(输入清晰度前的标号):')
qn = qn_dict[xuhao]
print('清晰度参数qn:' + str(qn))
api_url = 'https://api.bilibili.com/x/player/playurl?cid={}&avid={}&qn={}&otype=json&requestFrom=bilibili-helper'.format(
cid, aid, qn)
# print('api_url='+api_url)
cookies = {}
cookies['SESSDATA'] = SESSDATA # 这里输入你的SESSDATA
rsp = requests.get(api_url, headers=headers, cookies=cookies).content # 这里代cookies才能得到会员或者登录后才能下载的视频的链接
rsp = json.loads(rsp)
real_url = rsp.get('data').get('durl')[0].get('url')
print('成功获取视频直链!')
print('正在开启多线程极速下载……')
thread(real_url, url, name + '.flv') # 多线程下载
# 把上面那行删掉,把下面注释去掉就是单线程下载
# content = requests.get(real_url, headers=headers).content
# with open('1.flv', 'wb') as f:
# f.write(content)
多线程下载部分(multiprocessing.py)
import requests
import threading
import datetime
import time
#改headers参数和url就好了
def thread(url, Referer, file_name):
# print(r.status_code, r.headers)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36',
'Referer': Referer
}
r = requests.get(url, headers=headers, stream=True, timeout=30)
all_thread = 1
# 获取视频大小
file_size = int(r.headers['content-length'])
# 如果获取到文件大小,创建一个和需要下载文件一样大小的文件
if file_size:
fp = open(file_name, 'wb')
fp.truncate(file_size)
print('视频大小:' + str(int(file_size / 1024 / 1024)) + "MB")
fp.close()
# 每个线程每次下载大小为5M
size = 5242880
# 当前文件大小需大于5M
if file_size > size:
# 获取总线程数
all_thread = int(file_size / size)
# 设最大线程数为10,如总线程数大于10
# 线程数为10
if all_thread > 10:
all_thread = 10
part = file_size // all_thread
threads = []
starttime = datetime.datetime.now().replace(microsecond=0)
for i in range(all_thread):
# 获取每个线程开始时的文件位置
start = part * i
# 获取每个文件结束位置
if i == all_thread - 1:
end = file_size
else:
end = start + part
if i > 0:
start += 1
headers = headers.copy()
headers['Range'] = "bytes=%s-%s" % (start, end)
t = threading.Thread(target=Handler, name='线程-' + str(i),
kwargs={'start': start, 'end': end, 'url': url, 'filename': file_name, 'headers': headers})
t.setDaemon(True)
threads.append(t)
# 线程开始
for t in threads:
time.sleep(0.2)
t.start()
# 等待所有线程结束
print('正在下载……')
for t in threads:
t.join()
endtime = datetime.datetime.now().replace(microsecond=0)
print('下载完成!用时:%s' % (endtime - starttime))
def Handler(start, end, url, filename, headers={}):
tt_name = threading.current_thread().getName()
print(tt_name + ' 已启动')
r = requests.get(url, headers=headers, stream=True)
total_size = end - start
downsize = 0
startTime = time.time()
with open(filename, 'r+b') as fp:
fp.seek(start)
var = fp.tell()
for chunk in r.iter_content(204800):
if chunk:
fp.write(chunk)
downsize += len(chunk)
line = tt_name + '-downloading %d KB/s - %.2f MB, 共 %.2f MB'
line = line % (
downsize / 1024 / (time.time() - startTime), downsize / 1024 / 1024,
total_size / 1024 / 1024)
print(line, end='\r')
if __name__ == '__main__':
url = input('输入视频链接(请输入视频原链):')
thread(url)
在文件夹中新建SESSDATA.txt
,粘贴账号的SESSDATA
即可使用
贴吧自动签到
之前看论坛上有人发了很多版本的签到,但是编程水平真的一言难尽。
很多人都是没测试好代码就发帖了,然后编译都怪罪到cookie上。
百度的cookie是一个月一换,应该还算坚挺。
import datetime
import os
import re
import requests
import bs4
# 获取关注的所有贴吧链接
def get_tieba_link():
url = 'http://tieba.baidu.com/f/like/mylike?&pn='
page = 1
links = []
while True:
response = requests.get(url + str(page), headers=headers)
bs = bs4.BeautifulSoup(response.text, 'lxml')
for a in bs.select('table tr>td:first-child>a'):
links.append({'href': a.get('href'), 'name': a.string})
if '下一页' not in str(bs):
break
page += 1
return links
# 从贴吧链接中获取贴吧签到参数
def tieba_batch_sign():
links = get_tieba_link()
for link in links:
response = requests.get('https://tieba.baidu.com/' + link['href'],
headers=headers)
tbs_reg = re.compile(r'\'tbs\':\s"(.*?)"')
tbs = tbs_reg.search(response.text).group(1)
if not tbs: # 如果tbs不存在则跳过本次循环
continue
param = {'ie': 'utf-8', 'kw': link['name'], 'tbs': tbs}
response = requests.post('https://tieba.baidu.com/sign/add',
data=param,
headers=headers)
result = response.json()
if result['no'] == 0:
msg = str(datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S')) + ' ' + link['name'] + ' 签到成功\n'
else:
msg = str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
) + ' ' + link['name'] + ' ' + result['error'] + '\n'
print(msg)
with open(os.path.join(os.getcwd(), 'sign.log'), 'a+') as loghandle:
loghandle.write(msg)
if __name__ == '__main__':
headers = {
'cookie':
'BDUSS=你的BDUSS;STOKEN=你的Token'
}
tieba_batch_sign()
抖音下载无水印视频
访问地址:https://v.douyin.com/J8cHNas/
,他帮我们自动跳转到一个页面。
不用管页面如何,我们可以直接打开f12
看到请求,稍微看一下,会发现所有的视频信息(包含视频路径、名称、作者等等)都存储在
这个请求中:https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids=6841990853624433920&dytk=
直接从接口获取到video的链接:https://aweme.snssdk.com/aweme/v1/playwm/video_id=v0200f4f0000brpq51lahtm68p8nvh6g&ratio=720p&line=0
把链接中的playwm
替换成play
(去掉wm就行),就可以获得无水印的地址了,如:https://aweme.snssdk.com/aweme/v1/play/?video_id=v0200f4f0000brpq51lahtm68p8nvh6g&ratio=720p&line=0
注:无水印的视频,只能在移动端播放,所以我们需要在user-agent中模拟手机端的操作。
至于如何得出去掉wm就可以无水印的结论,有经验的可以尝试看抖音时抓包,可以得到他的视频链接,简单对比就可以得出结论了。
import requests
import re
import json
url=input("请输入要下载的抖音视频地址!\n")
userAgent = "Mozilla/5.0 (Linux; Android 8.0.0; Pixel 2 XL Build/OPD1.170816.004) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Mobile Safari/537.36"
header = {
"Referer": "https://v.douyin.com/",
'User-Agent': userAgent,
}
#获取要下载的视频id
r = requests.get(url,headers = header)
url=r.url
id=url.split("video/")[1].split("/?")[0]
url="https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids="+id
r = requests.get(url,headers = header)
video=json.loads(r.content)["item_list"][0]
url=video["video"]["play_addr"]["url_list"][0]
url=url.replace("playwm","play")
#开始下载
r = requests.get(url,headers = header)
with open(video["desc"]+".mp4", "wb") as code:
code.write(r.content)
print("下载成功")
input("按下回车退出程序")
当然你要是懒得编译,我这有成品
或者若是对Python不感兴趣,你可以使用我的PHP版解析站
:DylanC的抖音无水印解析站
微信自动推送天气
对,没错。是用了Server酱
地区的代码可以参考:中国天气网
import requests,json
desp=''
sckey='server酱的key'
def gettianqi(city):
global desp
url=('http://t.weather.sojson.com/api/weather/city/%d'%(city))
tianqi_res=requests.get(url=url).text
tianqi_res=json.loads(tianqi_res)
city=tianqi_res['cityInfo']['city']
data=tianqi_res['data']
shidu=data['shidu']
pm25=data['pm25']
pm10=data['pm10']
quality=data['quality']
tianqi=data['forecast']
jintiantianqi=tianqi[0]
jintian_ymd=jintiantianqi['ymd']
jintian_week=jintiantianqi['week']
jintian_high=jintiantianqi['high']
jintian_low=jintiantianqi['low']
jintian_fx=jintiantianqi['fx']
jintian_fl=jintiantianqi['fl']
jintian_type=jintiantianqi['type']
jintian_notice=jintiantianqi['notice']
desp=(
'##'+city+'\n'
'#####'+'-'*10+jintian_ymd+'-'*5+jintian_week+'-'*10+'\n'
'####空气质量:'+str(quality)+'\n'
'#####'+'pm2.5:'+str(pm25)+'\n'
'#####'+'pm10:'+str(pm10)+'\n'
'#### 天气状况:\n'
'#####'+str(jintian_low)+'~'+str(jintian_high)+'\n'
'####'+str(jintian_type)+'\n'
'#####'+str(jintian_notice)+'\n'
'####'+str(jintian_fx)+str(jintian_fl)+'\n')
def upserver():
global desp,sckey
data={
'text':'天气预报',
'desp':desp
}
url=('https://sc.ftqq.com/%s.send'%sckey)
server=requests.post(url=url,data=data)
gettianqi(地区id)
upserver()
微博短链生成
利用微博评论将长地址转为微博短链
将博文下的长地址评论删除
import requests
import urllib.parse
import re
headers = {
'Cookie': 'SUB=你的cookie',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36',
'Referer': 'https://www.weibo.com',
'Content-Type': 'application/x-www-form-urlencoded'
}
def get_short_url(long_url):
url = "https://www.weibo.com/aj/v6/comment/add"
payload = urllib.parse.urlencode({
'mid': '微博mid',
'content': long_url
})
response = requests.post(url, headers=headers, data=payload)
try:
data = response.json()['data']['comment']
short_url = re.search(r'(https?)://t.cn/\w+', data).group(0)
comment_id = re.findall(r'comment_id="(.+\d)"', data)[-1] # 评论id
print('微博短链:' + short_url)
# del_comment(comment_id) # 需要删除评论,可以取消该行注释
except:
pass
# 删除评论
def del_comment(comment_id):
url = 'https://www.weibo.com/aj/comment/del'
payload = urllib.parse.urlencode({
'mid': '微博mid',
'cid': comment_id # 评论id
})
response = requests.post(url, headers=headers, data=payload)
try:
if response.json()['code'] == '100000':
print('评论已删除')
except:
pass
if __name__ == '__main__':
get_short_url(input('请输入长地址:'))
人工语音模拟输出
利用了百度的语音api,可调整多种方言以及音量语速等
import json
import tkinter as tk
from urllib.request import urlopen
from urllib.request import Request
from urllib.error import URLError
from urllib.parse import urlencode
from urllib.parse import quote_plus
from tkinter import ttk
from playsound import playsound
from EP import *
API_KEY = ''# 替换你的 API_KEY
SECRET_KEY = ''# 替换你的 SECRET_KEY
TTS_URL = 'http://tsn.baidu.com/text2audio'
TOKEN_URL = 'http://openapi.baidu.com/oauth/2.0/token'
角色表={'度小宇':1, '度小美': 0, '度逍遥': 3, '度丫丫': 4, '度博文': 106, '度小童':110 , '度小萌':111 , '度米朵':103 , '度小娇':5}
class PyWinDesign:
def __init__(self, 启动窗口):
self.启动窗口 = 启动窗口
self.启动窗口.title('DylanC的语音合成助手')
self.启动窗口.resizable(width=False, height=False)
screenwidth = self.启动窗口.winfo_screenwidth()
screenheight = self.启动窗口.winfo_screenheight()
size = '%dx%d+%d+%d' % (305, 137, (screenwidth - 305) / 2, (screenheight - 137) / 2)
self.启动窗口.geometry(size)
self.标签1_标题 = tk.StringVar()
self.标签1_标题.set('内容')
self.标签1 = tk.Label(self.启动窗口, textvariable=self.标签1_标题, anchor=tk.E)
self.标签1.place(x=12, y=10, width=35, height=24)
self.标签2_标题 = tk.StringVar()
self.标签2_标题.set('语速')
self.标签2 = tk.Label(self.启动窗口, textvariable=self.标签2_标题, anchor=tk.E)
self.标签2.place(x=14, y=70, width=35, height=24)
self.标签3_标题 = tk.StringVar()
self.标签3_标题.set('音调')
self.标签3 = tk.Label(self.启动窗口, textvariable=self.标签3_标题, anchor=tk.E)
self.标签3.place(x=102, y=70, width=35, height=24)
self.标签4_标题 = tk.StringVar()
self.标签4_标题.set('音量')
self.标签4 = tk.Label(self.启动窗口, textvariable=self.标签4_标题, anchor=tk.E)
self.标签4.place(x=192, y=70, width=35, height=24)
self.编辑框1_内容 = tk.StringVar()
self.编辑框1_内容.set('3')
self.编辑框1 = ttk.Entry(self.启动窗口, textvariable=self.编辑框1_内容, justify=tk.LEFT)
self.编辑框1.place(x=51, y=71, width=51, height=22)
self.编辑框2_内容 = tk.StringVar()
self.编辑框2_内容.set('5')
self.编辑框2 = ttk.Entry(self.启动窗口, textvariable=self.编辑框2_内容, justify=tk.LEFT)
self.编辑框2.place(x=139, y=71, width=51, height=22)
self.编辑框3_内容 = tk.StringVar()
self.编辑框3_内容.set('5')
self.编辑框3 = ttk.Entry(self.启动窗口, textvariable=self.编辑框3_内容, justify=tk.LEFT)
self.编辑框3.place(x=231, y=71, width=51, height=22)
self.编辑框4 = tk.Text(self.启动窗口, wrap=tk.NONE)
self.编辑框4.insert(tk.END, '')
self.编辑框4.place(x=51, y=15, width=233, height=43)
self.按钮1_标题 = tk.StringVar()
self.按钮1_标题.set('开始朗读')
self.按钮1 = ttk.Button(self.启动窗口, textvariable=self.按钮1_标题, command=self.按钮1_被鼠标左键单击)
self.按钮1.place(x=175, y=101, width=109, height=25)
self.标签5_标题 = tk.StringVar()
self.标签5_标题.set('角色')
self.标签5 = tk.Label(self.启动窗口, textvariable=self.标签5_标题, anchor=tk.E)
self.标签5.place(x=14, y=101, width=35, height=24)
self.组合框1 = ttk.Combobox(self.启动窗口, values=('度小宇', '度小美', '度逍遥', '度丫丫', '度博文', '度小童', '度小萌', '度米朵', '度小娇'),
state='readonly')
self.组合框1.current(0)
self.组合框1.place(x=51, y=103, width=100, height=20)
def 按钮1_被鼠标左键单击(self):
str = self.编辑框4.get(1.0, tk.END)
列表 = 文件_遍历指定路径文件('./rec/')
if 加密_MD5(str)+'.mp3' in 列表:
playsound('./rec/{}.mp3'.format(加密_MD5(str))) # 播放
return
token = self.fetch_token()
tex = quote_plus(str) # 此处TEXT需要两次urlencode
params = {'tok': token, 'tex': tex, 'cuid': "quickstart",
'lan': 'zh', 'ctp': 1, 'spd': int(self.编辑框1_内容.get()), 'pit': int(self.编辑框2_内容.get()), 'vol': int(self.编辑框3_内容.get()),'per':角色表[self.组合框1.get()]} # lan ctp 固定参数 spd是语速0-15 pit音调 vol音量
data = urlencode(params)
req = Request(TTS_URL, data.encode('utf-8'))
has_error = False
try:
f = urlopen(req)
result_str = f.read()
headers = dict((name.lower(), value) for name, value in f.headers.items())
has_error = ('content-type' not in headers.keys() or headers['content-type'].find('audio/') < 0)
except URLError as err:
print('http response http code : ' + str(err.code))
result_str = err.read()
has_error = True
save_file = "error.txt" if has_error else './rec/{}.mp3'.format(加密_MD5(str))
目录, 文件 = 文件_目录文件名分割(save_file)
if 目录 != '' and os.path.exists(目录) is False:
os.makedirs(目录)
with open(save_file, 'wb') as of:
of.write(result_str)
if has_error:
result_str = str(result_str, 'utf-8')
print("tts api error:" + result_str)
print("file saved as : " + save_file)
playsound('./rec/{}.mp3'.format(加密_MD5(str)))#播放
def fetch_token(self):
params = {'grant_type': 'client_credentials',
'client_id': API_KEY,
'client_secret': SECRET_KEY}
post_data = urlencode(params)
post_data = post_data.encode('utf-8')
req = Request(TOKEN_URL, post_data)
try:
f = urlopen(req, timeout=5)
result_str = f.read()
except URLError as err:
print('token http response http code : ' + str(err.code))
result_str = err.read()
result_str = result_str.decode()
result = json.loads(result_str)
if ('access_token' in result.keys() and 'scope' in result.keys()):
if not 'audio_tts_post' in result['scope'].split(' '):
print('please ensure has check the tts ability')
exit()
return result['access_token']
else:
print('please overwrite the correct API_KEY and SECRET_KEY')
exit()
if __name__ == '__main__':
root = tk.Tk()
app = PyWinDesign(root)
root.mainloop()
当然也可以直接用成品
QQ推送每日天气
懒得自己搭bot就找了几个接口,用的是CP酷推。
#coding=utf-8
import requests, json
import time
spkey = '去酷推官网获取的个人key'
def get_iciba_everyday():
icbapi = 'http://open.iciba.com/dsapi/'
eed = requests.get(icbapi)
bee = eed.json()
english = eed.json()['content']
zh_CN = eed.json()['note']
str = '\n【WE的英语角】\n' + english + '\n' + zh_CN
return str
print(get_iciba_everyday())
def main(*args):
api = 'http://t.weather.sojson.com/api/weather/city/'
city_code = '101180701'
tqurl = api + city_code
response = requests.get(tqurl)
d = response.json()
if (d['status'] == 200):
print("城市:", d["cityInfo"]["parent"], d["cityInfo"]["city"])
print("更新时间:", d["time"])
print("日期:", d["data"]["forecast"][0]["ymd"])
print("星期:", d["data"]["forecast"][0]["week"])
print("天气:", d["data"]["forecast"][0]["type"])
print("温度:", d["data"]["forecast"][0]["high"],
d["data"]["forecast"][0]["low"])
print("湿度:", d["data"]["shidu"])
print("PM25:", d["data"]["pm25"])
print("PM10:", d["data"]["pm10"])
print("空气质量:", d["data"]["quality"])
print("风力风向:", d["data"]["forecast"][0]["fx"],
d["data"]["forecast"][0]["fl"])
print("感冒指数:", d["data"]["ganmao"])
print("温馨提示:", d["data"]["forecast"][0]["notice"], "。")
cpurl = 'https://push.xuthus.cc/group/' + spkey
tdwt = '【WE的天气推送】\n城市:' + d['cityInfo']['parent'] + ' ' + d[
'cityInfo']['city'] + '\n日期:' + d["data"]["forecast"][0][
"ymd"] + ' ' + d["data"]["forecast"][0]["week"] + '\n天气:' + d[
"data"]["forecast"][0]["type"] + '\n温度:' + d["data"][
"forecast"][0]["high"] + ' ' + d["data"]["forecast"][0][
"low"] + '\n湿度:' + d["data"]["shidu"] + '\n空气质量:' + d[
"data"]["quality"] + '\n风力风向:' + d["data"][
"forecast"][0]["fx"] + ' ' + d["data"][
"forecast"][0]["fl"] + '\n温馨提示:' + d[
"data"]["forecast"][0][
"notice"] + '。\n[更新时间:' + d[
"time"] + ']' + get_iciba_everyday(
)
requests.post(cpurl, tdwt.encode('utf-8'))
else:
error = '【出现错误】\n 今日天气推送错误,请检查服务状态!'
requests.post(cpurl, error.encode('utf-8'))
def main_handler(event, context):
return main()
if __name__ == '__main__':
main()