/* ========================= HyperLogLog algorithm ========================= */ /* Our hash function is MurmurHash2, 64 bit version. * It was modified for Redis in order to provide the same result in * big and little endian archs (endian neutral). */ 我们的hash函数用的是64位版本的MurmurHash2。 Redis使用时为了在大端和小端的结构中提供更好的结果修改了该算法 uint64_t MurmurHash64A (const void * key, int len, unsigned int seed) { const uint64_t m = 0xc6a4a7935bd1e995; 初始化的值 const int r = 47; 常用轮数 uint64_t h = seed ^ (len * m); 种子 异或 ( 长度 乘以 初始化值 ) const uint8_t *data = (const uint8_t *)key; 获取数据初始位置 const uint8_t *end = data + (len-(len&7)); 获取模8结束位置 while(data != end) { 不到结尾 uint64_t k; #if (BYTE_ORDER == LITTLE_ENDIAN) 小段情况 #ifdef USE_ALIGNED_ACCESS 字节对齐 memcpy(&k,data,sizeof(uint64_t)); #else k = *((uint64_t*)data); #endif #else 大端情况 k = (uint64_t) data[0]; k |= (uint64_t) data[1] << 8; k |= (uint64_t) data[2] << 16; k |= (uint64_t) data[3] << 24; k |= (uint64_t) data[4] << 32; k |= (uint64_t) data[5] << 40; k |= (uint64_t) data[6] << 48; k |= (uint64_t) data[7] << 56; #endif k *= m; k 乘以 m k ^= k >> r; k向右移动r位 k *= m; k 乘以 m h ^= k; h 异或 k h *= m; h 乘以 m data += 8; 下一个8字节数据 } switch(len & 7) { 剩下的不对齐8字节的数据处理 case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */ case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */ case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */ case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */ case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */ case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */ case 1: h ^= (uint64_t)data[0]; h *= m; /* fall-thru */ }; h ^= h >> r; h 异或 h向右移动r位 h *= m; h 乘以 m h ^= h >> r; h 异或 h向右移动r位 return h; 返回最终h值 } ****************************************************************************************** /* Given a string element to add to the HyperLogLog, returns the length * of the pattern 000..1 of the element hash. As a side effect 'regp' is * set to the register index this element hashes to. */ 给定一个字符串元素,添加到HyperLogLog,返回元素hash值的 按照模式000..1的长度。 作为一个副作用,变量regp被设置为这个元素所在寄存器的hash索引,即用regp标记寄存器 int hllPatLen(unsigned char *ele, size_t elesize, long *regp) { uint64_t hash, bit, index; int count; /* Count the number of zeroes starting from bit HLL_REGISTERS * (that is a power of two corresponding to the first bit we don't use * as index). The max run can be 64-P+1 = Q+1 bits. 对HLL_REGISTERS的开始位置的0进行计数(这是一个2的指数,对应第一个我们不作为索引使用的比特位) * Note that the final "1" ending the sequence of zeroes must be * included in the count, so if we find "001" the count is 3, and * the smallest count possible is no zeroes at all, just a 1 bit * at the first position, that is a count of 1. *注意到最后的1,结束序列0必须包含在计数中,所以我们找到001的计数是3, 最小的计数是可能一个前导零也没有,第一个比特位就是1,那么计数就是1 * This may sound like inefficient, but actually in the average case * there are high probabilities to find a 1 after a few iterations. */ 这个听起来不是那么有效,实际上在平均情况下,有很高的概率只需要少量迭代就可以发现一个为1的比特位 hash = MurmurHash64A(ele,elesize,0xadc83b19ULL); index = hash & HLL_P_MASK; /* Register index. */ 寄存器的索引 HLL_P_MASK=2^14-1 hash >>= HLL_P; /* Remove bits used to address the register. */ 移出14个用于标记寄存器的比特位 hash |= ((uint64_t)1<<HLL_Q); /* Make sure the loop terminates and count will be <= Q+1. */ 确保循环终止,并且计数小于等于Q+1 bit = 1; count = 1; /* Initialized to 1 since we count the "00000...1" pattern. */ 初始化为1,因为我们计数为"00000...1"的模式 while((hash & bit) == 0) { 这里需要注意hash是小端表示,所以低地址放的是高位的数据,所以得到的模式是000...1的模式 count++; bit <<= 1; } *regp = (int) index; return count; } ****************************************************************************************** /* ================== Dense representation implementation ================== */ 密集表示的实现 /* Low level function to set the dense HLL register at 'index' to the * specified value if the current value is smaller than 'count'. 低层级的函数,如果当前值比计数值小的情况下,用来设置在索引位置的密集HLL寄存器为特定的值。 * 'registers' is expected to have room for HLL_REGISTERS plus an * additional byte on the right. This requirement is met by sds strings * automatically since they are implicitly null terminated. 寄存器期望在右边有一个字节的额外空间,这个需求sds字符串可以自动满足,因为隐式的含有一个null结尾字节 * The function always succeed, however if as a result of the operation * the approximated cardinality changed, 1 is returned. Otherwise 0 * is returned. */ 这个函数总是成功的,除了 操作导致基数值改变了就返回1,其它情况返回0 int hllDenseSet(uint8_t *registers, long index, uint8_t count) { uint8_t oldcount; HLL_DENSE_GET_REGISTER(oldcount,registers,index); 获取index所在寄存器的值 if (count > oldcount) { HLL_DENSE_SET_REGISTER(registers,index,count); 设置index所在寄存器的值 return 1; } else { return 0; } } ****************************************************************************************** /* "Add" the element in the dense hyperloglog data structure. * Actually nothing is added, but the max 0 pattern counter of the subset * the element belongs to is incremented if needed. 在密集表示的HLL数据结构中添加元素。实际上没有添加任何内容,不过是最大的子集中模式0的基树可能增加 * This is just a wrapper to hllDenseSet(), performing the hashing of the * element in order to retrieve the index and zero-run count. */ 这个函数是函数hllDenseSet的封装,执行元素的hash为了获取索引和零行程的计数 int hllDenseAdd(uint8_t *registers, unsigned char *ele, size_t elesize) { long index; uint8_t count = hllPatLen(ele,elesize,&index); /* Update the register if this element produced a longer run of zeroes. */ 更新寄存器 如果元素产生了一个更长的零行程 return hllDenseSet(registers,index,count); } ****************************************************************************************** /* Compute the register histogram in the dense representation. */ 计算密集表示中寄存器的直方图 void hllDenseRegHisto(uint8_t *registers, int* reghisto) { int j; /* Redis default is to use 16384 registers 6 bits each. The code works * with other values by modifying the defines, but for our target value * we take a faster path with unrolled loops. */ redis默认使用16384个6比特的寄存器。通过修改定义这个代码可以工作在其它的值下, 但是对我们目标值来说,采用了更快的展开循环的方式进行(获取更快的速度) if (HLL_REGISTERS == 16384 && HLL_BITS == 6) { uint8_t *r = registers; unsigned long r0, r1, r2, r3, r4, r5, r6, r7, r8, r9, r10, r11, r12, r13, r14, r15; for (j = 0; j < 1024; j++) { /* Handle 16 registers per iteration. */ r0 = r[0] & 63; r1 = (r[0] >> 6 | r[1] << 2) & 63; r2 = (r[1] >> 4 | r[2] << 4) & 63; r3 = (r[2] >> 2) & 63; r4 = r[3] & 63; r5 = (r[3] >> 6 | r[4] << 2) & 63; r6 = (r[4] >> 4 | r[5] << 4) & 63; r7 = (r[5] >> 2) & 63; r8 = r[6] & 63; r9 = (r[6] >> 6 | r[7] << 2) & 63; r10 = (r[7] >> 4 | r[8] << 4) & 63; r11 = (r[8] >> 2) & 63; r12 = r[9] & 63; r13 = (r[9] >> 6 | r[10] << 2) & 63; r14 = (r[10] >> 4 | r[11] << 4) & 63; r15 = (r[11] >> 2) & 63; reghisto[r0]++; reghisto[r1]++; reghisto[r2]++; reghisto[r3]++; reghisto[r4]++; reghisto[r5]++; reghisto[r6]++; reghisto[r7]++; reghisto[r8]++; reghisto[r9]++; reghisto[r10]++; reghisto[r11]++; reghisto[r12]++; reghisto[r13]++; reghisto[r14]++; reghisto[r15]++; r += 12; 12 * 8 = 16 * 6 一个字节是8比特,所以12个字节对应 16个6比特的寄存器 } } else { //其它值的模式下,采用定义 for(j = 0; j < HLL_REGISTERS; j++) { unsigned long reg; HLL_DENSE_GET_REGISTER(reg,registers,j); reghisto[reg]++; } } } ****************************************************************************************** /* ================== Sparse representation implementation ================= */ 稀疏表示实现 /* Convert the HLL with sparse representation given as input in its dense * representation. Both representations are represented by SDS strings, and * the input representation is freed as a side effect. 对于输入为稀疏表示的HLL转化为密集表示。两种表示都使用SDS字符串,作为一种副作用,输入表示被释放(即函数调用之后,输入的表示不存了) * The function returns C_OK if the sparse representation was valid, * otherwise C_ERR is returned if the representation was corrupted. */ 这个函数返回C_OK,如果稀疏表示是有效的,否则返回C_ERR,如果稀疏表示奔溃了 typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; void *ptr; } robj; int hllSparseToDense(robj *o) { sds sparse = o->ptr, dense; struct hllhdr *hdr, *oldhdr = (struct hllhdr*)sparse; 旧HLL结构 int idx = 0, runlen, regval; uint8_t *p = (uint8_t*)sparse, *end = p+sdslen(sparse); 获取结尾 /* If the representation is already the right one return ASAP. */ 如果表示是我们需要的那个,立即返回 hdr = (struct hllhdr*) sparse; if (hdr->encoding == HLL_DENSE) return C_OK; /* Create a string of the right size filled with zero bytes. * Note that the cached cardinality is set to 0 as a side effect * that is exactly the cardinality of an empty HLL. */ 创建一个用零填充的大小合适的字符串 注意这个缓存的基数被设置为0是一个副作用,正好是空HLL的基数 dense = sdsnewlen(NULL,HLL_DENSE_SIZE); 根据长度创建一个新字符串 hdr = (struct hllhdr*) dense; *hdr = *oldhdr; /* This will copy the magic and cached cardinality. */ 将旧值赋值给新创建的字符串 hdr->encoding = HLL_DENSE; 编码类型设置为密集 /* Now read the sparse representation and set non-zero registers accordingly. */ 现在读取稀疏表示并且设置对应非0寄存器 p += HLL_HDR_SIZE; 跳过结构体头部,指向数据部分 while(p < end) { 没有到结尾 if (HLL_SPARSE_IS_ZERO(p)) { 是零操作符 runlen = HLL_SPARSE_ZERO_LEN(p); 获取长度 idx += runlen; p++; } else if (HLL_SPARSE_IS_XZERO(p)) { X零操作符 runlen = HLL_SPARSE_XZERO_LEN(p); idx += runlen; p += 2; } else { runlen = HLL_SPARSE_VAL_LEN(p); regval = HLL_SPARSE_VAL_VALUE(p); if ((runlen + idx) > HLL_REGISTERS) break; /* Overflow. */ while(runlen--) { HLL_DENSE_SET_REGISTER(hdr->registers,idx,regval);设置寄存器的值 idx++; } p++; } } /* If the sparse representation was valid, we expect to find idx * set to HLL_REGISTERS. */ 如果稀疏表示是有效的,我们期望找到idx设置为HLL_REGISTERS if (idx != HLL_REGISTERS) { 不是有效表示 sdsfree(dense); return C_ERR; } /* Free the old representation and set the new one. */ 释放旧的表示 设置新值 sdsfree(o->ptr); o->ptr = dense; return C_OK; } ****************************************************************************************** /* Low level function to set the sparse HLL register at 'index' to the * specified value if the current value is smaller than 'count'. 底层的函数 用来设置索引位置的HLL稀疏寄存器的值,如果当前值小于计数值 * The object 'o' is the String object holding the HLL. The function requires * a reference to the object in order to be able to enlarge the string if * needed. 对象o是保存hll的字符串。这个函数需要一个对象的引用,为了能够在需要的时候扩大字符串 * On success, the function returns 1 if the cardinality changed, or 0 * if the register for this element was not updated. * On error (if the representation is invalid) -1 is returned. 成功的情况下,如果基数改变了函数返回1,否则返回0如果这个元素的寄存器没有更新。 错误的情况下(如果这个表达式是无效的),返回-1 * As a side effect the function may promote the HLL representation from * sparse to dense: this happens when a register requires to be set to a value * not representable with the sparse representation, or when the resulting * size would be greater than server.hll_sparse_max_bytes. */ 作为一个副作用,这个函数可能促进HLL的表示从稀疏转向密集: 这种情况发生在一个寄存器需要设置一个非稀疏表示的值, 或者这个结果的大小将大于定义的server.hll_sparse_max_bytes的值 int hllSparseSet(robj *o, long index, uint8_t count) { struct hllhdr *hdr; uint8_t oldcount, *sparse, *end, *p, *prev, *next; long first, span; long is_zero = 0, is_xzero = 0, is_val = 0, runlen = 0; /* If the count is too big to be representable by the sparse representation * switch to dense representation. */ 如果计数太大了导致不能稀疏表示,转向密集表示 if (count > HLL_SPARSE_VAL_MAX_VALUE) goto promote; 如果超过了稀疏表示能表示的最大值,转向密集表示 /* When updating a sparse representation, sometimes we may need to * enlarge the buffer for up to 3 bytes in the worst case (XZERO split * into XZERO-VAL-XZERO). Make sure there is enough space right now * so that the pointers we take during the execution of the function * will be valid all the time. */ 当更新稀疏表示时候,有时候在最差的情况下,我们需要扩大缓存增加3个字节(XZERO分裂为XZERO-VAL-XZERO). 请确保现在有足够的空间,以便我们在函数执行期间使用的指针始终有效。 o->ptr = sdsMakeRoomFor(o->ptr,3); 扩大了3个字节,确保指针有效 /* Step 1: we need to locate the opcode we need to modify to check * if a value update is actually needed. */ 步骤1,我们需要定位需要修改的操作符,检查是否需要更新值 sparse = p = ((uint8_t*)o->ptr) + HLL_HDR_SIZE; 开始位置 end = p + sdslen(o->ptr) - HLL_HDR_SIZE; 结束位置 first = 0; prev = NULL; /* Points to previous opcode at the end of the loop. */ 在循环结束的时候,指向前一个操作码 next = NULL; /* Points to the next opcode at the end of the loop. */ 在循环结束的时候,指向下一个操作码 span = 0; while(p < end) { 还没有结束 long oplen; /* Set span to the number of registers covered by this opcode. 将span变量设置为这个操作码覆盖的寄存器 * This is the most performance critical loop of the sparse * representation. Sorting the conditionals from the most to the * least frequent opcode in many-bytes sparse HLLs is faster. */ 这是稀疏表示中性能最关键的循环。将条件从多字节稀疏HLL中的最频繁操作码排序到最不频繁操作码达到最快的速度。 oplen = 1; if (HLL_SPARSE_IS_ZERO(p)) { 短途的零操作最多 span = HLL_SPARSE_ZERO_LEN(p); } else if (HLL_SPARSE_IS_VAL(p)) { 然后蚕食值操作 span = HLL_SPARSE_VAL_LEN(p); } else { /* XZERO. */ span = HLL_SPARSE_XZERO_LEN(p); 最不频繁的就是长途的零 oplen = 2; } /* Break if this opcode covers the register as 'index'. */ 如果当前操作码包含了寄存器index所在的位置,那么就不用往下找了 if (index <= first+span-1) break; prev = p; 保存前一个操作码 p += oplen; 指向下一个操作码 first += span; 变更初始值 } if (span == 0 || p >= end) return -1; /* Invalid format. */ 无效格式 跨度为0或者超过结尾,格式错误 next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1; 指向下一个操作码 if (next >= end) next = NULL; 如果下一个操作码超过结尾,赋值为空 /* Cache current opcode type to avoid using the macro again and * again for something that will not change. * Also cache the run-length of the opcode. */ 缓存当前操作码类型,以避免对不会更改的内容反复使用宏。同时缓存操作码的行程长度。 if (HLL_SPARSE_IS_ZERO(p)) { is_zero = 1; runlen = HLL_SPARSE_ZERO_LEN(p); } else if (HLL_SPARSE_IS_XZERO(p)) { is_xzero = 1; runlen = HLL_SPARSE_XZERO_LEN(p); } else { is_val = 1; runlen = HLL_SPARSE_VAL_LEN(p); } /* Step 2: After the loop: * * 'first' stores to the index of the first register covered * by the current opcode, which is pointed by 'p'. first保存p指向的 当前寄存器中保存的操作码 * 'next' ad 'prev' store respectively the next and previous opcode, * or NULL if the opcode at 'p' is respectively the last or first. *next和prev分辨保存下一个和前一个的操作码或者空,如果操作在p分别是最后一个或者第一个 * 'span' is set to the number of registers covered by the current * opcode. *span被设置为本当前操作码覆盖的寄存器数目(就是当前) * There are different cases in order to update the data structure * in place without generating it from scratch: 有多种不同的情况, 可以更新对应位置的数据结构而不用破坏原来的结构 * A) If it is a VAL opcode already set to a value >= our 'count' * no update is needed, regardless of the VAL run-length field. * In this case PFADD returns 0 since no changes are performed. 情况A,如果对应位置是一个值,并且大于我们需要更新的值count,不需要更新, 无论值的行程有多长。在这种情况下,PFADD返回0,因为没有任何更新需要执行 * B) If it is a VAL opcode with len = 1 (representing only our * register) and the value is less than 'count', we just update it * since this is a trivial case. */ 情况B,如果恰好是一个值操作码,而且长度为1(只表示了我们当前所在的寄存器,就是p指向的寄存器) 并且值比我们的count小,我们只需要更新它即可,因为这也是一种平凡的情况,不需要改变原来的数据结构. if (is_val) { oldcount = HLL_SPARSE_VAL_VALUE(p); /* Case A. */ 情况A,无需改变 if (oldcount >= count) return 0; /* Case B. */ 情况B,值操作码长度为1,只需要更新值即可 if (runlen == 1) { HLL_SPARSE_VAL_SET(p,count,1); goto updated; } } /* C) Another trivial to handle case is a ZERO opcode with a len of 1. * We can just replace it with a VAL opcode with our value and len of 1. */ 情况C,另外一个平凡的情况是 需要更新的值是一个长度为1的零操作码,我们只需要用长度为1的值操作码替代即可 if (is_zero && runlen == 1) { 需要更新的是长度为1的零操作码,直接用长度为1的值操作码更新即可 HLL_SPARSE_VAL_SET(p,count,1); goto updated; } /* D) General case. *一般情况,就是需要改变数据结构的情况 * The other cases are more complex: our register requires to be updated * and is either currently represented by a VAL opcode with len > 1, * by a ZERO opcode with len > 1, or by an XZERO opcode. 其它情况比较复杂:我们的寄存器需要更新,并且当前表达式是一个长度大于1的值操作码, 或者长度大于1的零操作码,或者XZERO操作码。 * In those cases the original opcode must be split into multiple * opcodes. The worst case is an XZERO split in the middle resuling into * XZERO - VAL - XZERO, so the resulting sequence max length is 5 bytes. *在这些情况中,原始的操作码需要分裂成多个操作码。最坏的情况是一个XZERO在中间分裂成XZERO-VAL-XZERO, 这样导致序列最大长度为5个字节。 * We perform the split writing the new sequence into the 'new' buffer * with 'newlen' as length. Later the new sequence is inserted in place * of the old one, possibly moving what is on the right a few bytes * if the new sequence is longer than the older one. */ 我们执行分裂操作,将新序列写入到名为new实际长度为newlen的缓存.后续再将新序列插入到旧值原来的地方, 如果新值的长度超过旧值,那么可能需要向右移动几个字节。 uint8_t seq[5], *n = seq; int last = first+span-1; /* Last register covered by the sequence. */ 最后一个被序列覆盖的寄存器 int len; if (is_zero || is_xzero) { 原本是0序列, 短的零序列 和长的零序列 /* Handle splitting of ZERO / XZERO. */ 分裂 ZERO 或者 XZERO序列. if (index != first) { 不是头部 len = index-first; 偏离头部长度 if (len > HLL_SPARSE_ZERO_MAX_LEN) { 超过了ZERO的最大长度,那么就是XZERO HLL_SPARSE_XZERO_SET(n,len); n += 2; } else { HLL_SPARSE_ZERO_SET(n,len); n++; } } HLL_SPARSE_VAL_SET(n,count,1); n++; if (index != last) { 如果不是尾部 len = last-index;偏离尾部的距离 if (len > HLL_SPARSE_ZERO_MAX_LEN) { 超过了ZERO的最大长度,那么就是XZERO HLL_SPARSE_XZERO_SET(n,len); n += 2; } else { HLL_SPARSE_ZERO_SET(n,len); n++; } } } else { 原本是个值 /* Handle splitting of VAL. */ 处理值的分裂情况,多个连续相同的原值 int curval = HLL_SPARSE_VAL_VALUE(p); 获取原值 if (index != first) { 不是头部 len = index-first; HLL_SPARSE_VAL_SET(n,curval,len); n++; } HLL_SPARSE_VAL_SET(n,count,1); 新值 n++; if (index != last) { len = last-index; HLL_SPARSE_VAL_SET(n,curval,len);原值 n++; } } /* Step 3: substitute the new sequence with the old one. 步骤3,用新的序列代替老的序列 * Note that we already allocated space on the sds string * calling sdsMakeRoomFor(). */ 注意到我们已经在sds字符串调用函数sdsMakeRoomFor()中分配空间 int seqlen = n-seq; 新值的长度 int oldlen = is_xzero ? 2 : 1; 旧值的长度 int deltalen = seqlen-oldlen; 两者差值 if (deltalen > 0 && sdslen(o->ptr)+deltalen > server.hll_sparse_max_bytes) goto promote; 如果超出了最大稀疏表示,就改用密集表示 if (deltalen && next) memmove(next+deltalen,next,end-next); 如果有新旧值长度有差值并且原值有后续操作码,那么向右移动后续的操作码,腾出新值所需的空间 sdsIncrLen(o->ptr,deltalen); 需改字符串的空间大小 memcpy(p,seq,seqlen); 将新值拷贝到上面腾出的空间中 end += deltalen; 尾部位置需要将新增加的空间计算上 updated: /* Step 4: Merge adjacent values if possible. *步骤4 如果可能就合并相邻的值 * The representation was updated, however the resulting representation * may not be optimal: adjacent VAL opcodes can sometimes be merged into * a single one. */ 表达式被更新了,然而这个结果表达式可能不是最优的:相邻的值操作码能被合并到一个新的值操作码 p = prev ? prev : sparse; 指向前一个操作码,如果没有就从头开始 int scanlen = 5; /* Scan up to 5 upcodes starting from prev. */从前一个值开始扫描5个更新操作码字节,因为最长也就是5个 while (p < end && scanlen--) { if (HLL_SPARSE_IS_XZERO(p)) { p += 2; continue; } else if (HLL_SPARSE_IS_ZERO(p)) { p++; continue; } /* We need two adjacent VAL opcodes to try a merge, having * the same value, and a len that fits the VAL opcode max len. */ 我们需要合并相邻的值操作码,拥有相同的值,并且长度符合值操作码的最大长度 if (p+1 < end && HLL_SPARSE_IS_VAL(p+1)) { int v1 = HLL_SPARSE_VAL_VALUE(p); int v2 = HLL_SPARSE_VAL_VALUE(p+1); if (v1 == v2) { int len = HLL_SPARSE_VAL_LEN(p)+HLL_SPARSE_VAL_LEN(p+1); 将两个同样值的长度相加 if (len <= HLL_SPARSE_VAL_MAX_LEN) { 小于最大长度 HLL_SPARSE_VAL_SET(p+1,v1,len); 设置到p+1的位置 memmove(p,p+1,end-p); 向前移动一个位置 sdsIncrLen(o->ptr,-1); 字符串长度减去一个位置 end--; /* After a merge we reiterate without incrementing 'p' * in order to try to merge the just merged value with * a value on its right. */ 经过合并,我们在不增加p的情况下,重复尝试合并刚合并的值和它右边的值 continue; } } } p++; 前后两个都是值,不用合并的情况下,向后移动一位,继续尝试合并 } /* Invalidate the cached cardinality. */ 使得存储的基数无效 hdr = o->ptr; HLL_INVALIDATE_CACHE(hdr); return 1; promote: /* Promote to dense representation. */ 变化为密集表示 if (hllSparseToDense(o) == C_ERR) return -1; /* Corrupted HLL. */ 变成密集表示失败 hdr = o->ptr; /* We need to call hllDenseAdd() to perform the operation after the * conversion. However the result must be 1, since if we need to * convert from sparse to dense a register requires to be updated. *在转化为密集表示后,我们需要调用hllDenseAdd去执行操作修改旧值, 这个结果必须为1,因为我们从稀疏转向密集,必定有寄存器需要修改,否则不需要转密集 * Note that this in turn means that PFADD will make sure the command * is propagated to slaves / AOF, so if there is a sparse -> dense * conversion, it will be performed in all the slaves as well. */ 注意,这反过来意味着PFADD将确保命令被传播到slaves/AOF, 因此如果存在稀疏->密集转换,它也将在所有slave中执行 int dense_retval = hllDenseSet(hdr->registers,index,count); serverAssert(dense_retval == 1); 确保为1 return dense_retval; } ****************************************************************************************** /* "Add" the element in the sparse hyperloglog data structure. * Actually nothing is added, but the max 0 pattern counter of the subset * the element belongs to is incremented if needed. 在稀疏表示的HLL数据结构中添加元素。实际上什么也没有添加。但是如果所属子集的最大0模式的计数增加,将会做出变动。 * This function is actually a wrapper for hllSparseSet(), it only performs * the hashshing of the elmenet to obtain the index and zeros run length. */ 这个函数实际上是对hllSparseSet的封装。它只是执行元素hash函数来获取位置和零行程的长度 int hllSparseAdd(robj *o, unsigned char *ele, size_t elesize) { long index; uint8_t count = hllPatLen(ele,elesize,&index); 获取统计的零行程长度 /* Update the register if this element produced a longer run of zeroes. */ 更新寄存器如果元素馋了一个更长的零行程 return hllSparseSet(o,index,count); } ****************************************************************************************** /* Compute the register histogram in the sparse representation. */ 计算稀疏表示中寄存器的直方图 void hllSparseRegHisto(uint8_t *sparse, int sparselen, int *invalid, int* reghisto) { int idx = 0, runlen, regval; uint8_t *end = sparse+sparselen, *p = sparse; while(p < end) { 序列没有结束 if (HLL_SPARSE_IS_ZERO(p)) { 是0行程 runlen = HLL_SPARSE_ZERO_LEN(p); idx += runlen; reghisto[0] += runlen; 零个数的统计 p++; } else if (HLL_SPARSE_IS_XZERO(p)) { runlen = HLL_SPARSE_XZERO_LEN(p); idx += runlen; reghisto[0] += runlen; 零个数的统计 p += 2; } else { runlen = HLL_SPARSE_VAL_LEN(p); regval = HLL_SPARSE_VAL_VALUE(p); idx += runlen; reghisto[regval] += runlen; 值个数的统计 p++; } } if (idx != HLL_REGISTERS && invalid) *invalid = 1; 寄存器个数正确并且传入的invalid非空,赋值invalid为1 } ******************************************************************************************