Suricata6.0流表管理源码注释六:流的老化02

        流老化入口函数FlowManger调用函数FlowTimeoutHash完成流老化处理,这个函数完成了大部分老化工作,把一个老化的流从flow_hash上移走,移入了回收队列,由回收线程取出来放入空闲全局flow内存池。

1. FlowTimeoutHash 函数

FlowManager-》FlowTimeoutHash

函数有两个参数表示flow bucket的范围值,const uint32_t hash_min, const uint32_t hash_max,先把老化的flow移入参数td->aside_queue队列,再从aside_queue队列移入回收线程的队列flow_recycle_q,如此玩法,flow必晕。

/**
 *  \brief time out flows from the hash
 *
 *  \param ts timestamp
 *  \param hash_min min hash index to consider
 *  \param hash_max max hash index to consider
 *  \param counters ptr to FlowTimeoutCounters structure
 *
 *  \retval cnt number of timed out flow
 */

//新版本的老化策略比旧版本复杂了很多,流分配那块修改应该是有效率提升,
//但是这个函数应该没有效率提升,
//这个函数总思路就是先设置一个位图,每1位表示一个bucket是否有需要老化的flow,
//还是和旧版一样判断每个bucket的next_ts与最新时间,先循环32或64次检查每个
//bucket是否有超时的flow,有超时的放入一个32或64位的整数,
//循环完成,再循环这个整数,检查每一位同时检查next_ts,都符合超时条件则进行老化,好复杂,没毛用
static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td,
        struct timeval *ts,
        const uint32_t hash_min, const uint32_t hash_max,
        FlowTimeoutCounters *counters)
{
    uint32_t cnt = 0;
    //获取紧急模式标志
    const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
    //计算检查的bucket数量
    const uint32_t rows_checked = hash_max - hash_min;
    uint32_t rows_skipped = 0;
    uint32_t rows_empty = 0;

//那个一次取32还是64个bucket就是这个系统位数决定的,以下都认为64位吧
#if __WORDSIZE==64
#define BITS 64
#define TYPE uint64_t
#else
#define BITS 32
#define TYPE uint32_t
#endif

    for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
        TYPE check_bits = 0; //64位整数
        //取最小的数,万一bucket数量不够64个呢
        const uint32_t check = MIN(BITS, (hash_max - idx));
        //循环64次,检查64个bucket是否有超时的flow,有则设置一位到check_bits
        for (uint32_t i = 0; i < check; i++) {
            FlowBucket *fb = &flow_hash[idx+i];
            check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= (int32_t)ts->tv_sec) << (TYPE)i;
        }
        if (check_bits == 0) //没有一个超时的则继续下一轮64个bucket的超时判断
            continue;

        //多循环一次
        for (uint32_t i = 0; i < check; i++) {
            //有超时的,则循环检测这个64位数,然后老化之
            FlowBucket *fb = &flow_hash[idx+i];
            //多比较一次,何苦呢
            if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= (int32_t)ts->tv_sec) {
                FBLOCK_LOCK(fb);
                Flow *evicted = NULL;
                if (fb->evicted != NULL || fb->head != NULL) {
                    if (fb->evicted != NULL) {
                        //这个evicted队列不为空,说明在建立流获取流时检查到超时流,把那个超时
                        //放到有条件的放到这个队列了,也需要老化回收
                        /* transfer out of bucket so we can do additional work outside
                         * of the bucket lock */
                        //这里把这个evicted队列放到临时变量,后边代码处理这个evicted队列
                        evicted = fb->evicted;
                        fb->evicted = NULL;
                    }
                    if (fb->head != NULL) {
                        //这个flow bucket上有超时flow,所以调用FlowManagerHashRowTimeout
                        //进行超时处理,超时的流放到了td->aside_queue这个队列,这bucket老化
                        //就是把flow移入些队列,然后再将这些队列的flow以如回收线程的队列,繁琐
                        int32_t next_ts = 0;
                        FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
                            //设置bucket的最小超时时间,就是下一次超时时间
                            if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
                            SC_ATOMIC_SET(fb->next_ts, next_ts);
                    }
                    //如果bucket的evicted队列没有flow, 且head为空不包含flow了,说明这个
                    //bucket的flow都被老化了,则设置下次老化时间为INT_MAX,就是不需要老化了
                    if (fb->evicted == NULL && fb->head == NULL) {
                        SC_ATOMIC_SET(fb->next_ts, INT_MAX);
                    }
                } else {
                    //如果bucket的evicted队列没有flow,且head为空不包含flow了,说明这个
                    //bucket的flow都被老化了,则设置下次老化时间为INT_MAX,就是不需要老化了
                    SC_ATOMIC_SET(fb->next_ts, INT_MAX);
                    rows_empty++;
                }
                FBLOCK_UNLOCK(fb);
                /* processed evicted list */
                if (evicted) {
                    //这个bucket的超时处理完成,现在处理移入evicted队列的超时flow
                    //其实,这个函数又把这些flow移入了td->aside_queue,尴尬
                    FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
                }
            } else {
                //没有超时的bucket,跳过计数增1
                rows_skipped++;
            }
        }
        if (td->aside_queue.len) {
            //这个是最后真正处理超时的flow,把td->aside_queue的flow移入回收线程的回收队列,
            //何苦呢
            cnt += ProcessAsideQueue(td, counters);
        }
    }

    counters->rows_checked += rows_checked;
    counters->rows_skipped += rows_skipped;
    counters->rows_empty += rows_empty;

    if (td->aside_queue.len) {
        //如果有未处理的flow,则继续把td->aside_queue的flow移入
        //回收线程的回收队列flow_recycle_q
        cnt += ProcessAsideQueue(td, counters);
    }
    counters->flows_removed += cnt;
    /* coverity[missing_unlock : FALSE] */
    return cnt;
}

2. FlowManagerHashRowTimeout 函数

FlowManager-》FlowTimeoutHash-》FlowManagerHashRowTimeout

这函数实际执行了老化操作,其实就调用FlowManagerFlowTimeout检查是否超时,是就把老化的flow从flow_hash移除,然后移入参数td->aside_queue,别的什么没干。

/**
 *  \internal
 *
 *  \brief check all flows in a hash row for timing out
 *
 *  \param f last flow in the hash row
 *  \param ts timestamp
 *  \param emergency bool indicating emergency mode
 *  \param counters ptr to FlowTimeoutCounters structure
 */
static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td,
        Flow *f, struct timeval *ts,
        int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
{
    uint32_t checked = 0;
    Flow *prev_f = NULL;

    //循环检查bucket上的每个flow
    do {
        checked++;

        /* check flow timeout based on lastts and state. Both can be
         * accessed w/o Flow lock as we do have the hash row lock (so flow
         * can't disappear) and flow_state is atomic. lastts can only
         * be modified when we have both the flow and hash row lock */

        /* timeout logic goes here */
        //检查flow是否超时
        if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
            //未超时继续检查下个flow
            counters->flows_notimeout++;

            prev_f = f;
            f = f->next;
            continue;
        }

        FMFlowLock(f); //FLOWLOCK_WRLOCK(f);

        Flow *next_flow = f->next;

        /* never prune a flow that is used by a packet we
         * are currently processing in one of the threads */
        //到这里说明flow超时了,引用计数如果大于0则不能老化
        //FlowBypassedTimeout这个函数还需要再看看
        if (f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters)) {
            FLOWLOCK_UNLOCK(f);
            prev_f = f;
            if (f->use_cnt > 0) {
                counters->flows_timeout_inuse++;
            }
            f = f->next;
            continue;
        }

        //设置老化标志为超时老化
        f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT;

        counters->flows_timeout++;

        //从bucket上移除flow
        RemoveFromHash(f, prev_f);

        //将超时的flow移入td->aside_queue队列
        //函数返回后,会将这个队列的flow移入回收线程的回收队列
        FlowQueuePrivateAppendFlow(&td->aside_queue, f);
        /* flow is still locked in the queue */

        f = next_flow;
    } while (f != NULL);

    counters->flows_checked += checked;
    if (checked > counters->rows_maxlen)
        counters->rows_maxlen = checked;
}

上一篇:P1601 A+B Problem(高精)


下一篇:TS贪吃蛇项目详解