常见问题和特殊情况
• 应用程序如何从入站流中读取数据?TCPConnection::inbound stream() 已经在头文件中实现了。 • TCPConnection 是否需要任何花哨的数据结构或算法?不,它真的没有。繁重的工作全部由 TCPSender 和 TCPReceiver 完成你已经实施了。这里的工作实际上只是连接所有东西起来,并处理一些难以轻易解决的挥之不去的连接范围内的微妙之处考虑到发送方和接收方。 • TCPConnection 实际上是如何发送一个段的?与 TCPSender 类似——将其推送到段输出队列。至于你的就 TCPConnection 而言,一旦将其推送到此队列,就认为它已发送。很快,所有者就会出现并弹出它(使用公共段 out() 访问器方法)并真正发送。 • TCPConnection 如何获知时间的流逝?与 TCPSender 类似——tick() 方法将被定期调用。请不要使用任何其他方式来告诉时间——tick 方法是您访问段落的唯一途径时间。这使事情保持确定性和可测试性。 • 如果传入的段设置了 rst 标志,TCPConnection 会做什么?此标志(“重置”)意味着连接立即死亡。如果您收到一个片段使用 rst ,您应该在入站和出站 ByteStreams 上设置错误标志,并且对 TCPConnection::active() 的任何后续调用都应返回 false。 • 我应该何时发送设置了 rst 标志的段? 有两种情况您需要中止整个连接: 1.如果发送方连续多次重传都没有成功(超过 TCPConfig::MAX RETX ATTEMPTS次,即 8)。 2.如果TCPConnection析构函数在连接仍处于活动状态时被调用(active() 返回真)。发送一个带有 rst set 的段与接收一个段有类似的效果:连接是dead 并且不再是 active(),并且两个 ByteStreams 都应该设置为错误状态。 • 等等,但我如何制作一个可以设置 rst 标志的段?是什么序列号?任何传出段都需要有正确的序列号。你可以强制TCPSender 通过调用生成具有正确序列号的空段它的发送空段()方法。或者你可以让它填满窗口(生成如果它有未完成的信息要发送,例如来自流的字节或SYN/FIN) 通过调用它的 fill window() 方法。 • ack 标志的目的是什么?不是总是有一个确认吗? 几乎每个 TCPSegment 都有一个 ackno,并且设置了 ack 标志。例外情况 只是在连接的最开始,在接收者有任何segment之前承认。在传出段上,您需要在以下情况下设置 ackno 和 ack 标志。也就是说,每当 TCPReceiver 的 ackno() 方法返回一个std::optional<WrappingInt32> 有一个值,你可以用has value()测试.在传入的段上,只有当 ack 字段为时,您才需要查看 ackno。如果是这样,请将该确认(和窗口大小)提供给 TCPSender。 • 在接收到一个段时,如果 TCPReceiver 抱怨该段,我该怎么办?没有重叠窗口并且是不可接受的(segment received() 返回 false)?在这种情况下,TCPConnection 需要确保一个段被发送回 peer,给出当前的确认和窗口大小。这可以帮助纠正困惑同行。 • 好的。如果 TCPConnection 收到一个段,并且 TCPSender抱怨 ackno 无效(ack received() 返回 false)?一样的答案! • 如果 TCPConnection 接收到一个段,并且一切正常,如何?我是还需要回复吗?如果该段占用了任何序列号,那么您需要确保它得到确认 - 至少需要将一个段发送回对等方适当的序列号和新的确认和窗口大小。你可能不会需要做任何事情来强制这样做,因为 TCPSender 通常会决定发送一个ack received() 中的新段(因为窗口中打开了更多空间)。但即使 TCPSender 没有更多数据要发送,您也需要确保传入段以某种方式得到确认。 • 如果 TCPConnection 只是确认每个段,即使它没有占用任何序列号?不是个好主意!这两个对等点最终将发送无限数量的 ack来回打乒乓球。 • 如果 TCPReceiver 想要通告窗口大小,我应该发送什么窗口大小 这比 TCPSegment::header().win 字段中的大小要大吗?尽量发送最大值。 • TCP 连接何时最终“完成”?active() 什么时候可以返回 false?请参阅下一节。TCP 连接的结束:共识需要工作
TCPConnection 的一项重要功能是决定何时建立 TCP 连接完全“完成”。发生这种情况时,实现将其独占声明释放给本地端口号,停止发送确认以回复传入的段,考虑连接成为历史,并且其 active() 方法返回 false。连接可以通过两种方式结束。在不干净的关闭中,TCPConnection发送或接收设置了 rst 标志的段。在这种情况下,出站和入站 ByteStreams 应该都处于错误状态,并且 active() 可以返回 false立即地。干净的关闭是我们如何在没有错误的情况下“完成”(active() = false)。这是更复杂,但这是一件美妙的事情,因为它尽可能确保每个两个 ByteStream 中的一个已完全可靠地传送到接收对等方。在里面 下一节(§§5.1),我们给出了干净关闭发生时的实际结果,所以感觉如果您愿意,可以*跳过。太好了,你还在。因为二将军问题,无法保证两个对等点都可以实现干净关闭,但 TCP 非常接近。就是这样。从一个对等点(一个 TCPConnection,我们称之为“本地”对等点)的角度来看,有在与“远程”对等方的连接中干净关闭的四个先决条件: Prereq #1入站流已完全组装并已结束。 Prereq #2出站流已被本地应用程序结束并完全发送(包括 它结束的事实,即带有 fin 的段)到远程对等方。 Prereq #3出站流已被远程对等方完全确认。 Prereq #4本地 TCPConnection 确信远程对等方可以满足先决条件3 有两种替代方式可能会发生这种情况: • 选项A:在两个流结束后徘徊。先决条件 #1 到 #3是真的,并且远程对等方似乎已经得到了本地对等方的确认整个流。本地对等方肯定不知道这一点——TCP 不知道可靠地传送确认(它不确认确认)。但是本地同行还是很有信心的远程对等方已收到确认,因为远程对等方似乎没有正在重新传输任何内容,并且本地对等方已等待一段时间以确保。具体来说,当满足先决条件 #1 到 #3 时,就会建立连接并且已经至少是初始重传超时的 10 倍( cfg.rt 超时) 因为本地对等方已收到来自远程对等体。这在两个流结束后称为“徘徊”,以使确保远程对等方没有尝试重新传输我们需要确认的任何内容- 边缘。这确实意味着 TCPConnection 需要存活一段时间,保持对本地端口号的独占声明并可能发送 ack 作为响应 到传入的段,即使在 TCPSender 和 TCPReceiver 完全完成了他们的工作,两个流都结束了。 • 选项B:被动关闭。先决条件 #1 到 #3 为真,并且本地peer 100% 确定远程 peer 可以满足先决条件 #3。怎么能这是,如果 TCP 不确认确认?因为远程对等 是第一个结束其流的人。⋆为什么这条规则有效?这是脑筋急转弯,你不需要进一步阅读以完成本实验,但思考并进入二将军问题的深层原因和内在约束关于跨不可靠网络的可靠性。这样做的原因是在接收并组装远程对等方的鳍后(先决条件 #1),本地对等方发送了一个比以往更大的序列号的段之前发送(至少,它必须发送自己的鳍段以满足先决条件#2),并且该段也有一个确认远程对等方的 fin 位。远程对等方确认了该段(以满足先决条件#3),这意味着远程对等方必须也有看到本地对等方对远程对等方 fin 的确认。这保证了远程对等方必须能够满足自己的先决条件#3。这些所有意味着本地对等点可以满足先决条件 #4 而不必逗留。哇!我们说这是一个脑筋急转弯。实验室论文中的额外学分:可以你找到更好的方法来解释这个了吗?The bottom line is that if the TCPConnection’s inbound stream ends before the TCPConnection has ever sent a fin segment, then the TCPConnection doesn’t need to linger after both streams finish.
总结:
我方发送的所有数据都已经收到确认ack而且我收到对方的所有数据,为什么还要停留一段时间? 因为单独的ack不会重传。我方发送的ack可能会因为网络故障,不能到达对方,如果我们直接关闭连接,,然后对方就会一直重传,虽然对方会因为多次重传主动断开连接,但我们还是不希望对方一直重传等待,浪费资源,我们想要两方都正确收到对方的所有数据,并且自己发送的数据也得到确认,这样是最完美的. 为什么有时可以不用停留? 因为自己还有没发送的数据,收到了对面数据的fin,下次我放发送数据时,必然会带上该fin的ack序列号,而且对面必然会对这个数据返回ack,不然我就会一直重传, 那就表明,在这次连接停止的时候,对方必然已经确认我收到了他的完整数据,对方也就不可能重传了,满足条件4,那我就直接关闭连接就行。 tcp_connection.hh#ifndef SPONGE_LIBSPONGE_TCP_FACTORED_HH #define SPONGE_LIBSPONGE_TCP_FACTORED_HH #include "tcp_config.hh" #include "tcp_receiver.hh" #include "tcp_sender.hh" #include "tcp_state.hh" //! \brief A complete endpoint of a TCP connection class TCPConnection { private: TCPConfig _cfg; TCPReceiver _receiver{_cfg.recv_capacity}; TCPSender _sender{_cfg.send_capacity, _cfg.rt_timeout, _cfg.fixed_isn}; uint32_t last_recive_time{0}; uint32_t now_time{0}; bool close{false}; //! outbound queue of segments that the TCPConnection wants sent std::queue<TCPSegment> _segments_out{}; //! Should the TCPConnection stay active (and keep ACKing) //! for 10 * _cfg.rt_timeout milliseconds after both streams have ended, //! in case the remote TCPConnection doesn't know we've received its whole stream? bool _linger_after_streams_finish{true}; void bad_shutdown(bool send_rst); void clean(); bool stream_both_eof(); void check_close(); public: //! \name "Input" interface for the writer //!@{ //! \brief Initiate a connection by sending a SYN segment void connect(); //! \brief Write data to the outbound byte stream, and send it over TCP if possible //! \returns the number of bytes from `data` that were actually written. size_t write(const std::string &data); //! \returns the number of `bytes` that can be written right now. size_t remaining_outbound_capacity() const; //! \brief Shut down the outbound byte stream (still allows reading incoming data) void end_input_stream(); //!@} //! \name "Output" interface for the reader //!@{ //! \brief The inbound byte stream received from the peer ByteStream &inbound_stream() { return _receiver.stream_out(); } //!@} //! \name Accessors used for testing //!@{ //! \brief number of bytes sent and not yet acknowledged, counting SYN/FIN each as one byte size_t bytes_in_flight() const; //! \brief number of bytes not yet reassembled size_t unassembled_bytes() const; //! \brief Number of milliseconds since the last segment was received size_t time_since_last_segment_received() const; //!< \brief summarize the state of the sender, receiver, and the connection TCPState state() const { return {_sender, _receiver, active(), _linger_after_streams_finish}; }; //!@} //! \name Methods for the owner or operating system to call //!@{ //! Called when a new segment has been received from the network void segment_received(const TCPSegment &seg); //! Called periodically when time elapses void tick(const size_t ms_since_last_tick); //! \brief TCPSegments that the TCPConnection has enqueued for transmission. //! \note The owner or operating system will dequeue these and //! put each one into the payload of a lower-layer datagram (usually Internet datagrams (IP), //! but could also be user datagrams (UDP) or any other kind). std::queue<TCPSegment> &segments_out() { return _segments_out; } //! \brief Is the connection still alive in any way? //! \returns `true` if either stream is still running or if the TCPConnection is lingering //! after both streams have finished (e.g. to ACK retransmissions from the peer) bool active() const; //!@} //! Construct a new connection from a configuration explicit TCPConnection(const TCPConfig &cfg) : _cfg{cfg} {} //! \name construction and destruction //! moving is allowed; copying is disallowed; default construction not possible //!@{ ~TCPConnection(); //!< destructor sends a RST if the connection is still open TCPConnection() = delete; TCPConnection(TCPConnection &&other) = default; TCPConnection &operator=(TCPConnection &&other) = default; TCPConnection(const TCPConnection &other) = delete; TCPConnection &operator=(const TCPConnection &other) = delete; //!@} }; #endif // SPONGE_LIBSPONGE_TCP_FACTORED_HH
tcp_connection.hh
#include "tcp_connection.hh" #include <iostream> // Dummy implementation of a TCP connection // For Lab 4, please replace with a real implementation that passes the // automated checks run by `make check`. template <typename... Targs> void DUMMY_CODE(Targs &&... /* unused */) {} using namespace std; void TCPConnection::bad_shutdown(bool send_rst) { close=true; _receiver.stream_out().set_error(); _sender.stream_in().set_error(); if(send_rst){ TCPSegment seg; seg.header().rst=true; seg.header().seqno=_sender.next_seqno(); _segments_out.push(seg); } } void TCPConnection::check_close(){ if(stream_both_eof()){ if(!_linger_after_streams_finish||now_time-last_recive_time>=10*_cfg.rt_timeout){ close=true; } } } bool TCPConnection::stream_both_eof(){ return _receiver.stream_out().input_ended()&&_sender.stream_in().eof()&&_sender.bytes_in_flight()==0ull; } void TCPConnection::clean(){ queue<TCPSegment>&buffer=_sender.segments_out(); while(!buffer.empty()){ TCPSegment& seg=buffer.front(); const optional<WrappingInt32>& ackno=_receiver.ackno(); if(ackno.has_value()){ seg.header().ack=true; seg.header().ackno=ackno.value(); } seg.header().win=static_cast<uint16_t>(UINT16_MAX>_receiver.window_size()?_receiver.window_size():UINT16_MAX); _segments_out.push(seg); buffer.pop(); } check_close(); } size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); } size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); } size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); } size_t TCPConnection::time_since_last_segment_received() const { return now_time-last_recive_time; } void TCPConnection::segment_received(const TCPSegment &seg) { last_recive_time=now_time; if(seg.header().rst){ bad_shutdown(false); return ; } _receiver.segment_received(seg); //如果己方还有需要发送的数据没有发送就收到了对方的fin,收到自己的fin后就不需要等待 if(_receiver.stream_out().input_ended()&&!_sender.stream_in().eof()){ _linger_after_streams_finish=false; } //reciver处理结束,sender处理 if(seg.header().ack){ _sender.ack_received(seg.header().ackno,seg.header().win); } if(seg.length_in_sequence_space()>0u)_sender.send_empty_ack(); clean(); } bool TCPConnection::active() const { return !close; } size_t TCPConnection::write(const string &data) { size_t size=_sender.stream_in().write(data); _sender.fill_window(); clean(); return size; } //! \param[in] ms_since_last_tick number of milliseconds since the last call to this method void TCPConnection::tick(const size_t ms_since_last_tick) { now_time+=ms_since_last_tick; if(_sender.consecutive_retransmissions()>=TCPConfig::MAX_RETX_ATTEMPTS){ bad_shutdown(true); return ; } _sender.tick(ms_since_last_tick); clean(); } void TCPConnection::end_input_stream() {; _sender.stream_in().end_input(); //发送fin _sender.fill_window(); clean(); } void TCPConnection::connect() { _sender.fill_window(); clean(); } TCPConnection::~TCPConnection() { try { if (active()) { cerr << "Warning: Unclean shutdown of TCPConnection\n"; // Your code here: need to send a RST segment to the peer bad_shutdown(true); } } catch (const exception &e) { std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl; } }