From 3d57c1c9bc1fbf1aef655ce2871fc8ff9236d91a Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 5 May 2019 08:45:11 +0800 Subject: [PATCH] Refine ST service. --- trunk/src/app/srs_app_listener.cpp | 129 +++++---------------------- trunk/src/app/srs_app_listener.hpp | 6 +- trunk/src/service/srs_service_st.cpp | 117 +++++++++++++++++++++++- trunk/src/service/srs_service_st.hpp | 10 ++- 4 files changed, 145 insertions(+), 117 deletions(-) diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index e8a520164..0c6699117 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -46,9 +46,6 @@ using namespace std; // sleep in srs_utime_t for udp recv packet. #define SrsUdpPacketRecvCycleInterval 0 -// nginx also set to 512 -#define SERVER_LISTEN_BACKLOG 512 - ISrsUdpHandler::ISrsUdpHandler() { } @@ -75,9 +72,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p) handler = h; ip = i; port = p; - - _fd = -1; - _stfd = NULL; + lfd = NULL; nb_buf = SRS_UDP_MAX_PACKET_SIZE; buf = new char[nb_buf]; @@ -87,65 +82,27 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p) SrsUdpListener::~SrsUdpListener() { - // close the stfd to trigger thread to interrupted. - srs_close_stfd(_stfd); - srs_freep(trd); - - // st does not close it sometimes, - // close it manually. - close(_fd); - + srs_close_stfd(lfd); srs_freepa(buf); } int SrsUdpListener::fd() { - return _fd; + return srs_netfd_fileno(lfd); } srs_netfd_t SrsUdpListener::stfd() { - return _stfd; + return lfd; } srs_error_t SrsUdpListener::listen() { srs_error_t err = srs_success; - - char sport[8]; - snprintf(sport, sizeof(sport), "%d", port); - - addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_flags = AI_NUMERICHOST; - - addrinfo* r = NULL; - SrsAutoFree(addrinfo, r); - if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) { - return srs_error_new(ERROR_SYSTEM_IP_INVALID, "get address info"); - } - - if ((_fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) { - return srs_error_new(ERROR_SOCKET_CREATE, "create socket. ip=%s, port=%d", ip.c_str(), port); - } - if ((err = srs_fd_closeexec(_fd)) != srs_success) { - return srs_error_wrap(err, "set closeexec"); - } - - if ((err = srs_fd_reuseaddr(_fd)) != srs_success) { - return srs_error_wrap(err, "set reuseaddr"); - } - - if (bind(_fd, r->ai_addr, r->ai_addrlen) == -1) { - return srs_error_new(ERROR_SOCKET_BIND, "bind socket. ep=%s:%d", ip.c_str(), port);; - } - - if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){ - return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket"); + if ((err = srs_udp_listen(ip, port, &lfd)) != srs_success) { + return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); } srs_freep(trd); @@ -165,12 +122,11 @@ srs_error_t SrsUdpListener::cycle() if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "udp listener"); } - + + int nread = 0; sockaddr_storage from; int nb_from = sizeof(from); - int nread = 0; - - if ((nread = srs_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) { + if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) { return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread); } @@ -191,9 +147,8 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p) handler = h; ip = i; port = p; - - _fd = -1; - _stfd = NULL; + + lfd = NULL; trd = new SrsDummyCoroutine(); } @@ -201,62 +156,20 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p) SrsTcpListener::~SrsTcpListener() { srs_freep(trd); - - srs_close_stfd(_stfd); + srs_close_stfd(lfd); } int SrsTcpListener::fd() { - return _fd; + return srs_netfd_fileno(lfd);; } srs_error_t SrsTcpListener::listen() { srs_error_t err = srs_success; - - char sport[8]; - snprintf(sport, sizeof(sport), "%d", port); - - addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_NUMERICHOST; - - addrinfo* r = NULL; - SrsAutoFree(addrinfo, r); - if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) { - return srs_error_new(ERROR_SYSTEM_IP_INVALID, "get address info"); - } - - if ((_fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) { - return srs_error_new(ERROR_SOCKET_CREATE, "create socket. ip=%s, port=%d", ip.c_str(), port); - } - - // Detect alive for TCP connection. - // @see https://github.com/ossrs/srs/issues/1044 - if ((err = srs_fd_keepalive(_fd)) != srs_success) { - return srs_error_wrap(err, "set keepalive"); - } - if ((err = srs_fd_closeexec(_fd)) != srs_success) { - return srs_error_wrap(err, "set closeexec"); - } - - if ((err = srs_fd_reuseaddr(_fd)) != srs_success) { - return srs_error_wrap(err, "set reuseaddr"); - } - - if (bind(_fd, r->ai_addr, r->ai_addrlen) == -1) { - return srs_error_new(ERROR_SOCKET_BIND, "bind socket. ep=%s:%d", ip.c_str(), port);; - } - - if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) { - return srs_error_new(ERROR_SOCKET_LISTEN, "listen socket"); - } - - if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){ - return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket"); + if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) { + return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port); } srs_freep(trd); @@ -277,19 +190,17 @@ srs_error_t SrsTcpListener::cycle() return srs_error_wrap(err, "tcp listener"); } - srs_netfd_t cstfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); - if(cstfd == NULL){ + srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); + if(fd == NULL){ return srs_error_new(ERROR_SOCKET_CREATE, "accept failed"); } - int cfd = srs_netfd_fileno(cstfd); - - if ((err = srs_fd_closeexec(cfd)) != srs_success) { + if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) { return srs_error_wrap(err, "set closeexec"); } - if ((err = handler->on_tcp_client(cstfd)) != srs_success) { - return srs_error_wrap(err, "handle fd=%d", cfd); + if ((err = handler->on_tcp_client(fd)) != srs_success) { + return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd)); } } diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index 538ecf1b9..99da48bda 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -69,8 +69,7 @@ public: class SrsUdpListener : public ISrsCoroutineHandler { private: - int _fd; - srs_netfd_t _stfd; + srs_netfd_t lfd; SrsCoroutine* trd; private: char* buf; @@ -96,8 +95,7 @@ public: class SrsTcpListener : public ISrsCoroutineHandler { private: - int _fd; - srs_netfd_t _stfd; + srs_netfd_t lfd; SrsCoroutine* trd; private: ISrsTcpHandler* handler; diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index 1e9644f4b..f25cf20d8 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -35,6 +35,9 @@ using namespace std; #include #include +// nginx also set to 512 +#define SERVER_LISTEN_BACKLOG 512 + #ifdef __linux__ #include @@ -124,7 +127,7 @@ srs_thread_t srs_thread_self() return (srs_thread_t)st_thread_self(); } -srs_error_t srs_socket_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd) +srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd) { st_utime_t timeout = ST_UTIME_NO_TIMEOUT; if (tm != SRS_UTIME_NO_TIMEOUT) { @@ -169,6 +172,116 @@ srs_error_t srs_socket_connect(string server, int port, srs_utime_t tm, srs_netf return srs_success; } +srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd) +{ + srs_error_t err = srs_success; + + char sport[8]; + snprintf(sport, sizeof(sport), "%d", port); + + addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_NUMERICHOST; + + addrinfo* r = NULL; + SrsAutoFree(addrinfo, r); + if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) { + return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)", + hints.ai_family, hints.ai_socktype, hints.ai_flags); + } + + int fd = 0; + if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) { + return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d", + r->ai_family, r->ai_socktype, r->ai_protocol); + } + + // Detect alive for TCP connection. + // @see https://github.com/ossrs/srs/issues/1044 + if ((err = srs_fd_keepalive(fd)) != srs_success) { + ::close(fd); + return srs_error_wrap(err, "set keepalive fd=%d", fd); + } + + if ((err = srs_fd_closeexec(fd)) != srs_success) { + ::close(fd); + return srs_error_wrap(err, "set closeexec fd=%d", fd); + } + + if ((err = srs_fd_reuseaddr(fd)) != srs_success) { + ::close(fd); + return srs_error_wrap(err, "set reuseaddr fd=%d", fd); + } + + if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) { + ::close(fd); + return srs_error_new(ERROR_SOCKET_BIND, "bind fd=%d", fd); + } + + if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) { + ::close(fd); + return srs_error_new(ERROR_SOCKET_LISTEN, "listen fd=%d", fd); + } + + if ((*pfd = srs_netfd_open_socket(fd)) == NULL){ + ::close(fd); + return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open fd=%d", fd); + } + + return err; +} + +srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd) +{ + srs_error_t err = srs_success; + + char sport[8]; + snprintf(sport, sizeof(sport), "%d", port); + + addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_flags = AI_NUMERICHOST; + + addrinfo* r = NULL; + SrsAutoFree(addrinfo, r); + if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) { + return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)", + hints.ai_family, hints.ai_socktype, hints.ai_flags); + } + + int fd = 0; + if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) { + return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d", + r->ai_family, r->ai_socktype, r->ai_protocol); + } + + if ((err = srs_fd_closeexec(fd)) != srs_success) { + ::close(fd); + return srs_error_wrap(err, "set closeexec fd=%d", fd); + } + + if ((err = srs_fd_reuseaddr(fd)) != srs_success) { + ::close(fd); + return srs_error_wrap(err, "set reuseaddr fd=%d", fd); + } + + if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) { + ::close(fd); + return srs_error_new(ERROR_SOCKET_BIND, "bind fd=%d", fd); + } + + if ((*pfd = srs_netfd_open_socket(fd)) == NULL){ + ::close(fd); + return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open fd=%d", fd); + } + + return err; +} + srs_cond_t srs_cond_new() { return (srs_cond_t)st_cond_new(); @@ -459,7 +572,7 @@ srs_error_t SrsTcpClient::connect() close(); srs_assert(stfd == NULL); - if ((err = srs_socket_connect(host, port, timeout, &stfd)) != srs_success) { + if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) { return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout)); } diff --git a/trunk/src/service/srs_service_st.hpp b/trunk/src/service/srs_service_st.hpp index eaca10955..18b0837fe 100644 --- a/trunk/src/service/srs_service_st.hpp +++ b/trunk/src/service/srs_service_st.hpp @@ -55,9 +55,15 @@ extern srs_error_t srs_fd_keepalive(int fd); // Get current coroutine/thread. extern srs_thread_t srs_thread_self(); -// client open socket and connect to server. +// For client, to open socket and connect to server. // @param tm The timeout in srs_utime_t. -extern srs_error_t srs_socket_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd); +extern srs_error_t srs_tcp_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd); + +// For server, listen at TCP endpoint. +extern srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd); + +// For server, listen at UDP endpoint. +extern srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd); // Wrap for coroutine. extern srs_cond_t srs_cond_new();