添加信号异常处理
void handle_pipe(int sig)
{
printf( "[%s:%d] enter into handle_pipe.", __FILE__, __LINE__);
}
定义常用变量
#ifndef _SERVER_H
#define _SERVER_H
#define GRS_RESULT_STR_MAX_LEN (4096)
#define TD_CMD_START 1
#define TD_CMD_STOP 2
#define TD_CMD_FINISH 3
#define TD_CMD_DATA 4
#define DECODER_SUCC 0
#define DECODER_FAIL -1
#define DECODER_TOOSHORT -2
#define DECODER_TOOLONG -3
#define DECODER_LOWSCORE -4
#define MAX_PROVIDER_LEN 16
#define SID_LENGTH 32+1
#include <queue>
using namespace std;
typedef struct _asr_head_t
{
union {
short cmd; //deocder对应的命令
short err_no; //decoder返回的处理结果状态
};
unsigned int idx; // 传送的语音包的编号
unsigned short pid; // 产品id
unsigned short time_out; //解码超时时间,单位ms,最大值50秒
unsigned short version; // 版本号
unsigned int time_stamp; // 时间戳
unsigned int log_id; //一般为web server产生
char provider[MAX_PROVIDER_LEN]; //发送模块的名称
int res_num; // 识别结果的数量
unsigned int body_len; //databuf的长度
char sid[SID_LENGTH];
} asr_head_t;
#define MAX_RES_LEN 256
#define MAX_LARGE_RST 2048 //256//4096 //2048//lenovo needs 256
#define MAX_PATH_SIZE 1024//512
typedef struct _rec_t
{
char res[MAX_LARGE_RST]; // 识别的文字结果
char phone[MAX_LARGE_RST]; // phoneme string
char segTime[MAX_LARGE_RST]; // segTime string
char confident[MAX_LARGE_RST]; // 置信度
int reserved1; // 保留,作为扩展用
int reserved2; // 保留,作为扩展用
float data_time_stamp; //
float end_time_stamp; //
_rec_t() : data_time_stamp(0), end_time_stamp(0)
{
memset(res, 0x00, MAX_LARGE_RST);
memset(phone, 0x00, MAX_LARGE_RST);
memset(segTime, 0x00, MAX_LARGE_RST);
memset(confident, 0x00, MAX_LARGE_RST);
}
}rec_t;
//Decoder到ts的头中,err_no表示识别结果的状态,有两种情况,识别成功或者识别失败。
typedef struct _asr_resp_t
{
asr_head_t header; // 请求头
rec_t *rec_array; // 识别结果数组
}asr_resp_t;
typedef struct _asr_param_t
{
int sessId;
int sockHandle;
int port;
char hostIp[32];
bool isFirstRecog;
rec_t *rec_array;
queue<rec_t* > mRecArray;
int res_num;
char *gmr_buf;
int gmr_len;
}asr_param_t;
typedef struct _asr_grammar_t
{
char gramName[MAX_PATH_SIZE];
int gramId;
}asr_grammar_t;
enum ResultStatus{
DATA_IS_EMPTY = -4,
DATA_TOO_LONG = -3,
DATA_TOO_SHORT = -2,
DATA_REG_ABORT = -1,
DATA_IS_IDLE = 0
};
int WriteRecLog(u_int logID, char * path, char * wavName, char * result);
int CloseRecLog();
int GetIpTail(char * in, char * out);
bool CheckDCdata(char * buf, int len);
void GetLocalTime(char *locTime);
#endif // _SERVER_H
static void *testProcThread(void *parameter)
{
int sessId=((asr_param_t *)parameter)->sessId;
int sockHandle = tParam[sessId].sockHandle;
struct sockaddr_in addrClient;
int len = sizeof(sockaddr);
((asr_param_t *)parameter)->isFirstRecog = true;
printf( "[SessionId:%d] enter the decoder thread.\n", sessId);
do
{
pthread_mutex_lock(&mutexaccept);
LOG_NOTICE( "[SessionId:%d] before accept: socketCli[sessId]:%d,socksvr:%d\n", sessId, socketCli[sessId], sockHandle);
socketCli[sessId] = accept(sockHandle,(sockaddr *) &addrClient, (socklen_t *)&len);
LOG_NOTICE( "[SessionId:%d] after accept: socketCli[sessId]:%d is connected,socksvr is:%d.[client] sessId:%d, hostIp:%s, port:%d\n",
sessId,socketCli[sessId], sockHandle, sessId,tParam[sessId].hostIp,tParam[sessId].port);
tParam[sessId].port = addrClient.sin_port;
strcpy(tParam[sessId].hostIp,inet_ntoa(addrClient.sin_addr));
if(hostIpTail[0] == '\0' || 5 <= strlen(hostIpTail))
{
/*if(GetIpTail(hostIpTail, tParam[handleId[sessId]].hostIp)<0)
{
strcpy(hostIpTail, "0");
}*/
}
if(errno == EINTR)
{
LOG_WARN("socket accept EINTR\n");
pthread_mutex_unlock(&mutexaccept);
continue;
}
if(socketCli[sessId] == -1)
{
LOG_WARN("socket accept fail\n");
pthread_mutex_unlock(&mutexaccept);
continue;
}
pthread_mutex_unlock(&mutexaccept);
asr_head_t inviteHead;
int readSize =0;
int timeout =40;
char name[16] = "";
int opt_BufferSize = 2048;
int readLen = 0;
int recBytes = 0;
char *recBuf = new char[opt_BufferSize];
while(1)
{
readSize = 0;
memset(&inviteHead, 0, sizeof(asr_head_t));
timeout = 3000000;
readSize = MyReadO(socketCli[sessId], &inviteHead, sizeof(asr_head_t), &timeout, name);
LOG_INFO( "[SessionId:%d] receive head package,%d bytes, cmd = %d,idx = %d,res_num:%d,body_len:%d\n",
sessId, readSize, inviteHead.cmd, inviteHead.idx, inviteHead.res_num, inviteHead.body_len);
if(inviteHead.body_len >= 2048 && inviteHead.cmd != 5)
{
LOG_ERROR( "[SessionId:%d] data is too long. Buffer size:%d, inviteHead.body_len:%d.\n", sessId, opt_BufferSize, inviteHead.body_len);
inviteHead.body_len = opt_BufferSize - 1;
}
if(readSize == sizeof(asr_head_t))
{
//start cmd
if(inviteHead.cmd == 1)
{
LOG_ERROR("[SessionId:%d] cmd =1 new session start.\n", sessId);
if(inviteHead.body_len > 0 && inviteHead.body_len < opt_BufferSize){
//the loop of file name receiving
memset(recBuf, 0, opt_BufferSize);
readLen = MyReadO(socketCli[sessId], recBuf, inviteHead.body_len, &timeout, name);
if(readLen != inviteHead.body_len)
{
LOG_ERROR("cmd=1[Id:%d] receive data error,readLen:%d,bodyLen:%d.", sessId , readLen, inviteHead.body_len);
}
else
{
recBuf[readLen+1] = '\0';
LOG_INFO( "[Id:%d] receive the first data,readLen:%d.", sessId, readLen);
if( readLen == inviteHead.body_len)
{
//strcpy(audiokeeperName, recBuf);//the loop of file name receiving
}
readLen = 0;
}
}
int sendSize = 0;
memset(&inviteHead, 0, sizeof(asr_head_t));
inviteHead.err_no = 9;
inviteHead.pid = sessId + 1;//sessid starts from 0, pid from 1;
sendSize = MyWriteO(socketCli[sessId], &inviteHead, sizeof(asr_head_t), &timeout, name);
if(sendSize != sizeof(asr_head_t)){
LOG_ERROR("cmd=1[SessionId:%d] sid=%s send ACK failed.", sessId,inviteHead.sid);
}
//isStart = true;
//sendLen = decodeLen = readLen = dataSize = 0;
//isCheckHeader = 0;
}
//stop cmd
else if(inviteHead.cmd == 2)
{
LOG_ERROR("[SessionId:%d]sid=%s stop command,type:%d.\n", sessId,inviteHead.sid, inviteHead.cmd);
break;
}
//stream packet cmd
else if(inviteHead.cmd == 4)
{
LOG_ERROR("[SessionId:%d] stream packet command,type:%d.\n", sessId,inviteHead.cmd);
//stream packet process
if(inviteHead.cmd == 4)
{
//loop of packet receiving
char sid[1024];//接收auk的sid
while(1)
{
memcpy(sid, inviteHead.sid, SID_LENGTH);
memset(recBuf, 0, opt_BufferSize);
readLen = MyReadO(socketCli[sessId], recBuf, inviteHead.body_len, &timeout, name);
if(readLen != inviteHead.body_len)
{
LOG_ERROR("cmd=4[SessionId:%d] not get the valid data,readLen:%d,bodyLen:%d,sid=%s\n",
sessId,readLen,inviteHead.body_len,sid);
}
recBytes += readLen;
LOG_INFO( "[SessionId:%d] receive the data[%s],readLen=%d,total=%d,sid=%s\n",sessId,recBuf, readLen, recBytes,sid);
//LOG_INFO( "[SessionId:%d] receive the data,=%s\n\n",sessId,recBuf);
break;//finish the loop of stream packet receiving
}
}
}
//finish cmd
else if(inviteHead.cmd == 3)
{
LOG_ERROR("[SessionId:%d] finish command 3,type:%d.\n\n", sessId, inviteHead.cmd);
//break;//完成一路会话,逻辑上由客户端来断开连接
}
//err control
else
{
LOG_ERROR("[SessionId:%d] unknown command,type:%d.\n", sessId, inviteHead.cmd);
break;
}
}
else if(readSize != sizeof(asr_head_t) && (inviteHead.cmd == 1 ||inviteHead.cmd == 4|| inviteHead.cmd == 3))//读数据超时
{
LOG_ERROR("[SessionId:%d] sid=%d,not get correct head,readSize:%d.\n",sessId, inviteHead.sid, readSize);
break;
}
else
{
LOG_ERROR("[SessionId:%d] sid=%s,unknown error,readSize:%d.\n", sessId,inviteHead.sid, readSize);
//isAbort = true;
break;
}
if( inviteHead.cmd == 3 )
{
asr_head_t inviteHead;
memset(&inviteHead, 0, sizeof(asr_head_t));
inviteHead.cmd = 0;//标识返回结果包数据
inviteHead.res_num = tParam[sessId].res_num;
strcpy(inviteHead.provider, "TESR");//Audiokeeper will detect this provider for xml directed return.
char *sendResultBuff = new char[opt_BufferSize];
strcpy( sendResultBuff,
"<\?xml version=\"1.0\" encoding=\"gb2312\"\?>"
"<results><errors>"
"<code>0</code><desc>Success Reco</desc>"
"</errors>");
char pcmBuf[1024] = "";
sprintf(pcmBuf,
"<result><no>%d</no>"
"<name>%s</name>"
"<confidence>%s</confidence></result></results>",
1, "车洋",
"testMain");
AddStr(sendResultBuff, pcmBuf, opt_BufferSize);
inviteHead.body_len = strlen(sendResultBuff);
//memcpy(inviteHead.sid, sid, SID_LENGTH);
int sendSize = 0;
sendSize = MyWriteO(socketCli[sessId], &inviteHead, sizeof(asr_head_t), &timeout, name);
if(sendSize == sizeof(asr_head_t))
{
//LOG_NOTICE("cmd=3 result==%s\n",sendResultBuff);
//timeout = opt_NetTimeout;
sendSize = MyWriteO(socketCli[sessId], sendResultBuff, inviteHead.body_len, &timeout, name);
if(sendSize != inviteHead.body_len)
{
LOG_WARN("[SessionId:%d] send result body error,sendSize:%d,inviteHead.body_len:%d\n",
sessId, sendSize, inviteHead.body_len);
}
}
else{
LOG_WARN("cmd=3 send result head fail");
}
}
}
MyClose(socketCli[sessId]);
socketCli[sessId] = -1;
LOG_INFO( "[SessionId:%d] Socket connection closed.\n\n", sessId);
}while(1);
printf( "[SessionId:%d] exit the decoder thread.\n", sessId);
}
在程序中创建线程后,静态函数为主逻辑,多个线程共享主线程socketfd,并加锁抢占锁资源,从而获取客户端连接资源
yangzai77 发布了9 篇原创文章 · 获赞 4 · 访问量 1万+ 私信 关注