From 492665e166fb0ee533f6af684f0b75f44ef5c080 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 29 Jun 2014 14:39:56 +0800 Subject: [PATCH] refine tcp client connect, extract to utility srs_socket_connect. 0.9.141 --- trunk/src/app/srs_app_edge.cpp | 93 +++++---------------------- trunk/src/app/srs_app_forward.cpp | 43 +++---------- trunk/src/app/srs_app_http_client.cpp | 40 ++++-------- trunk/src/app/srs_app_rtmp_conn.cpp | 39 ++--------- trunk/src/app/srs_app_utility.cpp | 52 +++++++++++++++ trunk/src/app/srs_app_utility.hpp | 5 ++ trunk/src/core/srs_core.hpp | 2 +- 7 files changed, 100 insertions(+), 174 deletions(-) diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index d934019dc..292eb0065 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -45,6 +45,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // when error, edge ingester sleep for a while and retry. #define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL) @@ -292,51 +293,24 @@ int SrsEdgeIngester::connect_server() server = server.substr(0, pos); port = ::atoi(s_port.c_str()); } - - // open socket. - // TODO: FIXME: extract utility method - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1){ - ret = ERROR_SOCKET_CREATE; - srs_error("create socket error. ret=%d", ret); - return ret; - } - srs_assert(!stfd); - stfd = st_netfd_open_socket(sock); - if(stfd == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket failed. ret=%d", ret); + // open socket. + int64_t timeout = SRS_EDGE_INGESTER_TIMEOUT_US; + if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) { + srs_warn("edge ingester failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", + _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret); return ret; } srs_freep(client); srs_freep(io); + srs_assert(stfd); io = new SrsSocket(stfd); client = new SrsRtmpClient(io); + kbps->set_io(io, io); - // connect to server. - std::string ip = srs_dns_resolve(server); - if (ip.empty()) { - ret = ERROR_SYSTEM_IP_INVALID; - srs_error("dns resolve server error, ip empty. ret=%d", ret); - return ret; - } - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(ip.c_str()); - - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_EDGE_INGESTER_TIMEOUT_US) == -1){ - ret = ERROR_ST_CONNECT; - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); - return ret; - } - srs_info("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); - srs_trace("edge connected, can_publish=%d, url=%s/%s, server=%s:%d", _source->can_publish(), _req->tcUrl.c_str(), _req->stream.c_str(), server.c_str(), port); @@ -575,52 +549,17 @@ int SrsEdgeForwarder::connect_server() port = ::atoi(s_port.c_str()); } + // open socket. + int64_t timeout = SRS_EDGE_FORWARDER_TIMEOUT_US; + if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) { + srs_warn("edge forwarder failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", + _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port, timeout, ret); + return ret; + } + // open socket. srs_trace("connect edge stream=%s, tcUrl=%s to server=%s, port=%d", _req->stream.c_str(), _req->tcUrl.c_str(), server.c_str(), port); - - // TODO: FIXME: extract utility method - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1){ - ret = ERROR_SOCKET_CREATE; - srs_error("create socket error. ret=%d", ret); - return ret; - } - - srs_assert(!stfd); - stfd = st_netfd_open_socket(sock); - if(stfd == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket failed. ret=%d", ret); - return ret; - } - - srs_freep(client); - srs_freep(io); - - io = new SrsSocket(stfd); - client = new SrsRtmpClient(io); - kbps->set_io(io, io); - - // connect to server. - std::string ip = srs_dns_resolve(server); - if (ip.empty()) { - ret = ERROR_SYSTEM_IP_INVALID; - srs_error("dns resolve server error, ip empty. ret=%d", ret); - return ret; - } - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(ip.c_str()); - - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_EDGE_FORWARDER_TIMEOUT_US) == -1){ - ret = ERROR_ST_CONNECT; - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); - return ret; - } - srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); return ret; } diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 14c3686af..c6605c46f 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -42,6 +42,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // when error, forwarder sleep for a while and retry. #define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL) @@ -255,50 +256,24 @@ int SrsForwarder::connect_server() close_underlayer_socket(); // open socket. - srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d", - stream_name.c_str(), tc_url.c_str(), server.c_str(), port); - - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1){ - ret = ERROR_SOCKET_CREATE; - srs_error("create socket error. ret=%d", ret); - return ret; - } - - srs_assert(!stfd); - stfd = st_netfd_open_socket(sock); - if(stfd == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket failed. ret=%d", ret); + int64_t timeout = SRS_FORWARDER_SLEEP_US; + if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) { + srs_warn("forward failed, stream=%s, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", + stream_name.c_str(), tc_url.c_str(), server.c_str(), port, timeout, ret); return ret; } srs_freep(client); srs_freep(io); + srs_assert(stfd); io = new SrsSocket(stfd); client = new SrsRtmpClient(io); + kbps->set_io(io, io); - // connect to server. - std::string ip = srs_dns_resolve(server); - if (ip.empty()) { - ret = ERROR_SYSTEM_IP_INVALID; - srs_error("dns resolve server error, ip empty. ret=%d", ret); - return ret; - } - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(ip.c_str()); - - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_FORWARDER_SLEEP_US) == -1){ - ret = ERROR_ST_CONNECT; - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); - return ret; - } - srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); + srs_trace("forward connected, stream=%s, tcUrl=%s to server=%s, port=%d", + stream_name.c_str(), tc_url.c_str(), server.c_str(), port); return ret; } diff --git a/trunk/src/app/srs_app_http_client.cpp b/trunk/src/app/srs_app_http_client.cpp index f4e6e067a..42a895b9c 100644 --- a/trunk/src/app/srs_app_http_client.cpp +++ b/trunk/src/app/srs_app_http_client.cpp @@ -34,6 +34,10 @@ using namespace std; #include #include #include +#include + +// when error, http client sleep for a while and retry. +#define SRS_HTTP_CLIENT_SLEEP_US (int64_t)(3*1000*1000LL) SrsHttpClient::SrsHttpClient() { @@ -127,36 +131,14 @@ int SrsHttpClient::connect(SrsHttpUri* uri) disconnect(); - std::string ip = srs_dns_resolve(uri->get_host()); - if (ip.empty()) { - ret = ERROR_SYSTEM_IP_INVALID; - srs_error("dns resolve server error, ip empty. ret=%d", ret); - return ret; - } - - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1){ - ret = ERROR_SOCKET_CREATE; - srs_error("create socket error. ret=%d", ret); - return ret; - } + std::string server = uri->get_host(); + int port = uri->get_port(); - stfd = st_netfd_open_socket(sock); - if(stfd == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket failed. ret=%d", ret); - return ret; - } - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(uri->get_port()); - addr.sin_addr.s_addr = inet_addr(ip.c_str()); - - if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ - ret = ERROR_ST_CONNECT; - srs_error("connect to server error. " - "ip=%s, port=%d, ret=%d", ip.c_str(), uri->get_port(), ret); + // open socket. + int64_t timeout = SRS_HTTP_CLIENT_SLEEP_US; + if ((ret = srs_socket_connect(server, port, timeout, &stfd)) != ERROR_SUCCESS) { + srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", + server.c_str(), port, timeout, ret); return ret; } srs_info("connect to server success. " diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 25feb98bc..80b9d1371 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -52,6 +52,7 @@ using namespace std; #include #include #include +#include // when stream is busy, for example, streaming is already // publishing, when a new client to request to publish, @@ -921,7 +922,6 @@ int SrsRtmpConn::check_edge_token_traverse_auth() return ret; } -// TODO: FIXME: refine the connect server serials functions. int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock) { int ret = ERROR_SUCCESS; @@ -942,39 +942,12 @@ int SrsRtmpConn::connect_server(int origin_index, st_netfd_t* pstsock) port = ::atoi(s_port.c_str()); } - // connect to server. - std::string ip = srs_dns_resolve(server); - if (ip.empty()) { - ret = ERROR_SYSTEM_IP_INVALID; - srs_error("dns resolve server error, ip empty. ret=%d", ret); - return ret; - } - // open socket. - // TODO: FIXME: extract utility method - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1){ - ret = ERROR_SOCKET_CREATE; - srs_error("create socket error. ret=%d", ret); - return ret; - } - - st_netfd_t stsock = st_netfd_open_socket(sock); - if(stsock == NULL){ - ret = ERROR_ST_OPEN_SOCKET; - srs_error("st_netfd_open_socket failed. ret=%d", ret); - return ret; - } - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(ip.c_str()); - - if (st_connect(stsock, (const struct sockaddr*)&addr, sizeof(sockaddr_in), SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US) == -1){ - ret = ERROR_ST_CONNECT; - srs_close_stfd(stsock); - srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); + st_netfd_t stsock = NULL; + int64_t timeout = SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT_US; + if ((ret = srs_socket_connect(server, port, timeout, &stsock)) != ERROR_SUCCESS) { + srs_warn("edge token traverse failed, tcUrl=%s to server=%s, port=%d, timeout=%"PRId64", ret=%d", + req->tcUrl.c_str(), server.c_str(), port, timeout, ret); return ret; } srs_info("edge token auth connected, url=%s/%s, server=%s:%d", req->tcUrl.c_str(), req->stream.c_str(), server.c_str(), port); diff --git a/trunk/src/app/srs_app_utility.cpp b/trunk/src/app/srs_app_utility.cpp index d99683706..9ad8e0106 100644 --- a/trunk/src/app/srs_app_utility.cpp +++ b/trunk/src/app/srs_app_utility.cpp @@ -39,6 +39,58 @@ using namespace std; #define SRS_LOCAL_LOOP_IP "127.0.0.1" +int srs_socket_connect(std::string server, int port, int64_t timeout, st_netfd_t* pstfd) +{ + int ret = ERROR_SUCCESS; + + *pstfd = NULL; + st_netfd_t stfd = NULL; + sockaddr_in addr; + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + srs_assert(!stfd); + stfd = st_netfd_open_socket(sock); + if(stfd == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + // connect to server. + std::string ip = srs_dns_resolve(server); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + goto failed; + } + + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), timeout) == -1){ + ret = ERROR_ST_CONNECT; + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); + goto failed; + } + srs_info("connect ok. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); + + *pstfd = stfd; + return ret; + +failed: + if (stfd) { + srs_close_stfd(stfd); + } + return ret; +} + int srs_get_log_level(std::string level) { if ("verbose" == _srs_config->get_log_level()) { diff --git a/trunk/src/app/srs_app_utility.hpp b/trunk/src/app/srs_app_utility.hpp index a92b23d44..3eca75207 100644 --- a/trunk/src/app/srs_app_utility.hpp +++ b/trunk/src/app/srs_app_utility.hpp @@ -36,8 +36,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include + class SrsKbps; +// client open socket and connect to server. +extern int srs_socket_connect(std::string server, int port, int64_t timeout, st_netfd_t* pstfd); + /** * convert level in string to log level in int. * @return the log level defined in SrsLogLevel. diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 47425014c..d3a58f220 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR "0" #define VERSION_MINOR "9" -#define VERSION_REVISION "140" +#define VERSION_REVISION "141" #define RTMP_SIG_SRS_VERSION VERSION_MAJOR"."VERSION_MINOR"."VERSION_REVISION // server info. #define RTMP_SIG_SRS_KEY "SRS"