diff --git a/trunk/research/msg_zerocopy/client.cpp b/trunk/research/msg_zerocopy/client.cpp index cdfb18bc6..92f30f604 100644 --- a/trunk/research/msg_zerocopy/client.cpp +++ b/trunk/research/msg_zerocopy/client.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -30,6 +31,13 @@ #define MSG_ZEROCOPY 0x4000000 #endif +#include +// Define macro for UDP GSO. +// @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/udpgso.c +#ifndef UDP_SEGMENT +#define UDP_SEGMENT 103 +#endif + void* receiver(void* arg) { st_netfd_t stfd = (st_netfd_t)arg; @@ -61,63 +69,112 @@ void* receiver(void* arg) return NULL; } -void parse_reception(st_netfd_t stfd) +void parse_reception(st_netfd_t stfd, int nn_confirm) { - msghdr msg; - memset(&msg, 0, sizeof(msghdr)); - - // Reception from kernel, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-reception - // See do_recv_completion at https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c#L393 - char control[100]; - msg.msg_control = control; - msg.msg_controllen = sizeof(control); - // Note that the r0 is 0, the reception is in the control. - int r0 = st_recvmsg(stfd, &msg, MSG_ERRQUEUE, ST_UTIME_NO_TIMEOUT); - assert(r0 >= 0); - assert(msg.msg_flags == MSG_ERRQUEUE); - - // Notification parsing, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-parsing - cmsghdr* cm = CMSG_FIRSTHDR(&msg); - assert(cm->cmsg_level == SOL_IP || cm->cmsg_type == IP_RECVERR); - - sock_extended_err* serr = (sock_extended_err*)(void*)CMSG_DATA(cm); - assert(serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY); - - uint32_t hi = serr->ee_data; - uint32_t lo = serr->ee_info; - uint32_t range = hi - lo + 1; - printf("Reception %d bytes, flags %#x, cmsg(level %#x, type %#x), serr(errno %#x, origin %#x, code %#x), range %d [%d, %d]\n", - msg.msg_controllen, msg.msg_flags, cm->cmsg_level, cm->cmsg_type, serr->ee_errno, serr->ee_origin, serr->ee_code, range, lo, hi); - - // Defered Copies, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#deferred-copies - if (serr->ee_code == SO_EE_CODE_ZEROCOPY_COPIED) { - printf("Warning: Defered copies, should stop zerocopy\n"); + int left = nn_confirm; + while (left > 0) { + msghdr msg; + memset(&msg, 0, sizeof(msghdr)); + + // Reception from kernel, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-reception + // See do_recv_completion at https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c#L393 + char control[100]; + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + // Note that the r0 is 0, the reception is in the control. + int r0 = st_recvmsg(stfd, &msg, MSG_ERRQUEUE, ST_UTIME_NO_TIMEOUT); + assert(r0 >= 0); + assert(msg.msg_flags == MSG_ERRQUEUE); + + // Notification parsing, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-parsing + cmsghdr* cm = CMSG_FIRSTHDR(&msg); + assert(cm->cmsg_level == SOL_IP || cm->cmsg_type == IP_RECVERR); + + sock_extended_err* serr = (sock_extended_err*)(void*)CMSG_DATA(cm); + assert(serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY); + + uint32_t hi = serr->ee_data; + uint32_t lo = serr->ee_info; + uint32_t range = hi - lo + 1; + left -= range; + printf("Reception %d bytes, flags %#x, cmsg(level %#x, type %#x), serr(errno %#x, origin %#x, code %#x), range %d [%d, %d]\n", + msg.msg_controllen, msg.msg_flags, cm->cmsg_level, cm->cmsg_type, serr->ee_errno, serr->ee_origin, serr->ee_code, range, lo, hi); + + // Defered Copies, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#deferred-copies + if (serr->ee_code == SO_EE_CODE_ZEROCOPY_COPIED) { + printf("Warning: Defered copies, should stop zerocopy\n"); + } } } +void usage(int argc, char** argv) +{ + printf("Usage: %s \n", argv[0]); + printf("Options:\n"); + printf(" --help Print this help and exit.\n"); + printf(" --host=string The host to send to.\n"); + printf(" --port=int The port to send to.\n"); + printf(" --pong=bool Whether response pong, true|false\n"); + printf(" --zerocopy=bool Whether use zerocopy to sendmsg, true|false\n"); + printf(" --copy=int The copies of message, 1 means sendmmsg(msg+msg)\n"); + printf(" --loop=int The number of loop to send out messages\n"); + printf(" --batch=bool Whether read reception by batch, true|false\n"); + printf(" --mix=bool Whether mix msg with zerocopy and those without, true|false\n"); + printf(" --size=int Each message size in bytes.\n"); + printf(" --gso=int The GSO size in bytes, 0 to disable it.\n"); + printf(" --iovs=int The number of iovs to send, at least 1.\n"); + printf(" --sndbuf=int The SO_SNDBUF size in bytes, 0 to ignore.\n"); + printf("For example:\n"); + printf(" %s --host=127.0.0.1 --port=8000 --pong=true --zerocopy=true --copy=0 --loop=1 --batch=true --mix=true --size=1400 --gso=0 --iovs=1 --sndbuf=0\n", argv[0]); +} + int main(int argc, char** argv) { - if (argc < 8) { - printf("Usage: %s \n", argv[0]); - printf(" pong Whether response pong, true|false\n"); - printf(" zerocopy Whether use zerocopy to sendmsg, true|false\n"); - printf(" sendmmsg The copies of message, 1 means sendmmsg(msg+msg)\n"); - printf(" loop The number of loop to send out messages\n"); - printf(" batch Whether read reception by batch, true|false\n"); - printf("For example:\n"); - printf(" %s 127.0.0.1 8000 true true 0 1 true\n", argv[0]); - exit(-1); + option longopts[] = { + { "host", required_argument, NULL, 'o' }, + { "port", required_argument, NULL, 'p' }, + { "pong", required_argument, NULL, 'n' }, + { "zerocopy", required_argument, NULL, 'z' }, + { "copy", required_argument, NULL, 'c' }, + { "loop", required_argument, NULL, 'l' }, + { "batch", required_argument, NULL, 'b' }, + { "mix", required_argument, NULL, 'm' }, + { "size", required_argument, NULL, 's' }, + { "gso", required_argument, NULL, 'g' }, + { "iovs", required_argument, NULL, 'i' }, + { "sndbuf", required_argument, NULL, 'u' }, + { "help", no_argument, NULL, 'h' }, + { NULL, 0, NULL, 0 } + }; + + char* host = NULL; char ch; + int port = 0; int nn_copies = 0; int loop = 1; int size = 1500; int gso = 0; int nn_iovs = 0; int sndbuf = 0; + bool pong = false; bool zerocopy = false; bool batch = false; bool mix = false; + while ((ch = getopt_long(argc, argv, "o:p:n:z:c:l:b:m:s:g:u:h", longopts, NULL)) != -1) { + switch (ch) { + case 'o': host = (char*)optarg; break; + case 'p': port = atoi(optarg); break; + case 'n': pong = !strcmp(optarg,"true"); break; + case 'z': zerocopy = !strcmp(optarg,"true"); break; + case 'c': nn_copies = atoi(optarg); break; + case 'l': loop = atoi(optarg); break; + case 'b': batch = !strcmp(optarg,"true"); break; + case 'm': mix = !strcmp(optarg,"true"); break; + case 's': size = atoi(optarg); break; + case 'g': gso = atoi(optarg); break; + case 'i': nn_iovs = atoi(optarg); break; + case 'u': sndbuf = atoi(optarg); break; + case 'h': usage(argc, argv); exit(0); + default: usage(argc, argv); exit(-1); + } } - char* host = argv[1]; - int port = atoi(argv[2]); - bool pong = !strcmp(argv[3], "true"); - bool zerocopy = !strcmp(argv[4], "true"); - int nn_copies = atoi(argv[5]); - int loop = atoi(argv[6]); - bool batch = !strcmp(argv[7], "true"); - printf("Server listen %s:%d, pong %d, zerocopy %d, copies %d, loop %d, batch %d\n", - host, port, pong, zerocopy, nn_copies, loop, batch); + printf("Server listen %s:%d, pong %d, zerocopy %d, copies %d, loop %d, batch %d, mix %d, size %d, gso %d, iovs %d, sndbuf %d\n", + host, port, pong, zerocopy, nn_copies, loop, batch, mix, size, gso, nn_iovs, sndbuf); + if (!host || !port || !nn_iovs) { + usage(argc, argv); + exit(-1); + } assert(!st_set_eventsys(ST_EVENTSYS_ALT)); assert(!st_init()); @@ -142,6 +199,21 @@ int main(int argc, char** argv) printf("epoll events EPOLLERR=%#x, EPOLLHUP=%#x\n", EPOLLERR, EPOLLHUP); } + if (true) { + int dv = 0; + socklen_t len = sizeof(dv); + int r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &dv, &len); + + int r1 = 0; + if (sndbuf > 0) { + r1 = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); + } + + int nv = 0; + int r2 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nv, &len); + printf("socket SO_SNDBUF default=%d, user=%d, now=%d, r0=%d, r1=%d, r2=%d\n", dv, sndbuf, nv, r0, r1, r2); + } + st_netfd_t stfd = st_netfd_open_socket(fd); assert(stfd); printf("Client fd=%d\n", fd); @@ -158,21 +230,40 @@ int main(int argc, char** argv) peer.sin_port = htons(port); peer.sin_addr.s_addr = inet_addr(host); - char buf[1500]; - memset(buf, 0, sizeof(buf)); - memcpy(buf, "Hello", 5); + char* buf = new char[size]; + memset(buf, 0, size); + memcpy(buf, "Hello", size < 5? size : 5); iovec iov; iov.iov_base = buf; - iov.iov_len = strlen(buf); + iov.iov_len = size; + int nn_confirm = 0; for (int k = 0; k < loop; k++) { msghdr msg; memset(&msg, 0, sizeof(msghdr)); msg.msg_name = (sockaddr_in*)&peer; msg.msg_namelen = sizeof(sockaddr_in); - msg.msg_iov = &iov; - msg.msg_iovlen = 1; + msg.msg_iov = new iovec[nn_iovs]; + msg.msg_iovlen = nn_iovs; + + for (int i = 0; i < nn_iovs; i++) { + iovec* p = msg.msg_iov + i; + memcpy(p, &iov, sizeof(iovec)); + } + + if (gso > 0) { + msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t)); + if (!msg.msg_control) { + msg.msg_control = new char[msg.msg_controllen]; + } + + cmsghdr* cm = CMSG_FIRSTHDR(&msg); + cm->cmsg_level = SOL_UDP; + cm->cmsg_type = UDP_SEGMENT; + cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); + *((uint16_t*)CMSG_DATA(cm)) = gso; + } int r0; if (nn_copies == 0) { @@ -194,22 +285,29 @@ int main(int argc, char** argv) r0 = st_sendmmsg(stfd, hdrs, nn_copies + 1, 0, ST_UTIME_NO_TIMEOUT); } } - assert(r0 > 0); - printf("Ping %s:%d %d bytes, copies=%d, r0=%d, %s\n", host, port, iov.iov_len, nn_copies, r0, msg.msg_iov->iov_base); + if (r0 > 0) { + printf("Ping %s:%d %d bytes, control %d, copies=%d, r0=%d, %s\n", host, port, iov.iov_len * nn_iovs, + msg.msg_controllen, nn_copies, r0, msg.msg_iov->iov_base); + } else { + printf("Ping %d bytes, error r0=%d, errno=%d\n", iov.iov_len * nn_iovs, r0, errno); exit(1); + } - if (!zerocopy) { - continue; + if (zerocopy && !batch) { + parse_reception(stfd, r0); + } else { + nn_confirm += r0; } - if (!batch) { - parse_reception(stfd); + if (mix) { + r0 = st_sendmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT); + assert(r0 > 0); + printf("Mix %s:%d %d bytes, r0=%d, %s\n", host, port, iov.iov_len * nn_iovs, r0, msg.msg_iov->iov_base); } } // @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-batching if (batch) { - st_usleep(100 * 1000); - parse_reception(stfd); + parse_reception(stfd, nn_confirm); } st_sleep(-1); diff --git a/trunk/research/msg_zerocopy/server.cpp b/trunk/research/msg_zerocopy/server.cpp index 6f83fde97..1b9a5d059 100644 --- a/trunk/research/msg_zerocopy/server.cpp +++ b/trunk/research/msg_zerocopy/server.cpp @@ -6,6 +6,7 @@ #include #include #include +#include struct message { st_netfd_t stfd; @@ -47,22 +48,48 @@ void* sender(void* arg) return NULL; } +void usage(int argc, char** argv) +{ + printf("Usage: %s \n", argv[0]); + printf("Options:\n"); + printf(" --help Print this help and exit.\n"); + printf(" --host=string The host to send to.\n"); + printf(" --port=int The port to send to.\n"); + printf(" --pong=bool Whether response pong, true|false\n"); + printf(" --delay=int The delay in ms to response pong.\n"); + printf("For example:\n"); + printf(" %s --host=0.0.0.0 --port=8000 --pong --delay=100\n", argv[0]); +} + int main(int argc, char** argv) { - if (argc < 5) { - printf("Usage: %s \n", argv[0]); - printf(" pong Whether response pong, true|false\n"); - printf(" delay The delay in ms to response pong.\n"); - printf("For example:\n"); - printf(" %s 0.0.0.0 8000 true 100\n", argv[0]); - exit(-1); + option longopts[] = { + { "host", required_argument, NULL, 'o' }, + { "port", required_argument, NULL, 'p' }, + { "pong", required_argument, NULL, 'n' }, + { "delay", required_argument, NULL, 'd' }, + { "help", no_argument, NULL, 'h' }, + { NULL, 0, NULL, 0 } + }; + + char* host = NULL; char ch; + int port = 0; int delay = 0; bool pong = false; + while ((ch = getopt_long(argc, argv, "o:p:n:d:h", longopts, NULL)) != -1) { + switch (ch) { + case 'o': host = (char*)optarg; break; + case 'p': port = atoi(optarg); break; + case 'n': pong = !strcmp(optarg,"true"); break; + case 'd': delay = atoi(optarg); break; + case 'h': usage(argc, argv); exit(0); + default: usage(argc, argv); exit(-1); + } } - char* host = argv[1]; - int port = atoi(argv[2]); - bool pong = !strcmp(argv[3], "true"); - int delay = ::atoi(argv[4]); printf("Server listen %s:%d, pong %d, delay: %dms\n", host, port, pong, delay); + if (!host || !port) { + usage(argc, argv); + exit(-1); + } assert(!st_set_eventsys(ST_EVENTSYS_ALT)); assert(!st_init()); @@ -107,11 +134,12 @@ int main(int argc, char** argv) msg.msg_iov = &iov; msg.msg_iovlen = 1; + int nn_msgs = 0; while (true) { r0 = st_recvmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT); assert(r0 > 0); - printf("From %s:%d %d bytes, flags %#x, %s\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port), r0, - msg.msg_flags, msg.msg_iov->iov_base); + printf("#%d, From %s:%d %d bytes, flags %#x, %s\n", nn_msgs++, inet_ntoa(peer.sin_addr), ntohs(peer.sin_port), + r0, msg.msg_flags, msg.msg_iov->iov_base); if (pong) { message* msg = new message();