2
0

client.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. #include <st.h>
  2. #include <sys/socket.h>
  3. #include <netinet/in.h>
  4. #include <arpa/inet.h>
  5. #include <string.h>
  6. #include <assert.h>
  7. #include <stdio.h>
  8. #include <stdlib.h>
  9. #include <getopt.h>
  10. #include <linux/version.h>
  11. // @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-reception
  12. #include <sys/epoll.h>
  13. // @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c
  14. #include <linux/errqueue.h>
  15. #ifndef SO_EE_ORIGIN_ZEROCOPY
  16. #define SO_EE_ORIGIN_ZEROCOPY 5
  17. #endif
  18. #ifndef SO_ZEROCOPY
  19. #define SO_ZEROCOPY 60
  20. #endif
  21. #ifndef SO_EE_CODE_ZEROCOPY_COPIED
  22. #define SO_EE_CODE_ZEROCOPY_COPIED 1
  23. #endif
  24. #ifndef MSG_ZEROCOPY
  25. #define MSG_ZEROCOPY 0x4000000
  26. #endif
  27. #include <netinet/udp.h>
  28. // Define macro for UDP GSO.
  29. // @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/udpgso.c
  30. #ifndef UDP_SEGMENT
  31. #define UDP_SEGMENT 103
  32. #endif
  33. void* receiver(void* arg)
  34. {
  35. st_netfd_t stfd = (st_netfd_t)arg;
  36. for (;;) {
  37. sockaddr_in peer;
  38. memset(&peer, 0, sizeof(sockaddr_in));
  39. char buf[1500];
  40. memset(buf, 0, sizeof(buf));
  41. iovec iov;
  42. iov.iov_base = buf;
  43. iov.iov_len = sizeof(buf);
  44. msghdr msg;
  45. memset(&msg, 0, sizeof(msghdr));
  46. msg.msg_name = (sockaddr_in*)&peer;
  47. msg.msg_namelen = sizeof(sockaddr_in);
  48. msg.msg_iov = &iov;
  49. msg.msg_iovlen = 1;
  50. int r0 = st_recvmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT);
  51. assert(r0 > 0);
  52. printf("Pong %s:%d %d bytes, flags %#x, %s\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port), r0,
  53. msg.msg_flags, msg.msg_iov->iov_base);
  54. }
  55. return NULL;
  56. }
  57. void parse_reception(st_netfd_t stfd, int nn_confirm)
  58. {
  59. int left = nn_confirm;
  60. while (left > 0) {
  61. msghdr msg;
  62. memset(&msg, 0, sizeof(msghdr));
  63. // Reception from kernel, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-reception
  64. // See do_recv_completion at https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c#L393
  65. char control[100];
  66. msg.msg_control = control;
  67. msg.msg_controllen = sizeof(control);
  68. // Note that the r0 is 0, the reception is in the control.
  69. int r0 = st_recvmsg(stfd, &msg, MSG_ERRQUEUE, ST_UTIME_NO_TIMEOUT);
  70. assert(r0 >= 0);
  71. assert(msg.msg_flags == MSG_ERRQUEUE);
  72. // Notification parsing, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-parsing
  73. cmsghdr* cm = CMSG_FIRSTHDR(&msg);
  74. assert(cm->cmsg_level == SOL_IP || cm->cmsg_type == IP_RECVERR);
  75. sock_extended_err* serr = (sock_extended_err*)(void*)CMSG_DATA(cm);
  76. assert(serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY);
  77. uint32_t hi = serr->ee_data;
  78. uint32_t lo = serr->ee_info;
  79. uint32_t range = hi - lo + 1;
  80. left -= range;
  81. printf("Reception %d bytes, flags %#x, cmsg(level %#x, type %#x), serr(errno %#x, origin %#x, code %#x), range %d [%d, %d]\n",
  82. msg.msg_controllen, msg.msg_flags, cm->cmsg_level, cm->cmsg_type, serr->ee_errno, serr->ee_origin, serr->ee_code, range, lo, hi);
  83. // Defered Copies, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#deferred-copies
  84. if (serr->ee_code == SO_EE_CODE_ZEROCOPY_COPIED) {
  85. printf("Warning: Defered copies, should stop zerocopy\n");
  86. }
  87. }
  88. }
  89. void usage(int argc, char** argv)
  90. {
  91. printf("Usage: %s <options>\n", argv[0]);
  92. printf("Options:\n");
  93. printf(" --help Print this help and exit.\n");
  94. printf(" --host=string The host to send to.\n");
  95. printf(" --port=int The port to send to.\n");
  96. printf(" --pong=bool Whether response pong, true|false\n");
  97. printf(" --zerocopy=bool Whether use zerocopy to sendmsg, true|false\n");
  98. printf(" --copy=int The copies of message, 1 means sendmmsg(msg+msg)\n");
  99. printf(" --loop=int The number of loop to send out messages\n");
  100. printf(" --batch=bool Whether read reception by batch, true|false\n");
  101. printf(" --mix=bool Whether mix msg with zerocopy and those without, true|false\n");
  102. printf(" --size=int Each message size in bytes.\n");
  103. printf(" --gso=int The GSO size in bytes, 0 to disable it.\n");
  104. printf(" --iovs=int The number of iovs to send, at least 1.\n");
  105. printf(" --sndbuf=int The SO_SNDBUF size in bytes, 0 to ignore.\n");
  106. printf("For example:\n");
  107. printf(" %s --host=127.0.0.1 --port=8000 --pong=true --zerocopy=true --copy=0 --loop=1 --batch=true --mix=true --size=1400 --gso=0 --iovs=1 --sndbuf=0\n", argv[0]);
  108. }
  109. int main(int argc, char** argv)
  110. {
  111. option longopts[] = {
  112. { "host", required_argument, NULL, 'o' },
  113. { "port", required_argument, NULL, 'p' },
  114. { "pong", required_argument, NULL, 'n' },
  115. { "zerocopy", required_argument, NULL, 'z' },
  116. { "copy", required_argument, NULL, 'c' },
  117. { "loop", required_argument, NULL, 'l' },
  118. { "batch", required_argument, NULL, 'b' },
  119. { "mix", required_argument, NULL, 'm' },
  120. { "size", required_argument, NULL, 's' },
  121. { "gso", required_argument, NULL, 'g' },
  122. { "iovs", required_argument, NULL, 'i' },
  123. { "sndbuf", required_argument, NULL, 'u' },
  124. { "help", no_argument, NULL, 'h' },
  125. { NULL, 0, NULL, 0 }
  126. };
  127. char* host = NULL; char ch;
  128. int port = 0; int nn_copies = 0; int loop = 1; int size = 1500; int gso = 0; int nn_iovs = 0; int sndbuf = 0;
  129. bool pong = false; bool zerocopy = false; bool batch = false; bool mix = false;
  130. while ((ch = getopt_long(argc, argv, "o:p:n:z:c:l:b:m:s:g:u:h", longopts, NULL)) != -1) {
  131. switch (ch) {
  132. case 'o': host = (char*)optarg; break;
  133. case 'p': port = atoi(optarg); break;
  134. case 'n': pong = !strcmp(optarg,"true"); break;
  135. case 'z': zerocopy = !strcmp(optarg,"true"); break;
  136. case 'c': nn_copies = atoi(optarg); break;
  137. case 'l': loop = atoi(optarg); break;
  138. case 'b': batch = !strcmp(optarg,"true"); break;
  139. case 'm': mix = !strcmp(optarg,"true"); break;
  140. case 's': size = atoi(optarg); break;
  141. case 'g': gso = atoi(optarg); break;
  142. case 'i': nn_iovs = atoi(optarg); break;
  143. case 'u': sndbuf = atoi(optarg); break;
  144. case 'h': usage(argc, argv); exit(0);
  145. default: usage(argc, argv); exit(-1);
  146. }
  147. }
  148. printf("Server listen %s:%d, pong %d, zerocopy %d, copies %d, loop %d, batch %d, mix %d, size %d, gso %d, iovs %d, sndbuf %d\n",
  149. host, port, pong, zerocopy, nn_copies, loop, batch, mix, size, gso, nn_iovs, sndbuf);
  150. if (!host || !port || !nn_iovs) {
  151. usage(argc, argv);
  152. exit(-1);
  153. }
  154. assert(!st_set_eventsys(ST_EVENTSYS_ALT));
  155. assert(!st_init());
  156. int fd = socket(PF_INET, SOCK_DGRAM, 0);
  157. assert(fd > 0);
  158. // @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c
  159. if (zerocopy) {
  160. int one = 1;
  161. int r0 = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one));
  162. // MSG_ZEROCOPY for UDP was added in commit b5947e5d1e71 ("udp: msg_zerocopy") in Linux 5.0.
  163. // @see https://lore.kernel.org/netdev/CA+FuTSfBFqRViKfG5crEv8xLMgAkp3cZ+yeuELK5TVv61xT=Yw@mail.gmail.com/
  164. #if LINUX_VERSION_CODE < KERNEL_VERSION(5,0,0)
  165. if (r0 == -1) {
  166. printf("MSG_ZEROCOPY should be kernel 5.0+, kernel %#x, errno=%d\n", LINUX_VERSION_CODE, 524);
  167. exit(-1);
  168. }
  169. #endif
  170. assert(!r0);
  171. printf("epoll events EPOLLERR=%#x, EPOLLHUP=%#x\n", EPOLLERR, EPOLLHUP);
  172. }
  173. if (true) {
  174. int dv = 0;
  175. socklen_t len = sizeof(dv);
  176. int r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &dv, &len);
  177. int r1 = 0;
  178. if (sndbuf > 0) {
  179. r1 = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf));
  180. }
  181. int nv = 0;
  182. int r2 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nv, &len);
  183. printf("socket SO_SNDBUF default=%d, user=%d, now=%d, r0=%d, r1=%d, r2=%d\n", dv, sndbuf, nv, r0, r1, r2);
  184. }
  185. st_netfd_t stfd = st_netfd_open_socket(fd);
  186. assert(stfd);
  187. printf("Client fd=%d\n", fd);
  188. if (pong) {
  189. st_thread_t r0 = st_thread_create(receiver, stfd, 0, 0);
  190. assert(r0);
  191. }
  192. sockaddr_in peer;
  193. memset(&peer, 0, sizeof(sockaddr_in));
  194. peer.sin_family = AF_INET;
  195. peer.sin_port = htons(port);
  196. peer.sin_addr.s_addr = inet_addr(host);
  197. char* buf = new char[size];
  198. memset(buf, 0, size);
  199. memcpy(buf, "Hello", size < 5? size : 5);
  200. iovec iov;
  201. iov.iov_base = buf;
  202. iov.iov_len = size;
  203. int nn_confirm = 0;
  204. for (int k = 0; k < loop; k++) {
  205. msghdr msg;
  206. memset(&msg, 0, sizeof(msghdr));
  207. msg.msg_name = (sockaddr_in*)&peer;
  208. msg.msg_namelen = sizeof(sockaddr_in);
  209. msg.msg_iov = new iovec[nn_iovs];
  210. msg.msg_iovlen = nn_iovs;
  211. for (int i = 0; i < nn_iovs; i++) {
  212. iovec* p = msg.msg_iov + i;
  213. memcpy(p, &iov, sizeof(iovec));
  214. }
  215. if (gso > 0) {
  216. msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
  217. if (!msg.msg_control) {
  218. msg.msg_control = new char[msg.msg_controllen];
  219. }
  220. cmsghdr* cm = CMSG_FIRSTHDR(&msg);
  221. cm->cmsg_level = SOL_UDP;
  222. cm->cmsg_type = UDP_SEGMENT;
  223. cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
  224. *((uint16_t*)CMSG_DATA(cm)) = gso;
  225. }
  226. int r0;
  227. if (nn_copies == 0) {
  228. if (zerocopy) {
  229. r0 = st_sendmsg(stfd, &msg, MSG_ZEROCOPY, ST_UTIME_NO_TIMEOUT);
  230. } else {
  231. r0 = st_sendmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT);
  232. }
  233. } else {
  234. mmsghdr* hdrs = new mmsghdr[nn_copies + 1];
  235. for (int i = 0; i < nn_copies + 1; i++) {
  236. mmsghdr* p = hdrs + i;
  237. memcpy(&p->msg_hdr, &msg, sizeof(msghdr));
  238. p->msg_len = 0;
  239. }
  240. // The sendmmsg is removed by https://github.com/ossrs/srs/commit/34dae0fe0ddf2e95353ca8cbdc799f4abf96aead
  241. if (zerocopy) {
  242. r0 = st_sendmmsg(stfd, hdrs, nn_copies + 1, MSG_ZEROCOPY, ST_UTIME_NO_TIMEOUT);
  243. } else {
  244. r0 = st_sendmmsg(stfd, hdrs, nn_copies + 1, 0, ST_UTIME_NO_TIMEOUT);
  245. }
  246. }
  247. if (r0 > 0) {
  248. printf("Ping %s:%d %d bytes, control %d, copies=%d, r0=%d, %s\n", host, port, iov.iov_len * nn_iovs,
  249. msg.msg_controllen, nn_copies, r0, msg.msg_iov->iov_base);
  250. } else {
  251. printf("Ping %d bytes, error r0=%d, errno=%d\n", iov.iov_len * nn_iovs, r0, errno); exit(1);
  252. }
  253. if (zerocopy && !batch) {
  254. parse_reception(stfd, r0);
  255. } else {
  256. nn_confirm += r0;
  257. }
  258. if (mix) {
  259. r0 = st_sendmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT);
  260. assert(r0 > 0);
  261. printf("Mix %s:%d %d bytes, r0=%d, %s\n", host, port, iov.iov_len * nn_iovs, r0, msg.msg_iov->iov_base);
  262. }
  263. }
  264. // @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-batching
  265. if (batch) {
  266. parse_reception(stfd, nn_confirm);
  267. }
  268. st_sleep(-1);
  269. return 0;
  270. }