静态函数调用线程并发

添加信号异常处理

void   handle_pipe(int sig)   
{   
    printf( "[%s:%d] enter into handle_pipe.", __FILE__, __LINE__);

定义常用变量

#ifndef _SERVER_H
#define _SERVER_H

#define GRS_RESULT_STR_MAX_LEN    (4096)

#define TD_CMD_START 1                                                            
#define TD_CMD_STOP 2                                                            
#define TD_CMD_FINISH 3
#define TD_CMD_DATA 4                                                          

#define DECODER_SUCC 0                                                           
#define DECODER_FAIL -1                                                          
#define DECODER_TOOSHORT -2                                                      
#define DECODER_TOOLONG -3                                                       
#define DECODER_LOWSCORE -4


#define MAX_PROVIDER_LEN 16 
#define SID_LENGTH 32+1
 
#include <queue> 
using namespace std;

typedef struct _asr_head_t                                                       
{                                                                                
    union {
        short       cmd;                        //deocder对应的命令
        short       err_no;                     //decoder返回的处理结果状态
    };
    unsigned int    idx;                        // 传送的语音包的编号
    unsigned short  pid;                        // 产品id
    unsigned short  time_out;                   //解码超时时间,单位ms,最大值50秒
    unsigned short  version;                    // 版本号
    unsigned int    time_stamp;                 // 时间戳
    unsigned int    log_id;                     //一般为web server产生
    char            provider[MAX_PROVIDER_LEN]; //发送模块的名称
    int             res_num;                    // 识别结果的数量
    unsigned int    body_len;                   //databuf的长度
    char            sid[SID_LENGTH];
} asr_head_t;


#define MAX_RES_LEN             256                                              
#define MAX_LARGE_RST            2048            //256//4096 //2048//lenovo needs 256
#define MAX_PATH_SIZE    1024//512
                                                
typedef struct _rec_t
{                                                                                
    char                res[MAX_LARGE_RST];       // 识别的文字结果
    char                 phone[MAX_LARGE_RST];     // phoneme string
    char                 segTime[MAX_LARGE_RST];   // segTime string
    char                 confident[MAX_LARGE_RST]; // 置信度
    int                 reserved1;                // 保留,作为扩展用
    int                 reserved2;                // 保留,作为扩展用
    float               data_time_stamp;          //
    float               end_time_stamp;           //

    _rec_t() : data_time_stamp(0), end_time_stamp(0)
    {
        memset(res, 0x00, MAX_LARGE_RST);
        memset(phone, 0x00, MAX_LARGE_RST);
        memset(segTime, 0x00, MAX_LARGE_RST);
        memset(confident, 0x00, MAX_LARGE_RST);
    }
}rec_t;

//Decoder到ts的头中,err_no表示识别结果的状态,有两种情况,识别成功或者识别失败。
typedef struct _asr_resp_t                                                       
{                                                                                
    asr_head_t      header;                      // 请求头
    rec_t           *rec_array;     // 识别结果数组
 }asr_resp_t;                 


typedef struct _asr_param_t
{
    int            sessId;
    int            sockHandle;
    int            port;
    char           hostIp[32];
    bool           isFirstRecog;
    rec_t          *rec_array;
    queue<rec_t* > mRecArray;
    int            res_num;
    char           *gmr_buf;
    int            gmr_len;
}asr_param_t;

typedef struct _asr_grammar_t
{
    char gramName[MAX_PATH_SIZE];
    int  gramId;
}asr_grammar_t;

enum ResultStatus{
    DATA_IS_EMPTY = -4,
    DATA_TOO_LONG = -3,
    DATA_TOO_SHORT = -2,
    DATA_REG_ABORT = -1,
    DATA_IS_IDLE = 0
};

int WriteRecLog(u_int logID, char * path, char * wavName, char * result);
int CloseRecLog();
int GetIpTail(char * in, char * out);
bool CheckDCdata(char * buf, int len);
void GetLocalTime(char *locTime);

#endif // _SERVER_H

static void *testProcThread(void *parameter)
{
	int sessId=((asr_param_t *)parameter)->sessId;
	
	int sockHandle = tParam[sessId].sockHandle;
    struct sockaddr_in addrClient;
    int len = sizeof(sockaddr);
    ((asr_param_t *)parameter)->isFirstRecog = true;
	printf( "[SessionId:%d] enter the decoder thread.\n", sessId);
	do
    {
		pthread_mutex_lock(&mutexaccept);
        LOG_NOTICE( "[SessionId:%d] before accept: socketCli[sessId]:%d,socksvr:%d\n", sessId, socketCli[sessId], sockHandle);
        socketCli[sessId] = accept(sockHandle,(sockaddr *) &addrClient, (socklen_t *)&len);
        LOG_NOTICE( "[SessionId:%d] after accept: socketCli[sessId]:%d is connected,socksvr is:%d.[client] sessId:%d, hostIp:%s, port:%d\n",
            sessId,socketCli[sessId], sockHandle, sessId,tParam[sessId].hostIp,tParam[sessId].port);
        tParam[sessId].port = addrClient.sin_port;
        strcpy(tParam[sessId].hostIp,inet_ntoa(addrClient.sin_addr));
        if(hostIpTail[0] == '\0' || 5 <= strlen(hostIpTail))
        {
            /*if(GetIpTail(hostIpTail, tParam[handleId[sessId]].hostIp)<0)
            {
                strcpy(hostIpTail, "0");
            }*/
        }

        if(errno == EINTR)
        {
			LOG_WARN("socket accept EINTR\n");
            pthread_mutex_unlock(&mutexaccept);
            continue;
        }
        if(socketCli[sessId] == -1)
        {
            LOG_WARN("socket accept fail\n");
            pthread_mutex_unlock(&mutexaccept);
            continue;
        }
        pthread_mutex_unlock(&mutexaccept);
		
		asr_head_t inviteHead;
		int readSize =0;
		int timeout =40;
		char name[16] = "";
		int opt_BufferSize = 2048;
	
        int readLen = 0;
		int recBytes = 0;
		char *recBuf = new char[opt_BufferSize];
		
		while(1)
		{
			readSize = 0;
            memset(&inviteHead, 0, sizeof(asr_head_t));
            timeout = 3000000;
			readSize = MyReadO(socketCli[sessId], &inviteHead, sizeof(asr_head_t), &timeout, name);
            LOG_INFO( "[SessionId:%d] receive head package,%d bytes, cmd = %d,idx = %d,res_num:%d,body_len:%d\n",
                        sessId, readSize, inviteHead.cmd, inviteHead.idx, inviteHead.res_num, inviteHead.body_len);
			 if(inviteHead.body_len >= 2048 && inviteHead.cmd != 5)
            {
                LOG_ERROR( "[SessionId:%d]  data is too long. Buffer size:%d, inviteHead.body_len:%d.\n", sessId, opt_BufferSize, inviteHead.body_len);
                inviteHead.body_len = opt_BufferSize - 1;
            }
			if(readSize == sizeof(asr_head_t))
            {
				//start cmd
                if(inviteHead.cmd == 1)
                {
					LOG_ERROR("[SessionId:%d] cmd =1 new session start.\n", sessId);
					if(inviteHead.body_len > 0 && inviteHead.body_len < opt_BufferSize){
						//the loop of file name receiving
                        memset(recBuf, 0, opt_BufferSize);
                        readLen = MyReadO(socketCli[sessId], recBuf, inviteHead.body_len, &timeout, name);
                        if(readLen != inviteHead.body_len)
                        {
                            LOG_ERROR("cmd=1[Id:%d] receive data error,readLen:%d,bodyLen:%d.", sessId , readLen, inviteHead.body_len);
                        }
                        else
						{
							recBuf[readLen+1] = '\0';
                            LOG_INFO( "[Id:%d] receive the first data,readLen:%d.", sessId, readLen);
							if( readLen == inviteHead.body_len)
                            {
                                //strcpy(audiokeeperName, recBuf);//the loop of file name receiving
                            }
                            readLen = 0;
						}
						
					}
					
					int sendSize = 0;
                    memset(&inviteHead, 0, sizeof(asr_head_t));
                    inviteHead.err_no = 9;
                    inviteHead.pid = sessId + 1;//sessid starts from 0, pid from 1;
                    sendSize = MyWriteO(socketCli[sessId], &inviteHead, sizeof(asr_head_t), &timeout, name);

                    if(sendSize != sizeof(asr_head_t)){
                        LOG_ERROR("cmd=1[SessionId:%d] sid=%s send ACK failed.", sessId,inviteHead.sid);
                    }

                    //isStart = true;
                    //sendLen = decodeLen = readLen = dataSize = 0;
                    //isCheckHeader = 0;
				}
				//stop cmd
                else if(inviteHead.cmd == 2)
                {
					LOG_ERROR("[SessionId:%d]sid=%s stop command,type:%d.\n", sessId,inviteHead.sid, inviteHead.cmd);
					break;
				}
				//stream packet cmd
                else if(inviteHead.cmd == 4)
                {
					LOG_ERROR("[SessionId:%d] stream packet command,type:%d.\n", sessId,inviteHead.cmd);
					//stream packet process
					if(inviteHead.cmd == 4)
					{
						//loop of packet receiving
						char sid[1024];//接收auk的sid
						while(1)
						{
							 memcpy(sid, inviteHead.sid, SID_LENGTH);
							 memset(recBuf, 0, opt_BufferSize);
							 readLen = MyReadO(socketCli[sessId], recBuf, inviteHead.body_len, &timeout, name);
							if(readLen != inviteHead.body_len)
							{
								LOG_ERROR("cmd=4[SessionId:%d] not get the valid data,readLen:%d,bodyLen:%d,sid=%s\n",
										sessId,readLen,inviteHead.body_len,sid);
							}
							recBytes += readLen;
							LOG_INFO( "[SessionId:%d] receive the data[%s],readLen=%d,total=%d,sid=%s\n",sessId,recBuf, readLen, recBytes,sid);
							//LOG_INFO( "[SessionId:%d] receive the data,=%s\n\n",sessId,recBuf);
					
					
							 break;//finish the loop of stream packet receiving
						}
					}
					
					
				}
				 //finish cmd
                else if(inviteHead.cmd == 3)
                {
					LOG_ERROR("[SessionId:%d] finish command 3,type:%d.\n\n", sessId, inviteHead.cmd);
					//break;//完成一路会话,逻辑上由客户端来断开连接
				}
				//err control
                else
                {
					LOG_ERROR("[SessionId:%d] unknown command,type:%d.\n", sessId, inviteHead.cmd);
					break;
				}
				
			}
			else if(readSize != sizeof(asr_head_t) && (inviteHead.cmd == 1 ||inviteHead.cmd == 4|| inviteHead.cmd == 3))//读数据超时
            {
                LOG_ERROR("[SessionId:%d] sid=%d,not get correct head,readSize:%d.\n",sessId, inviteHead.sid, readSize);
 
                break;
            }
            else
            {
				LOG_ERROR("[SessionId:%d] sid=%s,unknown error,readSize:%d.\n", sessId,inviteHead.sid, readSize);
                //isAbort = true;
                break;
            }
			
			if( inviteHead.cmd == 3 )
            {
				asr_head_t inviteHead;
				memset(&inviteHead, 0, sizeof(asr_head_t));
				inviteHead.cmd = 0;//标识返回结果包数据
				inviteHead.res_num = tParam[sessId].res_num;
				strcpy(inviteHead.provider, "TESR");//Audiokeeper will detect this provider for xml directed return.
				
				char *sendResultBuff = new char[opt_BufferSize];

				strcpy( sendResultBuff,
						"<\?xml version=\"1.0\" encoding=\"gb2312\"\?>"
						"<results><errors>"
						"<code>0</code><desc>Success Reco</desc>"
						"</errors>");
				char pcmBuf[1024] = "";
				sprintf(pcmBuf,
						"<result><no>%d</no>"
						"<name>%s</name>"
						"<confidence>%s</confidence></result></results>",
						1, "车洋",
						"testMain");
				AddStr(sendResultBuff, pcmBuf, opt_BufferSize);
				inviteHead.body_len = strlen(sendResultBuff);
                //memcpy(inviteHead.sid, sid, SID_LENGTH);
				int sendSize = 0;
                sendSize = MyWriteO(socketCli[sessId], &inviteHead, sizeof(asr_head_t), &timeout, name);
				if(sendSize == sizeof(asr_head_t))
				{
					//LOG_NOTICE("cmd=3 result==%s\n",sendResultBuff);
					//timeout = opt_NetTimeout;
					sendSize = MyWriteO(socketCli[sessId], sendResultBuff, inviteHead.body_len, &timeout, name);
					if(sendSize != inviteHead.body_len)
					{
						LOG_WARN("[SessionId:%d] send result body error,sendSize:%d,inviteHead.body_len:%d\n",
									 sessId, sendSize, inviteHead.body_len);
					}
				}
				else{
					LOG_WARN("cmd=3 send result head fail");
				}
							
			}
		}
		
		MyClose(socketCli[sessId]);
        socketCli[sessId] = -1;
		LOG_INFO( "[SessionId:%d] Socket connection closed.\n\n", sessId);
	}while(1);

	 printf( "[SessionId:%d] exit the decoder thread.\n", sessId);
}

在程序中创建线程后,静态函数为主逻辑,多个线程共享主线程socketfd,并加锁抢占锁资源,从而获取客户端连接资源

 

 

静态函数调用线程并发静态函数调用线程并发 yangzai77 发布了9 篇原创文章 · 获赞 4 · 访问量 1万+ 私信 关注
上一篇:设计方案--如何设计移动端高清方案


下一篇:C# winform写入和读取TXT文件