在实际工作中,我终于遇到了一些实实在在的面试题:
- 算法题:一个包含海量节点的无序链表,已知里面有多个重复元素,找出重复次数最多的那个,给出时间复杂度。比如20-1-2-3-5-7-3-20-12-3,重复元素有3个3,2个20答案显然是3。
在进行流量分析,DDoS检测与防护,流量清洗等动作时,一个很常见的需求就是“求top N”,与之相关的算法可谓汗牛充栋:
- 排序
- 最大堆
- LRU
- Bitmap counter
- …
理论算法很多,但在实战中总是会碰到各种工程问题,比如说并发操作的锁开销不可避免,那么设计一个装置而不是设计一个算法就是自然而然的想法了。
另外值得注意的的是,检测异常流量是一种模糊操作,未必需要精确的运算和精确的匹配,只需要定位到异常即可,因此还有一种思路就是使用percpu数据结构,读取数据时容忍精度的损失,用精度换效率。
下面是我在下班路上想到的两种方法,求top 1非常适合,稍微也能求top N。这两种方法实现起来非常简单通透。
方法1:多hash链表(类bloom filter的思路)
两个(或多个)使用不同hash算法的hash表链接IP地址结构体,并为per bucket计数:
数据结构如下:
struct bucket {
int hash;
atomic_t count;
spinlock_t lock;
struct list_head hlist;
};
struct IP_item {
struct list_head list;
u32 ipaddr;
};
#define HSIZE 8192
struct bucket hlist[2][HSIZE];
操作:
-
IP进入:以IP地址(源或者目标,取决于配置)分别计算hash1和hash2,分别链入相应的list,递增bucket计数。
-
timer到期或者conntrack销毁:摘除相应的IP结构体,递减bucket计数。
异常检测:
-
两个hash表同时存在某个bucket计数超过平均计数器的 β \beta β倍,且两个最大bucket计数器相差不超过 α \alpha α,视为异常:
L m a x > β L m e a n L_{max}>\beta L_{mean} Lmax>βLmean
其中平均长度计算如下:
L m e a n = Σ n = 0 N b u c k e t s − 1 L n − L m a x N b u c k e t s − 1 L_{mean}=\dfrac{\Sigma_{n=0}^{N_{buckets}-1}{Ln}-L_{max}}{N_{buckets}-1} Lmean=Nbuckets−1Σn=0Nbuckets−1Ln−Lmax
-
遍历两个最大计数器的bucket链表的开始 γ \gamma γ个元素( γ \gamma γ为平均长度的两倍),取两个链表最大重复交集。即为异常IP地址。力扣349:https://leetcode-cn.com/problems/intersection-of-two-arrays/
评价:
-
IP进入需要hash计算操作,O(1)时间复杂度。
-
IP结构体插入hash bucket链表,O(1)时间复杂度。(可能spinlock开销大,可percpu优化)
-
异常检测冒泡两个hash表最大bucket计数器(bucket固定),O(1)时间复杂度。
-
遍历两个链表KaTeX parse error: Undefined control sequence: \gama at position 1: \̲g̲a̲m̲a̲个元素,O(n)时间复杂度,假设hash算法是均匀的, L m e a n L_{mean} Lmean是很小的。
方法2:hash计数器(同样是bloom filter的思路)
将IP地址按照每8位拆分,每8位为256个counter,同时设置两个hash counter:
数据结构如下:
struct bucket {
int hash;
atomic_t count;
};
struct bcounter {
atomic_t counter;
};
#define HSIZE 8192
struct bucket hlist[2][HSIZE];
struct bcounter counter[4][256]
操作:
-
IP进入:以IP地址(源或者目标,取决于配置)分别计算hash1和hash2,分别递增两个hash counter对应的计数器,同时递增per octet counter组对应bit的计数器。
-
timer到期或者conntrack销毁:递减相应计数器。
异常检测:
-
两个hash表同时存在某个bucket计数超过平均计数器的 β \beta β倍,且两个最大bucket计数器相差不超过 α \alpha α,视为异常:
L m a x > β L m e a n L_{max}>\beta L_{mean} Lmax>βLmean
其中平均长度计算如下:
L m e a n = Σ n = 0 N b u c k e t s − 1 L n − L m a x N b u c k e t s − 1 L_{mean}=\dfrac{\Sigma_{n=0}^{N_{buckets}-1}{Ln}-L_{max}}{N_{buckets}-1} Lmean=Nbuckets−1Σn=0Nbuckets−1Ln−Lmax
-
取per octet counter组的最大计数器,按其位置作索引拼接成IP地址,用两个hash表验算。如果同时落到了最大计数器的bucket(即第一步发现的那两个bucket),则该拼接的地址就是异常地址,否则选择次小继续。
不是per octet counter组最大值拼接出来的概率很低,我们假设IP地址是均匀的长尾分布。
评价:
- 省去了hash链表的维护,特别是spinlock的使用。
优化版本
可以多加两个重叠per octet counter组,提高准确性,如此一来,从低位到高位,对于每一个octet,只要能high 4bit和下一个octet的low 4bit能对应拼接的,超级大概率就是top 1地址,除此之外,还能求top N地址:
背后的思路是,正常无突刺的情况下,IP地址的足够散列的,异常情况下,32位IP地址的任何子区间均会出现突刺,把所有的突刺拼接起来,就是一个完整的IP地址。
然而,如果出现了两个或者多个flood程度相当的IP地址出现,便难以检测了,需要排列组合来验算。一种应对这种情景的思路就是对每一个子区间进行top N排序,然后再验算。
总结
大家建议我用bitmap直接做,对于IPv4地址而言,耗费4G的地址空间。然而我是在内核里做这个功能,4G实在是玩不起啊(其实也没什么,但还是觉得不优雅)。bitmap是一个好方法,但如何能少占些内存呢?
于是想到了模糊的求解方法,也就是方法一,这个方法在极端情况下可能会失效,比如hash算法被攻破,hash散列产生了严重的畸变,但大多数情况下是好使的。在review的时候,per bucket的spinlock依然是心头一阵痛。
方法二是直接根据bitmap的思路改造的,好使,无锁。事实上,如果仔细观察和分析这些拆分出来的bits子区间,还是能挖出更多东西的,求top N完全不在话下,只是说目的达到了,也就没有再继续了。
BTW,以后面试千万不要再让人写个查找算法了,应该考察工程实现上的问题,比如并发操作如何优化锁的开销等等,写一个实际可用的spinlock要比写一个算法有用的多!大部分程序员职业生涯中除了面试根本不会再写快速排序,但就我这个成年不写几行代码的不会编程的人,这不也必须写一个在Linux内核里查找top 1 IP地址的实现了么?
浙江温州皮鞋湿,下雨进水不会胖。