Linux内核结构体--kfifo 环状缓冲区

转载链接:http://blog.csdn.net/yusiguyuan/article/details/41985907

1、前言

  最近项目中用到一个环形缓冲区(ring buffer),代码是由Linux内核的kfifo改过来的。缓冲区在文件系统中经常用到,通过缓冲区缓解cpu读写内存和读写磁盘的速度。例如一个进程A产生数据发给另外一个进程B,进程B需要对进程A传的数据进行处理并写入文件,如果B没有处理完,则A要延迟发送。为了保证进程A减少等待时间,可以在A和B之间采用一个缓冲区,A每次将数据存放在缓冲区中,B每次冲缓冲区中取。这是典型的生产者和消费者模型,缓冲区中数据满足FIFO特性,因此可以采用队列进行实现。Linux内核的kfifo正好是一个环形队列,可以用来当作环形缓冲区。生产者与消费者使用缓冲区如下图所示:

Linux内核结构体--kfifo 环状缓冲区

  环形缓冲区的详细介绍及实现方法可以参考http://en.wikipedia.org/wiki/Circular_buffer,介绍的非常详细,列举了实现环形队列的几种方法。环形队列的不便之处在于如何判断队列是空还是满。*上给三种实现方法。

2、linux 内核kfifo

  kfifo设计的非常巧妙,代码很精简,对于入队和出对处理的出人意料。首先看一下kfifo的数据结构

  1. struct kfifo {
  2. unsigned char *buffer;     /* the buffer holding the data */
  3. unsigned int size;         /* the size of the allocated buffer */
  4. unsigned int in;           /* data is added at offset (in % size) */
  5. unsigned int out;          /* data is extracted from off. (out % size) */
  6. spinlock_t *lock;          /* protects concurrent modifications */
  7. };

kfifo提供的方法有:

  1. //根据给定buffer创建一个kfifo
  2. struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
  3. gfp_t gfp_mask, spinlock_t *lock);
  4. //给定size分配buffer和kfifo
  5. struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask,
  6. spinlock_t *lock);
  7. //释放kfifo空间
  8. void kfifo_free(struct kfifo *fifo)
  9. //向kfifo中添加数据
  10. unsigned int kfifo_put(struct kfifo *fifo,
  11. const unsigned char *buffer, unsigned int len)
  12. //从kfifo中取数据
  13. unsigned int kfifo_put(struct kfifo *fifo,
  14. const unsigned char *buffer, unsigned int len)
  15. //获取kfifo中有数据的buffer大小
  16. unsigned int kfifo_len(struct kfifo *fifo)

定义自旋锁的目的为了防止多进程/线程并发使用kfifo。因为in和out在每次get和out时,发生改变。初始化和创建kfifo的源代码如下:

  1. struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
  2. gfp_t gfp_mask, spinlock_t *lock)
  3. {
  4. struct kfifo *fifo;
  5. /* size must be a power of 2 */
  6. BUG_ON(!is_power_of_2(size));
  7. fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
  8. if (!fifo)
  9. return ERR_PTR(-ENOMEM);
  10. fifo->buffer = buffer;
  11. fifo->size = size;
  12. fifo->in = fifo->out = 0;
  13. fifo->lock = lock;
  14. return fifo;
  15. }
  16. struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
  17. {
  18. unsigned char *buffer;
  19. struct kfifo *ret;
  20. if (!is_power_of_2(size)) {
  21. BUG_ON(size > 0x80000000);
  22. size = roundup_pow_of_two(size);
  23. }
  24. buffer = kmalloc(size, gfp_mask);
  25. if (!buffer)
  26. return ERR_PTR(-ENOMEM);
  27. ret = kfifo_init(buffer, size, gfp_mask, lock);
  28. if (IS_ERR(ret))
  29. kfree(buffer);
  30. return ret;
  31. }

在kfifo_init和kfifo_calloc中,kfifo->size的值总是在调用者传进来的size参数的基础上向2的幂扩展,这是内核一贯的做法。这样的好处不言而喻--对kfifo->size取模运算可以转化为与运算,如:kfifo->in % kfifo->size 可以转化为 kfifo->in & (kfifo->size – 1)

kfifo的巧妙之处在于in和out定义为无符号类型,在put和get时,in和out都是增加,当达到最大值时,产生溢出,使得从0开始,进行循环使用。put和get代码如下所示:

  1. static inline unsigned int kfifo_put(struct kfifo *fifo,
  2. const unsigned char *buffer, unsigned int len)
  3. {
  4. unsigned long flags;
  5. unsigned int ret;
  6. spin_lock_irqsave(fifo->lock, flags);
  7. ret = __kfifo_put(fifo, buffer, len);
  8. spin_unlock_irqrestore(fifo->lock, flags);
  9. return ret;
  10. }
  11. static inline unsigned int kfifo_get(struct kfifo *fifo,
  12. unsigned char *buffer, unsigned int len)
  13. {
  14. unsigned long flags;
  15. unsigned int ret;
  16. spin_lock_irqsave(fifo->lock, flags);
  17. ret = __kfifo_get(fifo, buffer, len);
  18. //当fifo->in == fifo->out时,buufer为空
  19. if (fifo->in == fifo->out)
  20. fifo->in = fifo->out = 0;
  21. spin_unlock_irqrestore(fifo->lock, flags);
  22. return ret;
  23. }
  24. unsigned int __kfifo_put(struct kfifo *fifo,
  25. const unsigned char *buffer, unsigned int len)
  26. {
  27. unsigned int l;
  28. //buffer中空的长度
  29. len = min(len, fifo->size - fifo->in + fifo->out);
  30. /*
  31. * Ensure that we sample the fifo->out index -before- we
  32. * start putting bytes into the kfifo.
  33. */
  34. smp_mb();
  35. /* first put the data starting from fifo->in to buffer end */
  36. l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
  37. memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
  38. /* then put the rest (if any) at the beginning of the buffer */
  39. memcpy(fifo->buffer, buffer + l, len - l);
  40. /*
  41. * Ensure that we add the bytes to the kfifo -before-
  42. * we update the fifo->in index.
  43. */
  44. smp_wmb();
  45. fifo->in += len;  //每次累加,到达最大值后溢出,自动转为0
  46. return len;
  47. }
  48. unsigned int __kfifo_get(struct kfifo *fifo,
  49. unsigned char *buffer, unsigned int len)
  50. {
  51. unsigned int l;
  52. //有数据的缓冲区的长度
  53. len = min(len, fifo->in - fifo->out);
  54. /*
  55. * Ensure that we sample the fifo->in index -before- we
  56. * start removing bytes from the kfifo.
  57. */
  58. smp_rmb();
  59. /* first get the data from fifo->out until the end of the buffer */
  60. l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
  61. memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
  62. /* then get the rest (if any) from the beginning of the buffer */
  63. memcpy(buffer + l, fifo->buffer, len - l);
  64. /*
  65. * Ensure that we remove the bytes from the kfifo -before-
  66. * we update the fifo->out index.
  67. */
  68. smp_mb();
  69. fifo->out += len; //每次累加,到达最大值后溢出,自动转为0
  70. return len;
  71. }

put和get在调用__put和__get过程都进行加锁,防止并发。从代码中可以看出put和get都调用两次memcpy,这针对的是边界条件。例如下图:蓝色表示空闲,红色表示占用。

(1)空的kfifo,

Linux内核结构体--kfifo 环状缓冲区

(2)put一个buffer后

Linux内核结构体--kfifo 环状缓冲区

(3)get一个buffer后

Linux内核结构体--kfifo 环状缓冲区

(4)当此时put的buffer长度超出in到末尾长度时,则将剩下的移到头部去

Linux内核结构体--kfifo 环状缓冲区

3、测试程序

仿照kfifo编写一个ring_buffer,现有线程互斥量进行并发控制。设计的ring_buffer如下所示:

  1. /**@brief 仿照linux kfifo写的ring buffer
  2. *@atuher Anker  date:2013-12-18
  3. * ring_buffer.h
  4. * */
  5. #ifndef KFIFO_HEADER_H
  6. #define KFIFO_HEADER_H
  7. #include <inttypes.h>
  8. #include <string.h>
  9. #include <stdlib.h>
  10. #include <stdio.h>
  11. #include <errno.h>
  12. #include <assert.h>
  13. //判断x是否是2的次方
  14. #define is_power_of_2(x) ((x) != 0 && (((x) & ((x) - 1)) == 0))
  15. //取a和b中最小值
  16. #define min(a, b) (((a) < (b)) ? (a) : (b))
  17. struct ring_buffer
  18. {
  19. void         *buffer;     //缓冲区
  20. uint32_t     size;       //大小
  21. uint32_t     in;         //入口位置
  22. uint32_t       out;        //出口位置
  23. pthread_mutex_t *f_lock;    //互斥锁
  24. };
  25. //初始化缓冲区
  26. struct ring_buffer* ring_buffer_init(void *buffer, uint32_t size, pthread_mutex_t *f_lock)
  27. {
  28. assert(buffer);
  29. struct ring_buffer *ring_buf = NULL;
  30. if (!is_power_of_2(size))
  31. {
  32. fprintf(stderr,"size must be power of 2.\n");
  33. return ring_buf;
  34. }
  35. ring_buf = (struct ring_buffer *)malloc(sizeof(struct ring_buffer));
  36. if (!ring_buf)
  37. {
  38. fprintf(stderr,"Failed to malloc memory,errno:%u,reason:%s",
  39. errno, strerror(errno));
  40. return ring_buf;
  41. }
  42. memset(ring_buf, 0, sizeof(struct ring_buffer));
  43. ring_buf->buffer = buffer;
  44. ring_buf->size = size;
  45. ring_buf->in = 0;
  46. ring_buf->out = 0;
  47. ring_buf->f_lock = f_lock;
  48. return ring_buf;
  49. }
  50. //释放缓冲区
  51. void ring_buffer_free(struct ring_buffer *ring_buf)
  52. {
  53. if (ring_buf)
  54. {
  55. if (ring_buf->buffer)
  56. {
  57. free(ring_buf->buffer);
  58. ring_buf->buffer = NULL;
  59. }
  60. free(ring_buf);
  61. ring_buf = NULL;
  62. }
  63. }
  64. //缓冲区的长度
  65. uint32_t __ring_buffer_len(const struct ring_buffer *ring_buf)
  66. {
  67. return (ring_buf->in - ring_buf->out);
  68. }
  69. //从缓冲区中取数据
  70. uint32_t __ring_buffer_get(struct ring_buffer *ring_buf, void * buffer, uint32_t size)
  71. {
  72. assert(ring_buf || buffer);
  73. uint32_t len = 0;
  74. size  = min(size, ring_buf->in - ring_buf->out);
  75. /* first get the data from fifo->out until the end of the buffer */
  76. len = min(size, ring_buf->size - (ring_buf->out & (ring_buf->size - 1)));
  77. memcpy(buffer, ring_buf->buffer + (ring_buf->out & (ring_buf->size - 1)), len);
  78. /* then get the rest (if any) from the beginning of the buffer */
  79. memcpy(buffer + len, ring_buf->buffer, size - len);
  80. ring_buf->out += size;
  81. return size;
  82. }
  83. //向缓冲区中存放数据
  84. uint32_t __ring_buffer_put(struct ring_buffer *ring_buf, void *buffer, uint32_t size)
  85. {
  86. assert(ring_buf || buffer);
  87. uint32_t len = 0;
  88. size = min(size, ring_buf->size - ring_buf->in + ring_buf->out);
  89. /* first put the data starting from fifo->in to buffer end */
  90. len  = min(size, ring_buf->size - (ring_buf->in & (ring_buf->size - 1)));
  91. memcpy(ring_buf->buffer + (ring_buf->in & (ring_buf->size - 1)), buffer, len);
  92. /* then put the rest (if any) at the beginning of the buffer */
  93. memcpy(ring_buf->buffer, buffer + len, size - len);
  94. ring_buf->in += size;
  95. return size;
  96. }
  97. uint32_t ring_buffer_len(const struct ring_buffer *ring_buf)
  98. {
  99. uint32_t len = 0;
  100. pthread_mutex_lock(ring_buf->f_lock);
  101. len = __ring_buffer_len(ring_buf);
  102. pthread_mutex_unlock(ring_buf->f_lock);
  103. return len;
  104. }
  105. uint32_t ring_buffer_get(struct ring_buffer *ring_buf, void *buffer, uint32_t size)
  106. {
  107. uint32_t ret;
  108. pthread_mutex_lock(ring_buf->f_lock);
  109. ret = __ring_buffer_get(ring_buf, buffer, size);
  110. //buffer中没有数据
  111. if (ring_buf->in == ring_buf->out)
  112. ring_buf->in = ring_buf->out = 0;
  113. pthread_mutex_unlock(ring_buf->f_lock);
  114. return ret;
  115. }
  116. uint32_t ring_buffer_put(struct ring_buffer *ring_buf, void *buffer, uint32_t size)
  117. {
  118. uint32_t ret;
  119. pthread_mutex_lock(ring_buf->f_lock);
  120. ret = __ring_buffer_put(ring_buf, buffer, size);
  121. pthread_mutex_unlock(ring_buf->f_lock);
  122. return ret;
  123. }
  124. #endif

采用多线程模拟生产者和消费者编写测试程序,如下所示:

  1. /**@brief ring buffer测试程序,创建两个线程,一个生产者,一个消费者。
  2. * 生产者每隔1秒向buffer中投入数据,消费者每隔2秒去取数据。
  3. *@atuher Anker  date:2013-12-18
  4. * */
  5. #include "ring_buffer.h"
  6. #include <pthread.h>
  7. #include <time.h>
  8. #define BUFFER_SIZE  1024 * 1024
  9. typedef struct student_info
  10. {
  11. uint64_t stu_id;
  12. uint32_t age;
  13. uint32_t score;
  14. }student_info;
  15. void print_student_info(const student_info *stu_info)
  16. {
  17. assert(stu_info);
  18. printf("id:%lu\t",stu_info->stu_id);
  19. printf("age:%u\t",stu_info->age);
  20. printf("score:%u\n",stu_info->score);
  21. }
  22. student_info * get_student_info(time_t timer)
  23. {
  24. student_info *stu_info = (student_info *)malloc(sizeof(student_info));
  25. if (!stu_info)
  26. {
  27. fprintf(stderr, "Failed to malloc memory.\n");
  28. return NULL;
  29. }
  30. srand(timer);
  31. stu_info->stu_id = 10000 + rand() % 9999;
  32. stu_info->age = rand() % 30;
  33. stu_info->score = rand() % 101;
  34. print_student_info(stu_info);
  35. return stu_info;
  36. }
  37. void * consumer_proc(void *arg)
  38. {
  39. struct ring_buffer *ring_buf = (struct ring_buffer *)arg;
  40. student_info stu_info;
  41. while(1)
  42. {
  43. sleep(2);
  44. printf("------------------------------------------\n");
  45. printf("get a student info from ring buffer.\n");
  46. ring_buffer_get(ring_buf, (void *)&stu_info, sizeof(student_info));
  47. printf("ring buffer length: %u\n", ring_buffer_len(ring_buf));
  48. print_student_info(&stu_info);
  49. printf("------------------------------------------\n");
  50. }
  51. return (void *)ring_buf;
  52. }
  53. void * producer_proc(void *arg)
  54. {
  55. time_t cur_time;
  56. struct ring_buffer *ring_buf = (struct ring_buffer *)arg;
  57. while(1)
  58. {
  59. time(&cur_time);
  60. srand(cur_time);
  61. int seed = rand() % 11111;
  62. printf("******************************************\n");
  63. student_info *stu_info = get_student_info(cur_time + seed);
  64. printf("put a student info to ring buffer.\n");
  65. ring_buffer_put(ring_buf, (void *)stu_info, sizeof(student_info));
  66. printf("ring buffer length: %u\n", ring_buffer_len(ring_buf));
  67. printf("******************************************\n");
  68. sleep(1);
  69. }
  70. return (void *)ring_buf;
  71. }
  72. int consumer_thread(void *arg)
  73. {
  74. int err;
  75. pthread_t tid;
  76. err = pthread_create(&tid, NULL, consumer_proc, arg);
  77. if (err != 0)
  78. {
  79. fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",
  80. errno, strerror(errno));
  81. return -1;
  82. }
  83. return tid;
  84. }
  85. int producer_thread(void *arg)
  86. {
  87. int err;
  88. pthread_t tid;
  89. err = pthread_create(&tid, NULL, producer_proc, arg);
  90. if (err != 0)
  91. {
  92. fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",
  93. errno, strerror(errno));
  94. return -1;
  95. }
  96. return tid;
  97. }
  98. int main()
  99. {
  100. void * buffer = NULL;
  101. uint32_t size = 0;
  102. struct ring_buffer *ring_buf = NULL;
  103. pthread_t consume_pid, produce_pid;
  104. pthread_mutex_t *f_lock = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));
  105. if (pthread_mutex_init(f_lock, NULL) != 0)
  106. {
  107. fprintf(stderr, "Failed init mutex,errno:%u,reason:%s\n",
  108. errno, strerror(errno));
  109. return -1;
  110. }
  111. buffer = (void *)malloc(BUFFER_SIZE);
  112. if (!buffer)
  113. {
  114. fprintf(stderr, "Failed to malloc memory.\n");
  115. return -1;
  116. }
  117. size = BUFFER_SIZE;
  118. ring_buf = ring_buffer_init(buffer, size, f_lock);
  119. if (!ring_buf)
  120. {
  121. fprintf(stderr, "Failed to init ring buffer.\n");
  122. return -1;
  123. }
  124. #if 0
  125. student_info *stu_info = get_student_info(638946124);
  126. ring_buffer_put(ring_buf, (void *)stu_info, sizeof(student_info));
  127. stu_info = get_student_info(976686464);
  128. ring_buffer_put(ring_buf, (void *)stu_info, sizeof(student_info));
  129. ring_buffer_get(ring_buf, (void *)stu_info, sizeof(student_info));
  130. print_student_info(stu_info);
  131. #endif
  132. printf("multi thread test.......\n");
  133. produce_pid  = producer_thread((void*)ring_buf);
  134. consume_pid  = consumer_thread((void*)ring_buf);
  135. pthread_join(produce_pid, NULL);
  136. pthread_join(consume_pid, NULL);
  137. ring_buffer_free(ring_buf);
  138. free(f_lock);
  139. return 0;
  140. }

总结:

len = min(len, fifo->size - fifo->in + fifo->out); 
      在 len 和 (fifo->size - fifo->in + fifo->out) 之间取一个较小的值赋给len。注意,当 (fifo->in == fifo->out+fifo->size) 时,表示缓冲区已满,此时得到的较小值一定是0,后面实际写入的字节数也全为0。
      另一种边界情况是当 len 很大时(因为len是无符号的,负数对它来说也是一个很大的正数),这一句也能保证len取到一个较小的值,因为    fifo->in总是大于等于 fifo->out ,所以后面的那个表达式 l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); 的值不会超过fifo->size的大小。
      smp_mb();  smp_wmb(); 是加内存屏障,这里不是我们讨论的范围,你可以忽略它。 
      l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));    是把上一步决定的要写入的字节数len “切开”,这里又使用了一个技巧。注意:实际分配给fifo->buffer 的字节数 fifo->size,必须是2的幂,否则这里就会出错。既然 fifo->size 是2的幂,那么 (fifo->size-1) 也就是一个后面几位全为1的数,也就能保证(fifo->in & (fifo->size - 1)) 总为不超过 (fifo->size - 1) 的那一部分,和 (fifo->in)% (fifo->size - 1) 的效果一样。 
      这样后面的代码就不难理解了,它先向  fifo->in  到缓冲区末端这一块写数据,如果还没写完,在从缓冲区头开始写入剩下的,从而实现了循环缓冲。最后,把写指针后移 len 个字节,并返回len。
       从上面可以看出,fifo->in的值可以从0变化到超过fifo->size的数值,fifo->out也如此,但它们的差不会超过fifo->size。

上一篇:linux下C语言实现多线程通信—环形缓冲区,可用于生产者(producer)/消费者(consumer)【转】


下一篇:环形缓冲区-模仿linux kfifo【转】