流老化入口函数FlowManger调用函数FlowTimeoutHash完成流老化处理,这个函数完成了大部分老化工作,把一个老化的流从flow_hash上移走,移入了回收队列,由回收线程取出来放入空闲全局flow内存池。
1. FlowTimeoutHash 函数
FlowManager-》FlowTimeoutHash
函数有两个参数表示flow bucket的范围值,const uint32_t hash_min, const uint32_t hash_max,先把老化的flow移入参数td->aside_queue队列,再从aside_queue队列移入回收线程的队列flow_recycle_q,如此玩法,flow必晕。
/**
* \brief time out flows from the hash
*
* \param ts timestamp
* \param hash_min min hash index to consider
* \param hash_max max hash index to consider
* \param counters ptr to FlowTimeoutCounters structure
*
* \retval cnt number of timed out flow
*/
//新版本的老化策略比旧版本复杂了很多,流分配那块修改应该是有效率提升,
//但是这个函数应该没有效率提升,
//这个函数总思路就是先设置一个位图,每1位表示一个bucket是否有需要老化的flow,
//还是和旧版一样判断每个bucket的next_ts与最新时间,先循环32或64次检查每个
//bucket是否有超时的flow,有超时的放入一个32或64位的整数,
//循环完成,再循环这个整数,检查每一位同时检查next_ts,都符合超时条件则进行老化,好复杂,没毛用
static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td,
struct timeval *ts,
const uint32_t hash_min, const uint32_t hash_max,
FlowTimeoutCounters *counters)
{
uint32_t cnt = 0;
//获取紧急模式标志
const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
//计算检查的bucket数量
const uint32_t rows_checked = hash_max - hash_min;
uint32_t rows_skipped = 0;
uint32_t rows_empty = 0;
//那个一次取32还是64个bucket就是这个系统位数决定的,以下都认为64位吧
#if __WORDSIZE==64
#define BITS 64
#define TYPE uint64_t
#else
#define BITS 32
#define TYPE uint32_t
#endif
for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
TYPE check_bits = 0; //64位整数
//取最小的数,万一bucket数量不够64个呢
const uint32_t check = MIN(BITS, (hash_max - idx));
//循环64次,检查64个bucket是否有超时的flow,有则设置一位到check_bits
for (uint32_t i = 0; i < check; i++) {
FlowBucket *fb = &flow_hash[idx+i];
check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= (int32_t)ts->tv_sec) << (TYPE)i;
}
if (check_bits == 0) //没有一个超时的则继续下一轮64个bucket的超时判断
continue;
//多循环一次
for (uint32_t i = 0; i < check; i++) {
//有超时的,则循环检测这个64位数,然后老化之
FlowBucket *fb = &flow_hash[idx+i];
//多比较一次,何苦呢
if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= (int32_t)ts->tv_sec) {
FBLOCK_LOCK(fb);
Flow *evicted = NULL;
if (fb->evicted != NULL || fb->head != NULL) {
if (fb->evicted != NULL) {
//这个evicted队列不为空,说明在建立流获取流时检查到超时流,把那个超时
//放到有条件的放到这个队列了,也需要老化回收
/* transfer out of bucket so we can do additional work outside
* of the bucket lock */
//这里把这个evicted队列放到临时变量,后边代码处理这个evicted队列
evicted = fb->evicted;
fb->evicted = NULL;
}
if (fb->head != NULL) {
//这个flow bucket上有超时flow,所以调用FlowManagerHashRowTimeout
//进行超时处理,超时的流放到了td->aside_queue这个队列,这bucket老化
//就是把flow移入些队列,然后再将这些队列的flow以如回收线程的队列,繁琐
int32_t next_ts = 0;
FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
//设置bucket的最小超时时间,就是下一次超时时间
if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
SC_ATOMIC_SET(fb->next_ts, next_ts);
}
//如果bucket的evicted队列没有flow, 且head为空不包含flow了,说明这个
//bucket的flow都被老化了,则设置下次老化时间为INT_MAX,就是不需要老化了
if (fb->evicted == NULL && fb->head == NULL) {
SC_ATOMIC_SET(fb->next_ts, INT_MAX);
}
} else {
//如果bucket的evicted队列没有flow,且head为空不包含flow了,说明这个
//bucket的flow都被老化了,则设置下次老化时间为INT_MAX,就是不需要老化了
SC_ATOMIC_SET(fb->next_ts, INT_MAX);
rows_empty++;
}
FBLOCK_UNLOCK(fb);
/* processed evicted list */
if (evicted) {
//这个bucket的超时处理完成,现在处理移入evicted队列的超时flow
//其实,这个函数又把这些flow移入了td->aside_queue,尴尬
FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
}
} else {
//没有超时的bucket,跳过计数增1
rows_skipped++;
}
}
if (td->aside_queue.len) {
//这个是最后真正处理超时的flow,把td->aside_queue的flow移入回收线程的回收队列,
//何苦呢
cnt += ProcessAsideQueue(td, counters);
}
}
counters->rows_checked += rows_checked;
counters->rows_skipped += rows_skipped;
counters->rows_empty += rows_empty;
if (td->aside_queue.len) {
//如果有未处理的flow,则继续把td->aside_queue的flow移入
//回收线程的回收队列flow_recycle_q
cnt += ProcessAsideQueue(td, counters);
}
counters->flows_removed += cnt;
/* coverity[missing_unlock : FALSE] */
return cnt;
}
2. FlowManagerHashRowTimeout 函数
FlowManager-》FlowTimeoutHash-》FlowManagerHashRowTimeout
这函数实际执行了老化操作,其实就调用FlowManagerFlowTimeout检查是否超时,是就把老化的flow从flow_hash移除,然后移入参数td->aside_queue,别的什么没干。
/**
* \internal
*
* \brief check all flows in a hash row for timing out
*
* \param f last flow in the hash row
* \param ts timestamp
* \param emergency bool indicating emergency mode
* \param counters ptr to FlowTimeoutCounters structure
*/
static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td,
Flow *f, struct timeval *ts,
int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
{
uint32_t checked = 0;
Flow *prev_f = NULL;
//循环检查bucket上的每个flow
do {
checked++;
/* check flow timeout based on lastts and state. Both can be
* accessed w/o Flow lock as we do have the hash row lock (so flow
* can't disappear) and flow_state is atomic. lastts can only
* be modified when we have both the flow and hash row lock */
/* timeout logic goes here */
//检查flow是否超时
if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
//未超时继续检查下个flow
counters->flows_notimeout++;
prev_f = f;
f = f->next;
continue;
}
FMFlowLock(f); //FLOWLOCK_WRLOCK(f);
Flow *next_flow = f->next;
/* never prune a flow that is used by a packet we
* are currently processing in one of the threads */
//到这里说明flow超时了,引用计数如果大于0则不能老化
//FlowBypassedTimeout这个函数还需要再看看
if (f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters)) {
FLOWLOCK_UNLOCK(f);
prev_f = f;
if (f->use_cnt > 0) {
counters->flows_timeout_inuse++;
}
f = f->next;
continue;
}
//设置老化标志为超时老化
f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT;
counters->flows_timeout++;
//从bucket上移除flow
RemoveFromHash(f, prev_f);
//将超时的flow移入td->aside_queue队列
//函数返回后,会将这个队列的flow移入回收线程的回收队列
FlowQueuePrivateAppendFlow(&td->aside_queue, f);
/* flow is still locked in the queue */
f = next_flow;
} while (f != NULL);
counters->flows_checked += checked;
if (checked > counters->rows_maxlen)
counters->rows_maxlen = checked;
}