异步请求池

异步请求池

同步异步的区别?

异步请求池

概念:是两者之间的关系。例如进程和进程,或者客户端和服务器。调用或发送请求后是否会挂起等待返回。

同步:进程调用接口后会挂起等待返回,此时两个线程是同时进行这个过程的,称为同步

异步:进程调用后不会挂起等待,转而去执行其他的任务,等到有结果返回时,会通知此进程再接收处理。此时两个进程不是同时进行这个过程的,称为异步

应用场景

当业务器访问需要等待的业务时,业务访问线程需要等待挂起直到服务器给出反馈,例如对mysql,redis,http,dns等服务的请求,这些请求的返回需要等待一段时间,大大加深了业务服务器的承载压力,也会影响其总体性能,异步请求池就是为解决这个问题应运而生的。

如何实现一个异步请求池?

异步请求池实现关注的问题?

  1. 异步请求是一个连接还是多个连接
  2. 连接的网络I/O管理问题
  3. 是多个连接的话,发送完请求后还未接收到响应的时候,fd的管理问题
  4. 请求和响应是否放在一个线程里面

实现思路:

init

  • 职责是创建线程,管理fd,接收响应。接收响应是通过回调函数实现

commint

  • 建立好网络连接
  • 组织好对应的协议
  • 发送到对应的服务器
  • fd是否可读,fd加入到epoll

callback

  • epoll_wait()检测那些fd可读recv(fd)
  • 读出所有数据。就是接收响应

destory

  • close
  • pthread_cancel

代码实现

#include<stdlib.h>
#include <unistd.h>
#include<stdio.h>
#include<sys/epoll.h>
#include<pthread.h>
#include<sys/socket.h>
#include<sys/un.h>
#include <sys/types.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/in.h>

struct ep_arg{
    int sockfd;
    void(*callback)(void*,int);
};

struct async_context{
    int epfd;
    pthread_t thread_id;
};

typedef void(*async_result_cb)(void*,int);


int dns_client_commit(struct async_context* ctx,async_result_cb cb){//功能是发生请求出去
    int sockfd = socket(AF_INET,SOCK_DGRAM,0);//UDP
    if(sockfd < 0){
        perror("create sockfd failed!\n");
        exit(-1);
    }

    struct sockaddr_in dest;
    bzero(&dest, sizeof(dest));
    dest.sin_family = AF_INET;
    dest.sin_port = htons(5555);
    dest.sin_addr.s_addr = inet_addr("127.0.0.1");

    int ret = connect(sockfd, (struct sockaddr*)&dest, sizeof(dest));//连接
    printf("connect :%d\n", ret);  

    //自定义协议类型
#if 0
      struct dns_header header = {0};
      dns_create_header(&header);
     
      struct dns_requestion question = {0};
      dns_create_quetion(&question);

      char request[1024] = {0};
      int req_len = dns_build_request(&header,&question,request);
      
      int slen = sendto(sockfd,request,req_len,0,(struct sockaddr*)&dest,sizeof(struct sockaddr));
#endif
    char buf[10] = "hello";
    ret = sendto(sockfd,buf,10,0,(struct sockaddr*)&dest,sizeof(struct sockaddr));
    
    //请求发送完,管理fd,并且注册读事件处理器
    struct ep_arg* ptr = (struct ep_arg*)calloc(1,sizeof(struct ep_arg));
    ptr->sockfd = sockfd;
    ptr->callback = cb;

    struct epoll_event evs;
    evs.data.fd = sockfd;
    evs.events = EPOLLIN;
    epoll_ctl(ctx->epfd,EPOLL_CTL_ADD,sockfd,&evs);

}

void* dns_async_callback(void* arg){
    struct async_context* ctx = (struct async_context*)arg;
    while(1){
        struct epoll_event events[128] = {0};

        int nready = epoll_wait(ctx->epfd,events,128,-1);
        if(nready < 0) continue;

        int i = 0;
        for(i = 0;i < nready;++i){
            struct ep_arg* ptr = (struct ep_arg*)events[i].data.ptr;
            int sockfd = ptr->sockfd;

            char buffer[1024] = {0};
            struct sockaddr_in addr;
            size_t addr_len = sizeof(struct sockaddr_in);

            int n = recvfrom(sockfd,buffer,1024,0,(struct sockaddr*)&addr,(socklen_t*)&addr_len);
            printf("recvfrom n : %d\n",n);

            /* struct dns_item* domains = NULL; */
            /* int count = dns_parse_response(buffer,&domains); */

            /* ptr->callback(domains,count); */

            close(sockfd);

            free(ptr);
        }
    
    }
}

void* dns_client_init(const char* ){//创建接收响应的线程
    int epfd = epoll_create(1);
    if(epfd < 0)    return NULL;

    struct async_context* ctx = (struct async_context*)calloc(1,sizeof(struct async_context));
    ctx->epfd = epfd;

    int ret = pthread_create(&ctx->thread_id,NULL,dns_async_callback,ctx);
    if(ret < 0){
        close(epfd);
        free(ctx);
        return NULL;
    }

    return ctx;
}

------以上整理自观看零声学院课程

上一篇:nginx 负载均衡配置


下一篇:linux 安装及配置DNS服务器