用百度智能云批量识别微信语音
目录
如果你没有在 cnblog 作者为 carr0t2 中看到此文章,建议访问 原网页以获取更好的排版,图片体验
本人现在不确定是否要继续开发图形化界面,更多的支持,如果确实有需要,请在下方留言。
准备工具及环境
- Python3.7
- silk-v3-decoder https://github.com/kn007/silk-v3-decoder
- 百度智能云账号(用百度账号就行),申请 API Key 和 Secret Key
- 百度短语音识别API Demo 基于官方Demo代码修改 https://github.com/Baidu-AIP/speech-demo/tree/master/rest-api-asr/python
- 本文环境为Windows python3.7
大致思路
-
手机微信找到语音文件保存的位置,导出
-
用silk-v3-decoder将录音转换为wav格式
-
ffmpeg将wav转成pcm,采样频率16000
-
用python识别
-
仅个人处理方法,有问题欢迎指出
具体操作
导出微信语音文件
-
手机微信语音文件一般保存在
内部存储\tencent\MicroMsg\****************************\voice2
星号里是一个很长的包含数字字母的字符串
里面包括这许多这样的文件夹
-
全部复制粘贴到并提取出音频文件
Windows下搜索
.amr
全选复制粘贴到一个新文件夹
-
这些就是录音文件,但是格式比较奇怪,需要处理成常规格式
处理导出语音文件
重命名文件
- 因为要保持相对顺序,而直接进行转换会导致文件修改时间变化,于是无法恢复正常语音顺序
- 用python,提取文件修改时间并重命名
import os
import time
path='.\\lecture'
dirs = os.listdir(path)
for file in dirs:
finfo = os.stat(path+'\\'+file)
timeArray = time.localtime(finfo.st_mtime)
nametime = time.strftime("%Y_%m_%d_%H_%M_%S", timeArray)
os.rename(path+'\\'+file,path+'\\'+nametime+'.amr')
print(nametime)
转换为pcm格式
- python通过命令行调用
silk_v3_decoder.exe
解码,具体命令写在下面 -
pcm
文件好像无法直接播放,Audacity是可以的
修改Demo代码
-
先copy代码
https://github.com/Baidu-AIP/speech-demo/tree/master/rest-api-asr/python
个人觉得json和raw在小规模使用没啥区别
-
填 API Key 和 Secret Key
-
写python,个人只稍微改了一部分,全部代码贴在github
前面的修改
-
silk_v3_decoder.exe
转格式为16k pcm
FORMAT = 'pcm'
pathamr=r'.\amr'
pathpcm=r'.\pcm'
dirs = os.listdir(pathamr)
#dirs.remove('desktop.ini')### Windows可能会有这个文件
for file in dirs:
time.sleep(0.3)
name=file[:-3]
commandstring= ' silk_v3_decoder.exe ' + str(pathamr) + '\\' + name + 'amr ' + str(pathpcm) +'\\'+ str(name) + 'pcm' +' -Fs_API 16000 '
os.system(commandstring)
AUDIO_FILE =str(pathpcm)+'\\'+ str(name) + 'pcm'
后面的修改
- 使输出为追加,并且增加时间字段,后续处理还没有做,所以导出的文件还是json
with open("result.txt","a") as of:
result_dict=eval(result_str)
result_dict["time"]=name
of.write(str(result_dict)+'\n')
后记
刚学python,随便写写,欢迎指出错误- 文件后续处理还没有做好,想做成输出是前面一行时间,后面一行识别内容,如果有识别偏差较大,方便找到位置重新听
- 没有做到全程自动化,还是要手动处理内容的。
- 没用到百度的语音自训练平台
代码(仅供参考)
import sys
import json
import base64
import time
import os
import subprocess
IS_PY3 = sys.version_info.major == 3
if IS_PY3:
from urllib.request import urlopen
from urllib.request import Request
from urllib.error import URLError
from urllib.parse import urlencode
timer = time.perf_counter
else:
from urllib2 import urlopen
from urllib2 import Request
from urllib2 import URLError
from urllib import urlencode
if sys.platform == "win32":
timer = time.clock
else:
# On most other platforms the best timer is time.time()
timer = time.time
API_KEY = '****************'### 填入自己的
SECRET_KEY = '*****************'
# 需要识别的文件
# 文件格式
FORMAT = 'pcm' # 文件后缀只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式
###这里为了方便直接限制死
CUID = '****************'
# 采样率
RATE = 16000 # 固定值
DEV_PID = 1537 # 1537 表示识别普通话,使用输入法模型。根据文档填写PID,选择语言及识别模型
ASR_URL = 'http://vop.baidu.com/server_api'
SCOPE = 'audio_voice_assistant_get' # 有此scope表示有asr能力,没有请在网页里勾选,非常旧的应用可能没有
class DemoError(Exception):
pass
""" TOKEN start """
TOKEN_URL = 'http://openapi.baidu.com/oauth/2.0/token'
def fetch_token():
params = {'grant_type': 'client_credentials',
'client_id': API_KEY,
'client_secret': SECRET_KEY}
post_data = urlencode(params)
if (IS_PY3):
post_data = post_data.encode( 'utf-8')
req = Request(TOKEN_URL, post_data)
try:
f = urlopen(req)
result_str = f.read()
except URLError as err:
print('token http response http code : ' + str(err.code))
result_str = err.read()
if (IS_PY3):
result_str = result_str.decode()
print(result_str)
result = json.loads(result_str)
print(result)
if ('access_token' in result.keys() and 'scope' in result.keys()):
print(SCOPE)
if SCOPE and (not SCOPE in result['scope'].split(' ')): # SCOPE = False 忽略检查
raise DemoError('scope is not correct')
print('SUCCESS WITH TOKEN: %s EXPIRES IN SECONDS: %s' % (result['access_token'], result['expires_in']))
return result['access_token']
else:
raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')
""" TOKEN end """
if __name__ == '__main__':
token = fetch_token()
pathamr=r'.\amr'
pathpcm=r'.\pcm'
dirs = os.listdir(pathamr)
#dirs.remove('desktop.ini')### Windows可能会有这个文件
for file in dirs:
time.sleep(0.2)
name=file[:-3]
commandstring= ' silk_v3_decoder.exe ' + str(pathamr) + '\\' + name + 'amr ' + str(pathpcm) +'\\'+ str(name) + 'pcm' +' -Fs_API 16000 '
os.system(commandstring)
######下面没怎么动过了
AUDIO_FILE =str(pathpcm)+'\\'+ str(name) + 'pcm'
speech_data = []
with open(AUDIO_FILE, 'rb') as speech_file:
speech_data = speech_file.read()
length = len(speech_data)
if length == 0:
raise DemoError('file %s length read 0 bytes' % AUDIO_FILE)
speech = base64.b64encode(speech_data)
if (IS_PY3):
speech = str(speech, 'utf-8')
params = {'dev_pid': DEV_PID,
#"lm_id" : LM_ID, #测试自训练平台开启此项
'format': FORMAT,
'rate': RATE,
'token': token,
'cuid': CUID,
'channel': 1,
'speech': speech,
'len': length
}
post_data = json.dumps(params, sort_keys=False)
# print post_data
req = Request(ASR_URL, post_data.encode('utf-8'))
req.add_header('Content-Type', 'application/json')
try:
begin = timer()
f = urlopen(req)
result_str = f.read()
print ("Request time cost %f" % (timer() - begin))
except URLError as err:
print('asr http response http code : ' + str(err.code))
result_str = err.read()
if (IS_PY3):
result_str = str(result_str, 'utf-8')
print(result_str)
with open("result.txt","a") as of:
result_dict=eval(result_str)
#result_dict["time"]=name
#of.write(str(result_dict)+'\n')
of.write('{'+name+'}'+'\n')
try:
of.write(str(result_dict["result"])[2:-2]+'\n\n')
except:
of.write('Error'+'\n')