From 72322836c6368c42c4f285dd23e010993076106d Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 20 Apr 2020 20:25:23 +0800 Subject: [PATCH] Update demo for zerocopy --- trunk/auto/depends.sh | 6 +- trunk/research/msg_zerocopy/Makefile | 4 +- trunk/research/msg_zerocopy/client.cpp | 125 +++++++++++++++++++++++-- trunk/research/msg_zerocopy/server.cpp | 59 +++++++++--- trunk/src/app/srs_app_rtc_conn.cpp | 2 + 5 files changed, 171 insertions(+), 25 deletions(-) diff --git a/trunk/auto/depends.sh b/trunk/auto/depends.sh index 557de4bb3..932c55ade 100755 --- a/trunk/auto/depends.sh +++ b/trunk/auto/depends.sh @@ -435,6 +435,10 @@ fi # cherrypy for http hooks callback, CherryPy-3.2.4 ##################################################################################### if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then + # Detect python or python2 + python --version >/dev/null 2>&1 && SYS_PYTHON=python; + python2 --version >/dev/null 2>&1 && SYS_PYTHON=python2; + # Install cherrypy for api server. if [[ -f ${SRS_OBJS}/${SRS_PLATFORM}/CherryPy-3.2.4/setup.py ]]; then echo "CherryPy-3.2.4 is ok."; else @@ -442,7 +446,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then ( rm -rf ${SRS_OBJS}/CherryPy-3.2.4 && cd ${SRS_OBJS}/${SRS_PLATFORM} && unzip -q ../../3rdparty/CherryPy-3.2.4.zip && cd CherryPy-3.2.4 && - python setup.py install --user --prefix='' + $SYS_PYTHON setup.py install --user --prefix='' ) fi # check status diff --git a/trunk/research/msg_zerocopy/Makefile b/trunk/research/msg_zerocopy/Makefile index cd67c1885..9cadc3e96 100644 --- a/trunk/research/msg_zerocopy/Makefile +++ b/trunk/research/msg_zerocopy/Makefile @@ -4,10 +4,10 @@ default: server client server: server.cpp ../../objs/st/libst.a - gcc -g -O0 -I../../objs/st/ $^ -o $@ + g++ -g -O0 -I../../objs/st/ $^ -o $@ client: client.cpp ../../objs/st/libst.a - gcc -g -O0 -I../../objs/st/ $^ -o $@ + g++ -g -O0 -I../../objs/st/ $^ -o $@ ../../objs/st/libst.a: ../../Makefile (cd ../../ && $(MAKE) st) diff --git a/trunk/research/msg_zerocopy/client.cpp b/trunk/research/msg_zerocopy/client.cpp index acae19459..3d706bbc1 100644 --- a/trunk/research/msg_zerocopy/client.cpp +++ b/trunk/research/msg_zerocopy/client.cpp @@ -7,19 +7,72 @@ #include #include +#include + +// @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-reception +#include + +// @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c +#include +#ifndef SO_EE_ORIGIN_ZEROCOPY +#define SO_EE_ORIGIN_ZEROCOPY 5 +#endif + +#ifndef SO_ZEROCOPY +#define SO_ZEROCOPY 60 +#endif + +#ifndef SO_EE_CODE_ZEROCOPY_COPIED +#define SO_EE_CODE_ZEROCOPY_COPIED 1 +#endif + +#ifndef MSG_ZEROCOPY +#define MSG_ZEROCOPY 0x4000000 +#endif + +void* receiver(void* arg) +{ + st_netfd_t stfd = (st_netfd_t)arg; + + sockaddr_in peer; + memset(&peer, 0, sizeof(sockaddr_in)); + + char buf[1500]; + memset(buf, 0, sizeof(buf)); + + iovec iov; + iov.iov_base = buf; + iov.iov_len = sizeof(buf); + + msghdr msg; + memset(&msg, 0, sizeof(msghdr)); + msg.msg_name = (sockaddr_in*)&peer; + msg.msg_namelen = sizeof(sockaddr_in); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + int r0 = st_recvmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT); + assert(r0 > 0); + printf("Pong %s:%d %d bytes, flags %#x, %s\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port), r0, + msg.msg_flags, msg.msg_iov->iov_base); + + return NULL; +} + int main(int argc, char** argv) { - if (argc < 4) { - printf("Usage: %s \n", argv[0]); + if (argc < 5) { + printf("Usage: %s \n", argv[0]); printf("For example:\n"); - printf(" %s 127.0.0.1 8000 true\n", argv[0]); + printf(" %s 127.0.0.1 8000 true true\n", argv[0]); exit(-1); } char* host = argv[1]; int port = atoi(argv[2]); bool pong = !strcmp(argv[3], "true"); - printf("Server listen %s:%d, pong %d\n", host, port, pong); + bool zerocopy = !strcmp(argv[4], "true"); + printf("Server listen %s:%d, pong %d, zerocopy %d\n", host, port, pong, zerocopy); assert(!st_set_eventsys(ST_EVENTSYS_ALT)); assert(!st_init()); @@ -27,8 +80,31 @@ int main(int argc, char** argv) int fd = socket(PF_INET, SOCK_DGRAM, 0); assert(fd > 0); + // @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c + if (zerocopy) { + int one = 1; + int r0 = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one)); +// MSG_ZEROCOPY for UDP was added in commit b5947e5d1e71 ("udp: msg_zerocopy") in Linux 5.0. +// @see https://lore.kernel.org/netdev/CA+FuTSfBFqRViKfG5crEv8xLMgAkp3cZ+yeuELK5TVv61xT=Yw@mail.gmail.com/ +#if LINUX_VERSION_CODE < KERNEL_VERSION(5,0,0) + if (r0 == -1) { + printf("MSG_ZEROCOPY should be kernel 5.0+, kernel %#x, errno=%d\n", LINUX_VERSION_CODE, 524); + exit(-1); + } +#endif + assert(!r0); + + printf("epoll events EPOLLERR=%#x, EPOLLHUP=%#x\n", EPOLLERR, EPOLLHUP); + } + st_netfd_t stfd = st_netfd_open_socket(fd); assert(stfd); + printf("Client fd=%d\n", fd); + + if (pong) { + st_thread_t r0 = st_thread_create(receiver, stfd, 0, 0); + assert(r0); + } sockaddr_in peer; memset(&peer, 0, sizeof(sockaddr_in)); @@ -52,15 +128,44 @@ int main(int argc, char** argv) msg.msg_iov = &iov; msg.msg_iovlen = 1; - int r0 = st_sendmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT); + int r0; + if (zerocopy) { + r0 = st_sendmsg(stfd, &msg, MSG_ZEROCOPY, ST_UTIME_NO_TIMEOUT); + } else { + r0 = st_sendmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT); + } + assert(r0 > 0); printf("Ping %s:%d %d bytes, r0=%d, %s\n", host, port, iov.iov_len, r0, msg.msg_iov->iov_base); - if (pong) { - r0 = st_recvmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT); - assert(r0 > 0); - printf("Pong %s:%d %d bytes, flags %#x, %s\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port), r0, - msg.msg_flags, msg.msg_iov->iov_base); + // Reception from kernel, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-reception + // See do_recv_completion at https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/msg_zerocopy.c#L393 + char control[100]; + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + // Note that the r0 is 0, the reception is in the control. + r0 = st_recvmsg(stfd, &msg, MSG_ERRQUEUE, ST_UTIME_NO_TIMEOUT); + assert(r0 >= 0); + assert(msg.msg_flags == MSG_ERRQUEUE); + + // Notification parsing, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#notification-parsing + cmsghdr* cm = CMSG_FIRSTHDR(&msg); + assert(cm->cmsg_level == SOL_IP || cm->cmsg_type == IP_RECVERR); + + sock_extended_err* serr = (sock_extended_err*)(void*)CMSG_DATA(cm); + assert(serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY); + + uint32_t hi = serr->ee_data; + uint32_t lo = serr->ee_info; + uint32_t range = hi - lo + 1; + printf("Reception %d bytes, flags %#x, cmsg(level %#x, type %#x), serr(errno %#x, origin %#x, code %#x), %d [%d, %d]\n", + msg.msg_controllen, msg.msg_flags, cm->cmsg_level, cm->cmsg_type, serr->ee_errno, serr->ee_origin, serr->ee_code, range, lo, hi); + + // Defered Copies, @see https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#deferred-copies + if (serr->ee_code == SO_EE_CODE_ZEROCOPY_COPIED) { + printf("Warning: Defered copies, should stop zerocopy\n"); } + st_sleep(-1); + return 0; } diff --git a/trunk/research/msg_zerocopy/server.cpp b/trunk/research/msg_zerocopy/server.cpp index 417e8c529..6f83fde97 100644 --- a/trunk/research/msg_zerocopy/server.cpp +++ b/trunk/research/msg_zerocopy/server.cpp @@ -7,6 +7,46 @@ #include #include +struct message { + st_netfd_t stfd; + sockaddr_in peer; + int delay; +}; + +void* sender(void* arg) +{ + message* p = (message*)arg; + + int delay = p->delay; + if (delay > 0) { + st_usleep(delay * 1000); + } + + msghdr msg; + memset(&msg, 0, sizeof(msghdr)); + + sockaddr_in peer = p->peer; + msg.msg_name = (sockaddr_in*)&peer; + msg.msg_namelen = sizeof(sockaddr_in); + + char buf[] = "World"; + + iovec iov; + memset(&iov, 0, sizeof(iovec)); + iov.iov_base = buf; + iov.iov_len = sizeof(buf); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + st_netfd_t stfd = p->stfd; + int r0 = st_sendmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT); + assert(r0 > 0); + printf("Pong %s:%d %d bytes, flags %#x, %s\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port), r0, + msg.msg_flags, msg.msg_iov->iov_base); + + return NULL; +} + int main(int argc, char** argv) { if (argc < 5) { @@ -47,7 +87,7 @@ int main(int argc, char** argv) st_netfd_t stfd = st_netfd_open_socket(fd); assert(stfd); - printf("Listen at udp://%s:%d\n", host, port); + printf("Listen at udp://%s:%d, fd=%d\n", host, port, fd); msghdr msg; memset(&msg, 0, sizeof(msghdr)); @@ -73,18 +113,13 @@ int main(int argc, char** argv) printf("From %s:%d %d bytes, flags %#x, %s\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port), r0, msg.msg_flags, msg.msg_iov->iov_base); - memcpy(msg.msg_iov->iov_base, "World", 5); - msg.msg_iov->iov_len = 5; - if (pong) { - if (delay > 0) { - st_usleep(delay * 1000); - } - - r0 = st_sendmsg(stfd, &msg, 0, ST_UTIME_NO_TIMEOUT); - assert(r0 > 0); - printf("Pong %s:%d %d bytes, flags %#x, %s\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port), r0, - msg.msg_flags, msg.msg_iov->iov_base); + message* msg = new message(); + msg->stfd = stfd; + msg->peer = peer; + msg->delay = delay; + st_thread_t r0 = st_thread_create(sender, msg, 0, 0); + assert(r0); } } diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index dd9a791b2..e1a95765a 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -34,6 +34,8 @@ using namespace std; #include #include +// Define macro for UDP GSO. +// @see https://github.com/torvalds/linux/blob/master/tools/testing/selftests/net/udpgso.c #ifndef UDP_SEGMENT #define UDP_SEGMENT 103 #endif