继续接着第一篇写:使用C#实现DHT磁力搜索的BT种子后端管理程序+数据库设计(开源)[搜片神器]
谢谢园子朋友的支持,已经找到个VPS进行测试,国外的服务器: http://www.sosobta.com 大家可以给提点意见...
开源地址:https://github.com/h31h31/H31DHTMgr
程序下载:H31DHT下载
看大家对昨天此类文章的兴趣没有第一篇高,今天就简单的对支持的朋友进行交流.园子里的朋友希望授大家以渔,所以这部分代码就先不放出来.希望大家更多的加入进来.
也希望谁有能力将C++的代码转换成C#的,添加到我们的搜片神器工具里面.
昨天通过向大家介绍DHT的工作原理,相信大家大概明白怎么回事,不明白的朋友可以继续分享接下来的文章.
本人借鉴的代码是C++版本的:transmission里面的DHT代码,大家可以访问网站下载:http://www.transmissionbt.com/
不过里面的代码环境是LINUX下的,需要自己转换到相应的WIN平台上来.
有兴趣使用C#来完成DHT功能的朋友可以借鉴mono-monotorrent,里面的框架代码比较多,不如C++的transmission里面就三个文件来得明白.
transmission里面只有三个文件就可以实现dht的功能: dht.c dht.h dht-example.c,并且接口很简单,复用性很好。
下面介绍进入DHT网络主要功能步骤
dht.c dht.h代码分成三部分:
1、路由表的插入操作。
1)如果节点已经在路由表中,则更新节点,返回。
2)如果桶没有满,则插入,返回。
3)如果发现失效节点,替换,返回。
4)发现可疑节点,则保存新节点到缓存中并且如果该可疑节点没有ping,发出ping_node操作,返回。
5)现在,桶已经充满了好的节点,如果自己的ID没有落在这个桶中,返回。
6)将桶空间分成两半。跳到步骤1)。
2、KAD远程处理调用。
这部分又分成3种,
1)ping/pong操作。
所有的包的tid都使用pg\0\0
2)find_node操作。
所有的包的tid都使用fn\0\0
3)get_peers/annouce_peer操作。
对同一个HASH的一次递归查询中,tid保持不变。
其中只有3)种实现bittorrent的DHT规范里面提到的递归查询操作,1)和2)仅仅用来维护路由表,并且不保存状态。
3、定时器处理:
为了检测路由表中节点的有效性(根据规范,路由表中应该只保存有效节点),在代码中,在执行krpc操作时如果发现时对路由表中的节点操作,那么则保存操作的开始时间
pinged_time,通过操作的开始时间来判断操作是否超时。
expire_stuff_time
超时时,会执行下面的操作:
1、检查路由表中失效的节点(根据pinged_time来判定),并将该节点删除。
2、检查用来保存annoounce_peer的节点是否超过30分钟(这个不打算深入讨论,故不做解析)。
3、检查递归查询操作超时。
rotate_secrets_time
定时器。
用来每隔大约15分左右就更换token(见DHT规范).
confirm_nodes_time
定时器。
查找长期没有活动的桶,然后通过执行一个find_node的krpc操作来刷新它。
search_time定时器。
有可能出现发出的所有的get_peers操作,都没有应答,那么search_time定时器遇到这种情形时负责重发所有请求。(注意:
get_peers操作最大未决的krpc请求数是3)
用于维持路由表的ping/pong操作:
在试图插入节点时,发现桶已经满,而存在可疑节点时会触发ping_node操作。未响应的节点会有可疑最终变为失效节点,而被替换。
下面介绍我们是如何进入DHT网络
- DHT必须把自己电脑当服务器,别人才能够知道自己是谁,所以需要通过UDP绑定端口,参考代码里面支持IPV6,个人觉得可以过滤掉.WIN平台代码如下:
//初始化socket
m_soListen =(int)socket(PF_INET, SOCK_DGRAM, IPPROTO_IP);
if (m_soListen == INVALID_SOCKET) {
m_iErrorNo=WSAGetLastError();
_dout(_T("CH31CarMonitorDlg Start Error(%d).\n"),m_iErrorNo);
return -;
}
//初始化服务器地址
SOCKADDR_IN addr;
memset(&addr, , sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
//绑定端口监听
if (bind(m_soListen, (SOCKADDR*)&addr, sizeof(addr)) == SOCKET_ERROR) {
m_iErrorNo=WSAGetLastError();
_dout(_T("CH31CarMonitorDlg Start Error(%d).\n"),m_iErrorNo);
return -;
}UDP端口绑定
- DHT需要生成一个自己的20位ID号,当然可以通过随机一个数值,然后通过SHA1来生成20位的ID号,WIN平台代码如下:
unsigned char p[];
CSHA1 sha1;
sha1.Reset();
sha1.Update((const unsigned char *)m_myID.GetBuffer(), m_myID.GetLength());
sha1.Final();
sha1.GetHash(p);SHA1生成ID号
- 初始化他人服务器的IP信息,这样我们就可以从他们那里查询我们要的信息,借鉴代码如下:
rc = getaddrinfo("router.utorrent.com","", &hints1, &info);
//rc = getaddrinfo("router.bittorrent.com","6881", &hints1, &info);
//rc = getaddrinfo("dht.transmissionbt.com","6881", &hints1, &info);
if(rc != ) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rc));
exit();
}
infop = info;
while(infop&&m_bDataThread)
{
memcpy(&bootstrap_nodes[num_bootstrap_nodes],infop->ai_addr, infop->ai_addrlen);
infop = infop->ai_next;
num_bootstrap_nodes++;
}
freeaddrinfo(info);服务器信息
- 现在就可以初始化我们的DHT类了.由于此类使用C写的,大家可以自行封装成C++类使用.
rc = m_dht.dht_init(s, s6, m_myid,NULL);
if(rc < ) {
perror("dht_init");
exit();
}初始化DHT类
- 对服务器进行PING操作,服务器就会回应PONG操作,这样就表明服务器活动正常.
for(int i = ; i < num_bootstrap_nodes&&m_bDataThread; i++)
{
m_dht.dht_ping_node((struct sockaddr*)&bootstrap_nodes[i],sizeof(bootstrap_nodes[i]));
Sleep(m_dht.random() % );
}PING服务器
- 下面就可以使用搜索类进行操作,查询我们要的HASH值的BT种子文件代码.借鉴代码如下:
if(searching) {
if(s >= )
dht_search(hash, , AF_INET, callback, NULL);
if(s6 >= )
dht_search(hash, , AF_INET6, callback, NULL);
searching = ;
}dht_search
- 大家可以借鉴dht-example.c里面接下来的Search函数的操作,不过我们不是这样来的,我们需要直接向服务器发送Findnode和Get_Peer操作.
unsigned char tid[];
m_dht.make_tid(tid, "fn", );
m_dht.send_find_node(&ipRecvPingList[ipListPOS].fromaddr,sizeof(sockaddr),tid,,ipRecvPingList[ipListPOS].ID,,);
Sleep();
memset(tid,,sizeof(tid));
m_dht.make_tid(tid, "gp", );
m_dht.send_get_peers(&ipRecvPingList[ipListPOS].fromaddr,sizeof(sockaddr),tid,,hashList[],,);发送FINDNODE和GET_PEER操作
- 接下来的事情就是等待别人返回的信息进行分析就可以了,当然DHT类代码已经全部为我们做好的.
FD_ZERO(&readfds);
if(m_soListen >= )
FD_SET(m_soListen, &readfds);
if(s6 >= )
FD_SET(s6, &readfds);
rc = select(m_soListen > s6 ? m_soListen + : s6 + , &readfds, NULL, NULL, &tv);
if(rc <&&m_bDataThread)
{
if(errno != EINTR) {
perror("select");
Sleep();
}
} if(!m_bDataThread)
break; if(rc > &&m_bDataThread)
{
fromlen = sizeof(from1);
memset(buf,,sizeof(buf));
if(m_soListen >= && FD_ISSET(m_soListen, &readfds))
rc = recvfrom(m_soListen, buf, sizeof(buf) - , ,&from1, &fromlen);
else if(s6 >= && FD_ISSET(s6, &readfds))
rc = recvfrom(s6, buf, sizeof(buf) - , ,&from1, &fromlen);
else
abort();
} if(rc > &&m_bDataThread)
{
buf[rc] = '\0';
rc = m_dht.dht_periodic(buf, rc, &from1, fromlen,&tosleep, DHT_callback, this); }
else
{
rc = m_dht.dht_periodic(NULL, , NULL, , &tosleep, DHT_callback, this);
}等待返回DHT网络信息
- 如何解析信息DHT代码已经有了,如何别人的请求,代码也已经有了,大家可以分析DHT.c就知道是怎么回事.
int CDHT::dht_periodic(const void *buf, size_t buflen,const struct sockaddr *fromAddr, int fromlen,time_t *tosleep,dht_callback *callback, void *closure)
{
gettimeofday(&nowTime, NULL); if(buflen > )
{
int message;
unsigned char tid[], id[], info_hash[], target[];
unsigned char nodes[], nodes6[], token[];
int tid_len = , token_len = ;
int nodes_len = , nodes6_len = ;
unsigned short port;
unsigned char values[], values6[];
int values_len = , values6_len = ;
int want;
unsigned short ttid; struct sockaddr_in* tempip=(struct sockaddr_in *)fromAddr; if(is_martian(fromAddr))
goto dontread; if(node_blacklisted(fromAddr, fromlen)) {
_dout("Received packet from blacklisted node.\n");
goto dontread;
} if(((char*)buf)[buflen] != '\0') {
_dout("Unterminated message.\n");
errno = EINVAL;
return -;
} message = parse_message((unsigned char *)buf, buflen, tid, &tid_len, id, info_hash,target, &port, token, &token_len,nodes, &nodes_len, nodes6, &nodes6_len,values, &values_len, values6, &values6_len,&want); if(token_len>)
{
int a=;
}
if(message < || message == ERROR || id_cmp(id, zeroes) == )
{
_dout("Unparseable message: ");
debug_printable((const unsigned char *)buf, buflen);
_dout("\n");
goto dontread;
} if(id_cmp(id, myid) == ) {
_dout("Received message from self.\n");
goto dontread;
} if(message > REPLY) {
/* Rate limit requests. */
if(!token_bucket()) {
_dout("Dropping request due to rate limiting.\n");
goto dontread;
}
} switch(message)
{
case REPLY:
if(tid_len != )
{
_dout("Broken node truncates transaction ids: ");
debug_printable((const unsigned char *)buf, buflen);
_dout("\n");
/* This is really annoying, as it means that we will
time-out all our searches that go through this node.
Kill it. */
blacklist_node(id, fromAddr, fromlen);
goto dontread;
}
if(tid_match(tid, "pn", NULL))
{
_dout("Pong!From IP:%s:[%d] id:[%s]\n",inet_ntoa(tempip->sin_addr),tempip->sin_port,id);
new_node(id, fromAddr, fromlen, );
(*callback)(closure, DHT_EVENT_PONG_VALUES,id,(void*)fromAddr, fromlen);
//send_find_node(from,fromlen,tid,4,id,0,0);
}
else if(tid_match(tid, "fn", NULL) ||tid_match(tid, "gp", NULL))
{
int gp = ;
struct search *sr = NULL;
if(tid_match(tid, "gp", &ttid))
{
gp = ;
sr = find_search(ttid, fromAddr->sa_family);
}
_dout("Nodes found (%d+%d)%s!From IP:%s:[%d]\n", nodes_len/, nodes6_len/,gp ? " for get_peers" : "",inet_ntoa(tempip->sin_addr),tempip->sin_port);
if(nodes_len % != || nodes6_len % != )
{
_dout("Unexpected length for node info!\n");
blacklist_node(id, fromAddr, fromlen);
}
//else if(gp && sr == NULL)
//{
// _dout("Unknown search!\n");
// new_node(id, fromAddr, fromlen, 1);
// }
else
{
int i;
new_node(id, fromAddr, fromlen, );
for(i = ; i < nodes_len / ; i++)
{
unsigned char *ni = nodes + i * ;
struct sockaddr_in sin;
if(id_cmp(ni, myid) == )
continue;
memset(&sin, , sizeof(sin));
sin.sin_family = AF_INET;
memcpy(&sin.sin_addr, ni + , );
memcpy(&sin.sin_port, ni + , );
new_node(ni, (struct sockaddr*)&sin, sizeof(sin), );
(*callback)(closure, DHT_EVENT_FINDNODE_VALUES, ni,(void*)&sin, sizeof(sin));
if(sr && sr->af == AF_INET)
{
insert_search_node(ni,(struct sockaddr*)&sin,sizeof(sin),sr, , NULL, );
}
//send_get_peers((struct sockaddr*)&sin,sizeof(sockaddr),tid,4,ni,0,0);
}
for(i = ; i < nodes6_len / ; i++)
{
unsigned char *ni = nodes6 + i * ;
struct sockaddr_in6 sinip6;
if(id_cmp(ni, myid) == )
continue;
memset(&sinip6, , sizeof(sinip6));
sinip6.sin6_family = AF_INET6;
memcpy(&sinip6.sin6_addr, ni + , );
memcpy(&sinip6.sin6_port, ni + , );
new_node(ni, (struct sockaddr*)&sinip6, sizeof(sinip6), );
if(sr && sr->af == AF_INET6)
{
insert_search_node(ni,(struct sockaddr*)&sinip6,sizeof(sinip6),sr, , NULL, );
}
}
if(sr)
/* Since we received a reply, the number of requests in flight has decreased. Let's push another request. */
search_send_get_peers(sr, NULL);
}
//if(sr)
{
// insert_search_node(id, fromAddr, fromlen, sr,1, token, token_len);
if(values_len > || values6_len > )
{
_dout("Got values (%d+%d)!\n", values_len / , values6_len / );
if(callback) {
if(values_len > )
(*callback)(closure, DHT_EVENT_VALUES, sr->id,(void*)values, values_len); if(values6_len > )
(*callback)(closure, DHT_EVENT_VALUES6, sr->id,(void*)values6, values6_len);
}
}
}
}
else if(tid_match(tid, "ap", &ttid))
{
struct search *sr;
_dout("Got reply to announce_peer.\n");
sr = find_search(ttid, fromAddr->sa_family);
if(!sr) {
_dout("Unknown search!\n");
new_node(id, fromAddr, fromlen, );
}
else
{
int i;
new_node(id, fromAddr, fromlen, );
for(i = ; i < sr->numnodes; i++)
{
if(id_cmp(sr->nodes[i].id, id) == )
{
sr->nodes[i].request_time = ;
sr->nodes[i].reply_time = nowTime.tv_sec;
sr->nodes[i].acked = ;
sr->nodes[i].pinged = ;
break;
}
}
/* See comment for gp above. */
search_send_get_peers(sr, NULL);
}
}
else
{
_dout("Unexpected reply: ");
debug_printable((const unsigned char *)buf, buflen);
_dout("\n");
}
break;
case PING:
_dout("Ping (%d)!From IP:%s:%d\n", tid_len,inet_ntoa(tempip->sin_addr),tempip->sin_port);
new_node(id, fromAddr, fromlen, );
_dout("Sending pong.\n");
send_pong(fromAddr, fromlen, tid, tid_len);
break;
case FIND_NODE:
_dout("Find node!From IP:%s:%d\n",inet_ntoa(tempip->sin_addr),tempip->sin_port);
new_node(id, fromAddr, fromlen, );
_dout("Sending closest nodes (%d).\n", want);
send_closest_nodes(fromAddr, fromlen,tid, tid_len, target, want,, NULL, NULL, );
break;
case GET_PEERS:
_dout("Get_peers!From IP:%s:%d\n",inet_ntoa(tempip->sin_addr),tempip->sin_port);
new_node(id, fromAddr, fromlen, );
if(id_cmp(info_hash, zeroes) == )
{
_dout("Eek! Got get_peers with no info_hash.\n");
send_error(fromAddr, fromlen, tid, tid_len,, "Get_peers with no info_hash");
break;
}
else
{
struct storage *st = find_storage(info_hash);
unsigned char token[TOKEN_SIZE];
make_token(fromAddr, , token);
if(st && st->numpeers > )
{
_dout("Sending found%s peers.\n",fromAddr->sa_family == AF_INET6 ? " IPv6" : "");
send_closest_nodes(fromAddr, fromlen,tid, tid_len,info_hash, want,fromAddr->sa_family, st,token, TOKEN_SIZE);
}
else
{
_dout("Sending nodes for get_peers.\n");
send_closest_nodes(fromAddr, fromlen,tid, tid_len, info_hash, want,, NULL, token, TOKEN_SIZE);
}
if(callback)
{
(*callback)(closure, DHT_EVENT_GET_PEER_VALUES, info_hash,(void *)fromAddr, fromlen);
}
} break;
case ANNOUNCE_PEER:
_dout("Announce peer!From IP:%s:%d\n",inet_ntoa(tempip->sin_addr),tempip->sin_port);
new_node(id, fromAddr, fromlen, ); if(id_cmp(info_hash, zeroes) == )
{
_dout("Announce_peer with no info_hash.\n");
send_error(fromAddr, fromlen, tid, tid_len,, "Announce_peer with no info_hash");
break;
}
if(!token_match(token, token_len, fromAddr)) {
_dout("Incorrect token for announce_peer.\n");
send_error(fromAddr, fromlen, tid, tid_len,, "Announce_peer with wrong token");
break;
}
if(port == ) {
_dout("Announce_peer with forbidden port %d.\n", port);
send_error(fromAddr, fromlen, tid, tid_len,, "Announce_peer with forbidden port number");
break;
}
if(callback)
{
(*callback)(closure, DHT_EVENT_ANNOUNCE_PEER_VALUES, info_hash,(void *)fromAddr, fromlen);
}
storage_store(info_hash, fromAddr, port);
/* Note that if storage_store failed, we lie to the requestor.
This is to prevent them from backtracking, and hence polluting the DHT. */
_dout("Sending peer announced.\n");
send_peer_announced(fromAddr, fromlen, tid, tid_len);
}
} dontread:
if(nowTime.tv_sec >= rotate_secrets_time)
rotate_secrets(); if(nowTime.tv_sec >= expire_stuff_time) {
expire_buckets(buckets);
expire_buckets(buckets6);
expire_storage();
expire_searches();
} if(search_time > && nowTime.tv_sec >= search_time) {
struct search *sr;
sr = searches;
while(sr) {
if(!sr->done && sr->step_time + <= nowTime.tv_sec)
{
search_step(sr, callback, closure);
}
sr = sr->next;
} search_time = ; sr = searches;
while(sr) {
if(!sr->done) {
time_t tm = sr->step_time + + random() % ;
if(search_time == || search_time > tm)
search_time = tm;
}
sr = sr->next;
}
} if(nowTime.tv_sec >= confirm_nodes_time) {
int soon = ; soon |= bucket_maintenance(AF_INET);
soon |= bucket_maintenance(AF_INET6); if(!soon)
{
if(mybucket_grow_time >= nowTime.tv_sec - )
soon |= neighbourhood_maintenance(AF_INET);
if(mybucket6_grow_time >= nowTime.tv_sec - )
soon |= neighbourhood_maintenance(AF_INET6);
} /* In order to maintain all buckets' age within 600 seconds, worst
case is roughly 27 seconds, assuming the table is 22 bits deep.
We want to keep a margin for neighborhood maintenance, so keep
this within 25 seconds. */
if(soon)
confirm_nodes_time = nowTime.tv_sec + + random() % ;
else
confirm_nodes_time = nowTime.tv_sec + + random() % ;
} if(confirm_nodes_time > nowTime.tv_sec)
*tosleep = confirm_nodes_time - nowTime.tv_sec;
else
*tosleep = ; if(search_time > ) {
if(search_time <= nowTime.tv_sec)
*tosleep = ;
else if(*tosleep > search_time - nowTime.tv_sec)
*tosleep = search_time - nowTime.tv_sec;
} return ;
}dht_periodic
- 至于节点如何进行桶操作,调试过一次代码就会明白对应的原理,当然上面也介绍了如何进行桶分裂的原理.
- 接下来就是将上面的操作步骤进行循环.
通过上面的流程,了解DHT的工作方法后,如何增加更多的返回信息就需要下一篇的技术性问题的介绍,希望大家一起修改我们的开源程序.
大家有不明白的地方,可以一起讨论.
大家的推荐才是下一篇介绍的动力...