From 480d6d5f5ab7cb3bbe58a66c425398908c8d86d7 Mon Sep 17 00:00:00 2001 From: Jacob Su Date: Sat, 24 Jan 2026 20:16:49 +0800 Subject: [PATCH] TCP|UDP Listener: Fix coroutine exit when err happens (#4613) (#4615) Try to fix #4613 --- trunk/src/app/srs_app_listener.cpp | 81 +++++++++++------- trunk/src/app/srs_app_listener.hpp | 4 + trunk/src/utest/srs_utest_service.cpp | 114 ++++++++++++++++++++++++++ 3 files changed, 170 insertions(+), 29 deletions(-) diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index 7ffd7e19e..c99931211 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -198,25 +198,11 @@ srs_error_t SrsUdpListener::cycle() return srs_error_wrap(err, "udp listener"); } - int nread = 0; - sockaddr_storage from; - int nb_from = sizeof(from); - if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) { - return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread); + if ((err = do_cycle()) != srs_success) { + srs_warn("%s listener: Ignore error, %s", label_.c_str(), srs_error_desc(err).c_str()); + srs_freep(err); } - // Drop UDP health check packet of Aliyun SLB. - // Healthcheck udp check - // @see https://help.aliyun.com/document_detail/27595.html - if (nread == 21 && buf[0] == 0x48 && buf[1] == 0x65 && buf[2] == 0x61 && buf[3] == 0x6c - && buf[19] == 0x63 && buf[20] == 0x6b) { - continue; - } - - if ((err = handler->on_udp_packet((const sockaddr*)&from, nb_from, buf, nread)) != srs_success) { - return srs_error_wrap(err, "handle packet %d bytes", nread); - } - if (SrsUdpPacketRecvCycleInterval > 0) { srs_usleep(SrsUdpPacketRecvCycleInterval); } @@ -225,6 +211,31 @@ srs_error_t SrsUdpListener::cycle() return err; } +srs_error_t SrsUdpListener::do_cycle() +{ + srs_error_t err = srs_success; + + int nread = 0; + sockaddr_storage from; + int nb_from = sizeof(from); + if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr *)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) { + return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread); + } + + // Drop UDP health check packet of Aliyun SLB. + // Healthcheck udp check + // @see https://help.aliyun.com/document_detail/27595.html + if (nread == 21 && buf[0] == 0x48 && buf[1] == 0x65 && buf[2] == 0x61 && buf[3] == 0x6c && buf[19] == 0x63 && buf[20] == 0x6b) { + return err; + } + + if ((err = handler->on_udp_packet((const sockaddr *)&from, nb_from, buf, nread)) != srs_success) { + return srs_error_wrap(err, "handle packet %d bytes", nread); + } + + return err; +} + SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h) { handler = h; @@ -303,24 +314,36 @@ srs_error_t SrsTcpListener::cycle() if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "tcp listener"); } - - srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); - if(fd == NULL){ - return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd)); - } - - if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) { - return srs_error_wrap(err, "set closeexec"); - } - - if ((err = handler->on_tcp_client(this, fd)) != srs_success) { - return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd)); + + if ((err = do_cycle()) != srs_success) { + srs_warn("%s listener: Ignore error, %s", label_.c_str(), srs_error_desc(err).c_str()); + srs_freep(err); } } return err; } +srs_error_t SrsTcpListener::do_cycle() +{ + srs_error_t err = srs_success; + + srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); + if (fd == NULL) { + return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd)); + } + + if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) { + return srs_error_wrap(err, "set closeexec"); + } + + if ((err = handler->on_tcp_client(this, fd)) != srs_success) { + return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd)); + } + + return err; +} + SrsMultipleTcpListeners::SrsMultipleTcpListeners(ISrsTcpHandler* h) { handler_ = h; diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 085a15ac0..4e6f752cc 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -103,6 +103,8 @@ public: // Interface ISrsReusableThreadHandler. public: virtual srs_error_t cycle(); +private: + srs_error_t do_cycle(); }; // Bind and listen tcp port, use handler to process the client. @@ -130,6 +132,8 @@ public: // Interface ISrsReusableThreadHandler. public: virtual srs_error_t cycle(); +private: + srs_error_t do_cycle(); }; // Bind and listen tcp port, use handler to process the client. diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index b46b8a1ae..4ad21a11b 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -1574,3 +1574,117 @@ VOID TEST(ThreadCriticalTest, FailIfCloseActiveFD) h.fd = NULL; } +class MockFailedTcpHandler : public ISrsTcpHandler +{ +public: + int connection_count; +public: + MockFailedTcpHandler() { + connection_count = 0; + } + virtual ~MockFailedTcpHandler() { + } +public: + virtual srs_error_t on_tcp_client(ISrsListener *listener, srs_netfd_t stfd) + { + srs_close_stfd(stfd); + connection_count++; + return srs_error_new(ERROR_SYSTEM_ASSERT_FAILED, "Intentional failure for testing error handling"); + } +}; + +VOID TEST(TCPServerTest, ListenerContinuesToAcceptAfterError) +{ + srs_error_t err; + + MockFailedTcpHandler h; + SrsTcpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); + + HELPER_EXPECT_SUCCESS(l.listen()); + + // Simulate multiple failed connection attempts + const int num_attempts = 3; + for (int i = 0; i < num_attempts; i++) + { + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_TRUE(sockfd > 0); + + struct sockaddr_in server_addr; + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = inet_addr(_srs_tmp_host.c_str()); + server_addr.sin_port = htons(_srs_tmp_port); + + int ret = connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)); + ASSERT_TRUE(ret == 0); + + close(sockfd); + srs_usleep(10 * SRS_UTIME_MILLISECONDS); // Wait for listener to process the connection + } + + EXPECT_EQ(h.connection_count, num_attempts); + + l.close(); + +} + +class MockFailedUdpHandler : public ISrsUdpHandler +{ +public: + int packet_count; + +public: + MockFailedUdpHandler() + { + packet_count = 0; + } + virtual ~MockFailedUdpHandler() + { + } + +public: + virtual srs_error_t on_udp_packet(const sockaddr *from, const int fromlen, char *buf, int nb_buf) + { + packet_count++; + return srs_error_new(ERROR_SYSTEM_ASSERT_FAILED, "Intentional failure for testing UDP error handling"); + } +}; + +VOID TEST(UDPServerTest, ListenerContinuesToReceiveAfterError) +{ + srs_error_t err; + + MockFailedUdpHandler h; + SrsUdpListener l(&h); + l.set_endpoint(_srs_tmp_host, _srs_tmp_port); + + HELPER_EXPECT_SUCCESS(l.listen()); + + // Create a UDP socket to send packets to the listener + int sockfd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(sockfd > 0); + + struct sockaddr_in server_addr; + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = inet_addr(_srs_tmp_host.c_str()); + server_addr.sin_port = htons(_srs_tmp_port); + + // Send multiple failed packet attempts + const int num_attempts = 3; + const char *test_data = "test data"; + int data_len = strlen(test_data); + for (int i = 0; i < num_attempts; i++) + { + int ret = sendto(sockfd, test_data, data_len, 0, (struct sockaddr *)&server_addr, sizeof(server_addr)); + ASSERT_TRUE(ret == data_len); + + srs_usleep(10 * SRS_UTIME_MILLISECONDS); // Wait for listener to process the packet + } + + EXPECT_EQ(h.packet_count, num_attempts); + + close(sockfd); + l.close(); +}