DPDK系列之四十二DPDK应用网络编程UDP编程

一、UDP编程

UDP编程的应用和TCP编程的应用同样非常广泛,如果说真得想使用UDP编程,一般情况下还真得不至于运用DPDK这种重量级的框架。但一个框架的优秀与否,不仅仅在于自身的整体设计优秀,更重要的在于其对应用的支持更完善。
正如DPDK对TCP的支持一样,其实对于DPDK这种更侧重于底层的应用来说,实现UDP和TCP没有本质的区别,只是套的一层解析的协议不同罢了。同样,UDP与TCP的不同及其协议的内容如有不明白可自行查看相关资料,此处不再赘述。

二、DPDK实现UDP源码分析

下面看一下例程(代码来源与TCP相同):
1、数据结构和协议

// arp表的单个条目
struct arp_entry
{
	uint32_t ip;
	unsigned char hwaddr[RTE_ETHER_ADDR_LEN];

	unsigned char type;

	struct arp_entry *next;
	struct arp_entry *prev;
};

// arp表结构
struct arp_table
{
	struct arp_entry *entries;
	int count;

	pthread_spinlock_t spinlock;
};
// udp control block
struct localhost
{
	int fd;

	//unsigned int status; //
	uint32_t localip; // ip --> mac
	unsigned char localmac[RTE_ETHER_ADDR_LEN];
	uint16_t localport;

	unsigned char protocol;

	struct rte_ring *sndbuf;
	struct rte_ring *rcvbuf;

	struct localhost *prev;
	struct localhost *next;

	pthread_cond_t cond;
	pthread_mutex_t mutex;
};
//UDP的数据包
struct offload
{
	uint32_t sip;
	uint32_t dip;

	uint16_t sport;
	uint16_t dport;

	int protocol;

	unsigned char *data;
	uint16_t length;

};
//common.c
int ng_arp_entry_insert(uint32_t ip, unsigned char *mac)
{
    struct arp_table *pstTbl = arp_table_instance();
    struct arp_entry *pstEntry = NULL;
    unsigned char *pstHwaddr = NULL;

    pstHwaddr = ng_get_dst_macaddr(ip);
    if(pstHwaddr == NULL)
    {
        pstEntry = rte_malloc("arp_entry", sizeof(struct arp_entry), 0);
		if (pstEntry)
        {
			memset(pstEntry, 0, sizeof(struct arp_entry));

			pstEntry->ip = ip;
			rte_memcpy(pstEntry->hwaddr, mac, RTE_ETHER_ADDR_LEN);
			pstEntry->type = 0;

			pthread_spin_lock(&pstTbl->spinlock);
			LL_ADD(pstEntry, pstTbl->entries);
			pstTbl->count ++;
			pthread_spin_unlock(&pstTbl->spinlock);
		}
        return 1;
    }

    return 0;
}

static struct arp_table *arp_table_instance(void)
{
	if (g_pstArpTbl == NULL)
    {
		g_pstArpTbl = rte_malloc("arp table", sizeof(struct  arp_table), 0);
		if (g_pstArpTbl == NULL)
			rte_exit(EXIT_FAILURE, "rte_malloc arp table failed\n");

		memset(g_pstArpTbl, 0, sizeof(struct arp_table));

		pthread_spin_init(&g_pstArpTbl->spinlock, PTHREAD_PROCESS_SHARED);
	}

	return g_pstArpTbl;
}

unsigned char* ng_get_dst_macaddr(uint32_t dip)
{
	struct arp_entry *pstIter;
	struct arp_table *pstTbl = arp_table_instance();

	int count = pstTbl->count;

	for (pstIter = pstTbl->entries; count-- != 0 && pstIter != NULL; pstIter = pstIter->next)
    {
		if (dip == pstIter->ip)
			return pstIter->hwaddr;
	}

	return NULL;
}

UDP数据处理分为两类一类是控制包,一类是负载包也就是数据包。arp表的作用类似于快递小哥的作用,查找IP与MAC的映射并实现数据包的准确传输,即其有两个出口,一个是数据发送时的出口,保存发送时的IP和MAC(没有则先广播);一个是接收时的出口,用来保存收到的数据包对应的IP和MAC。代码没有什么难度,大家对照着相关的协议实现就明白了。

int nsocket(__attribute__((unused)) int domain, int type, __attribute__((unused))  int protocol)
{
    int iFd;
    struct localhost *pstHost;
    pthread_cond_t pctCond = PTHREAD_COND_INITIALIZER;
    pthread_mutex_t pmtMutex = PTHREAD_MUTEX_INITIALIZER;

    iFd = get_fd_frombitmap();
    if(type == SOCK_DGRAM) // udp
    {
        pstHost = rte_malloc("localhost", sizeof(struct localhost), 0);
        if(pstHost == NULL)
        {
            printf("[%s][%d]: rte_malloc fail!\n", __FUNCTION__, __LINE__);
            return -1;
        }

        memset(pstHost, 0x00, sizeof(struct localhost));
        pstHost->fd = iFd;
        pstHost->protocol = IPPROTO_UDP;
        pstHost->rcvbuf = rte_ring_create("recv buffer", D_RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
        if (pstHost->rcvbuf == NULL)
        {
            printf("[%s][%d]: rte_ring_create fail!\n", __FUNCTION__, __LINE__);
			rte_free(pstHost);
			return -1;
		}
        pstHost->sndbuf = rte_ring_create("send buffer", D_RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
        if (pstHost->sndbuf == NULL)
        {
            printf("[%s][%d]: rte_ring_create fail!\n", __FUNCTION__, __LINE__);
            rte_ring_free(pstHost->rcvbuf);
			rte_free(pstHost);
			return -1;
		}

		rte_memcpy(&pstHost->cond, &pctCond, sizeof(pthread_cond_t));

		rte_memcpy(&pstHost->mutex, &pmtMutex, sizeof(pthread_mutex_t));

		LL_ADD(pstHost, g_pstHost);
    }
    else if(type == SOCK_STREAM) // tcp
    {
        struct tcp_stream *pstStream = rte_malloc("tcp_stream", sizeof(struct tcp_stream), 0);
		if (pstStream == NULL)
			return -1;

		memset(pstStream, 0, sizeof(struct tcp_stream));
        pstStream->fd = iFd;
        pstStream->protocol = IPPROTO_TCP;
		pstStream->next = pstStream->prev = NULL;

        pstStream->rcvbuf = rte_ring_create("tcp recv buffer", D_RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
		if (pstStream->rcvbuf == NULL)
        {
			rte_free(pstStream);
			return -1;
		}
		pstStream->sndbuf = rte_ring_create("tcp send buffer", D_RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
		if (pstStream->sndbuf == NULL)
        {
			rte_ring_free(pstStream->rcvbuf);
			rte_free(pstStream);
			return -1;
		}

        pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
		rte_memcpy(&pstStream->cond, &blank_cond, sizeof(pthread_cond_t));

		pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
		rte_memcpy(&pstStream->mutex, &blank_mutex, sizeof(pthread_mutex_t));

        g_pstTcpTbl = tcpInstance();
		LL_ADD(pstStream, g_pstTcpTbl->tcb_set);           // todo :hash
    }

    return iFd;
}
int nbind(int sockfd, const struct sockaddr *addr, __attribute__((unused))  socklen_t addrlen)
{
    void *info = NULL;

    info = get_hostinfo_fromfd(sockfd);
    if(info == NULL)
        return -1;

    struct localhost *pstHostInfo = (struct localhost *)info;
    if(pstHostInfo->protocol == IPPROTO_UDP)
    {
        const struct sockaddr_in *pstAddr = (const struct sockaddr_in *)addr;
		pstHostInfo->localport = pstAddr->sin_port;
		rte_memcpy(&pstHostInfo->localip, &pstAddr->sin_addr.s_addr, sizeof(uint32_t));
		rte_memcpy(pstHostInfo->localmac, &g_stCpuMac, RTE_ETHER_ADDR_LEN);
    }
    else if(pstHostInfo->protocol == IPPROTO_TCP)
    {
        struct tcp_stream* pstStream = (struct tcp_stream*)pstHostInfo;

        const struct sockaddr_in *pstAddr = (const struct sockaddr_in *)addr;
		pstStream->dport = pstAddr->sin_port;
		rte_memcpy(&pstStream->dip, &pstAddr->sin_addr.s_addr, sizeof(uint32_t));
		rte_memcpy(pstStream->localmac, &g_stCpuMac, RTE_ETHER_ADDR_LEN);

		pstStream->status = TCP_STATUS_CLOSED;
    }

    return 0;
}
ssize_t nrecvfrom(int sockfd, void *buf, size_t len, __attribute__((unused))  int flags,
                        struct sockaddr *src_addr, __attribute__((unused))  socklen_t *addrlen)
{
    struct localhost *pstHostInfo = NULL;
    struct offload *pstOffLoad = NULL;
    struct sockaddr_in *pstAddr = NULL;
	unsigned char *pucPtr = NULL;
    int iLen = 0;
    int iRet = -1;

    pstHostInfo = (struct localhost *)get_hostinfo_fromfd(sockfd);
    if(pstHostInfo == NULL)
        return -1;

    pthread_mutex_lock(&pstHostInfo->mutex);
    while((iRet = rte_ring_mc_dequeue(pstHostInfo->rcvbuf, (void**)&pstOffLoad)) < 0)
    {
        pthread_cond_wait(&pstHostInfo->cond, &pstHostInfo->mutex);
    }
    pthread_mutex_unlock(&pstHostInfo->mutex);

    pstAddr = (struct sockaddr_in *)src_addr;
    pstAddr->sin_port = pstOffLoad->sport;
    rte_memcpy(&pstAddr->sin_addr.s_addr, &pstOffLoad->sip, sizeof(uint32_t));

    if(len < pstOffLoad->length)
    {
        rte_memcpy(buf, pstOffLoad->data, len);

        pucPtr = rte_malloc("unsigned char *", pstOffLoad->length - len, 0);
		rte_memcpy(pucPtr, pstOffLoad->data + len, pstOffLoad->length - len);

		pstOffLoad->length -= len;
		rte_free(pstOffLoad->data);
		pstOffLoad->data = pucPtr;

		rte_ring_mp_enqueue(pstHostInfo->rcvbuf, pstOffLoad);

		return len;
    }

    iLen = pstOffLoad->length;
    rte_memcpy(buf, pstOffLoad->data, pstOffLoad->length);

    rte_free(pstOffLoad->data);
    rte_free(pstOffLoad);

    return iLen;
}   

ssize_t nsendto(int sockfd, const void *buf, size_t len, __attribute__((unused))  int flags,
                      const struct sockaddr *dest_addr, __attribute__((unused))  socklen_t addrlen)
{
    struct localhost *pstHostInfo = NULL;
    struct offload *pstOffLoad = NULL;
    const struct sockaddr_in *pstAddr = (const struct sockaddr_in *)dest_addr;

    pstHostInfo = (struct localhost *)get_hostinfo_fromfd(sockfd);
    if(pstHostInfo == NULL)
        return -1;

    pstOffLoad = rte_malloc("offload", sizeof(struct offload), 0);
	if (pstOffLoad == NULL)
        return -1;

    pstOffLoad->dip = pstAddr->sin_addr.s_addr;
	pstOffLoad->dport = pstAddr->sin_port;
	pstOffLoad->sip = pstHostInfo->localip;
	pstOffLoad->sport = pstHostInfo->localport;
	pstOffLoad->length = len;


    struct in_addr addr;
	addr.s_addr = pstOffLoad->dip;
	printf("nsendto ---> src: %s:%d \n", inet_ntoa(addr), ntohs(pstOffLoad->dport));


    pstOffLoad->data = rte_malloc("unsigned char *", len, 0);
	if (pstOffLoad->data == NULL) {
		rte_free(pstOffLoad);
		return -1;
	}

	rte_memcpy(pstOffLoad->data, buf, len);

	puts("rte_ring_mp_enqueue before !");
	rte_ring_mp_enqueue(pstHostInfo->sndbuf, pstOffLoad);
	puts("rte_ring_mp_enqueue after !");

	return len;
}

int nclose(int fd)
{
    void *info = NULL;

    info = (struct localhost *)get_hostinfo_fromfd(fd);
    if(info == NULL)
        return -1;

    struct localhost *pstHostInfo = (struct localhost *)info;
    if(pstHostInfo->protocol == IPPROTO_UDP)
    {
        LL_REMOVE(pstHostInfo, g_pstHost);

        if (pstHostInfo->rcvbuf)
			rte_ring_free(pstHostInfo->rcvbuf);
		if (pstHostInfo->sndbuf)
			rte_ring_free(pstHostInfo->sndbuf);

		rte_free(pstHostInfo);

		set_fd_frombitmap(fd);
    }
    else if(pstHostInfo->protocol == IPPROTO_TCP)
    {
        struct tcp_stream *pstStream = (struct tcp_stream*)info;
        if (pstStream->status != TCP_STATUS_LISTEN)
        {
            struct tcp_fragment *pstFragment = rte_malloc("tcp_fragment", sizeof(struct tcp_fragment), 0);
			if (pstFragment == NULL)
                return -1;

            memset(pstFragment, 0x00, sizeof(struct tcp_fragment));
            pstFragment->data = NULL;
			pstFragment->length = 0;
			pstFragment->sport = pstStream->dport;
			pstF
上一篇:node.js 常用命令


下一篇:算法设计与分析实验报告python实现(排序算法、三壶谜题、交替放置的碟子、带锁的门)