TCP|UDP Listener: Fix coroutine exit when err happens (#4613) (#4615)

Try to fix #4613
This commit is contained in:
Jacob Su 2026-01-24 20:16:49 +08:00 committed by GitHub
parent 1316ceb7e9
commit 480d6d5f5a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 170 additions and 29 deletions

View File

@ -198,25 +198,11 @@ srs_error_t SrsUdpListener::cycle()
return srs_error_wrap(err, "udp listener");
}
int nread = 0;
sockaddr_storage from;
int nb_from = sizeof(from);
if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread);
if ((err = do_cycle()) != srs_success) {
srs_warn("%s listener: Ignore error, %s", label_.c_str(), srs_error_desc(err).c_str());
srs_freep(err);
}
// Drop UDP health check packet of Aliyun SLB.
// Healthcheck udp check
// @see https://help.aliyun.com/document_detail/27595.html
if (nread == 21 && buf[0] == 0x48 && buf[1] == 0x65 && buf[2] == 0x61 && buf[3] == 0x6c
&& buf[19] == 0x63 && buf[20] == 0x6b) {
continue;
}
if ((err = handler->on_udp_packet((const sockaddr*)&from, nb_from, buf, nread)) != srs_success) {
return srs_error_wrap(err, "handle packet %d bytes", nread);
}
if (SrsUdpPacketRecvCycleInterval > 0) {
srs_usleep(SrsUdpPacketRecvCycleInterval);
}
@ -225,6 +211,31 @@ srs_error_t SrsUdpListener::cycle()
return err;
}
srs_error_t SrsUdpListener::do_cycle()
{
srs_error_t err = srs_success;
int nread = 0;
sockaddr_storage from;
int nb_from = sizeof(from);
if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr *)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread);
}
// Drop UDP health check packet of Aliyun SLB.
// Healthcheck udp check
// @see https://help.aliyun.com/document_detail/27595.html
if (nread == 21 && buf[0] == 0x48 && buf[1] == 0x65 && buf[2] == 0x61 && buf[3] == 0x6c && buf[19] == 0x63 && buf[20] == 0x6b) {
return err;
}
if ((err = handler->on_udp_packet((const sockaddr *)&from, nb_from, buf, nread)) != srs_success) {
return srs_error_wrap(err, "handle packet %d bytes", nread);
}
return err;
}
SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h)
{
handler = h;
@ -303,24 +314,36 @@ srs_error_t SrsTcpListener::cycle()
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "tcp listener");
}
srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if(fd == NULL){
return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
}
if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
if ((err = handler->on_tcp_client(this, fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
if ((err = do_cycle()) != srs_success) {
srs_warn("%s listener: Ignore error, %s", label_.c_str(), srs_error_desc(err).c_str());
srs_freep(err);
}
}
return err;
}
srs_error_t SrsTcpListener::do_cycle()
{
srs_error_t err = srs_success;
srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if (fd == NULL) {
return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
}
if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
if ((err = handler->on_tcp_client(this, fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
}
return err;
}
SrsMultipleTcpListeners::SrsMultipleTcpListeners(ISrsTcpHandler* h)
{
handler_ = h;

View File

@ -103,6 +103,8 @@ public:
// Interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
private:
srs_error_t do_cycle();
};
// Bind and listen tcp port, use handler to process the client.
@ -130,6 +132,8 @@ public:
// Interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
private:
srs_error_t do_cycle();
};
// Bind and listen tcp port, use handler to process the client.

View File

@ -1574,3 +1574,117 @@ VOID TEST(ThreadCriticalTest, FailIfCloseActiveFD)
h.fd = NULL;
}
class MockFailedTcpHandler : public ISrsTcpHandler
{
public:
int connection_count;
public:
MockFailedTcpHandler() {
connection_count = 0;
}
virtual ~MockFailedTcpHandler() {
}
public:
virtual srs_error_t on_tcp_client(ISrsListener *listener, srs_netfd_t stfd)
{
srs_close_stfd(stfd);
connection_count++;
return srs_error_new(ERROR_SYSTEM_ASSERT_FAILED, "Intentional failure for testing error handling");
}
};
VOID TEST(TCPServerTest, ListenerContinuesToAcceptAfterError)
{
srs_error_t err;
MockFailedTcpHandler h;
SrsTcpListener l(&h);
l.set_endpoint(_srs_tmp_host, _srs_tmp_port);
HELPER_EXPECT_SUCCESS(l.listen());
// Simulate multiple failed connection attempts
const int num_attempts = 3;
for (int i = 0; i < num_attempts; i++)
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
ASSERT_TRUE(sockfd > 0);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(_srs_tmp_host.c_str());
server_addr.sin_port = htons(_srs_tmp_port);
int ret = connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
ASSERT_TRUE(ret == 0);
close(sockfd);
srs_usleep(10 * SRS_UTIME_MILLISECONDS); // Wait for listener to process the connection
}
EXPECT_EQ(h.connection_count, num_attempts);
l.close();
}
class MockFailedUdpHandler : public ISrsUdpHandler
{
public:
int packet_count;
public:
MockFailedUdpHandler()
{
packet_count = 0;
}
virtual ~MockFailedUdpHandler()
{
}
public:
virtual srs_error_t on_udp_packet(const sockaddr *from, const int fromlen, char *buf, int nb_buf)
{
packet_count++;
return srs_error_new(ERROR_SYSTEM_ASSERT_FAILED, "Intentional failure for testing UDP error handling");
}
};
VOID TEST(UDPServerTest, ListenerContinuesToReceiveAfterError)
{
srs_error_t err;
MockFailedUdpHandler h;
SrsUdpListener l(&h);
l.set_endpoint(_srs_tmp_host, _srs_tmp_port);
HELPER_EXPECT_SUCCESS(l.listen());
// Create a UDP socket to send packets to the listener
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(sockfd > 0);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(_srs_tmp_host.c_str());
server_addr.sin_port = htons(_srs_tmp_port);
// Send multiple failed packet attempts
const int num_attempts = 3;
const char *test_data = "test data";
int data_len = strlen(test_data);
for (int i = 0; i < num_attempts; i++)
{
int ret = sendto(sockfd, test_data, data_len, 0, (struct sockaddr *)&server_addr, sizeof(server_addr));
ASSERT_TRUE(ret == data_len);
srs_usleep(10 * SRS_UTIME_MILLISECONDS); // Wait for listener to process the packet
}
EXPECT_EQ(h.packet_count, num_attempts);
close(sockfd);
l.close();
}