diff --git a/README.md b/README.md index 04ca05f5f..498265693 100755 --- a/README.md +++ b/README.md @@ -348,6 +348,7 @@ Remark: ### SRS 2.0 history +* v2.0, 2015-07-16, for [#441](https://github.com/simple-rtmp-server/srs/issues/441) use 30s timeout for first msg. 2.0.178 * v2.0, 2015-07-14, refine hls disable the time jitter, support not mix monotonically increase. 2.0.177 * v2.0, 2015-07-01, fix [#433](https://github.com/simple-rtmp-server/srs/issues/433) fix the sps parse bug. 2.0.176 * v2.0, 2015-06-10, fix [#425](https://github.com/simple-rtmp-server/srs/issues/425) refine the time jitter, correct (-inf,-250)+(250,+inf) to 10ms. 2.0.175 diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 5d99d9917..cc41986cb 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#define SRS_STREAM_CACHE_CYCLE_SECONDS 30 + #if defined(SRS_AUTO_HTTP_CORE) #include @@ -63,6 +65,9 @@ SrsStreamCache::SrsStreamCache(SrsSource* s, SrsRequest* r) source = s; queue = new SrsMessageQueue(true); pthread = new SrsEndlessThread("http-stream", this); + + // TODO: FIXME: support reload. + fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); } SrsStreamCache::~SrsStreamCache() @@ -82,8 +87,6 @@ int SrsStreamCache::dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jit { int ret = ERROR_SUCCESS; - double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); - if (fast_cache <= 0) { srs_info("http: ignore dump fast cache."); return ret; @@ -104,6 +107,14 @@ int SrsStreamCache::cycle() { int ret = ERROR_SUCCESS; + // TODO: FIXME: support reload. + if (fast_cache <= 0) { + st_sleep(SRS_STREAM_CACHE_CYCLE_SECONDS); + return ret; + } + + // the stream cache will create consumer to cache stream, + // which will trigger to fetch stream from origin for edge. SrsConsumer* consumer = NULL; if ((ret = source->create_consumer(consumer, false, false, true)) != ERROR_SUCCESS) { srs_error("http: create consumer failed. ret=%d", ret); @@ -116,11 +127,9 @@ int SrsStreamCache::cycle() SrsMessageArray msgs(SRS_PERF_MW_MSGS); + // set the queue size, which used for max cache. // TODO: FIXME: support reload. - double fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost); - if (fast_cache > 0) { - queue->set_queue_size(fast_cache); - } + queue->set_queue_size(fast_cache); while (true) { pprint->elapse(); @@ -150,11 +159,7 @@ int SrsStreamCache::cycle() // free the messages. for (int i = 0; i < count; i++) { SrsSharedPtrMessage* msg = msgs.msgs[i]; - if (fast_cache > 0) { - queue->enqueue(msg); - } else { - srs_freep(msg); - } + queue->enqueue(msg); } } @@ -1137,8 +1142,10 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) } // hstrs not enabled, ignore. - // for origin: generally set hstrs to 'off' and mount while stream is pushed to origin. - // for edge: must set hstrs to 'on' so that it could trigger rtmp stream before mount. + // for origin, the http stream will be mount already when publish, + // so it must never enter this line for stream already mounted. + // for edge, the http stream is trigger by hstrs and mount by it, + // so we only hijack when only edge and hstrs is on. entry = it->second; if (!entry->hstrs) { return ret; @@ -1175,12 +1182,18 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) SrsAutoFree(SrsRequest, r); std::string sid = r->get_stream_url(); - // check if the stream is enabled. + // check whether the http remux is enabled, + // for example, user disable the http flv then reload. if (sflvs.find(sid) != sflvs.end()) { SrsLiveEntry* s_entry = sflvs[sid]; if (!s_entry->stream->entry->enabled) { - srs_error("stream is disabled, hijack failed. ret=%d", ret); - return ret; + // only when the http entry is disabled, check the config whether http flv disable, + // for the http flv edge use hijack to trigger the edge ingester, we always mount it + // eventhough the origin does not exists the specified stream. + if (!_srs_config->get_vhost_http_remux_enabled(r->vhost)) { + srs_error("stream is disabled, hijack failed. ret=%d", ret); + return ret; + } } } @@ -1210,15 +1223,6 @@ int SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) srs_trace("hstrs: source url=%s, is_edge=%d, source_id=%d[%d]", r->get_stream_url().c_str(), vhost_is_edge, s->source_id(), s->source_id()); - // TODO: FIXME: disconnect when all connection closed. - if (vhost_is_edge) { - // notice edge to start for the first client. - if ((ret = s->on_edge_start_play()) != ERROR_SUCCESS) { - srs_error("notice edge start play stream failed. ret=%d", ret); - return ret; - } - } - return ret; } diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 3df97f4b3..b07dd448c 100644 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -41,6 +41,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ class SrsStreamCache : public ISrsEndlessThreadHandler { +private: + double fast_cache; private: SrsMessageQueue* queue; SrsSource* source; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 9cf15db59..681cb0510 100755 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -537,14 +537,6 @@ int SrsRtmpConn::playing(SrsSource* source) SrsAutoFree(SrsConsumer, consumer); srs_verbose("consumer created success."); - if (_srs_config->get_vhost_is_edge(req->vhost)) { - // notice edge to start for the first client. - if ((ret = source->on_edge_start_play()) != ERROR_SUCCESS) { - srs_error("notice edge start play stream failed. ret=%d", ret); - return ret; - } - } - // use isolate thread to recv, // @see: https://github.com/simple-rtmp-server/srs/issues/217 SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP); @@ -782,8 +774,14 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) while (!disposed) { pprint->elapse(); - // cond wait for error. - trd->wait(SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US / 1000); + // cond wait for timeout. + if (nb_msgs == 0) { + // when not got msgs, wait for a larger timeout. + // @see https://github.com/simple-rtmp-server/srs/issues/441 + trd->wait(SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US / 1000); + } else { + trd->wait(SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US / 1000); + } // check the thread error code. if ((ret = trd->error_code()) != ERROR_SUCCESS) { @@ -835,7 +833,6 @@ int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge) if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) { srs_error("notice edge start publish stream failed. ret=%d", ret); } - return ret; } else { if ((ret = source->on_publish()) != ERROR_SUCCESS) { srs_error("notify publish failed. ret=%d", ret); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 5dd98c204..1c38fbaa9 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -765,7 +765,7 @@ SrsSource* SrsSource::fetch(SrsRequest* r) SrsSource* SrsSource::fetch(std::string vhost, std::string app, std::string stream) { SrsSource* source = NULL; - string stream_url = srs_generate_stream_url(vhost, app, stream); + string stream_url = srs_generate_stream_url(vhost, app, stream); if (pool.find(stream_url) == pool.end()) { return NULL; @@ -2135,6 +2135,15 @@ int SrsSource::create_consumer(SrsConsumer*& consumer, bool ds, bool dm, bool dg } else { srs_trace("create consumer, ignore gop cache, jitter=%d", jitter_algorithm); } + + // for edge, when play edge stream, check the state + if (_srs_config->get_vhost_is_edge(_req->vhost)) { + // notice edge to start for the first client. + if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) { + srs_error("notice edge start play stream failed. ret=%d", ret); + return ret; + } + } return ret; } @@ -2163,11 +2172,6 @@ SrsRtmpJitterAlgorithm SrsSource::jitter() return jitter_algorithm; } -int SrsSource::on_edge_start_play() -{ - return play_edge->on_client_play(); -} - int SrsSource::on_edge_start_publish() { return publish_edge->on_client_publish(); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 372803dd5..523115964 100755 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -574,8 +574,6 @@ public: virtual SrsRtmpJitterAlgorithm jitter(); // internal public: - // for edge, when play edge stream, check the state - virtual int on_edge_start_play(); // for edge, when publish edge stream, check the state virtual int on_edge_start_publish(); // for edge, proxy the publish diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index 4c2428352..1cf9b5b99 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -78,6 +78,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // we must use more smaller timeout, for the recv never know the status // of underlayer socket. #define SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US (int64_t)(3*1000*1000LL) +// when no msg recevied for publisher, use larger timeout. +#define SRS_CONSTS_RTMP_PUBLISHER_NO_MSG_RECV_TIMEOUT_US 10*SRS_CONSTS_RTMP_PUBLISHER_RECV_TIMEOUT_US // the timeout to wait for client control message, // if timeout, we generally ignore and send the data to client,