From 16319d73fd3a550501b5fc6e9d63f5f8fa6a9290 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 27 Apr 2014 12:28:45 +0800 Subject: [PATCH] implements basic edge(play and publish), with bug --- trunk/src/app/srs_app_edge.cpp | 113 ++++++++++++++++----- trunk/src/app/srs_app_edge.hpp | 6 +- trunk/src/app/srs_app_rtmp_conn.cpp | 24 +++-- trunk/src/kernel/srs_kernel_error.hpp | 1 + trunk/src/rtmp/srs_protocol_rtmp_stack.cpp | 5 + trunk/src/rtmp/srs_protocol_rtmp_stack.hpp | 1 + 6 files changed, 112 insertions(+), 38 deletions(-) diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index f50030294..321825966 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -166,7 +166,7 @@ int SrsEdgeIngester::ingest() // read from client. SrsCommonMessage* msg = NULL; if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) { - srs_error("recv origin server message failed. ret=%d", ret); + srs_error("ingest recv origin server message failed. ret=%d", ret); return ret; } srs_verbose("edge loop recv message. ret=%d", ret); @@ -312,10 +312,13 @@ SrsEdgeProxyContext::SrsEdgeProxyContext() edge_stream_id = 0; edge_io = NULL; edge_rtmp = NULL; + edge_stfd = NULL; + edge_got_message = false; origin_stream_id = 0; origin_io = NULL; origin_rtmp = NULL; + origin_stfd = NULL; } SrsEdgeProxyContext::~SrsEdgeProxyContext() @@ -400,10 +403,21 @@ int SrsEdgeForwarder::proxy(SrsEdgeProxyContext* context) context->origin_io = io; context->origin_rtmp = client; context->origin_stream_id = stream_id; + context->origin_stfd = stfd; - client->set_recv_timeout(SRS_PULSE_TIMEOUT_US); + context->origin_rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US); + context->edge_rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US); + + context->edge_got_message = false; SrsPithyPrint pithy_print(SRS_STAGE_EDGE); + + pollfd fds[2]; + fds[0].fd = st_netfd_fileno(context->edge_stfd); + fds[0].events = POLLIN; + + fds[1].fd = st_netfd_fileno(context->origin_stfd); + fds[1].events = POLLIN; while (true) { // switch to other st-threads. @@ -417,58 +431,101 @@ int SrsEdgeForwarder::proxy(SrsEdgeProxyContext* context) pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); } - if ((ret = proxy_message(context)) != ERROR_SUCCESS) { + fds[0].revents = 0; + fds[1].revents = 0; + + // Upon successful completion, a non-negative value is returned. + // A positive value indicates the total number of OS file descriptors in pds that have events. + // A value of 0 indicates that the call timed out. + // Upon failure, a value of -1 is returned and errno is set to indicate the error + if(st_poll(fds, 2, ST_UTIME_NO_TIMEOUT) <= 0){ + ret = ERROR_RTMP_EDGE_PROXY_PULL; + srs_error("edge wait for st_poll error. ret=%d", ret); return ret; } + + // edge active + if(fds[0].revents & POLLIN){ + if((ret = proxy_edge_message(context)) != ERROR_SUCCESS){ + return ret; + } + } + + // origin active + if(fds[1].revents & POLLIN){ + if((ret = proxy_origin_message(context)) != ERROR_SUCCESS){ + return ret; + } + } } return ret; } -int SrsEdgeForwarder::proxy_message(SrsEdgeProxyContext* context) +int SrsEdgeForwarder::proxy_origin_message(SrsEdgeProxyContext* context) { int ret = ERROR_SUCCESS; SrsCommonMessage* msg = NULL; - // proxy origin message to client - msg = NULL; + // process origin message. ret = context->origin_rtmp->recv_message(&msg); if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { - srs_error("recv origin server message failed. ret=%d", ret); + srs_error("forward recv origin server message failed. ret=%d", ret); return ret; } - if (msg) { - if (msg->size <= 0) { - srs_freep(msg); - } else { - msg->header.stream_id = context->edge_stream_id; - if ((ret = context->edge_rtmp->send_message(msg)) != ERROR_SUCCESS) { - srs_error("send origin message to client failed. ret=%d", ret); - return ret; - } - } + srs_assert(msg); + + if (msg->size <= 0 + || !context->edge_got_message + || msg->header.is_set_chunk_size() + || msg->header.is_window_ackledgement_size() + || msg->header.is_ackledgement() + ) { + srs_freep(msg); + return ret; } + msg->header.stream_id = context->edge_stream_id; + if ((ret = context->edge_rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send origin message to client failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsEdgeForwarder::proxy_edge_message(SrsEdgeProxyContext* context) +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = NULL; + // proxy client message to origin - msg = NULL; ret = context->edge_rtmp->recv_message(&msg); if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { srs_error("recv client message failed. ret=%d", ret); return ret; } - if (msg) { - if (msg->size <= 0) { - srs_freep(msg); - } else { - msg->header.stream_id = context->origin_stream_id; - if ((ret = context->origin_rtmp->send_message(msg)) != ERROR_SUCCESS) { - srs_error("send client message to origin failed. ret=%d", ret); - return ret; - } - } + srs_assert(msg); + + context->edge_got_message = true; + + if (msg->size <= 0 + || msg->header.is_set_chunk_size() + || msg->header.is_window_ackledgement_size() + || msg->header.is_ackledgement() + ) { + srs_freep(msg); + return ret; + } + + msg->header.stream_id = context->origin_stream_id; + if ((ret = context->origin_rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send client message to origin failed. ret=%d", ret); + return ret; } return ret; diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 15663f15d..80e95ffec 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -105,10 +105,13 @@ class SrsEdgeProxyContext { public: int edge_stream_id; + st_netfd_t edge_stfd; ISrsProtocolReaderWriter* edge_io; SrsRtmpServer* edge_rtmp; + bool edge_got_message; public: int origin_stream_id; + st_netfd_t origin_stfd; ISrsProtocolReaderWriter* origin_io; SrsRtmpClient* origin_rtmp; public: @@ -141,7 +144,8 @@ public: public: virtual int proxy(SrsEdgeProxyContext* context); private: - virtual int proxy_message(SrsEdgeProxyContext* context); + virtual int proxy_origin_message(SrsEdgeProxyContext* context); + virtual int proxy_edge_message(SrsEdgeProxyContext* context); virtual void close_underlayer_socket(); virtual int connect_server(); }; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 98ec71d8c..45e0d088a 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -277,17 +277,21 @@ int SrsRtmpConn::stream_service_cycle() } srs_assert(source != NULL); - // check publish available. - if (type != SrsRtmpConnPlay && !source->can_publish()) { - ret = ERROR_SYSTEM_STREAM_BUSY; - srs_warn("stream %s is already publishing. ret=%d", - req->get_stream_url().c_str(), ret); - // to delay request - st_usleep(SRS_STREAM_BUSY_SLEEP_US); - return ret; + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + + // check publish available + // for edge, never check it, for edge use proxy mode. + if (!vhost_is_edge) { + if (type != SrsRtmpConnPlay && !source->can_publish()) { + ret = ERROR_SYSTEM_STREAM_BUSY; + srs_warn("stream %s is already publishing. ret=%d", + req->get_stream_url().c_str(), ret); + // to delay request + st_usleep(SRS_STREAM_BUSY_SLEEP_US); + return ret; + } } - bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); bool enabled_cache = _srs_config->get_gop_cache(req->vhost); srs_trace("source found, url=%s, enabled_cache=%d, edge=%d", req->get_stream_url().c_str(), enabled_cache, vhost_is_edge); @@ -338,6 +342,7 @@ int SrsRtmpConn::stream_service_cycle() context.edge_io = skt; context.edge_stream_id = res->stream_id; context.edge_rtmp = rtmp; + context.edge_stfd = stfd; if (vhost_is_edge) { return source->on_edge_proxy_publish(&context); } @@ -371,6 +376,7 @@ int SrsRtmpConn::stream_service_cycle() context.edge_io = skt; context.edge_stream_id = res->stream_id; context.edge_rtmp = rtmp; + context.edge_stfd = stfd; if (vhost_is_edge) { return source->on_edge_proxy_publish(&context); } diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index c233d6fd4..1e909159b 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -81,6 +81,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_RTMP_EDGE_PLAY_STATE 320 // invalid state for client to publish edge stream. #define ERROR_RTMP_EDGE_PUBLISH_STATE 321 +#define ERROR_RTMP_EDGE_PROXY_PULL 322 #define ERROR_SYSTEM_STREAM_INIT 400 #define ERROR_SYSTEM_PACKET_INVALID 401 diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp index ca71f6bcb..868bc2619 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.cpp @@ -1202,6 +1202,11 @@ bool SrsMessageHeader::is_window_ackledgement_size() return message_type == RTMP_MSG_WindowAcknowledgementSize; } +bool SrsMessageHeader::is_ackledgement() +{ + return message_type == RTMP_MSG_Acknowledgement; +} + bool SrsMessageHeader::is_set_chunk_size() { return message_type == RTMP_MSG_SetChunkSize; diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp index 834ee1579..0b84b5a8b 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp @@ -236,6 +236,7 @@ struct SrsMessageHeader bool is_amf3_command(); bool is_amf3_data(); bool is_window_ackledgement_size(); + bool is_ackledgement(); bool is_set_chunk_size(); bool is_user_control_message();