Python3使⽤科⼤讯飞API接⼝实现⾳频⽂件转写
注意事项
操作系统:Windows
Python:3.6
可⽤时长: 免费⽤户时长5⼩时,且⽤且珍惜。
⾳频属性: 采样率16k或8k、位长8bits或16bits、单声道&多声道
⾳频格式: wav/flac/opus/m4a/mp3
⾳频⼤⼩: 不超过500M
⾳频时长: 不超过5⼩时,建议5分钟以上
语⾔种类: 中⽂普通话、英⽂
转写结果保存时长 30天。(同⼀通录⾳不需要重新上传识别,如果你已经上传识别过了,之后只需要使
⽤_result_request(taskid)的⽅式即可再次获取识别结果,taskid是你第⼀次上传录⾳时给你分配的任务ID,避免重复上传浪费可⽤时长)
APP_ID, SECRET_KEY的获取
讯飞的好像不需要API_KEY,开放授权的⽅式和其他⼤⼚的类似:
1、页⾯右上⽅“控制台”点击进⼊,登录讯飞账号(没有就注册⼀个),进⼊讯飞开放平台。
任天堂switch使用教程
2、左侧导航栏上⽅,依次选择 语⾳识别->语⾳转写->离线语⾳转写识别。
3、服务申请。点击“创建应⽤”,“接⼝选择”已默认勾选完成,如⽆其他需求,⽆需勾选,完成其他资料后,点击最下⽅“⽴即创
建”按钮。⾃⼰可以⼿动领取5⼩时免费试⽤体验包。
4、应⽤成功则页⾯显⽰“创建完毕”,点击”返回应⽤列表”, 查看新创建应⽤详情,在服务接⼝认证信息窗⼝就可以看到返回的AppID,SecretKey。
话不多说,直接上代码了
# -*- coding: utf-8 -*-
#
#  author: yanmeng2
#
# ⾮实时转写调⽤demo
import base64
import hashlib
import hmac
import json
import os
import time
import requests
lfasr_host ='raasr.xfyun/api'
# 请求的接⼝名
api_prepare ='/prepare'
api_upload ='/upload'
api_merge ='/merge'
api_get_progress ='/getProgress'
api_get_result ='/getResult'
# ⽂件分⽚⼤⼩10M
file_piece_sice =10485760
# ——————————————————转写可配置参数————————————————
# 参数可在官⽹界⾯(doc.xfyun/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可⾃⾏在gene_params⽅法⾥添加修改
# 转写类型
lfasr_type =0
# 是否开启分词
has_participle ='false'
has_seperate ='true'
# 多候选词个数
# 多候选词个数
max_alternatives =0
# ⼦⽤户标识
suid =''
class SliceIdGenerator:
"""slice id⽣成器"""
def__init__(self):
self.__ch ='aaaaaaaaa`'
def getNextSliceId(self):
ch = self.__ch
j =len(ch)-1
while j >=0:
cj = ch[j]
if cj !='z':
ch = ch[:j]+chr(ord(cj)+1)+ ch[j +1:]
break
else:
ch = ch[:j]+'a'+ ch[j +1:]
j = j -1
self.__ch = ch
return self.__ch
class RequestApi(object):
def__init__(self, appid, secret_key, upload_file_path):
self.appid = appid
self.secret_key = secret_key
self.upload_file_path = upload_file_path
# 根据不同的apiname⽣成不同的参数,本⽰例中未使⽤全部参数您可在官⽹(doc.xfyun/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5% 86%99.html)查看后选择适合业务场景的进⾏更换
def gene_params(self, apiname, taskid=None, slice_id=None):
appid = self.appid
secret_key = self.secret_key
upload_file_path = self.upload_file_path
ts =str(int(time.time()))
m2 = hashlib.md5()
m2.update((appid + ts).encode('utf-8'))
md5 = m2.hexdigest()
md5 =bytes(md5, encoding='utf-8')
# 以secret_key为key, 上⾯的md5为msg,使⽤hashlib.sha1加密结果为signa
signa = w(de('utf-8'), md5, hashlib.sha1).digest()
signa = base64.b64encode(signa)
signa =str(signa,'utf-8')
file_len = size(upload_file_path)
file_name = os.path.basename(upload_file_path)
param_dict ={}
dedev6if apiname == api_prepare:
# slice_num是指分⽚数量,如果您使⽤的⾳频都是较短⾳频也可以不分⽚,直接将slice_num指定为1即可
slice_num =int(file_len / file_piece_sice)+(0if(file_len % file_piece_sice ==0)else1)
param_dict['app_id']= appid
param_dict['signa']= signa
param_dict['ts']= ts
param_dict['file_len']=str(file_len)
param_dict['file_name']= file_name
param_dict['slice_num']=str(slice_num)
elif apiname == api_upload:
param_dict['app_id']= appid
param_dict['signa']= signa
param_dict['ts']= ts
param_dict['task_id']= taskid
param_dict['task_id']= taskid
param_dict['slice_id']= slice_id嗜肉菌别称
elif apiname == api_merge:
param_dict['app_id']= appid
param_dict['signa']= signa
param_dict['ts']= ts
param_dict['task_id']= taskid
param_dict['file_name']= file_name
elif apiname == api_get_progress or apiname == api_get_result:
param_dict['app_id']= appid
param_dict['signa']= signa
param_dict['ts']= ts
param_dict['task_id']= taskid
return param_dict
# 请求和结果解析,结果中各个字段的含义可参考:doc.xfyun/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html def gene_request(self, apiname, data, files=None, headers=None):
response = requests.post(lfasr_host + apiname, data=data, files=files, headers=headers)
result = json.)
if result["ok"]==0:
print("{} success:".format(apiname)+str(result))
return result
else:
print("{} error:".format(apiname)+str(result))
exit(0)
return result
# 预处理
def prepare_request(self):
_request(apiname=api_prepare,
_params(api_prepare))
# 上传
def upload_request(self, taskid, upload_file_path):
file_object =open(upload_file_path,'rb')
try:
index =1
sig = SliceIdGenerator()
while True:
content = ad(file_piece_sice)
if not content or len(content)==0:
break
files ={
"filename": _params(api_upload).get("slice_id"),
"content": content
}
response = _request(api_upload,
_params(api_upload, taskid=taskid,
slice_NextSliceId()),
files=files)
('ok')!=0:
# 上传分⽚失败
print('upload slice fail, response: '+str(response))
return False
print('upload slice '+str(index)+' success')
index +=1
finally:
'file index:'+str(ll())
file_object.close()
return True
# 合并
python解析json文件def merge_request(self, taskid):
_request(api_merge, _params(api_merge, taskid=taskid))
# 获取进度
# 获取进度
def get_progress_request(self, taskid):
_request(api_get_progress, _params(api_get_progress, taskid=taskid))
# 获取结果
def get_result_request(self, taskid):
_request(api_get_result, _params(api_get_result, taskid=taskid))
def all_api_request(self):
# 1. 预处理
pre_result = self.prepare_request()
taskid = pre_result["data"]
# 2 . 分⽚上传
self.upload_request(taskid=taskid, upload_file_path=self.upload_file_path)
# 3 . ⽂件合并
自制数据库管理系统
<_request(taskid=taskid)
# 4 . 获取任务进度
while True:
# 每隔20秒获取⼀次任务进度
progress = _progress_request(taskid)
progress_dic = progress
if progress_dic['err_no']!=0and progress_dic['err_no']!=26605:
print('task error: '+ progress_dic['failed'])
return
else:
data = progress_dic['data']
task_status = json.loads(data)
if task_status['status']==9:
print('task '+ taskid +' finished')
break
print('The task '+ taskid +' is in processing, task status: '+str(data))
# 每次获取进度间隔20S
time.sleep(20)
# 5 . 获取结果
<_result_request(taskid=taskid)
# 注意:如果出现requests模块报错:"NoneType" object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0) # 输⼊讯飞开放平台的appid,secret_key和待转写的⽂件路径
if __name__ =='__main__':
APP_ID ="***"
SECRET_KEY ="****"
table array
file_path = r"***.wav"
api = RequestApi(appid=APP_ID, secret_key=SECRET_KEY, upload_file_path=file_path)
api.all_api_request()
当然,你可以根据⾃⼰的需求对demo进⾏改进,⽐如你想并发识别录⾳,你可以添加多线程执⾏的函数,为了获取taskid⽅便,我在class 的初始化⾥边添加了self.taskid = “None”,并在预处理结果返回之后重新对taskid赋值。
def thread_func(wav_file_path, txt_file_path):# 线程函数,⽅便并发识别录⾳
doc =open(txt_file_path,'w', encoding='utf-8')
# doc.close()
api = RequestApi(appid=APP_ID, secret_key=SECRET_KEY, upload_file_path=wav_file_path)
api.all_api_request()# demo中这个函数是完整过程执⾏,但我把提取结果的模块提出来了
print('taskid is: '+ api.taskid,file=doc)
result = _result_request(api.taskid)
result =eval(result['data'])
# print(result)
for x in result:
print(x)
print(x,file=doc)
doc.close()
#主函数写成类似这种
if __name__ =='__main__':
file_list =[
"o2019082112552587460156",
"o2019082115552587460127"
]
APP_ID ="***"
SECRET_KEY ="***"
file_read_path = r"D:\MyProject\Python\Voice_SDK\20190820\\"
file_save_path = r"D:\MyProject\Python\Voice_SDK\20190820_xunfei\\"
for file in file_list:#多并发批量执⾏
wav_file_path = file_read_path +file+".wav"
txt_file_path = file_save_path +file+".txt"
t = threading.Thread(target=thread_func, args=(wav_file_path, txt_file_path))
t.start()