异步请求池
同步异步的区别?
概念:是两者之间的关系。例如进程和进程,或者客户端和服务器。调用或发送请求后是否会挂起等待返回。
同步:进程调用接口后会挂起等待返回,此时两个线程是同时进行这个过程的,称为同步
异步:进程调用后不会挂起等待,转而去执行其他的任务,等到有结果返回时,会通知此进程再接收处理。此时两个进程不是同时进行这个过程的,称为异步
应用场景
当业务器访问需要等待的业务时,业务访问线程需要等待挂起直到服务器给出反馈,例如对mysql,redis,http,dns等服务的请求,这些请求的返回需要等待一段时间,大大加深了业务服务器的承载压力,也会影响其总体性能,异步请求池就是为解决这个问题应运而生的。
如何实现一个异步请求池?
异步请求池实现关注的问题?
- 异步请求是一个连接还是多个连接
- 连接的网络I/O管理问题
- 是多个连接的话,发送完请求后还未接收到响应的时候,fd的管理问题
- 请求和响应是否放在一个线程里面
实现思路:
init
- 职责是创建线程,管理fd,接收响应。接收响应是通过回调函数实现
commint
- 建立好网络连接
- 组织好对应的协议
- 发送到对应的服务器
- fd是否可读,fd加入到epoll
callback
- epoll_wait()检测那些fd可读recv(fd)
- 读出所有数据。就是接收响应
destory
- close
- pthread_cancel
代码实现
#include<stdlib.h>
#include <unistd.h>
#include<stdio.h>
#include<sys/epoll.h>
#include<pthread.h>
#include<sys/socket.h>
#include<sys/un.h>
#include <sys/types.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/in.h>
struct ep_arg{
int sockfd;
void(*callback)(void*,int);
};
struct async_context{
int epfd;
pthread_t thread_id;
};
typedef void(*async_result_cb)(void*,int);
int dns_client_commit(struct async_context* ctx,async_result_cb cb){//功能是发生请求出去
int sockfd = socket(AF_INET,SOCK_DGRAM,0);//UDP
if(sockfd < 0){
perror("create sockfd failed!\n");
exit(-1);
}
struct sockaddr_in dest;
bzero(&dest, sizeof(dest));
dest.sin_family = AF_INET;
dest.sin_port = htons(5555);
dest.sin_addr.s_addr = inet_addr("127.0.0.1");
int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));//连接
printf("connect :%d\n", ret);
//自定义协议类型
#if 0
struct dns_header header = {0};
dns_create_header(&header);
struct dns_requestion question = {0};
dns_create_quetion(&question);
char request[1024] = {0};
int req_len = dns_build_request(&header,&question,request);
int slen = sendto(sockfd,request,req_len,0,(struct sockaddr*)&dest,sizeof(struct sockaddr));
#endif
char buf[10] = "hello";
ret = sendto(sockfd,buf,10,0,(struct sockaddr*)&dest,sizeof(struct sockaddr));
//请求发送完,管理fd,并且注册读事件处理器
struct ep_arg* ptr = (struct ep_arg*)calloc(1,sizeof(struct ep_arg));
ptr->sockfd = sockfd;
ptr->callback = cb;
struct epoll_event evs;
evs.data.fd = sockfd;
evs.events = EPOLLIN;
epoll_ctl(ctx->epfd,EPOLL_CTL_ADD,sockfd,&evs);
}
void* dns_async_callback(void* arg){
struct async_context* ctx = (struct async_context*)arg;
while(1){
struct epoll_event events[128] = {0};
int nready = epoll_wait(ctx->epfd,events,128,-1);
if(nready < 0) continue;
int i = 0;
for(i = 0;i < nready;++i){
struct ep_arg* ptr = (struct ep_arg*)events[i].data.ptr;
int sockfd = ptr->sockfd;
char buffer[1024] = {0};
struct sockaddr_in addr;
size_t addr_len = sizeof(struct sockaddr_in);
int n = recvfrom(sockfd,buffer,1024,0,(struct sockaddr*)&addr,(socklen_t*)&addr_len);
printf("recvfrom n : %d\n",n);
/* struct dns_item* domains = NULL; */
/* int count = dns_parse_response(buffer,&domains); */
/* ptr->callback(domains,count); */
close(sockfd);
free(ptr);
}
}
}
void* dns_client_init(const char* ){//创建接收响应的线程
int epfd = epoll_create(1);
if(epfd < 0) return NULL;
struct async_context* ctx = (struct async_context*)calloc(1,sizeof(struct async_context));
ctx->epfd = epfd;
int ret = pthread_create(&ctx->thread_id,NULL,dns_async_callback,ctx);
if(ret < 0){
close(epfd);
free(ctx);
return NULL;
}
return ctx;
}
------以上整理自观看零声学院课程