1.前言
共享内存让ngx工作进程能基于同一个对象工作,但共享内存不提供资源管理,所以ngx使用slab算法管理共享内存的使用。
那么ngx大致的工作流程是,master进程申请共享内存,使用按照slab格式化内存,fork,work进程按照slab申请内存,如此同一个slab内存上的对象,所有的worker都能看见。
ngx是简化版的slab,若要学习slab得看kernel.
2.设计
文件系统管理磁盘资源,在格式化磁盘时,建立inode表维护磁盘的使用,内存管理也使用同样思路,不过不同于磁盘data块的大小固定(因为磁盘IO慢),内存的data块大小有多个级别(为了不浪费)。
ngx将多个级别内存分配分为两类级别,page级别和slot级别,page级别分配页大小的内存,slot级别分配2^N的内存,逻辑上ngx的内存分配为两层
若用伪代码描述slot和page分配的关系,则如下所示,slot的分配是建立在page的分配的基础上
ngx_slab_alloc(size_t size)
{
ngx_page_t *page;
if (size >= page_size) { // 如果size够page_size,则使用page分配
page = ngx_slab_page_alloc(size); // 根据size,分配page描述
return ngx_page_to_addr(page); //将page转换成对应的 void *
}
return ngx_slab_slot_alloc(size); // 如果size不过page_size,则使用slot分配
}
void *ngx_slab_slot_alloc(size_t size)
{
ngx_slot_t *slot, *head;
head = ngx_slab_slot_find(size); // 根据 size的大小,找到合适的 slot 头节点
while (1) {
for (slot = head->next; slot; slot = slot->next) { // 遍历此slot链表
ret = _ngx_slab_slot_alloc(slot, size); // 若本slot节点能分配到足够的内存,则返回
if (ret != NULL)
return ret;
}
// 如果slot没有足够的空间,则在分配一个page
page = ngx_slab_page_alloc(1); // 分配一个page
if (page == NULL) {
break;
}
ngx_slab_insert(head, page); // 将此page加入到slot链表
}
return NULL;
}
3. 实现
具体在ngx中将 inode 称为 struct ngx_slab_page_s,若是依照面向对象的思想此结构体应该分为ngx_slab_s 和 ngx_page_s ,分别描述小内存分配和大内存分配,在实现上ngx更像硬件模块的寄存器复用
struct ngx_slab_page_s {
uintptr_t slab; // 当用于page分配时,表示数组长度,当用slot分配时,表示位图
ngx_slab_page_t *next; // 只在slot分配使用
uintptr_t prev;
};
slab上下文如下,其中最必须的成员是 pages, free, start, end
typedef struct {
ngx_atomic_t lock; // 给下面的mutex用的
size_t min_size; // 最小划分块大小,固定值为 8,1 << pool->min_shift
size_t min_shift; // 固定值为3
ngx_slab_page_t *pages; // 无头结点的数组链表
ngx_slab_page_t free; // 带头结点的链表
u_char *start; // 用于分配的缓存起始地址
u_char *end; // 用于分配的缓存结束地址
ngx_shmtx_t mutex; // 互斥锁
u_char *log_ctx; // 自定义 注释文本
u_char zero;
void *data; // 通常一个slab池用于某种目标的数据,data用于记录该数据的首地址(如红黑树的首地址)
void *addr; // slab池 位于 缓冲池上,addr用于记录 缓存池的首地址
} ngx_slab_pool_t;
若用户已经保证了互斥就使用 xxx_locked
// pool由主调函数分配,用户需要分配整个共享内存块,并用pool指向首地址
void ngx_slab_init(ngx_slab_pool_t *pool);
void *ngx_slab_alloc(ngx_slab_pool_t *pool, size_t size);
void *ngx_slab_alloc_locked(ngx_slab_pool_t *pool, size_t size);
void ngx_slab_free(ngx_slab_pool_t *pool, void *p);
void ngx_slab_free_locked(ngx_slab_pool_t *pool, void *p);
经过init,整个内存被划分为
具体,每个slot链表最小分配单位不同
/* 常量值 */
// 大于ngx_slab_max_size 的 内存分配,使用 pages 分配,否则使用 slots分配
static ngx_uint_t ngx_slab_max_size;
// 当slab字段全为位图时,slot 块大小
// 使用 uintptr_t 类型 位图 表示页划分。比如 4KB 内存页, 32位系统环境,
// 一个 uintptr_t 位图变量最多可以对应 32个 划分块的状态,
// 所以 4KB内存页划分为 32块,一块为 ngx_slab_exact_size = 4096/32 = 128
static ngx_uint_t ngx_slab_exact_size;
// 当slab字段全为位图时,slot 块大小
// 位偏移方式表示, 若 ngx_slab_exact_size 为 128,则 ngx_slab_exact_shift 为 7
static ngx_uint_t ngx_slab_exact_shift;
void
ngx_slab_init(ngx_slab_pool_t *pool)
{
u_char *p;
size_t size;
ngx_int_t m;
ngx_uint_t i, n, pages;
ngx_slab_page_t *slots;
/* 运行时,根据系统环境,确定 常量值 */
/* STUB */
if (ngx_slab_max_size == 0) {
ngx_slab_max_size = ngx_pagesize / 2;
ngx_slab_exact_size = ngx_pagesize / (8 * sizeof(uintptr_t)); // 8*sizeof(uintptr_t) 的意思是 sizeof(uintptr_t) 个字节 乘以 每个字节8bit
// 所以 excat_size 是以 uintptr_t 做位图划分
for (n = ngx_slab_exact_size; n >>= 1; ngx_slab_exact_shift++) {
/* void */
}
}
/**/
pool->min_size = 1 << pool->min_shift; // min_size = 8, min_shift = 3
p = (u_char *) pool + sizeof(ngx_slab_pool_t); // pool - pool+sizeof(ngx_slab_pool_t) 用于存储 ngx_slab_pool_t 对象
size = pool->end - p; // p 内存池开始
// size 为整个缓存大小
ngx_slab_junk(p, size); // 内存涂鸦 为 0xD0
slots = (ngx_slab_page_t *) p; // 缓冲池 开始为 ngx_slab_page_t * 数组
n = ngx_pagesize_shift - pool->min_shift; // pagesize_size / min_size 得到 slot 的数量为 n
// slot 为 最小分配单元 ,为 8字节
// 2^min_shift - 2^(ngx_pagesize_shift - 1) 为slot链表各节点 分配内存块大小,所以slot一共有 ngx_pagesize_shift - min_shift 个
for (i = 0; i < n; i++) {
slots[i].slab = 0;
slots[i].next = &slots[i];
slots[i].prev = 0;
}
p += n * sizeof(ngx_slab_page_t); // slots 数组初始化完成
pages = (ngx_uint_t) (size / (ngx_pagesize + sizeof(ngx_slab_page_t))); // 计算 缓冲池 最多 提供的 page 数量
// 一个Page 消耗page空间和 描述空间(ngx_slab_page_t)
ngx_memzero(p, pages * sizeof(ngx_slab_page_t)); // 初始化 pages 数组
pool->pages = (ngx_slab_page_t *) p; // 设置pages数组
pool->free.prev = 0; // 设置空闲的 内存块 链表
pool->free.next = (ngx_slab_page_t *) p;
pool->pages->slab = pages;
pool->pages->next = &pool->free;
pool->pages->prev = (uintptr_t) &pool->free;
// 指向 用于分配的 页
pool->start = (u_char *)
ngx_align_ptr((uintptr_t) p + pages * sizeof(ngx_slab_page_t),
ngx_pagesize);
// 由于对齐等原因,实际的 page 数量不足,所以重新计算
m = pages - (pool->end - pool->start) / ngx_pagesize;
if (m > 0) {
pages -= m;
pool->pages->slab = pages;
}
pool->log_ctx = &pool->zero;
pool->zero = '\0';
}
注意alloc中的如下语句,通过page找到具体地址,分配地址的关键是分配 pages,而分配pages使用ngx_slab_alloc_pages
p = (page - pool->pages) << ngx_pagesize_shift;
p += (uintptr_t) pool->start;
根据使用位图占用空间和 unsigned int 空间的关系,slot分配被分为三类,目标小于ngx_slab_exact_size,等于ngx_slab_exact_size,大于ngx_slab_exact_size
void *
ngx_slab_alloc_locked(ngx_slab_pool_t *pool, size_t size)
{
size_t s;
uintptr_t p, n, m, mask, *bitmap;
ngx_uint_t i, slot, shift, map;
ngx_slab_page_t *page, *prev, *slots;
if (size >= ngx_slab_max_size) { // 若size过大,使用pages分配
ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, ngx_cycle->log, 0,
"slab alloc: %uz", size);
page = ngx_slab_alloc_pages(pool, (size + ngx_pagesize - 1)
>> ngx_pagesize_shift);
if (page) {
// 根据偏移量求 缓存地址
p = (page - pool->pages) << ngx_pagesize_shift;
p += (uintptr_t) pool->start;
} else {
p = 0;
}
goto done;
}
// 使用 slot 分配
// 确定 shift , 将 size 转换为 2 进制, shift表示 为1 的最高位,如 6 的 shift 为 3, 9的shfit 为 4
// 确定 slot, 每个 slot 链表 的 最小分配单位不同,且以 2 的幂数增加
// 如, slot[0] 链表的 最小分配单位为 8, slot[1] 最小单位为 16, slot[2] 最小单位 为 32
if (size > pool->min_size) { // 若分配大小大于 min_size,则计算 slot,
shift = 1;
for (s = size - 1; s >>= 1; shift++) { /* void */ }
slot = shift - pool->min_shift; // slot 是以 min_size 为单位
} else {
size = pool->min_size;
shift = pool->min_shift;
slot = 0;
}
ngx_log_debug2(NGX_LOG_DEBUG_ALLOC, ngx_cycle->log, 0,
"slab alloc: %uz slot: %ui", size, slot);
slots = (ngx_slab_page_t *) ((u_char *) pool + sizeof(ngx_slab_pool_t));
page = slots[slot].next;
if (page->next != page) { // 若 slot 已经 关联了 page
if (shift < ngx_slab_exact_shift) { // 若 需要分配的大小 小于 ngx_slab_exact_size
do {
p = (page - pool->pages) << ngx_pagesize_shift; // 找到对应的page
bitmap = (uintptr_t *) (pool->start + p); // 一个 page 分为多个 bitmap
map = (1 << (ngx_pagesize_shift - shift))
/ (sizeof(uintptr_t) * 8);
for (n = 0; n < map; n++) {
if (bitmap[n] != NGX_SLAB_BUSY) { // 若本bitmap没有全部使用
for (m = 1, i = 0; m; m <<= 1, i++) {
if ((bitmap[n] & m)) { // 找到空闲的块
continue;
}
bitmap[n] |= m;
i = ((n * sizeof(uintptr_t) * 8) << shift) // 找到缓存偏移量
+ (i << shift);
if (bitmap[n] == NGX_SLAB_BUSY) { // 若本bitmap已经消耗完, 需要判断本page是否全部使用
for (n = n + 1; n < map; n++) { // 若本page未消耗完,则直接返回
if (bitmap[n] != NGX_SLAB_BUSY) {
p = (uintptr_t) bitmap + i;
goto done;
}
}
// 若本page已经消耗完了,则移除本 slot 链表
prev = (ngx_slab_page_t *)
(page->prev & ~NGX_SLAB_PAGE_MASK); // 过滤末尾标记位,以获得正确的前一节点地址
prev->next = page->next;
page->next->prev = page->prev;
page->next = NULL;
page->prev = NGX_SLAB_SMALL;
}
p = (uintptr_t) bitmap + i;
goto done;
}
}
}
page = page->next;
} while (page);
} else if (shift == ngx_slab_exact_shift) {
do {
if (page->slab != NGX_SLAB_BUSY) {
for (m = 1, i = 0; m; m <<= 1, i++) {
if ((page->slab & m)) {
continue;
}
page->slab |= m;
if (page->slab == NGX_SLAB_BUSY) {
prev = (ngx_slab_page_t *)
(page->prev & ~NGX_SLAB_PAGE_MASK);
prev->next = page->next;
page->next->prev = page->prev;
page->next = NULL;
page->prev = NGX_SLAB_EXACT;
}
p = (page - pool->pages) << ngx_pagesize_shift;
p += i << shift;
p += (uintptr_t) pool->start;
goto done;
}
}
page = page->next;
} while (page);
} else { /* shift > ngx_slab_exact_shift */
n = ngx_pagesize_shift - (page->slab & NGX_SLAB_SHIFT_MASK);
n = 1 << n;
n = ((uintptr_t) 1 << n) - 1;
mask = n << NGX_SLAB_MAP_SHIFT;
do {
if ((page->slab & NGX_SLAB_MAP_MASK) != mask) {
for (m = (uintptr_t) 1 << NGX_SLAB_MAP_SHIFT, i = 0;
m & mask;
m <<= 1, i++)
{
if ((page->slab & m)) {
continue;
}
page->slab |= m;
if ((page->slab & NGX_SLAB_MAP_MASK) == mask) {
prev = (ngx_slab_page_t *)
(page->prev & ~NGX_SLAB_PAGE_MASK);
prev->next = page->next;
page->next->prev = page->prev;
page->next = NULL;
page->prev = NGX_SLAB_BIG;
}
p = (page - pool->pages) << ngx_pagesize_shift;
p += i << shift;
p += (uintptr_t) pool->start;
goto done;
}
}
page = page->next;
} while (page);
}
}
// 若 page->next == page 意味,该 slot 没有关联 page,则使用 ngx_slab_alloc_pages 分配一块page
page = ngx_slab_alloc_pages(pool, 1);
if (page) {
if (shift < ngx_slab_exact_shift) {
p = (page - pool->pages) << ngx_pagesize_shift;
bitmap = (uintptr_t *) (pool->start + p);
s = 1 << shift;
n = (1 << (ngx_pagesize_shift - shift)) / 8 / s;
if (n == 0) {
n = 1;
}
bitmap[0] = (2 << n) - 1;
map = (1 << (ngx_pagesize_shift - shift)) / (sizeof(uintptr_t) * 8);
for (i = 1; i < map; i++) {
bitmap[i] = 0;
}
page->slab = shift;
page->next = &slots[slot];
page->prev = (uintptr_t) &slots[slot] | NGX_SLAB_SMALL;
slots[slot].next = page;
p = ((page - pool->pages) << ngx_pagesize_shift) + s * n;
p += (uintptr_t) pool->start;
goto done;
} else if (shift == ngx_slab_exact_shift) {
page->slab = 1;
page->next = &slots[slot];
page->prev = (uintptr_t) &slots[slot] | NGX_SLAB_EXACT;
slots[slot].next = page;
p = (page - pool->pages) << ngx_pagesize_shift;
p += (uintptr_t) pool->start;
goto done;
} else { /* shift > ngx_slab_exact_shift */
page->slab = ((uintptr_t) 1 << NGX_SLAB_MAP_SHIFT) | shift;
page->next = &slots[slot];
page->prev = (uintptr_t) &slots[slot] | NGX_SLAB_BIG;
slots[slot].next = page;
p = (page - pool->pages) << ngx_pagesize_shift;
p += (uintptr_t) pool->start;
goto done;
}
}
p = 0;
done:
ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, ngx_cycle->log, 0, "slab alloc: %p", p);
return (void *) p;
}
若page已经分配,无法通过ngx_slab_pool_t找到,但是可以通过返回的地址找到
// pages : 需要的 page 数量
static ngx_slab_page_t *
ngx_slab_alloc_pages(ngx_slab_pool_t *pool, ngx_uint_t pages)
{
ngx_slab_page_t *page, *p;
for (page = pool->free.next; page != &pool->free; page = page->next) {
if (page->slab >= pages) {
if (page->slab > pages) {
page[pages].slab = page->slab - pages; // 设置剩余 数组的长度
page[pages].next = page->next; // 将剩余数组加入链表
page[pages].prev = page->prev;
p = (ngx_slab_page_t *) page->prev; // 将剩余数组加入链表
p->next = &page[pages];
page->next->prev = (uintptr_t) &page[pages];
} else {
p = (ngx_slab_page_t *) page->prev; // 将剩余数组加入链表
p->next = page->next;
page->next->prev = page->prev;
}
page->slab = pages | NGX_SLAB_PAGE_START; // 设置 返回数组的元素数量 并标记起始元素
page->next = NULL; // 断开返回数组节点和 链表连接
page->prev = NGX_SLAB_PAGE;
if (--pages == 0) {
return page;
}
for (p = page + 1; pages; pages--) { // 初始化 返回数组的其它元素
p->slab = NGX_SLAB_PAGE_BUSY; // 将 slab 标记为占用状态
p->next = NULL; // 断开链表关系
p->prev = NGX_SLAB_PAGE;
p++;
}
return page; // 返回数组节点
}
}
// 没有合适的缓存
ngx_slab_error(pool, NGX_LOG_CRIT, "ngx_slab_alloc() failed: no memory");
return NULL;
}
通过p可以找到相关page,若Page释放完全,则加入pool->pages数组,pool->pages数组不会合并,slot会合并
void
ngx_slab_free_locked(ngx_slab_pool_t *pool, void *p)
{
size_t size;
uintptr_t slab, m, *bitmap;
ngx_uint_t n, type, slot, shift, map;
ngx_slab_page_t *slots, *page;
...
n = ((u_char *) p - pool->start) >> ngx_pagesize_shift;
page = &pool->pages[n];
slab = page->slab;
type = page->prev & NGX_SLAB_PAGE_MASK;
...
}