Linux应用编程实现简单队列功能-改进

queue.h



#ifndef __QUEUE_H__
#define __QUEUE_H__

/*
 * -lpthread
 * 编译时需要线程库
 */
#include <pthread.h>


/*
 * https://www.kernel.org/doc/man-pages/
 * https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html
 */
struct queue_buf_t {
	char *data_buf;
	int item_num;
	int item_size;
	int item_cnt;
	int wr;
	int rd;
	pthread_mutex_t mtx;

	/*
	 * 数据成功入队后,触发pop_cond,去唤醒由于对空队列进行pop操作被阻塞的线程
	 */
	pthread_cond_t pop_cond;

	/*
	 * 数据成功出队后,触发push_cond,去唤醒由于满空队列进行push操作被阻塞的线程
	 */
	pthread_cond_t push_cond;
};

int queue_buf_item_num(struct queue_buf_t *queue);
int queue_buf_item_cnt(struct queue_buf_t *queue);
int queue_buf_push(struct queue_buf_t *queue, const void *item);
int queue_buf_push_wait(struct queue_buf_t *queue, const void *item, unsigned int timeout_ms);
int queue_buf_pop(struct queue_buf_t *queue, void *item);
int queue_buf_pop_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms);
int queue_buf_get(struct queue_buf_t *queue, void *item);
int queue_buf_get_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms);
struct queue_buf_t *queue_buf_alloc(int item_num, int item_size);
void queue_buf_free(struct queue_buf_t *queue_buf);


#endif





queue.c

/*
 * Copyright (C) 2021, 2021  huohongpeng
 * Author: huohongpeng <1045338804@qq.com>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * Change logs:
 * Date        Author       Notes
 * 2021-05-15  huohongpeng   首次添加
 * 2021-05-17 修改tm_to_ns()在32bit平台溢出问题
 * 2021-05-22 1.将队列元素从long类型修改为一个大小可调整的缓冲区.
 *            2.队列大小又新变量进行记录,不在使用rd和wr进行计算,避免浪费一个无用的空间
 */


#include "queue.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <errno.h>


static long long tm_to_ns(struct timespec tm)
{
	long long ret = tm.tv_sec;
	ret = ret * 1000000000 + tm.tv_nsec;
	
	return ret;
}

static struct timespec ns_to_tm(long long ns)
{
	struct timespec tm;
	long long tmp;
	tmp = ns / 1000000000;
	tm.tv_sec = tmp;
	
	tm.tv_nsec = ns - (tmp * 1000000000);
	return tm;
}


int queue_buf_item_num(struct queue_buf_t *queue)
{
	pthread_mutex_lock(&queue->mtx);
	int num = queue->item_num;
	pthread_mutex_unlock(&queue->mtx);

	return num;
}

int queue_buf_item_cnt(struct queue_buf_t *queue)
{
	pthread_mutex_lock(&queue->mtx);
	int cnt = queue->item_cnt;
	pthread_mutex_unlock(&queue->mtx);
	
	return cnt;
}


int queue_buf_push(struct queue_buf_t *queue, const void *item)
{
	if (!queue) {
		return -1;
	}

	int ret = 0;
	
	pthread_mutex_lock(&queue->mtx);
	if (queue->item_cnt == queue->item_num) {
		ret = -1;
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->wr;
		memcpy(data, item, queue->item_size);
		queue->wr++;
		queue->wr %= queue->item_num;
		queue->item_cnt++;
	}
	
	pthread_mutex_unlock(&queue->mtx);

	if (ret == 0) {
		/*
		 * 队列中有数据了,通知其他被阻塞的线程可以读数据
		 */
		pthread_cond_signal(&queue->pop_cond);
	}
	
	return ret;
}

int queue_buf_push_wait(struct queue_buf_t *queue, const void *item, unsigned int timeout_ms)
{
	if (!queue) {
		return -1;
	}

	int ret = 0;

	struct timespec start_tm;
	struct timespec end_tm;

	clock_gettime(CLOCK_MONOTONIC, &start_tm);
	
	long long tmp = timeout_ms;
	printf("tmp: %lld\n", tmp);
	end_tm = ns_to_tm(tm_to_ns(start_tm) + tmp*1000000);

	printf("st: %lld\n", tm_to_ns(start_tm));
	printf("et: %lld\n", tm_to_ns(end_tm));
	printf("dt: %lld\n", tm_to_ns(end_tm) - tm_to_ns(start_tm));


	pthread_mutex_lock(&queue->mtx);
	
	while (queue->item_cnt == queue->item_num) {
		//printf("2tmp: %lld\n", tmp);
		/*
		 * 队列为满需要等待push_cond有效
		 */
		if (pthread_cond_timedwait(&queue->push_cond, &queue->mtx, &end_tm) == ETIMEDOUT) {
			/*
			 * 如果超时则退出等待
			 */
			ret = -1;
			break;
		}
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->wr;
		memcpy(data, item, queue->item_size);
		queue->wr++;
		queue->wr %= queue->item_num;
		queue->item_cnt++;
	}
	
	pthread_mutex_unlock(&queue->mtx);

	if (ret == 0) {
		pthread_cond_signal(&queue->pop_cond);
	}

	return ret;
}

int queue_buf_pop(struct queue_buf_t *queue, void *item)
{
	if (!queue) {
		return -1;
	}
	
	int ret = 0;

	pthread_mutex_lock(&queue->mtx);

	if (queue->item_cnt == 0) {
		ret = -1;
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->rd;
		memcpy(item, data, queue->item_size);

		queue->rd++;
		queue->rd %= queue->item_num;
		queue->item_cnt--;
	}
	
	pthread_mutex_unlock(&queue->mtx);

	if (ret == 0) {
		/*
		 * 通知其他线程队列已经有空间
		 */
		pthread_cond_signal(&queue->push_cond);
	}

	return ret;
}

int queue_buf_pop_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms)
{
	if (!queue) {
		return -1;
	}

	int ret = 0;

	struct timespec start_tm;
	struct timespec end_tm;
	
	clock_gettime(CLOCK_MONOTONIC, &start_tm);
	long long tmp = timeout_ms;
	end_tm = ns_to_tm(tm_to_ns(start_tm) + tmp*1000000);
	
	pthread_mutex_lock(&queue->mtx);
	
	while (queue->item_cnt == 0) {
		/*
		 * 队列为空需要等待pop_cond有效
		 */
		if (pthread_cond_timedwait(&queue->pop_cond, &queue->mtx, &end_tm) == ETIMEDOUT) {
			/*
			 * 如果超时则退出等待
			 */
			ret = -1;
			break;
		}
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->rd;
		memcpy(item, data, queue->item_size);

		queue->rd++;
		queue->rd %= queue->item_num;
		queue->item_cnt--;
	}

	pthread_mutex_unlock(&queue->mtx);

	if (ret == 0) {
		/*
		 * 通知其他线程队列已经有空间
		 */
		pthread_cond_signal(&queue->push_cond);
	}

	return ret;
}


int queue_buf_get(struct queue_buf_t *queue, void *item)
{
	if (!queue) {
		return -1;
	}

	int ret = 0;

	pthread_mutex_lock(&queue->mtx);

	if (queue->item_cnt == 0) {
		ret =  -1;
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->rd;
		memcpy(item, data, queue->item_size);
	}

	pthread_mutex_unlock(&queue->mtx);

	return ret;

}


int queue_buf_get_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms)
{
	if (!queue) {
		return -1;
	}

	int ret = 0;

	struct timespec start_tm;
	struct timespec end_tm;

	clock_gettime(CLOCK_MONOTONIC, &start_tm);
	long long tmp = timeout_ms;
	end_tm = ns_to_tm(tm_to_ns(start_tm) + tmp*1000000);
	
	pthread_mutex_lock(&queue->mtx);
	
	while (queue->item_cnt == 0) {
		/*
		 * 队列为空需要等待pop_cond有效
		 */
		if (pthread_cond_timedwait(&queue->pop_cond, &queue->mtx, &end_tm) == ETIMEDOUT) {
			/*
			 * 如果超时则退出等待
			 */
			ret = -1;
			break;
		}
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->rd;
		memcpy(item, data, queue->item_size);
	}

	pthread_mutex_unlock(&queue->mtx);

	return ret;
}



struct queue_buf_t *queue_buf_alloc(int item_num, int item_size)
{
	struct queue_buf_t *queue;
	char *p;

	p = (char *)malloc(sizeof(struct queue_buf_t) + item_num * item_size);
	
	if (!p) {
		return NULL;
	}

	queue = (struct queue_buf_t *)p;
	memset(queue, 0x00, sizeof(struct queue_buf_t));
	
	queue->data_buf = (char *)(p + sizeof(struct queue_buf_t));
	queue->item_num = item_num;
	queue->item_size = item_size;
	pthread_mutex_init(&queue->mtx, NULL);

	pthread_condattr_t attr;
	pthread_condattr_init(&attr);
	
#if 0
	clockid_t clock_id;
	pthread_condattr_getclock(&attr, &clock_id);
	printf("clock_id: %d\n", clock_id);
#endif

	/*
	 * pthread_cond_timedwait()默认使用的是CLOCK_REALTIME,
	 * CLOCK_REALTIME容易受系统影响,比如校时操作
	 * 所以条件变量使用的时钟改为CLOCK_MONOTONIC
	 * 参考:https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html
	 */
    pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);

	pthread_cond_init(&queue->push_cond, &attr);
	pthread_cond_init(&queue->pop_cond, &attr);
	
	pthread_condattr_destroy(&attr);
	
	return queue;
}


void queue_buf_free(struct queue_buf_t *queue)
{
	if (queue) {
		pthread_mutex_destroy(&queue->mtx);
		pthread_cond_destroy(&queue->pop_cond);
		pthread_cond_destroy(&queue->push_cond);
		free(queue);
	}
}



void test(void)
{
	struct queue_buf_t *q = queue_buf_alloc(20, sizeof(long long));

	long long l = 9999999999*100;

	int i;

	for (i = 0; i < 25; i++) {
		long long t = l + i;
		int ret = queue_buf_push_wait(q, &t, 1000);
		printf("push ret: %d\n", ret);
		printf("num: %d\n", queue_buf_item_num(q));
		printf("cnt: %d\n", queue_buf_item_cnt(q));
	}

	for (i = 0; i < 25; i++) {
		long long t = 0;
		int ret = queue_buf_pop_wait(q, &t, 500);
		printf("pop ret: %d, data: %lld\n", ret, t);
		printf("num: %d\n", queue_buf_item_num(q));
		printf("cnt: %d\n", queue_buf_item_cnt(q));
	}

	for (i = 0; i < 25; i++) {
		long long t = 0;
		int ret = queue_buf_get_wait(q, &t, 100);
		printf("get ret: %d, data: %lld\n", ret, t);
		printf("num: %d\n", queue_buf_item_num(q));
		printf("cnt: %d\n", queue_buf_item_cnt(q));
	}

}

 

上一篇:LeetCode 72.编辑距离(动态规划)


下一篇:枚举类型