libutp源码简析

libutp是uTorrent使用的类似TCP的传输层实现。它基于UDP提供可信的、有序的点对点的传输,并具有最少的时延。
网上有关libutp实现的介绍几乎没有,因此我打算就其源码做一个简单的分析。这里要注意UTP是基于包的而不像TCP是基于流的,虽然它提供的API还是基于流的。这样有一些影响,例如黏包问题的处理、缓冲区的管理(例如可以去掉PUSH标记)、窗口管理、重新分组等方面。

uTP源码简介

utp.h以C89的形式提供接口。例如utp_write是以proactive的方式实现的。
utp相关的实现大多在utp_internal.cpp文件中。
utp_packedsockaddr.cpp中封装了sockaddr_in结构。
ucat.c基于uTP框架构建了一个基础的应用。
uTP的设计主要是异步的,应用代码不会阻塞在异步IO操作上,而是指定回调函数并立即返回。utp_callbacks.cpp中注册了各种回调函数,utp向外界传输消息都是以这里回调的形式开展的。例如当收到数据包时,ctx->callbacks[UTP_ON_READ]这个回调函数就会被调用。

使用回调函数也体现了libutp总体的设计思路:

  1. 回调函数能够屏蔽掉套接字API的细节
    一个可靠通信协议的主要任务是在不可靠的设施上建立可靠的传输通道,至于使用哪一种不可靠的传输方式并不是核心问题。uTP协议的内部实现能够与UDP套接字等做到隔离,utp不是继承或者封装了UDP套接字描述符,然后提供一个TCP的鸭子类型。而是完全工作在UDP上层,打包了一些对UDP的操作,方便用户调用。
    例如uTP就可以选择不实现sendto等方法,而用户选择使用send还是write还是sendmsg,然后写成回调,uTP只需要在它需要通过UDP发送它构造的数据报时调用这个回调就好了。又例如系统从UDP套接口收到一个消息时,它并不是直接处理,而是调用utp_process_udp函数,在这个函数里面uTP协议根据自己的报头处理了相关消息之后,调用用户设置的回调函数通知收到了消息。
  2. 回调函数方便实现proactive和reactive风格的API
    常见的反射式(reactive)异步IO模型包括select、poll、kqueue、Java NIO等,只会通知到某IO设备上产生了IO事件,然后由用户来发起IO请求,例如调用readrecv等。前摄式(proactive)包括IOCP、Boost Asio等,用户主动发送IO请求(即使现在IO设备还没有准备好)并提前向系统注册一个回调函数,当实际的IO事件发生时由系统处理该IO操作,并在完成后触发指定的回调函数,因此前摄式能够避免用户将数据从内核取回来的开销。因此前摄式强调的是对未来读取事件的预期,抽象程度要高一点,用户可以利用Proactor的回调构造一条执行顺序链,而Reactor必须手动维护接受的状态。
  3. 回调函数能减少处理并发问题的难度

鉴于以上的这几点,在分析uTP协议时必须要将ucat.c纳入考虑范围,不然很难搞懂原理。

ucat简介

ucat使用了poll来维护了两个fd,stdin和套接口,并且设置了500ms的超时时间。

uTP重要数据结构

utp_context

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// utp.h
typedef struct struct_utp_context utp_context;
// utp_internal.h
struct struct_utp_context {
void *userdata;
utp_callback_t* callbacks[UTP_ARRAY_SIZE];
uint64 current_ms;
utp_context_stats context_stats;
UTPSocket *last_utp_socket;
Array<UTPSocket*> ack_sockets;
Array<RST_Info> rst_info;
UTPSocketHT *utp_sockets;
size_t target_delay;
size_t opt_sndbuf;
size_t opt_rcvbuf;
uint64 last_check;
// utp_api.cpp
struct_utp_context::struct_utp_context()
: userdata(NULL), current_ms(0), last_utp_socket(NULL), log_normal(false), log_mtu(false), log_debug(false) {
memset(&context_stats, 0, sizeof(context_stats));
memset(callbacks, 0, sizeof(callbacks));
target_delay = CCONTROL_TARGET;
utp_sockets = new UTPSocketHT;
callbacks[UTP_GET_UDP_MTU] = &utp_default_get_udp_mtu;
callbacks[UTP_GET_UDP_OVERHEAD] = &utp_default_get_udp_overhead;
callbacks[UTP_GET_MILLISECONDS] = &utp_default_get_milliseconds;
callbacks[UTP_GET_MICROSECONDS] = &utp_default_get_microseconds;
callbacks[UTP_GET_RANDOM] = &utp_default_get_random;
// 1 MB of receive buffer (i.e. max bandwidth delay product)
// means that from a peer with 200 ms RTT, we cannot receive
// faster than 5 MB/s
// from a peer with 10 ms RTT, we cannot receive faster than
// 100 MB/s. This is assumed to be good enough, since bandwidth
// often is proportional to RTT anyway
// when setting a download rate limit, all sockets should have
// their receive buffer set much lower, to say 60 kiB or so
opt_rcvbuf = opt_sndbuf = 1024 * 1024;
last_check = 0;
}
~struct_utp_context(){
delete this->utp_sockets;
}
void log(int level, utp_socket *socket, char const *fmt, ...);
void log_unchecked(utp_socket *socket, char const *fmt, ...);
bool would_log(int level);
bool log_normal:1; // log normal events?
bool log_mtu:1; // log MTU related events?
bool log_debug:1; // log debugging events? (Must also compile with UTP_DEBUG_LOGGING defined)
};

utp_context的成员

  1. utp_sockets
    utp_sockets指向一个UTPSocketHT : utpHashTable<UTPSocketKey, UTPSocketKeyData>哈希表。这个哈希表维护了所有的套接字:

    • utp_sockets析构时调用UTP_FreeAll释放所有的套接字。
    • 当UDP包被接受时,会调用utp_process_udp这个处理程序。此时我们仅能获得对应的套接字地址const struct sockaddr *,因此需要能够通过这个指针找到对应的套接字。

      UTPSocketHT中的键UTPSocketKey和值UTPSocketKeyData的构造如下:

    • UTPSocketKey中存放了对应UTPSocket中的PackedSockAddr addr以及recv_id字段。
      PackedSockAddr addr字段是在utp_initialize_socket设置的,表示指向的目标地址。
      recv_id对应着套接字里面的conn_id_recv字段,是在utp_initialize_socket中随机生成的。这里的recv_id的主要功能是作为ATP协议中“host端的端口号”来使用。查看相关代码

      1
      2
      3
      4
      5
      6
      // utp_process_udp
      utp_initialize_socket(conn, to, tolen, false, id, id+1, id);
      // void utp_initialize_socket(utp_socket *conn, const struct sockaddr *addr, socklen_t addrlen, bool need_seed_gen, uint32 conn_seed, uint32 conn_id_recv, uint32 conn_id_send){
      // ...
      conn_id_recv += conn_seed;
      conn_id_send += conn_seed;

      这表示conn_id_send始终比conn_id_recv要大1。
      为什么要有两个id,并且选择conn_id_recv而不是conn_id_send来作为哈希值呢?这是因为当数据报到达时,要通过里面的recv_id找到具有特定conn_id_recv的套接字。

    • UTPSocketKeyData中主要持有了对应的UTPSocket *的指针。
  2. opt_sndbufopt_rcvbuf
    这两个size_t表示发送缓冲区和接收缓冲区的默认大小。缓冲区的大小与窗口大小形成协同。在创建套接字时,套接字的opt_sndbufopt_rcvbuf会“继承自”对应的context。
  3. target_delay单位为微秒,初始值为CCONTROL_TARGET = 100 * 1000
  4. current_ms的作用是用来保存当前时间,这样可以避免多次调用获取时间函数的开销。
  5. context_stats是一个utp_context_stats类型的结构,用来统计不同大小的uTP包的数量。
  6. ack_sockets与schedule_ack机制有关,详见超时重传部分。
  7. rst_info维护了RST_INFO_LIMIT个reset信息,详见连接重置部分。

utp_context的用途

  1. 方便集中管理的UTP套接口UTPSocket
    从上面的结构中看到所有的UTPSocket被放到一个哈希表里面。当UTPSocket销毁时,要将哈希表中对应的<UTPSocketKey, UTPSocketKeyData>键值对删掉,在utp_initialize_socket函数中要往context里面注册自己,这些操作实际上都是为了方便集中管理套接字。
    utp_check_timeouts函数为例,这个函数作为每次“时钟中断”的入口,接受的是一个utp_context而不是一个UTPSocket,context里面对所有的UTPSocket调用了check_timeouts,这样避免了为每一个套接字维护一个时钟信号的开销。
  2. 方便实现UTP服务
    utp_process_udp接受到UDP包的时候,他获得的是一个sockaddr地址,所以需要找到对应的UTPSocket套接字,当套接字不存在时,需要发送RST包。当套接字关闭时,需要它来维护2MSL的等待时间,实际上由于UDP的UTPSocketKey包含了recv_id,所以2MSL是不必要的,在UTPSocket::check_timeouts代码中看到只等到rto_timeout

utp_socket

1
2
3
4
5
6
// utp.h
typedef struct UTPSocket utp_socket;
// utp_internal.cpp
struct UTPSocket {
// ...
};

UTPSocket类型用来维护一个套接字的上下文,里面东西比较多,将在下面展开讨论

OutgoingPacket

对于一个(将要)被发出去的包,有一个OutgoingPacket与其对应。

1
2
3
4
5
6
7
8
struct OutgoingPacket{
size_t length; // 总长
size_t payload; // 有效载荷
uint64_t time_sent; // microseconds
uint32_t transmissions; // 总传输次数
bool need_resend;
char data[1];
};

这里的data是个VLA,实际上是包头+数据包的全部内容。注意到最好不要将包头和数据包分开存放,不然又要多一次复制的开销。

PacketFormatV1/PacketFormatAckV1

首先查看基础的PacketFormatV1,这是一个uTP常规数据报的报头。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// utp_internal.cpp
struct PACKED_ATTRIBUTE PacketFormatV1 {
// packet_type (4 high bits)
// protocol version (4 low bits)
byte ver_type;
byte version() const { return ver_type & 0xf; }
byte type() const { return ver_type >> 4; }
void set_version(byte v) { ver_type = (ver_type & 0xf0) | (v & 0xf); }
void set_type(byte t) { ver_type = (ver_type & 0xf) | (t << 4); }
// Type of the first extension header
byte ext;
// connection ID
uint16_big connid;
uint32_big tv_usec;
uint32_big reply_micro;
// receive window size in bytes
uint32_big windowsize;
// Sequence number
uint16_big seq_nr;
// Acknowledgment number
uint16_big ack_nr;
};

  1. ver_type
    ver_type标志了数据报的类型,这个类似于压缩了后的TCP报头中的flags字段,包含下面的5种。

    1
    2
    3
    4
    5
    6
    7
    8
    enum {
    ST_DATA = 0, // Data packet.
    ST_FIN = 1, // Finalize the connection. This is the last packet.
    ST_STATE = 2, // State packet. Used to transmit an ACK with no data.
    ST_RESET = 3, // Terminate connection forcefully.
    ST_SYN = 4, // Connect SYN
    ST_NUM_STATES, // used for bounds checking
    };
  2. ext
    这个表示扩展号,默认是0,设为1时表示使用了EACK的扩展,对应着扩展后的PacketFormatAckV1类型的数据包。

  3. connid
    connid的用途已在前面的utp_context进行了论述。
  4. tv_usec
    tv_usec是一个时间戳,表示数据包的发送时间,在send_data中被设置。我们看到相比TCP则保守地用了TCP Timestamps Option这个选项,UTP中强制将其整合了进来。其实时间戳的作用是非常大的,例如借助于时间戳可以更精确地计算出RTT。否则我们只能对非重传的数据包进行采样。时间戳还能方便我们对高带宽下序号迅速耗尽进行PAWS(Protect Againest Wrapped Sequence numbers)防范,不过我检查下代码发现UTP里面并没有PAWS的机制(详见后面收包的部分)。
  5. reply_micro
    reply_microutp_process_incoming中被设置,表示从发送到接受所经历的时间差

这里再提一下PACKED_ATTRIBUTE这个属性,在utp_internal.cpp中已经使用了#pragma pack,这里是为了双重保险。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// utp_internal.cpp
#if (defined(__SVR4) && defined(__sun))
#pragma pack(1)
#else
#pragma pack(push,1)
#endif
// utp_types.h
// Allow libutp consumers or prerequisites to override PACKED_ATTRIBUTE
#ifndef PACKED_ATTRIBUTE
#if defined BROKEN_GCC_STRUCTURE_PACKING && defined __GNUC__
// Used for gcc tool chains accepting but not supporting pragma pack
// See http://gcc.gnu.org/onlinedocs/gcc/Type-Attributes.html
#define PACKED_ATTRIBUTE __attribute__((__packed__))
#else
#define PACKED_ATTRIBUTE
#endif // defined BROKEN_GCC_STRUCTURE_PACKING && defined __GNUC__

PacketFormatAckV1这个包表示当这个包是EACK包时的附加数据,EACK包是类似于SACK的一种机制,用于选择性确认。在UTPSocket::send_ack函数中能看到EACK将acr_nr前最多32个报文的接受情况按位放到长度为4的字节数组里面,这是一个非常巧妙的方法。

1
2
3
4
5
6
struct PACKED_ATTRIBUTE PacketFormatAckV1 {
PacketFormatV1 pf;
byte ext_next;
byte ext_len;
byte acks[4];
};

SizableCircularBuffer

这是一个环形缓冲区,值得注意的是这个缓冲区并不是线程安全的,不过libutp的接口不是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct SizableCircularBuffer {
// This is the mask. Since it's always a power of 2, adding 1 to this value will return the size.
size_t mask;
// This is the elements that the circular buffer points to
void **elements;
void *get(size_t i) const { assert(elements); return elements ? elements[i & mask] : NULL; }
void put(size_t i, void *data) { assert(elements); elements[i&mask] = data; }
void grow(size_t item, size_t index){
// Figure out the new size.
size_t size = mask + 1; do size *= 2; while (index >= size);
// Allocate the new buffer
void **buf = (void**)calloc(size, sizeof(void*));
size--;
// Copy elements from the old buffer to the new buffer
for (size_t i = 0; i <= mask; i++) {
buf[(item - index + i) & size] = get(item - index + i);
}
// Swap to the newly allocated buffer
mask = size; free(elements); elements = buf;
}
void ensure_size(size_t item, size_t index) { if (index > mask) grow(item, index); }
size_t size() { return mask + 1; }
};

环形缓冲区的大小size始终是2的整数幂,这里的mask等于size - 1,因此全部为1。mask起到类似取模的作用。
这里的ensure_sizegrow的参数有点奇怪,其实查看调用情况可以发现item表示要插入的元素的编号,如seq_nr;而index表示当前队列中元素的个数,如cur_window_packets,这样队列就不会出现假溢出的现象。如果说队列中元素比容量size = mask + 1要多了,那么就要扩展队列。由于扩展队列变了模数,不同余了,所以要按照模前的数(item - index + i)进行复制。

环形缓冲区的增长

grow需要提供itemindex,两个变量可以分别理解为缓冲区中最旧的序号和最新的序号,其中item - index表示最老的未确认的ATPPacket的序号。
对于缓冲区中的序号$seq$,有$seq \, mod \, m1 = x$,当mask从$m1$增长到$m2$时,需要求出在$seq$未知的情况下求出$seq \, mod \, m2 = y$。
其实有个简单的开销较大的办法,就是用一个std::pair把原始的序号计算出来。

套接字的连接关闭与读写操作

创建套接字

utp_create_socket用来创建一个套接字,在创建套接字时并不向context进行注册,这也是因为目前已有信息无法计算出哈希值的缘故。
state = CS_UNINITIALIZED

主动连接

实现在函数utp_connect内。

  1. 首先调用utp_initialize_socket
    utp_initialize_socket的作用初始化套接字,这里的操作包括设置dest端的地址/端口,初始化conn_id_recvconn_id_send,初始化套接字的部分字段。向context注册自己。
    在初始化之后,套接字具有状态state = CS_IDLE
  2. 然后调用UTPSocket::send_packet
    这个函数详见下面

发包操作

UTPSocket::send_packet是主要的发包函数

  1. UTPSocket::send_packetUTPSocket::send_acksend_rst
    send_packet函数用来发送构造好的OutgoingPacket::data
    send_ack会就地构造一个ACK包,然后调用send_data发送。
    send_rst直接调用更基础的send_to_addr发送RST包。
  2. UTPSocket::send_data
    这个函数的存在主要是处理一些UTPSocket::send_packetUTPSocket::send_ack的共同部分
  3. send_to_addr
    这个函数位于调用链的最下端,调用了注册的callback函数来发送数据包,同时调用utp_register_sent_packetutp_context::context_stats报告了发送长度用来统计。这个统计信息可以被API函数utp_get_context_stats取得,以供用户分析。

写操作

utp_write

utp_write被作为utp_writev的一个特例来处理。

utp_writev

同UNIX套接口函数writev一样,utp_writev接受一个指向iovec数组的指针iovec_input

1
2
3
4
struct iovec{
void *iov_base;
size_t iov_len;
};

utp_writev按照iovec_input[0 .. num_iovecs-1]的顺序从缓冲区发送数据,并返回成功发送的总字节数。
utp_writev主要做一些检查,如num_iovecs是否超过了UTP_IOV_MAX。然后将iovec_input复制到自己的一块缓存iovec里面(为啥呢),计算所有iovec的大小的总和到bytes。我们实际发送的数据量num_to_sendbytes和连接最多允许的数据包大小packet_size(由MTU决定)两者的最小值。当bytes过大时,就需要分批发送,如下面的代码所示。

1
2
3
4
5
6
size_t packet_size = conn->get_packet_size();
size_t num_to_send = min<size_t>(bytes, packet_size);
while (!conn->is_full(num_to_send)) {
bytes -= num_to_send;
sent += num_to_send;
conn->write_outgoing_packet(num_to_send, ST_DATA, iovec, num_iovecs);

这里的is_full由窗口决定。
utp_writev下面会调用write_outgoing_packet(size_t payload, uint flags, struct utp_iovec *iovec, size_t num_iovecs)。我们知道OutgoingPacket用来描述一个数据包的上下文,在第一次握手时由于没有数据需要传输,所以直接调用的utp_send_packet,相当于只发送了一个包头。而对于utp_writev来说,需要在包头后面加上数据。
下面的代码将每个iov[i]中的iov_base复制到。

1
2
3
4
5
6
7
8
9
10
11
12
13
for (size_t i = 0; i < num_iovecs && needed; i++) {
if (iovec[i].iov_len == 0)
continue;
size_t num = min<size_t>(needed, iovec[i].iov_len);
memcpy(p, iovec[i].iov_base, num);
p += num;
iovec[i].iov_len -= num;
iovec[i].iov_base = (byte*)iovec[i].iov_base + num; // iovec[i].iov_base += num, but without void* pointers
needed -= num;
}

write_outgoing_packet

write_outgoing_packet接受一个utp_iovec数组,然后组织数据包结构,并将其放入发送缓存。注意write_outgoing_packet函数并不会直接调用send_packet发送数据包。
write_outgoing_packet主要是一个大循环

1
2
3
4
5
do {
// ...
payload -= added;
} while (payload);
flush_packets();

首先write_outgoing_packet在发送缓冲区中取出前一个seq_nr - 1序号的pkt,试图重用它。注意到如果窗口小于等于0,那么实际的pkt就是NULL,这是为了保证当窗口小于等于0时,不会再往当前的pkt里面放东西了,否则pkt的容量超过窗口导致被缓存。相反的,我们重新开一个包放超出窗口的数据,这样只有这个新开的包会被缓存。

1
2
3
if (cur_window_packets > 0) {
pkt = (OutgoingPacket*)outbuf.get(seq_nr - 1);
}

下面的代码的上半部分,当数据包pkt尚未满载,并且尚未发送时,在本次循环中会重新使用它,在它的后面续上added长度的空间,供添加数据使用。sizeof(OutgoingPacket) - 1是减去VLA的一个字节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// if there's any room left in the last packet in the window
// and it hasn't been sent yet, fill that frame first
if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) {
// Use the previous unsent packet
added = min(payload + pkt->payload, max<size_t>(packet_size, pkt->payload)) - pkt->payload;
pkt = (OutgoingPacket*)realloc(pkt, (sizeof(OutgoingPacket) - 1) + header_size + pkt->payload + added);
outbuf.put(seq_nr - 1, pkt);
append = false;
assert(!pkt->need_resend);
} else {
// Create the packet to send.
added = payload;
pkt = (OutgoingPacket*)malloc((sizeof(OutgoingPacket) - 1) + header_size + added);
pkt->payload = 0;
pkt->transmissions = 0;
pkt->need_resend = false;
}

下面的代码紧接着上面,为pkt添加added长度的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if (added) {
assert(flags == ST_DATA);
// Fill it with data from the upper layer.
unsigned char *p = pkt->data + header_size + pkt->payload;
size_t needed = added;
for (size_t i = 0; i < num_iovecs && needed; i++) {
if (iovec[i].iov_len == 0)
continue;
size_t num = min<size_t>(needed, iovec[i].iov_len);
memcpy(p, iovec[i].iov_base, num);
p += num;
iovec[i].iov_len -= num;
iovec[i].iov_base = (byte*)iovec[i].iov_base + num; // iovec[i].iov_base += num, but without void* pointers
needed -= num;
}
assert(needed == 0);
}

append表示是否是一个需要被加入缓冲区的新数据包,在上面的if块中被设置

1
2
3
4
5
6
7
8
if (append) {
// Remember the message in the outgoing queue.
outbuf.ensure_size(seq_nr, cur_window_packets);
outbuf.put(seq_nr, pkt);
p1->seq_nr = seq_nr;
seq_nr++;
cur_window_packets++;
}

接下来write_outgoing_packet调用flush_packets来刷新缓冲区

flush_packets

flush_packets函数在发包时和重传计时器超时时被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool UTPSocket::flush_packets()
{
size_t packet_size = get_packet_size();
// send packets that are waiting on the pacer to be sent
// i has to be an unsigned 16 bit counter to wrap correctly
// signed types are not guaranteed to wrap the way you expect
for (uint16 i = seq_nr - cur_window_packets; i != seq_nr; ++i) {
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(i);
if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false)) continue;
// have we run out of quota?
if (is_full()) return true;
// Nagle check
// don't send the last packet if we have one packet in-flight
// and the current packet is still smaller than packet_size.
if (i != ((seq_nr - 1) & ACK_NR_MASK) ||
cur_window_packets == 1 ||
pkt->payload >= packet_size) {
send_packet(pkt);
}
}
return false;
}

读操作

ucat.c中,当程序收到UDP包之后会交给对应socket的udp_process_incoming函数进行处理,这个函数中涉及和uTP本身有关的很多内容,因此将在下面的章节中进行讨论。

主动关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void utp_close(UTPSocket *conn)
{
switch(conn->state) {
case CS_CONNECTED:
case CS_CONNECTED_FULL:
conn->state = CS_FIN_SENT;
conn->write_outgoing_packet(0, ST_FIN, NULL, 0);
break;
case CS_SYN_SENT:
conn->rto_timeout = utp_call_get_milliseconds(conn->ctx, conn) + min<uint>(conn->rto * 2, 60);
// fall through
case CS_GOT_FIN:
conn->state = CS_DESTROY_DELAY;
break;
case CS_SYN_RECV:
// fall through
default:
conn->state = CS_DESTROY;
break;
}
}

超时重传与可靠传输

引起发送方超时重传的原因有大致三种:

  1. 分组丢失。这里指报文并没有顺利到达接收方,因此需要发送发进行重传。
  2. 确认丢失。这里报文顺利传送到接收方,但接收方返回的ACK报文丢失了。这种情况下发送方很可能会在超时之后重新发送该分组,而接收方应该选择丢弃并重新确认。
  3. 经受延迟。这里报文和ACK都顺利传送,但整个过程耗时超过了Timeout,这时发送方也会进行重传。

对于发送出去的每个数据包设置一个定时器,等定时器超时之后触发回调进行重传是开销很大的。实际上可以维护一个超时重传计时器,当对方有数据包过来时就重置这个计时器,否则当计时器超时时,就重传发送队列中的所有数据包。

发送ACK

当对方封包过来时,需要根据其序号更新自己的确认号,并进行相关处理,如发送ACK、处理乱序包等。这是一个复杂的流程,本部分介绍如何向对方发送一个ACK。而在此之前的例如接受封包,更新自己的ack_nr则在下面的数据包接收(确认部分)进行介绍。

延迟确认

uTP使用了schedule_ack的机制来实现Delayed ACK特性。首先使用UTPSocket::schedule_ack向context注册socket自己,表示请不要立即发送一个空的ACK包,而是尝试将ACK放到带用户数据的包里面。当计时器超时时,utp_issue_deferred_acks函数会被调用(在ucat.c里面)。这个函数调用ack_sockets里面所有注册了的socket的send_ack()方法,发送ACK包。

1
2
3
4
5
for (size_t i = 0; i < ctx->ack_sockets.GetCount(); i++) {
UTPSocket *conn = ctx->ack_sockets[i];
conn->send_ack();
i--;
}

当新的ACK能够随着带用户数据的包一同发送时,就能免于发送一个空ACK包的开销。这时候会调用removeSocketFromAckList函数将这个socket从ack_sockets列表中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void removeSocketFromAckList(UTPSocket *conn)
{
if (conn->ida >= 0)
{
UTPSocket *last = conn->ctx->ack_sockets[conn->ctx->ack_sockets.GetCount() - 1];
assert(last->ida < (int)(conn->ctx->ack_sockets.GetCount()));
assert(conn->ctx->ack_sockets[last->ida] == last);
last->ida = conn->ida;
conn->ctx->ack_sockets[conn->ida] = last;
conn->ida = -1;
// Decrease the count
conn->ctx->ack_sockets.SetCount(conn->ctx->ack_sockets.GetCount() - 1);
}
}

这个函数会将需要移除的指针和队尾指针互换,并弹出队尾。
需要注意的是延迟确认存在很多特殊情况:

  1. 保活心跳包立即发送
  2. SYN和FIN等关键包立即发送
  3. 当窗口变为0立即发送,因为窗口为0表示一段时间内不能向对方发送数据了

具体实现

从具体实现上看,libutp在void UTPSocket::ack_packet(uint16 seq)函数里面处理对方发过来的ACK,在void UTPSocket::send_ack(bool synack)里面向对方发送自己对对方的ACK。
ack_packet函数有3个返回值,返回0表示正常ACK,返回1表示这个包已经被ACK过了,返回2表示这个包还没有被发送。

数据包接收(确认部分)

TCP协议中使用的是后退N重传(Go-Back-N)协议,即从第一个未确认的包开始全部传送。TCP用ACK号表示小于ACK号的所有字节都已经被接受到。例如A发送了1/2/3/4四个数据包,如果截止到A的RTO超时,B只接受到了1/3,那么它只能ACK到1。这时候A就必须重传2/3/4三个数据包,但其实3是可以不重传的。此时在传输过程中发生了乱序,这里数据包3号早于数据包2号到达了。
uTP使用reorder_count记录数据包乱序抵达的情况。我们查看在utp_process_incoming中有关处理对方序号的部分

1
2
// seqnr is the number of packets past the expected packet this is. ack_nr is the last acked, seq_nr is the current. Subtracring 1 makes 0 mean "this is the next expected packet".
const uint seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK;

这里的pk_seq_nr指的是数据包包头中的seq_nr字段,而conn->ack_nr表示我们最后确认的序号,因此seqnr为0时表示这个包是序号紧接着的数据包。注意这个是能够正确处理溢出的情况的。
接下来跳过若干行,在utp_process_incoming函数的最后,对当前的数据包进行确认工作,并调用utp_call_on_read回调。查看代码,这里对seqnr是否为0,也就是是否为乱序包展开了讨论,首先查看不是乱序包的情况,我们直接在代码中进行注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
if (seqnr == 0) {
size_t count = packet_end - data;
if (count > 0 && conn->state != CS_FIN_SENT) {
// Post bytes to the upper layer
utp_call_on_read(conn->ctx, conn, data, count);
}
conn->ack_nr++;
// Check if the next packet has been received too, but waiting in the reorder buffer.
// 这里检查是否可以释放缓存着的乱序包,例如seqnr==1的包可能已经到达,但由于当前seqnr==0的包还未到达,所以无法确认,只能缓存着
for (;;) {
if (conn->got_fin && conn->eof_pkt == conn->ack_nr) {
if (conn->state != CS_FIN_SENT) {
conn->state = CS_GOT_FIN;
conn->rto_timeout = conn->ctx->current_ms + min<uint>(conn->rto * 3, 60);
utp_call_on_state_change(conn->ctx, conn, UTP_STATE_EOF);
}
// if the other end wants to close, ack
conn->send_ack();
// reorder_count is not necessarily 0 at this point. even though it is most of the time, the other end may have sent packets with higher sequence numbers than what later end up being eof_pkt since we have received all packets up to eof_pkt just ignore the ones after it.
conn->reorder_count = 0;
}
// 当已经没有乱序包了,就直接退出循环。这里和后面的assert联动
// Quick get-out in case there is nothing to reorder
if (conn->reorder_count == 0)
break;
// Check if there are additional buffers in the reorder buffers
// that need delivery.
byte *p = (byte*)conn->inbuf.get(conn->ack_nr+1);
if (p == NULL)
break;
conn->inbuf.put(conn->ack_nr+1, NULL);
count = *(uint*)p;
if (count > 0 && conn->state != CS_FIN_SENT) {
// Pass the bytes to the upper layer
utp_call_on_read(conn->ctx, conn, p + sizeof(uint), count);
}
conn->ack_nr++;
// Free the element from the reorder buffer
free(p);
assert(conn->reorder_count > 0);
conn->reorder_count--;
}
// 向context注册一个延迟确认
conn->schedule_ack();
}

下面查看是乱序包的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// if we have received a FIN packet, and the EOF-sequence number is lower than the sequence number of the packet we just received something is wrong.
if (conn->got_fin && pk_seq_nr > conn->eof_pkt) {
return 0;
}
// 如果这里接受到一个序号距离ack_nr偏移非常严重的包,选择直接丢弃。注意到这里实际上也处理了一个序号在pk_seq_nr前的包的情况
// if the sequence number is entirely off the expected one, just drop it. We can't allocate buffer space in the inbuf entirely based on untrusted input
if (seqnr > 0x3ff) {
return 0;
}
// we need to grow the circle buffer before we check if the packet is already in here, so that we don't end up looking at an older packet (since the indices wraps around).
conn->inbuf.ensure_size(pk_seq_nr + 1, seqnr + 1);
// 一个提前抵达的包同样可能已经被处理过
// Has this packet already been received? (i.e. a duplicate) If that is the case, just discard it.
if (conn->inbuf.get(pk_seq_nr) != NULL) {
return 0;
}
// Allocate memory to fit the packet that needs to re-ordered
byte *mem = (byte*)malloc((packet_end - data) + sizeof(uint));
*(uint*)mem = (uint)(packet_end - data);
memcpy(mem + sizeof(uint), data, packet_end - data);
// Insert into reorder buffer and increment the count of # of packets to be reordered. we add one to seqnr in order to leave the last entry empty, that way the assert in send_ack is valid. we have to add one to seqnr too, in order to make the circular buffer grow around the correct point (which is conn->ack_nr + 1).
assert(conn->inbuf.get(pk_seq_nr) == NULL);
assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask));
conn->inbuf.put(pk_seq_nr, mem);
conn->reorder_count++;
#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)",
conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
#endif
// 向context注册一个延迟确认
conn->schedule_ack();

RTT与RTO的计算

RTT(Round-Trip Time),往返时间。受到链路的传播时间、终端系统的处理时间、路由器缓存与处理时间的影响。
RTO(Retransmission TimeOut),超时重传时间,与RTT有关。RFC793中使用低通过滤器对RTT进行平滑,然后再乘上一个因子$\beta$得到初次重传RTO。此外在往返时间变化起伏较大是,还要根据均值和方差计算RTO。RTO随着重传次数是按照指数增长的,即第二次超时则重传时间变为2倍的RTO。在新的RFC2988/6298中又更新了相关的算法,在此不详述。

uTP中RTT和初始RTO的计算实现在ack_packet函数里面。ack_packet是作用在发送队列上的,当数据包没有被重传的时候,使用当前时间减去它的发送时间来计算出ertt,然后计算出rttrto。而rto_timeout指的是超时的时刻,初始化时有rto_timeout = ctx->current_ms + retransmit_timeout。当ctx->current_ms - rto_timeout时则超时条件触发。下面是ack_packet中具体的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 注意我们只对没有重传的包计算RTT,这在之前也提到过原因
if (pkt->transmissions == 1) {
// Estimate the round trip time.
const uint32 ertt = (uint32)((utp_call_get_microseconds(this->ctx, this) - pkt->time_sent) / 1000);
if (rtt == 0) {
// First round trip time sample
rtt = ertt;
rtt_var = ertt / 2;
// sanity check. rtt should never be more than 6 seconds
// assert(rtt < 6000);
} else {
// Compute new round trip times
const int delta = (int)rtt - ertt;
rtt_var = rtt_var + (int)(abs(delta) - rtt_var) / 4;
rtt = rtt - rtt/8 + ertt/8;
// sanity check. rtt should never be more than 6 seconds
// assert(rtt < 6000);
rtt_hist.add_sample(ertt, ctx->current_ms);
}
rto = max<uint>(rtt + rtt_var * 4, 1000);
}
retransmit_timeout = rto;
rto_timeout = ctx->current_ms + rto;

超时重传

TCP的实现超时重传一般是设置一个超时重传定时器icsk->icsk_retransmit_timer,通过inet_csk_init_xmit_timers来注册。常用的超时方式有使用alarm信号、使用select、设置SO_RCVTIMEOSO_SNDTIMEO字段、使用Linux提供的定时器(setitimer等)。但是在uTP的实现里面我并没有发现使用上面定时器的痕迹,这个定时器在哪里呢?我们首先找到uTP的超时重传实现,在void UTPSocket::check_timeouts函数里,而这个函数只被utp_check_timeouts调用。utp_check_timeouts是作为uTP的API函数,在应用程序ucat.c中,每次network_loop中的poll函数超时时,utp_check_timeouts就被调用。其实libutp框架更类似于一个个中断处理程序,而不是一个服务,它需要来自外部的信号才能驱动。
下面我们来查看UTPSocket::check_timeouts这个方法。这个方法中出发了对坚持定时器zerowindow_time、重传定时器rto_timeout、保活定时器last_sent_packet和时间等待定时器rto_timeout(这里复用了)。
首先查看最基本的重传定时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
case CS_SYN_SENT:
case CS_SYN_RECV:
case CS_CONNECTED_FULL:
case CS_CONNECTED:
case CS_FIN_SENT: {
if ((int)(ctx->current_ms - rto_timeout) >= 0 && rto_timeout > 0) {
// 这里删去了处理mtu探测的部分内容,移到专门的章节
// 这里删除了一段被注释了的重传次数大于4就重新计算rtt的策略
// Increase RTO
const uint new_timeout = ignore_loss ? retransmit_timeout : retransmit_timeout * 2;
// 这里防范恶意连接的情况,当第三次握手超时时就直接关闭连接
if (state == CS_SYN_RECV) {
state = CS_DESTROY;
utp_call_on_error(ctx, this, UTP_ETIMEDOUT);
return;
}
// 这里删去了处理连接超时的部分内容,移到专门章节
retransmit_timeout = new_timeout;
rto_timeout = ctx->current_ms + new_timeout;
if (!ignore_loss) {
// 此时连接超时,ignore_loss只有当执行mtu探测任务时才为true
duplicate_ack = 0;
int packet_size = get_packet_size();
if ((cur_window_packets == 0) && ((int)max_window > packet_size)) {
// 这时连接处于闲置状态,并不急切需要重置拥塞窗口
max_window = max(max_window * 2 / 3, size_t(packet_size));
} else {
// 此时延迟非常大,因此将拥塞窗口缩小到1个数据包,并开始慢启动算法
max_window = packet_size;
slow_start = true;
}
}
// 这个时候使用后退N协议全部重传
for (int i = 0; i < cur_window_packets; ++i) {
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;
// uTP使用`need_resend`来描述一个包是否需要被重传。
pkt->need_resend = true;
assert(cur_window >= pkt->payload);
cur_window -= pkt->payload;
}
if (cur_window_packets > 0) {
retransmit_count++;
fast_timeout = true;
timeout_seq_nr = seq_nr;
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
assert(pkt);
// Re-send the packet.
send_packet(pkt);
}
}
// Mark the socket as writable. If the cwnd has grown, or if the number of
// bytes in-flight is lower than cwnd, we need to make the socket writable again
// in case it isn't
if (state == CS_CONNECTED_FULL && !is_full()) {
state = CS_CONNECTED;
#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "Socket writable. max_window:%u cur_window:%u packet_size:%u",
(uint)max_window, (uint)cur_window, (uint)get_packet_size());
#endif
utp_call_on_state_change(this->ctx, this, UTP_STATE_WRITABLE);
}
if (state >= CS_CONNECTED && state < CS_GOT_FIN) {
if ((int)(ctx->current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) {
send_keep_alive();
}
}
break;

注意这里对于一个socket而不是一个数据包维护一个retransmit_count
下面查看时间等待定时器,这里并不需要等2MSL的时间,而是3*RTO和60之间的较小值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// check_timeouts
case CS_GOT_FIN:
case CS_DESTROY_DELAY:
if ((int)(ctx->current_ms - rto_timeout) >= 0) {
state = (state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET;
if (cur_window_packets > 0) {
utp_call_on_error(ctx, this, UTP_ECONNRESET);
}
}
break;
// utp_process_incoming
if (conn->state != CS_FIN_SENT) {
conn->state = CS_GOT_FIN;
conn->rto_timeout = conn->ctx->current_ms + min<uint>(conn->rto * 3, 60);
utp_call_on_state_change(conn->ctx, conn, UTP_STATE_EOF);
}

Fast retransmit

Fast retransmit虽然也是超时重传行为,但实际上是拥塞避免算法中的一部分。因此将在拥塞控制部分论述。

Selective Acknowledgment

在TCP协议中使用SACK选项进行选择确认,使用若干组[start, end)来表示已经接受到数据的区间。SACK能够有效减少重传数据包的数量,对于带宽紧张的网络十分有用。不过需要注意恶意使用SACK对CPU资源造成的损害。
在先前的数据包头构造部分已经提到了UTP的EACK机制,这是一个非常巧妙的方案,即用一个32位的比特串来表示前32个未确认的包中有哪些是已经收到了的。有关SACK的代码在UTPSocket::send_ack方法里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if (reorder_count != 0 && state < CS_GOT_FIN) {
// if reorder count > 0, send an EACK. reorder count should always be 0 for synacks, so this should not be as synack
assert(!synack);
pfa.pf.ext = 1;
pfa.ext_next = 0;
pfa.ext_len = 4;
uint m = 0;
// reorder count should only be non-zero if the packet ack_nr + 1 has not yet been received
assert(inbuf.get(ack_nr + 1) == NULL);
size_t window = min<size_t>(14+16, inbuf.size());
// Generate bit mask of segments received.
for (size_t i = 0; i < window; i++) {
if (inbuf.get(ack_nr + i + 2) != NULL) {
m |= 1 << i;
}
}
pfa.acks[0] = (byte)m;
pfa.acks[1] = (byte)(m >> 8);
pfa.acks[2] = (byte)(m >> 16);
pfa.acks[3] = (byte)(m >> 24);
len += 4 + 2;
}

首先我们看到条件是reorder_count != 0 && state < CS_GOT_FIN,这表明在非连接建立/关闭时,当出现数据包乱序抵达时,启动EACK机制。

连接的异常终止

重传失败

在Linux中使用tcp_retries1 = 3tcp_retries2 = 15(计算得到的一个时间戳)来限定普通包的重传次数。
特别地,在握手时设有专门的tcp_syn_retries,这是由于对于连接建立时的重传需要精心设计以防止可能的SYN Flood攻击。
在UTP中的重传失败机制比较简单。

1
2
3
4
5
6
7
8
9
10
11
if (retransmit_count >= 4 || (state == CS_SYN_SENT && retransmit_count >= 2)) {
// 4 consecutive transmissions have timed out. Kill it. If we
// haven't even connected yet, give up after only 2 consecutive
// failed transmissions.
if (state == CS_FIN_SENT)
state = CS_DESTROY;
else
state = CS_RESET;
utp_call_on_error(ctx, this, UTP_ETIMEDOUT);
return;
}

连接重置

流量控制和拥塞控制

流量控制着眼于接收端,保证发送端的发送速率能够匹配接收端的接受速率和缓存大小。流量控制包含滑动窗口、Nagle算法等。拥塞控制着眼于整个网络的性能,是当前发送端的速率匹配当前链路能承载的的限额。拥塞控制包含慢启动、拥塞避免、Fast retransmit和Fast recovery等。

拥塞控制原理简述

首先简单地讨论下TCP进行拥塞控制的原理。拥塞的发生一般有两个预兆,一个是超时,另一个是收到重复的ACK。这里重复的ACK来自于一个超出可靠传输协议的额外约定,也就是当我们收到一个失序的报文段时,我们仍然需要立即回复一个ACK给对方的,而这个ACK在对方看来肯定是重复的了。必须要说明的是,这两者的还可能是由分组损坏导致的对端的丢包,但是这种情况很少,所以我们并不考虑。
在TCP协议中我们考虑发送方和接收方之间存在较多路由器以及较慢链路的情况。在滑动窗口协议的控制下,我们知道只要窗口未被填满,缓冲区有数据,我们就可以往对端发包。但当链路较为复杂时,我们就必须要考虑当流量过大时,中途某个路由器可能无法负担而直接丢包,诸如此类的情况实际上限制了TCP的吞吐量。为了解决这个问题,TCP首先提出了慢启动的算法。慢启动要求我们增加一个拥塞窗口cwnd,当连接刚建立时,我们设置cwnd为1个MSS的大小,并随着对方的每次确认而线性增大。注意在没有约束的情况下这实际上导致了指数增大的过程,例如一开始cwnd是1(个MSS),发送一个包,对方确认之后cwnd到2,我们现在可以最多发送两个包,接着我们真的发送两个包,对方确认后cwnd增长到4。我们能看到称为TCP自计时(self-clocking)的行为,也就是接收方返回的ACK的间隔和发送发送间隔趋于一致,我们可以从TCPIP详解(卷一)中看到一个详细的图示。这个原因是我们假设网络是对称的,并且也不考虑链路上出现的排队。当这个过程收敛后(出现拥塞或者达到慢启动阈值ssthresh),也就是发送方和接收方之间的管道被填满,此时连接饱和,无论拥塞窗口和通告窗口是多少都不能再容纳更多数据,此时只有一个包被确认,发送方才能再发送另一个包。
上面的慢启动过程以拥塞和达到阈值ssthresh(默认为65535)为结束,特别地,当cwnd超过ssthresh后需要执行拥塞避免算法。拥塞避免算法实际上将cwnd从慢启动的指数增长变为线性增长,也就是在一个包的往返时间内只增大cwnd一次,对应到每个确认,我们只增加1/cwnd;而在慢启动中,该RTT中收到几个ACK就增加几次(注意cwnd是按字节计算的,由于成块数据流往往以MSS为单位发送,所以我们按照ACK来简化讨论,但合并的ACK并不影响)。一旦拥塞发生,拥塞避免会减半慢启动阈值ssthresh,然后拥塞窗口cwnd立刻变为1,重新开始慢启动算法。容易看到这个第二次开始的慢启动算法在达到原先阈值的一半就会停止并进入拥塞避免,这类似于用一种二分法的思路寻找稳定的承载量。
下面我们看到一个称为快速重传和快速恢复算法的改进,这个算法要求对于由重复的ACK(至少3次)报告的拥塞,我们在减半慢启动阈值ssthresh后不进行慢启动,而是只将拥塞窗口cwnd下降到新的ssthresh处,并继续执行拥塞避免算法。
还有一类被称为长肥管道的连接,它的带宽(bandwidth)乘以RTT的积很大,在这种管道上传输时由于时延高,往往出现窗口耗尽而报文还没送达对端的问题,对此可以使用扩大窗口选项解决。但丢包问题仍可能造成网络通信速度急剧下降。上面的快速重传与快速恢复能够部分地解决问题,但不管怎么样,吞吐率还是受到了影响。这时候使用SACK可以避免再重传对方已经收到的包,造成冗余的ACK包。

发送窗口

libutp定义了一些有关窗口的变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// the number of packets in the send queue. Packets that haven't
// yet been sent count as well as packets marked as needing resend
// the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
uint16 cur_window_packets;
// how much of the window is used, number of bytes in-flight
// packets that have not yet been sent do not count, packets
// that are marked as needing to be re-sent (due to a timeout)
// don't count either
size_t cur_window;
// maximum window size, in bytes
size_t max_window;
// max receive window for other end, in bytes
size_t max_window_user;

其中cur_window_packets表示数据包的窗口,包含所有在发送队列的数据包,无论是否已经被发送,或者是否需要重传。因此最旧的未被对方确认的序号是seq_nr - cur_window_packets。UTP对cur_window_packets的限制是一定要小于OUTGOING_BUFFER_MAX_SIZE
cur_window就是按字节算的通常意义上的窗口,在计算时不包含需要重传的包。max_window表示最大的窗口,它和max_window_user不同的是max_window还包含了拥塞窗口,而max_window_user来自对方,表示对方缓冲区的大小。
下面的is_full判断从cur_window_packetscur_window角度窗口是否饱和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool UTPSocket::is_full(int bytes)
{
size_t packet_size = get_packet_size();
if (bytes < 0) bytes = packet_size;
else if (bytes > (int)packet_size) bytes = (int)packet_size;
size_t max_send = min(max_window, opt_sndbuf, max_window_user);
// subtract one to save space for the FIN packet
if (cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1) {
last_maxed_out_window = ctx->current_ms;
return true;
}
if (cur_window + bytes > max_send) {
last_maxed_out_window = ctx->current_ms;
return true;
}
return false;
}

Fast retransmit

当发送方连续收到3次相同的ACK,那么就重传可能被丢了的包。这里为什么是至少收到3次而不是2次是因为丢包情况下发送方至少会收到三次重复的ACK。从实现上看,有的快速重传选择只重传最初被丢的包,有的选择重传所有被丢的包。uTP使用duplicate_ack来记录收到重复ACK的次数。在utp_process_incoming函数中对duplicate_ack进行更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (conn->cur_window_packets > 0) {
// 当ack_nr等于最后被对方确认的序号时,这里`conn->seq_nr - conn->cur_window_packets`等于第一个没被对方确认的包
if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK)
// 这里作者强调了当数据包带上了用户数据后,就不应该算入重复ACK中,这和BSD4.4 TCP实现是一致的
&& pk_flags == ST_STATE) {
++conn->duplicate_ack;
if (conn->duplicate_ack == DUPLICATE_ACKS_BEFORE_RESEND && conn->mtu_probe_seq) {
// It's likely that the probe was rejected due to its size, but we haven't got an ICMP report back yet
if (pk_ack_nr == ((conn->mtu_probe_seq - 1) & ACK_NR_MASK)) {
conn->mtu_ceiling = conn->mtu_probe_size - 1;
conn->mtu_search_update();
} else {
// A non-probe was blocked before our probe. Can't conclude much, send a new probe
conn->mtu_probe_seq = conn->mtu_probe_size = 0;
}
}
} else {
conn->duplicate_ack = 0;
}
// TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND and fast_resend_seq_nr <= ack_nr + 1, resend ack_nr + 1 also call maybe_decay_win()
}

报文分段

MTU(Maximum Transmission Unit),最大传输单元。通常地,以太网的MTU是1500,而IP是65535(包括头部),Internet的标准MTU是576。所以对于较大的IP包,如果在以太网上传输就需要进行分片。而TCP协议提供了MSS选项用来在建立连接时写上MSS大小,也就是TCP的最大的分段大小,超过这个MSS的数据就需要进行分段传输。MSS的协商在前两次的SYN握手时处理。
而UDP是不带有分片功能的,所以对于较大的数据包是采用IP进行分片的。这其中带来一些问题,IP分片后只有第一个分片带有UDP头部,因此只要有一个IP数据报传输失败,那么整个UDP报文就无法交付(校验和和长度都通不过)。而IP协议本身并没有重传功能,且分片可能发生在链路上的任一路由器上,实际上根本无法知道原数据包是怎么被分片的。因此如果在UDP上层的实现要求重传(UDP本身不带重传),必须整个UDP数据报全部重传。所以说我们重新考虑UDP协议就会发现它头部的2字节的长度显得很不必要,因为根本不会发这么长的数据报。事实上按照TCP的按字节编码省掉一个长度字段也是方便的。因此我们基于UDP实现的传输层协议首先要做的就是避免IP层为我们分片,这样就能保证每个IP数据报中都要带有UDP头和我们的协议头,是个完整的传输层协议包。这就意味着我们需要让我们的基于UDP的协议的MSS + HEAD + UDP_HEAD + IP_HEAD小于可能的链路层的MTU。

MTU探测

UTP通过截取ICMP Type3 Code4(fragmentation needed)来获得分片情况,即在IP首部设置了不可分片标志,但如果UDP报文达到MTU上限则会丢弃该IP报,返回ICMP不可达差错。UTP通过这个机制使用二分法来找到一个合适的MTU。
UTPSocket::mtu_reset函数中,预置了MTU搜寻空间为[576, udp_mtu],也就是default IP maximum datagem size。不过由于以太网的流行,所以将576作为下限,此时对应于TCP的MSS为536。

1
2
3
4
5
6
void UTPSocket::mtu_reset()
{
mtu_ceiling = get_udp_mtu();
mtu_floor = 576;
mtu_discover_time = utp_call_get_milliseconds(this->ctx, this) + 30 * 60 * 1000;
}

时间测量

延迟测量

在数据报报头的tv_usecreply_micro两个字段用来测量延迟。
发送端S设置tv_usec表示发送时间S1

1
2
3
// send_data
uint64 time = utp_call_get_microseconds(ctx, this);
b1->tv_usec = (uint32)time;

接收端R计算reply_micro表示接受时间R1与发送时间S1的差,粗略地估计了从S到R的经历的时间。注意由于两个主机的时钟不一定一致,所以这个值不精确。

1
2
3
4
5
6
7
// utp_process_incoming
uint64 p = pf1->tv_usec;
// get delay in both directions
// record the delay to report back
const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p);
conn->reply_micro = their_delay;

为了消除误差,UTP借助了NTP授时协议的机制。这里需要假设S与R之间的网络状况是对等的(这是一个很强的假设),即从S到R的速度不至于显著慢或快于从R到S的速度。这时候从R往S端发送一个回复的数据包,记录下这次的发送时间R2和接收时间S2。可以计算得到仅由网络原因造成的延时为$(S2 - S1) - (R2 - R1)$,还能得到S和R两个主机之间的时间差是$\frac{(R1 - S1) + (R2 - S2)}{2}$。

1
2
// send_data
b1->reply_micro = reply_micro;

DelayHist

DelayHist记录了时间的延迟,具有以下的方法

  1. shift
    用来将所有的delay_base_hist向右偏移一段时间长度

clock drift问题

由于UTP被用来实现一些BT下载软件,这个机制是UTP用来防止用户故意调慢时钟从而霸占带宽设计的,并且不会产生误报(False positive)。

uTP数据包统计

utp_context_statsutp_context_stats中进行context和socket级别的统计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Returned by utp_get_context_stats()
typedef struct {
uint32 _nraw_recv[5]; // total packets recieved less than 300/600/1200/MTU bytes fpr all connections (context-wide)
uint32 _nraw_send[5]; // total packets sent less than 300/600/1200/MTU bytes for all connections (context-wide)
} utp_context_stats;
// Returned by utp_get_stats()
typedef struct {
uint64 nbytes_recv; // total bytes received
uint64 nbytes_xmit; // total bytes transmitted
uint32 rexmit; // retransmit counter
uint32 fastrexmit; // fast retransmit counter
uint32 nxmit; // transmit counter
uint32 nrecv; // receive counter (total)
uint32 nduprecv; // duplicate receive counter
uint32 mtu_guess; // Best guess at MTU
} utp_socket_stats;

此外在utp_context中也维护了rst_info等信息。

序号溢出问题

TCP中使用了32位的序号,并且具有PAWS机制防止在大带宽的情况下序号被迅速耗尽后产生回绕。如前文所展示的,在uTP的实现中利用了无符号整数的溢出来避免回绕时序号变为0的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// compare if lhs is less than rhs, taking wrapping
// into account. if lhs is close to UINT_MAX and rhs
// is close to 0, lhs is assumed to have wrapped and
// considered smaller
bool wrapping_compare_less(uint32 lhs, uint32 rhs, uint32 mask)
{
// distance walking from lhs to rhs, downwards
const uint32 dist_down = (lhs - rhs) & mask;
// distance walking from lhs to rhs, upwards
const uint32 dist_up = (rhs - lhs) & mask;
// if the distance walking up is shorter, lhs
// is less than rhs. If the distance walking down
// is shorter, then rhs is less than lhs
return dist_up < dist_down;
}