DPDK-Writing Efficient Code

DPDK的编程规范,用来指导如何写高性能程序,觉得挺有意思的,所以翻译过来。

内存

Memory Copy

不要在数据面上使用 libc 函数,例如 memcpy 和 strcpy 等,推荐用 rte_memcpy 这样的优化实现,实际上就是 SIMD 的版本。
为了使用 SIMD,需要保证地址不是虚拟地址,例如 malloc 等产生的是虚拟地址,那么在物理地址上可能是分散的,就做不了 SIMD 优化。

下面介绍 rte_memcpy,它要求地址不连续。

rte_mov15_or_less 处理小内存

下面这段代码处理小内存的复制。这里压根不考虑内存对齐了,直接复制。

  1. 如果 n & 8,就一次性复制头 64 bit。
  2. 如果剩下来的还满足 n & 4,就一次性复制下面 32 bit。
  3. 由此类推。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static __rte_always_inline void *
rte_mov15_or_less(void *dst, const void *src, size_t n)
{
/**
* Use the following structs to avoid violating C standard
* alignment requirements and to avoid strict aliasing bugs
*/
struct rte_uint64_alias {
uint64_t val;
} __rte_packed __rte_may_alias;
struct rte_uint32_alias {
uint32_t val;
} __rte_packed __rte_may_alias;
struct rte_uint16_alias {
uint16_t val;
} __rte_packed __rte_may_alias;

void *ret = dst;
if (n & 8) { // 1000b
((struct rte_uint64_alias *)dst)->val =
((const struct rte_uint64_alias *)src)->val;
src = (const uint64_t *)src + 1;
dst = (uint64_t *)dst + 1;
}
if (n & 4) { // 0100b
((struct rte_uint32_alias *)dst)->val =
((const struct rte_uint32_alias *)src)->val;
src = (const uint32_t *)src + 1;
dst = (uint32_t *)dst + 1;
}
if (n & 2) { // 0010b
((struct rte_uint16_alias *)dst)->val =
((const struct rte_uint16_alias *)src)->val;
src = (const uint16_t *)src + 1;
dst = (uint16_t *)dst + 1;
}
if (n & 1) // 0001b
*(uint8_t *)dst = *(const uint8_t *)src;
return ret;
}

AVX512 实现

首先这里使用的都是带 u,也就是非对齐的版本。其中:

  1. 16 byte 数据对应 128 bit,对应一个 xmm 寄存器。
  2. 32 byte 数据对应 256 bit,对应一个 ymm 寄存器。
  3. 64 byte 数据对应 512 bit,对应一个 zmm 寄存器。
    注意,这里是 64 byte 而不是 64 bit,64 bit 等于普通寄存器了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* AVX512 implementation below
*/

/**
* Copy 16 bytes from one location to another,
* locations should not overlap.
*/
static __rte_always_inline void
rte_mov16(uint8_t *dst, const uint8_t *src)
{
__m128i xmm0;

xmm0 = _mm_loadu_si128((const __m128i *)src);
_mm_storeu_si128((__m128i *)dst, xmm0);
}

/**
* Copy 32 bytes from one location to another,
* locations should not overlap.
*/
static __rte_always_inline void
rte_mov32(uint8_t *dst, const uint8_t *src)
{
__m256i ymm0;

ymm0 = _mm256_loadu_si256((const __m256i *)src);
_mm256_storeu_si256((__m256i *)dst, ymm0);
}

/**
* Copy 64 bytes from one location to another,
* locations should not overlap.
*/
static __rte_always_inline void
rte_mov64(uint8_t *dst, const uint8_t *src)
{
__m512i zmm0;

zmm0 = _mm512_loadu_si512((const void *)src);
_mm512_storeu_si512((void *)dst, zmm0);
}

/**
* Copy 128 bytes from one location to another,
* locations should not overlap.
*/
static __rte_always_inline void
rte_mov128(uint8_t *dst, const uint8_t *src)
{
rte_mov64(dst + 0 * 64, src + 0 * 64);
rte_mov64(dst + 1 * 64, src + 1 * 64);
}

/**
* Copy 256 bytes from one location to another,
* locations should not overlap.
*/
static __rte_always_inline void
rte_mov256(uint8_t *dst, const uint8_t *src)
{
rte_mov64(dst + 0 * 64, src + 0 * 64);
rte_mov64(dst + 1 * 64, src + 1 * 64);
rte_mov64(dst + 2 * 64, src + 2 * 64);
rte_mov64(dst + 3 * 64, src + 3 * 64);
}

下面是逐 128 byte 搬数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Copy 128-byte blocks from one location to another,
* locations should not overlap.
*/
static __rte_always_inline void
rte_mov128blocks(uint8_t *dst, const uint8_t *src, size_t n)
{
__m512i zmm0, zmm1;

while (n >= 128) {
zmm0 = _mm512_loadu_si512((const void *)(src + 0 * 64));
n -= 128;
zmm1 = _mm512_loadu_si512((const void *)(src + 1 * 64));
src = src + 128;
_mm512_storeu_si512((void *)(dst + 0 * 64), zmm0);
_mm512_storeu_si512((void *)(dst + 1 * 64), zmm1);
dst = dst + 128;
}
}

下面是逐 512 byte 搬数据。这个是用在对齐逻辑上的,不知道为什么还带 u。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Copy 512-byte blocks from one location to another,
* locations should not overlap.
*/
static inline void
rte_mov512blocks(uint8_t *dst, const uint8_t *src, size_t n)
{
__m512i zmm0, zmm1, zmm2, zmm3, zmm4, zmm5, zmm6, zmm7;

while (n >= 512) {
zmm0 = _mm512_loadu_si512((const void *)(src + 0 * 64));
n -= 512;
zmm1 = _mm512_loadu_si512((const void *)(src + 1 * 64));
zmm2 = _mm512_loadu_si512((const void *)(src + 2 * 64));
zmm3 = _mm512_loadu_si512((const void *)(src + 3 * 64));
zmm4 = _mm512_loadu_si512((const void *)(src + 4 * 64));
zmm5 = _mm512_loadu_si512((const void *)(src + 5 * 64));
zmm6 = _mm512_loadu_si512((const void *)(src + 6 * 64));
zmm7 = _mm512_loadu_si512((const void *)(src + 7 * 64));
src = src + 512;
_mm512_storeu_si512((void *)(dst + 0 * 64), zmm0);
_mm512_storeu_si512((void *)(dst + 1 * 64), zmm1);
_mm512_storeu_si512((void *)(dst + 2 * 64), zmm2);
_mm512_storeu_si512((void *)(dst + 3 * 64), zmm3);
_mm512_storeu_si512((void *)(dst + 4 * 64), zmm4);
_mm512_storeu_si512((void *)(dst + 5 * 64), zmm5);
_mm512_storeu_si512((void *)(dst + 6 * 64), zmm6);
_mm512_storeu_si512((void *)(dst + 7 * 64), zmm7);
dst = dst + 512;
}
}

下面来看 rte_memcpy_generic
首先是处理非对齐的部分:

  1. 如果长度小于 16 bytes,走 rte_mov15_or_less
  2. 如果长度小于等于 32 bytes,走两次 rte_mov16
    令 n=17,则
    第一次从 src[0..16] 复制到 dst[0..16]。
    第二次从 src[1..17] 复制到 dst[1..17]。
  3. 如果长度小于等于 64 bytes,类似上面。
  4. 如果长度小于等于 512 bytes,则是一个类似于 rte_mov15_or_less 的实现
    但是在剩余长度小于 128 bytes 后,如果大于 64 bytes,就走两次 rte_mov64 解决战斗。
    如果小于 64 bytes,就走一次 rte_mov64 解决战斗。

如果需要复制的长度大于 512 bytes,就需要处理对齐的部分。这里检查 dst 是否按照 64 bytes 对齐,即 0x3F 对齐。如果不对齐,则先把前面的部分给复制完毕。

【Q】为什么是考虑 dst 对齐而不是是 src 对齐?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

static __rte_always_inline void *
rte_memcpy_generic(void *dst, const void *src, size_t n)
{
void *ret = dst;
size_t dstofss;
size_t bits;

/**
* Copy less than 16 bytes
*/
if (n < 16) {
return rte_mov15_or_less(dst, src, n);
}

/**
* Fast way when copy size doesn't exceed 512 bytes
*/
if (n <= 32) {
rte_mov16((uint8_t *)dst, (const uint8_t *)src);
rte_mov16((uint8_t *)dst - 16 + n,
(const uint8_t *)src - 16 + n);
return ret;
}
if (n <= 64) {
rte_mov32((uint8_t *)dst, (const uint8_t *)src);
rte_mov32((uint8_t *)dst - 32 + n,
(const uint8_t *)src - 32 + n);
return ret;
}
if (n <= 512) {
if (n >= 256) {
n -= 256;
rte_mov256((uint8_t *)dst, (const uint8_t *)src);
src = (const uint8_t *)src + 256;
dst = (uint8_t *)dst + 256;
}
if (n >= 128) {
n -= 128;
rte_mov128((uint8_t *)dst, (const uint8_t *)src);
src = (const uint8_t *)src + 128;
dst = (uint8_t *)dst + 128;
}
COPY_BLOCK_128_BACK63:
if (n > 64) {
rte_mov64((uint8_t *)dst, (const uint8_t *)src);
rte_mov64((uint8_t *)dst - 64 + n,
(const uint8_t *)src - 64 + n);
return ret;
}
if (n > 0)
rte_mov64((uint8_t *)dst - 64 + n,
(const uint8_t *)src - 64 + n);
return ret;
}

/**
* Make store aligned when copy size exceeds 512 bytes
*/
dstofss = ((uintptr_t)dst & 0x3F); // 111111b = 64
if (dstofss > 0) {
dstofss = 64 - dstofss;
n -= dstofss;
rte_mov64((uint8_t *)dst, (const uint8_t *)src);
src = (const uint8_t *)src + dstofss;
dst = (uint8_t *)dst + dstofss;
}

/**
* Copy 512-byte blocks.
* Use copy block function for better instruction order control,
* which is important when load is unaligned.
*/
rte_mov512blocks((uint8_t *)dst, (const uint8_t *)src, n);
bits = n;
n = n & 511;
bits -= n;
src = (const uint8_t *)src + bits;
dst = (uint8_t *)dst + bits;

/**
* Copy 128-byte blocks.
* Use copy block function for better instruction order control,
* which is important when load is unaligned.
*/
if (n >= 128) {
rte_mov128blocks((uint8_t *)dst, (const uint8_t *)src, n);
bits = n;
n = n & 127;
bits -= n;
src = (const uint8_t *)src + bits;
dst = (uint8_t *)dst + bits;
}

/**
* Copy whatever left
*/
goto COPY_BLOCK_128_BACK63;
}

Memory Allocation

避免使用 malloc 等在堆上分配内存,毕竟维护堆还是比较麻烦的,CSAPP 的 Data Lab 令我记忆犹新,并且也不容易做 parallel allocation。
更为推荐的做法是对固定大小的对象构建内存池,例如librte_mempool/rte_malloc的实现。在这样的实现中需要考虑内存对齐,无锁访问,NUMA感知,批量读写,每个核心的Cache。

对同一内存的并发访问

NUMA

Distribution Across Memory Channels

各个核心之间的通信

PMD

锁和原子操作

Reference

  1. https://doc.dpdk.org/guides/prog_guide/env_abstraction_layer.html
  2. https://github.com/DPDK/dpdk/blob/main/lib/eal/x86/include/rte_memcpy.h
    rte_memcpy 的源码