diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 811928ec2..a9042b580 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -307,23 +307,6 @@ int SrsEdgeIngester::connect_server() return ret; } -SrsEdgeProxyContext::SrsEdgeProxyContext() -{ - edge_stream_id = 0; - edge_io = NULL; - edge_rtmp = NULL; - edge_stfd = NULL; - - origin_stream_id = 0; - origin_io = NULL; - origin_rtmp = NULL; - origin_stfd = NULL; -} - -SrsEdgeProxyContext::~SrsEdgeProxyContext() -{ -} - SrsEdgeForwarder::SrsEdgeForwarder() { io = NULL; @@ -395,129 +378,30 @@ void SrsEdgeForwarder::stop() srs_freep(io); } -int SrsEdgeForwarder::proxy(SrsEdgeProxyContext* context) +int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; - context->origin_io = io; - context->origin_rtmp = client; - context->origin_stream_id = stream_id; - context->origin_stfd = stfd; - - context->origin_rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US); - context->edge_rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US); - - SrsPithyPrint pithy_print(SRS_STAGE_EDGE); - - pollfd fds[2]; - fds[0].fd = st_netfd_fileno(context->edge_stfd); - fds[0].events = POLLIN; - - fds[1].fd = st_netfd_fileno(context->origin_stfd); - fds[1].events = POLLIN; - - while (true) { - // switch to other st-threads. - st_usleep(0); - - pithy_print.elapse(); - - // pithy print - if (pithy_print.can_print()) { - srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); - } - - fds[0].revents = 0; - fds[1].revents = 0; - - // Upon successful completion, a non-negative value is returned. - // A positive value indicates the total number of OS file descriptors in pds that have events. - // A value of 0 indicates that the call timed out. - // Upon failure, a value of -1 is returned and errno is set to indicate the error - if(st_poll(fds, 2, ST_UTIME_NO_TIMEOUT) <= 0){ - ret = ERROR_RTMP_EDGE_PROXY_PULL; - srs_error("edge wait for st_poll error. ret=%d", ret); - return ret; - } - - // edge active - if(fds[0].revents & POLLIN){ - if((ret = proxy_edge_message(context)) != ERROR_SUCCESS){ - return ret; - } - } - - // origin active - if(fds[1].revents & POLLIN){ - if((ret = proxy_origin_message(context)) != ERROR_SUCCESS){ - return ret; - } - } - } - - return ret; -} - -int SrsEdgeForwarder::proxy_origin_message(SrsEdgeProxyContext* context) -{ - int ret = ERROR_SUCCESS; - - SrsCommonMessage* msg = NULL; - - // process origin message. - ret = context->origin_rtmp->recv_message(&msg); - if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { - srs_error("forward recv origin server message failed. ret=%d", ret); - return ret; - } - - srs_assert(msg); - - if (msg->size <= 0 - || msg->header.is_set_chunk_size() - || msg->header.is_window_ackledgement_size() - || msg->header.is_ackledgement() - ) { - srs_freep(msg); - return ret; - } - - msg->header.stream_id = context->edge_stream_id; - if ((ret = context->edge_rtmp->send_message(msg)) != ERROR_SUCCESS) { - srs_error("send origin message to client failed. ret=%d", ret); - return ret; - } - - return ret; -} - -int SrsEdgeForwarder::proxy_edge_message(SrsEdgeProxyContext* context) -{ - int ret = ERROR_SUCCESS; - - SrsCommonMessage* msg = NULL; - - // proxy client message to origin - ret = context->edge_rtmp->recv_message(&msg); - if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { - srs_error("recv client message failed. ret=%d", ret); - return ret; - } - - srs_assert(msg); - + // the msg is auto free by source, + // so we just ignore, or copy then send it. if (msg->size <= 0 || msg->header.is_set_chunk_size() || msg->header.is_window_ackledgement_size() || msg->header.is_ackledgement() ) { - srs_freep(msg); return ret; } - msg->header.stream_id = context->origin_stream_id; - if ((ret = context->origin_rtmp->send_message(msg)) != ERROR_SUCCESS) { + SrsSharedPtrMessage* copy = new SrsSharedPtrMessage(); + SrsAutoFree(SrsSharedPtrMessage, copy, false); + if ((ret = copy->initialize(msg)) != ERROR_SUCCESS) { + srs_error("initialize the msg failed. ret=%d", ret); + return ret; + } + srs_verbose("initialize shared ptr msg success."); + + copy->header.stream_id = stream_id; + if ((ret = client->send_message(copy->copy())) != ERROR_SUCCESS) { srs_error("send client message to origin failed. ret=%d", ret); return ret; } @@ -730,13 +614,18 @@ int SrsPublishEdge::on_client_publish() return forwarder->start(); } -int SrsPublishEdge::on_proxy_publish(SrsEdgeProxyContext* context) +int SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg) { - int ret = forwarder->proxy(context); + return forwarder->proxy(msg); +} + +void SrsPublishEdge::on_proxy_unpublish() +{ + if (state == SrsEdgeStatePublish) { + forwarder->stop(); + } SrsEdgeState pstate = state; state = SrsEdgeStateInit; srs_trace("edge change from %d to state %d (init).", pstate, state); - - return ret; } diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 9be32a525..76874491e 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -101,23 +101,6 @@ private: virtual int process_publish_message(SrsCommonMessage* msg); }; -class SrsEdgeProxyContext -{ -public: - int edge_stream_id; - st_netfd_t edge_stfd; - ISrsProtocolReaderWriter* edge_io; - SrsRtmpServer* edge_rtmp; -public: - int origin_stream_id; - st_netfd_t origin_stfd; - ISrsProtocolReaderWriter* origin_io; - SrsRtmpClient* origin_rtmp; -public: - SrsEdgeProxyContext(); - virtual ~SrsEdgeProxyContext(); -}; - /** * edge used to forward stream to origin. */ @@ -141,10 +124,8 @@ public: virtual int start(); virtual void stop(); public: - virtual int proxy(SrsEdgeProxyContext* context); + virtual int proxy(SrsCommonMessage* msg); private: - virtual int proxy_origin_message(SrsEdgeProxyContext* context); - virtual int proxy_edge_message(SrsEdgeProxyContext* context); virtual void close_underlayer_socket(); virtual int connect_server(); }; @@ -201,7 +182,11 @@ public: /** * proxy publish stream to edge */ - virtual int on_proxy_publish(SrsEdgeProxyContext* context); + virtual int on_proxy_publish(SrsCommonMessage* msg); + /** + * proxy unpublish stream to edge. + */ + virtual void on_proxy_unpublish(); }; #endif diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 45e0d088a..b1b4fb1e9 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -145,7 +145,7 @@ int SrsRtmpConn::do_cycle() req->app.c_str()); ret = service_cycle(); - on_close(); + http_hooks_on_close(); return ret; } @@ -312,14 +312,14 @@ int SrsRtmpConn::stream_service_cycle() srs_error("start to play stream failed. ret=%d", ret); return ret; } - if ((ret = on_play()) != ERROR_SUCCESS) { + if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) { srs_error("http hook on_play failed. ret=%d", ret); return ret; } srs_info("start to play stream %s success", req->stream.c_str()); ret = playing(source); - on_stop(); + http_hooks_on_stop(); return ret; } @@ -338,23 +338,23 @@ int SrsRtmpConn::stream_service_cycle() return ret; } - SrsEdgeProxyContext context; - context.edge_io = skt; - context.edge_stream_id = res->stream_id; - context.edge_rtmp = rtmp; - context.edge_stfd = stfd; - if (vhost_is_edge) { - return source->on_edge_proxy_publish(&context); - } - - if ((ret = on_publish()) != ERROR_SUCCESS) { + if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { srs_error("http hook on_publish failed. ret=%d", ret); return ret; } + srs_info("start to publish stream %s success", req->stream.c_str()); ret = fmle_publish(source); - source->on_unpublish(); - on_unpublish(); + + // when edge, notice edge to change state. + // when origin, notice all service to unpublish. + if (vhost_is_edge) { + source->on_edge_proxy_unpublish(); + } else { + source->on_unpublish(); + } + + http_hooks_on_unpublish(); return ret; } case SrsRtmpConnFlashPublish: { @@ -372,23 +372,23 @@ int SrsRtmpConn::stream_service_cycle() return ret; } - SrsEdgeProxyContext context; - context.edge_io = skt; - context.edge_stream_id = res->stream_id; - context.edge_rtmp = rtmp; - context.edge_stfd = stfd; - if (vhost_is_edge) { - return source->on_edge_proxy_publish(&context); - } - - if ((ret = on_publish()) != ERROR_SUCCESS) { + if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) { srs_error("http hook on_publish failed. ret=%d", ret); return ret; } + srs_info("flash start to publish stream %s success", req->stream.c_str()); ret = flash_publish(source); - source->on_unpublish(); - on_unpublish(); + + // when edge, notice edge to change state. + // when origin, notice all service to unpublish. + if (vhost_is_edge) { + source->on_edge_proxy_unpublish(); + } else { + source->on_unpublish(); + } + + http_hooks_on_unpublish(); return ret; } default: { @@ -431,7 +431,7 @@ int SrsRtmpConn::check_vhost() } srs_verbose("check refer success."); - if ((ret = on_connect()) != ERROR_SUCCESS) { + if ((ret = http_hooks_on_connect()) != ERROR_SUCCESS) { return ret; } @@ -562,6 +562,8 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) } srs_verbose("fmle hls on_publish success."); + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + while (true) { // switch to other st-threads. st_usleep(0); @@ -604,7 +606,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) } // video, audio, data message - if ((ret = process_publish_message(source, msg)) != ERROR_SUCCESS) { + if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) { srs_error("fmle process publish message failed. ret=%d", ret); return ret; } @@ -632,6 +634,8 @@ int SrsRtmpConn::flash_publish(SrsSource* source) } srs_verbose("flash hls on_publish success."); + bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); + while (true) { // switch to other st-threads. st_usleep(0); @@ -668,7 +672,7 @@ int SrsRtmpConn::flash_publish(SrsSource* source) } // video, audio, data message - if ((ret = process_publish_message(source, msg)) != ERROR_SUCCESS) { + if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) { srs_error("flash process publish message failed. ret=%d", ret); return ret; } @@ -677,10 +681,15 @@ int SrsRtmpConn::flash_publish(SrsSource* source) return ret; } -int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg) +int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge) { int ret = ERROR_SUCCESS; + // for edge, directly proxy message to origin. + if (vhost_is_edge) { + return source->on_edge_proxy_publish(msg); + } + // process audio packet if (msg->header.is_audio()) { if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) { @@ -771,7 +780,7 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag return ret; } -int SrsRtmpConn::on_connect() +int SrsRtmpConn::http_hooks_on_connect() { int ret = ERROR_SUCCESS; @@ -795,7 +804,7 @@ int SrsRtmpConn::on_connect() return ret; } -void SrsRtmpConn::on_close() +void SrsRtmpConn::http_hooks_on_close() { #ifdef SRS_AUTO_HTTP_CALLBACK // whatever the ret code, notify the api hooks. @@ -813,7 +822,7 @@ void SrsRtmpConn::on_close() #endif } -int SrsRtmpConn::on_publish() +int SrsRtmpConn::http_hooks_on_publish() { int ret = ERROR_SUCCESS; @@ -837,7 +846,7 @@ int SrsRtmpConn::on_publish() return ret; } -void SrsRtmpConn::on_unpublish() +void SrsRtmpConn::http_hooks_on_unpublish() { #ifdef SRS_AUTO_HTTP_CALLBACK // whatever the ret code, notify the api hooks. @@ -855,7 +864,7 @@ void SrsRtmpConn::on_unpublish() #endif } -int SrsRtmpConn::on_play() +int SrsRtmpConn::http_hooks_on_play() { int ret = ERROR_SUCCESS; @@ -879,7 +888,7 @@ int SrsRtmpConn::on_play() return ret; } -void SrsRtmpConn::on_stop() +void SrsRtmpConn::http_hooks_on_stop() { #ifdef SRS_AUTO_HTTP_CALLBACK // whatever the ret code, notify the api hooks. diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index d5e334caf..802daf6fd 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -80,15 +80,15 @@ private: virtual int playing(SrsSource* source); virtual int fmle_publish(SrsSource* source); virtual int flash_publish(SrsSource* source); - virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg); + virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge); virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); private: - virtual int on_connect(); - virtual void on_close(); - virtual int on_publish(); - virtual void on_unpublish(); - virtual int on_play(); - virtual void on_stop(); + virtual int http_hooks_on_connect(); + virtual void http_hooks_on_close(); + virtual int http_hooks_on_publish(); + virtual void http_hooks_on_unpublish(); + virtual int http_hooks_on_play(); + virtual void http_hooks_on_stop(); }; #endif diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 41a1ad4c8..50e6f1d22 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1199,9 +1199,14 @@ int SrsSource::on_edge_start_publish() return publish_edge->on_client_publish(); } -int SrsSource::on_edge_proxy_publish(SrsEdgeProxyContext* context) +int SrsSource::on_edge_proxy_publish(SrsCommonMessage* msg) { - return publish_edge->on_proxy_publish(context); + return publish_edge->on_proxy_publish(msg); +} + +void SrsSource::on_edge_proxy_unpublish() +{ + publish_edge->on_proxy_unpublish(); } int SrsSource::create_forwarders() diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 2f8a6a477..25badf2a2 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -322,7 +322,9 @@ public: // for edge, when publish edge stream, check the state virtual int on_edge_start_publish(); // for edge, proxy the publish - virtual int on_edge_proxy_publish(SrsEdgeProxyContext* context); + virtual int on_edge_proxy_publish(SrsCommonMessage* msg); + // for edge, proxy stop publish + virtual void on_edge_proxy_unpublish(); private: virtual int create_forwarders(); virtual void destroy_forwarders();