diff --git a/README.md b/README.md index d2a168482..496cbb58c 100755 --- a/README.md +++ b/README.md @@ -184,6 +184,7 @@ Please select your language: ### V3 changes +* v3.0, 2017-01-19, for [#742][bug #742] refine source, meta and origin hub. 3.0.16 * v3.0, 2017-01-17, for [#742][bug #742] refine source, timeout, live cycle. 3.0.15 * v3.0, 2017-01-11, fix [#735][bug #735] config transform refer_publish invalid. 3.0.14 * v3.0, 2017-01-06, for [#730][bug #730] support config in/out ack size. 3.0.13 @@ -872,13 +873,13 @@ Remark: | Input | SRS(Simple RTMP Server) | Output | +----------------------+-------------------------+----------------+ | Encoder(1) | +-> RTMP/HDS --------+-> Flash player | -| (FMLE,FFMPEG, -rtmp-+->-+-> HLS/HTTP ---------+-> M3u8 player | +| (FMLE,FFMPEG, -rtmp-+->-+-> HLS/HTTP ---------+-> M3U8 player | | Flash,XSPLIT, | +-> FLV/MP3/Aac/Ts ---+-> HTTP player | | ......) | +-> Fowarder ---------+-> RTMP server | | | +-> Transcoder -------+-> RTMP server | | | +-> EXEC(5) ----------+-> External app | -| | +-> DVR --------------+-> Flv file | -| | +-> BandwidthTest ----+-> flash | +| | +-> DVR --------------+-> FLV file | +| | +-> BandwidthTest ----+-> Flash | +----------------------+ | | | MediaSource(2) | | | | (RTSP,FILE, | | | diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index 6da11f675..1b0db63e1 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -970,7 +970,7 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg) SrsDvr::SrsDvr() { - source = NULL; + hub = NULL; plan = NULL; req = NULL; actived = false; @@ -985,12 +985,12 @@ SrsDvr::~SrsDvr() srs_freep(plan); } -int SrsDvr::initialize(SrsSource* s, SrsRequest* r) +int SrsDvr::initialize(SrsOriginHub* h, SrsRequest* r) { int ret = ERROR_SUCCESS; req = r; - source = s; + hub = h; SrsConfDirective* conf = _srs_config->get_dvr_apply(r->vhost); actived = srs_config_apply_filter(conf, r); @@ -1018,7 +1018,7 @@ int SrsDvr::on_publish(bool fetch_sequence_header) return ret; } - if (fetch_sequence_header && (ret = source->on_dvr_request_sh()) != ERROR_SUCCESS) { + if (fetch_sequence_header && (ret = hub->on_dvr_request_sh()) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index 9b3831bf1..38f6632de 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #ifdef SRS_AUTO_DVR class SrsSource; +class SrsOriginHub; class SrsRequest; class SrsBuffer; class SrsRtmpJitter; @@ -300,7 +301,7 @@ private: class SrsDvr : public ISrsReloadHandler { private: - SrsSource* source; + SrsOriginHub* hub; SrsDvrPlan* plan; SrsRequest* req; private: @@ -317,7 +318,7 @@ public: * when system initialize(encoder publish at first time, or reload), * initialize the dvr will reinitialize the plan, the whole dvr framework. */ - virtual int initialize(SrsSource* s, SrsRequest* r); + virtual int initialize(SrsOriginHub* h, SrsRequest* r); /** * publish stream event, * when encoder start to publish RTMP stream. diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index d132348c6..7350408eb 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -50,9 +50,9 @@ using namespace std; // when error, forwarder sleep for a while and retry. #define SRS_FORWARDER_CIMS (3000) -SrsForwarder::SrsForwarder(SrsSource* s) +SrsForwarder::SrsForwarder(SrsOriginHub* h) { - source = s; + hub = h; req = NULL; sh_video = sh_audio = NULL; @@ -250,7 +250,7 @@ int SrsForwarder::cycle() return ret; } - if ((ret = source->on_forwarder_start(this)) != ERROR_SUCCESS) { + if ((ret = hub->on_forwarder_start(this)) != ERROR_SUCCESS) { srs_error("callback the source to feed the sequence header failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index d99f04468..d905fd759 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -42,6 +42,7 @@ class SrsRtmpJitter; class SrsRtmpClient; class SrsRequest; class SrsSource; +class SrsOriginHub; class SrsKbps; class SrsSimpleRtmpClient; @@ -58,7 +59,7 @@ private: private: SrsReusableThread2* pthread; private: - SrsSource* source; + SrsOriginHub* hub; SrsSimpleRtmpClient* sdk; SrsRtmpJitter* jitter; SrsMessageQueue* queue; @@ -69,7 +70,7 @@ private: SrsSharedPtrMessage* sh_audio; SrsSharedPtrMessage* sh_video; public: - SrsForwarder(SrsSource* _source); + SrsForwarder(SrsOriginHub* h); virtual ~SrsForwarder(); public: virtual int initialize(SrsRequest* r, std::string ep); diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index aa8bba188..c8fc13545 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -1125,7 +1125,7 @@ int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segme SrsHls::SrsHls() { req = NULL; - source = NULL; + hub = NULL; hls_enabled = false; hls_can_dispose = false; @@ -1197,11 +1197,11 @@ int SrsHls::cycle() return ret; } -int SrsHls::initialize(SrsSource* s, SrsRequest* r) +int SrsHls::initialize(SrsOriginHub* h, SrsRequest* r) { int ret = ERROR_SUCCESS; - source = s; + hub = h; req = r; if ((ret = muxer->initialize()) != ERROR_SUCCESS) { @@ -1244,7 +1244,7 @@ int SrsHls::on_publish(bool fetch_sequence_header) // notice the source to get the cached sequence header. // when reload to start hls, hls will never get the sequence header in stream, // use the SrsSource.on_hls_start to push the sequence header to HLS. - if ((ret = source->on_hls_start()) != ERROR_SUCCESS) { + if ((ret = hub->on_hls_start()) != ERROR_SUCCESS) { srs_error("callback source hls start failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index e97b34ce2..0b5535054 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -45,6 +45,7 @@ class SrsAvcAacCodec; class SrsRequest; class SrsPithyPrint; class SrsSource; +class SrsOriginHub; class SrsFileWriter; class SrsSimpleStream; class SrsTsAacJitter; @@ -362,7 +363,7 @@ private: bool hls_can_dispose; int64_t last_update_time; private: - SrsSource* source; + SrsOriginHub* hub; SrsAvcAacCodec* codec; SrsCodecSample* sample; SrsRtmpJitter* jitter; @@ -391,7 +392,7 @@ public: /** * initialize the hls by handler and source. */ - virtual int initialize(SrsSource* s, SrsRequest* r); + virtual int initialize(SrsOriginHub* h, SrsRequest* r); /** * publish stream event, continue to write the m3u8, * for the muxer object not destroyed. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index aa55396f6..24df9acb8 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -745,6 +745,797 @@ ISrsSourceHandler::~ISrsSourceHandler() { } +// TODO: FIXME: Remove it? +bool srs_hls_can_continue(int ret, SrsSharedPtrMessage* sh, SrsSharedPtrMessage* msg) +{ + // only continue for decode error. + if (ret != ERROR_HLS_DECODE_ERROR) { + return false; + } + + // when video size equals to sequence header, + // the video actually maybe a sequence header, + // continue to make ffmpeg happy. + if (sh && sh->size == msg->size) { + srs_warn("the msg is actually a sequence header, ignore this packet."); + return true; + } + + return false; +} + +SrsOriginHub::SrsOriginHub(SrsSource* s) +{ + source = s; + req = NULL; + +#ifdef SRS_AUTO_HLS + hls = new SrsHls(); +#endif +#ifdef SRS_AUTO_DVR + dvr = new SrsDvr(); +#endif +#ifdef SRS_AUTO_TRANSCODE + encoder = new SrsEncoder(); +#endif +#ifdef SRS_AUTO_HDS + hds = new SrsHds(s); +#endif + ng_exec = new SrsNgExec(); + + _srs_config->subscribe(this); +} + +SrsOriginHub::~SrsOriginHub() +{ + _srs_config->unsubscribe(this); + + if (true) { + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + srs_freep(forwarder); + } + forwarders.clear(); + } + srs_freep(ng_exec); + +#ifdef SRS_AUTO_HLS + srs_freep(hls); +#endif +#ifdef SRS_AUTO_DVR + srs_freep(dvr); +#endif +#ifdef SRS_AUTO_TRANSCODE + srs_freep(encoder); +#endif +#ifdef SRS_AUTO_HDS + srs_freep(hds); +#endif +} + +int SrsOriginHub::initialize(SrsRequest* r) +{ + int ret = ERROR_SUCCESS; + + req = r; + +#ifdef SRS_AUTO_HLS + if ((ret = hls->initialize(this, req)) != ERROR_SUCCESS) { + return ret; + } +#endif + +#ifdef SRS_AUTO_DVR + if ((ret = dvr->initialize(this, req)) != ERROR_SUCCESS) { + return ret; + } +#endif + + return ret; +} + +void SrsOriginHub::dispose() +{ +#ifdef SRS_AUTO_HLS + hls->dispose(); +#endif +} + +int SrsOriginHub::cycle() +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_HLS + if ((ret = hls->cycle()) != ERROR_SUCCESS) { + return ret; + } +#endif + + return ret; +} + +int SrsOriginHub::on_original_metadata(SrsOnMetaDataPacket* metadata) +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_HLS + if (metadata && (ret = hls->on_meta_data(metadata->metadata)) != ERROR_SUCCESS) { + srs_error("hls process onMetaData message failed. ret=%d", ret); + return ret; + } +#endif + +#ifdef SRS_AUTO_DVR + if (metadata && (ret = dvr->on_meta_data(metadata)) != ERROR_SUCCESS) { + srs_error("dvr process onMetaData message failed. ret=%d", ret); + return ret; + } +#endif + + return ret; +} + +int SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata) +{ + int ret = ERROR_SUCCESS; + + // copy to all forwarders + if (true) { + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + if ((ret = forwarder->on_meta_data(shared_metadata)) != ERROR_SUCCESS) { + srs_error("forwarder process onMetaData message failed. ret=%d", ret); + return ret; + } + } + } + + return ret; +} + +int SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) +{ + int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* msg = shared_audio; + +#ifdef SRS_AUTO_HLS + if ((ret = hls->on_audio(msg)) != ERROR_SUCCESS) { + // apply the error strategy for hls. + // @see https://github.com/ossrs/srs/issues/264 + std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost); + if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) { + srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret); + + // unpublish, ignore ret. + hls->on_unpublish(); + + // ignore. + ret = ERROR_SUCCESS; + } else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) { + if (srs_hls_can_continue(ret, source->meta->ash(), msg)) { + ret = ERROR_SUCCESS; + } else { + srs_warn("hls continue audio failed. ret=%d", ret); + return ret; + } + } else { + srs_warn("hls disconnect publisher for audio error. ret=%d", ret); + return ret; + } + } +#endif + +#ifdef SRS_AUTO_DVR + if ((ret = dvr->on_audio(msg)) != ERROR_SUCCESS) { + srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret); + + // unpublish, ignore ret. + dvr->on_unpublish(); + + // ignore. + ret = ERROR_SUCCESS; + } +#endif + +#ifdef SRS_AUTO_HDS + if ((ret = hds->on_audio(msg)) != ERROR_SUCCESS) { + srs_warn("hds process audio message failed, ignore and disable dvr. ret=%d", ret); + + // unpublish, ignore ret. + hds->on_unpublish(); + // ignore. + ret = ERROR_SUCCESS; + } +#endif + + // copy to all forwarders. + if (true) { + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + if ((ret = forwarder->on_audio(msg)) != ERROR_SUCCESS) { + srs_error("forwarder process audio message failed. ret=%d", ret); + return ret; + } + } + } + + return ret; +} + +int SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_sequence_header) +{ + int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* msg = shared_video; + +#ifdef SRS_AUTO_HLS + if ((ret = hls->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) { + // apply the error strategy for hls. + // @see https://github.com/ossrs/srs/issues/264 + std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost); + if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) { + srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret); + + // unpublish, ignore ret. + hls->on_unpublish(); + + // ignore. + ret = ERROR_SUCCESS; + } else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) { + if (srs_hls_can_continue(ret, source->meta->vsh(), msg)) { + ret = ERROR_SUCCESS; + } else { + srs_warn("hls continue video failed. ret=%d", ret); + return ret; + } + } else { + srs_warn("hls disconnect publisher for video error. ret=%d", ret); + return ret; + } + } +#endif + +#ifdef SRS_AUTO_DVR + if ((ret = dvr->on_video(msg)) != ERROR_SUCCESS) { + srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret); + + // unpublish, ignore ret. + dvr->on_unpublish(); + + // ignore. + ret = ERROR_SUCCESS; + } +#endif + +#ifdef SRS_AUTO_HDS + if ((ret = hds->on_video(msg)) != ERROR_SUCCESS) { + srs_warn("hds process video message failed, ignore and disable dvr. ret=%d", ret); + + // unpublish, ignore ret. + hds->on_unpublish(); + // ignore. + ret = ERROR_SUCCESS; + } +#endif + + // copy to all forwarders. + if (!forwarders.empty()) { + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + if ((ret = forwarder->on_video(msg)) != ERROR_SUCCESS) { + srs_error("forwarder process video message failed. ret=%d", ret); + return ret; + } + } + } + + return ret; +} + +int SrsOriginHub::on_publish() +{ + int ret = ERROR_SUCCESS; + + // create forwarders + if ((ret = create_forwarders()) != ERROR_SUCCESS) { + srs_error("create forwarders failed. ret=%d", ret); + return ret; + } + + // TODO: FIXME: use initialize to set req. +#ifdef SRS_AUTO_TRANSCODE + if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) { + srs_error("start encoder failed. ret=%d", ret); + return ret; + } +#endif + +#ifdef SRS_AUTO_HLS + if ((ret = hls->on_publish(false)) != ERROR_SUCCESS) { + srs_error("start hls failed. ret=%d", ret); + return ret; + } +#endif + +#ifdef SRS_AUTO_DVR + if ((ret = dvr->on_publish(false)) != ERROR_SUCCESS) { + srs_error("start dvr failed. ret=%d", ret); + return ret; + } +#endif + + // TODO: FIXME: use initialize to set req. +#ifdef SRS_AUTO_HDS + if ((ret = hds->on_publish(req)) != ERROR_SUCCESS) { + srs_error("start hds failed. ret=%d", ret); + return ret; + } +#endif + + // TODO: FIXME: use initialize to set req. + if ((ret = ng_exec->on_publish(req)) != ERROR_SUCCESS) { + srs_error("start exec failed. ret=%d", ret); + return ret; + } + + return ret; +} + +void SrsOriginHub::on_unpublish() +{ + // destroy all forwarders + destroy_forwarders(); + +#ifdef SRS_AUTO_TRANSCODE + encoder->on_unpublish(); +#endif + +#ifdef SRS_AUTO_HLS + hls->on_unpublish(); +#endif + +#ifdef SRS_AUTO_DVR + dvr->on_unpublish(); +#endif + +#ifdef SRS_AUTO_HDS + hds->on_unpublish(); +#endif + + ng_exec->on_unpublish(); +} + +int SrsOriginHub::on_forwarder_start(SrsForwarder* forwarder) +{ + int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* cache_metadata = source->meta->data(); + SrsSharedPtrMessage* cache_sh_video = source->meta->vsh(); + SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); + + // feed the forwarder the metadata/sequence header, + // when reload to enable the forwarder. + if (cache_metadata && (ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) { + srs_error("forwarder process onMetaData message failed. ret=%d", ret); + return ret; + } + if (cache_sh_video && (ret = forwarder->on_video(cache_sh_video)) != ERROR_SUCCESS) { + srs_error("forwarder process video sequence header message failed. ret=%d", ret); + return ret; + } + if (cache_sh_audio && (ret = forwarder->on_audio(cache_sh_audio)) != ERROR_SUCCESS) { + srs_error("forwarder process audio sequence header message failed. ret=%d", ret); + return ret; + } + + return ret; +} + +int SrsOriginHub::on_hls_start() +{ + int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* cache_sh_video = source->meta->vsh(); + SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); + +#ifdef SRS_AUTO_HLS + // feed the hls the metadata/sequence header, + // when reload to start hls, hls will never get the sequence header in stream, + // use the SrsSource.on_hls_start to push the sequence header to HLS. + // TODO: maybe need to decode the metadata? + if (cache_sh_video && (ret = hls->on_video(cache_sh_video, true)) != ERROR_SUCCESS) { + srs_error("hls process video sequence header message failed. ret=%d", ret); + return ret; + } + if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio)) != ERROR_SUCCESS) { + srs_error("hls process audio sequence header message failed. ret=%d", ret); + return ret; + } +#endif + + return ret; +} + +int SrsOriginHub::on_dvr_request_sh() +{ + int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* cache_metadata = source->meta->data(); + SrsSharedPtrMessage* cache_sh_video = source->meta->vsh(); + SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); + +#ifdef SRS_AUTO_DVR + // feed the dvr the metadata/sequence header, + // when reload to start dvr, dvr will never get the sequence header in stream, + // use the SrsSource.on_dvr_request_sh to push the sequence header to DVR. + if (cache_metadata) { + char* payload = cache_metadata->payload; + int size = cache_metadata->size; + + SrsBuffer stream; + if ((ret = stream.initialize(payload, size)) != ERROR_SUCCESS) { + srs_error("dvr decode metadata stream failed. ret=%d", ret); + return ret; + } + + SrsOnMetaDataPacket pkt; + if ((ret = pkt.decode(&stream)) != ERROR_SUCCESS) { + srs_error("dvr decode metadata packet failed."); + return ret; + } + + if ((ret = dvr->on_meta_data(&pkt)) != ERROR_SUCCESS) { + srs_error("dvr process onMetaData message failed. ret=%d", ret); + return ret; + } + } + + if (cache_sh_video && (ret = dvr->on_video(cache_sh_video)) != ERROR_SUCCESS) { + srs_error("dvr process video sequence header message failed. ret=%d", ret); + return ret; + } + if (cache_sh_audio && (ret = dvr->on_audio(cache_sh_audio)) != ERROR_SUCCESS) { + srs_error("dvr process audio sequence header message failed. ret=%d", ret); + return ret; + } +#endif + + return ret; +} + +int SrsOriginHub::on_reload_vhost_forward(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + // TODO: FIXME: maybe should ignore when publish already stopped? + + // forwarders + destroy_forwarders(); + if ((ret = create_forwarders()) != ERROR_SUCCESS) { + srs_error("create forwarders failed. ret=%d", ret); + return ret; + } + + srs_trace("vhost %s forwarders reload success", vhost.c_str()); + + return ret; +} + +int SrsOriginHub::on_reload_vhost_hls(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + // TODO: FIXME: maybe should ignore when publish already stopped? + +#ifdef SRS_AUTO_HLS + hls->on_unpublish(); + if ((ret = hls->on_publish(true)) != ERROR_SUCCESS) { + srs_error("hls publish failed. ret=%d", ret); + return ret; + } + srs_trace("vhost %s hls reload success", vhost.c_str()); +#endif + + return ret; +} + +int SrsOriginHub::on_reload_vhost_hds(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + // TODO: FIXME: maybe should ignore when publish already stopped? + +#ifdef SRS_AUTO_HDS + hds->on_unpublish(); + if ((ret = hds->on_publish(req)) != ERROR_SUCCESS) { + srs_error("hds publish failed. ret=%d", ret); + return ret; + } + srs_trace("vhost %s hds reload success", vhost.c_str()); +#endif + + return ret; +} + +int SrsOriginHub::on_reload_vhost_dvr(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + // TODO: FIXME: maybe should ignore when publish already stopped? + +#ifdef SRS_AUTO_DVR + // cleanup dvr + dvr->on_unpublish(); + + // reinitialize the dvr, update plan. + if ((ret = dvr->initialize(this, req)) != ERROR_SUCCESS) { + return ret; + } + + // start to publish by new plan. + if ((ret = dvr->on_publish(true)) != ERROR_SUCCESS) { + srs_error("dvr publish failed. ret=%d", ret); + return ret; + } + + srs_trace("vhost %s dvr reload success", vhost.c_str()); +#endif + + return ret; +} + +int SrsOriginHub::on_reload_vhost_transcode(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + // TODO: FIXME: maybe should ignore when publish already stopped? + +#ifdef SRS_AUTO_TRANSCODE + encoder->on_unpublish(); + if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) { + srs_error("start encoder failed. ret=%d", ret); + return ret; + } + srs_trace("vhost %s transcode reload success", vhost.c_str()); +#endif + + return ret; +} + +int SrsOriginHub::on_reload_vhost_exec(string vhost) +{ + int ret = ERROR_SUCCESS; + + if (req->vhost != vhost) { + return ret; + } + + // TODO: FIXME: maybe should ignore when publish already stopped? + + ng_exec->on_unpublish(); + if ((ret = ng_exec->on_publish(req)) != ERROR_SUCCESS) { + srs_error("start exec failed. ret=%d", ret); + return ret; + } + srs_trace("vhost %s exec reload success", vhost.c_str()); + + return ret; +} + +int SrsOriginHub::create_forwarders() +{ + int ret = ERROR_SUCCESS; + + if (!_srs_config->get_forward_enabled(req->vhost)) { + return ret; + } + + SrsConfDirective* conf = _srs_config->get_forwards(req->vhost); + for (int i = 0; conf && i < (int)conf->args.size(); i++) { + std::string forward_server = conf->args.at(i); + + SrsForwarder* forwarder = new SrsForwarder(this); + forwarders.push_back(forwarder); + + // initialize the forwarder with request. + if ((ret = forwarder->initialize(req, forward_server)) != ERROR_SUCCESS) { + return ret; + } + + // TODO: FIXME: support queue size. + //double queue_size = _srs_config->get_queue_length(req->vhost); + //forwarder->set_queue_size(queue_size); + + if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) { + srs_error("start forwarder failed. " + "vhost=%s, app=%s, stream=%s, forward-to=%s", + req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), + forward_server.c_str()); + return ret; + } + } + + return ret; +} + +void SrsOriginHub::destroy_forwarders() +{ + std::vector::iterator it; + for (it = forwarders.begin(); it != forwarders.end(); ++it) { + SrsForwarder* forwarder = *it; + forwarder->on_unpublish(); + srs_freep(forwarder); + } + forwarders.clear(); +} + +SrsMetaCache::SrsMetaCache() +{ + cache_metadata = cache_sh_video = cache_sh_audio = NULL; +} + +SrsMetaCache::~SrsMetaCache() +{ + dispose(); +} + +void SrsMetaCache::dispose() +{ + srs_freep(cache_metadata); + srs_freep(cache_sh_video); + srs_freep(cache_sh_audio); +} + +SrsSharedPtrMessage* SrsMetaCache::data() +{ + return cache_metadata; +} + +SrsSharedPtrMessage* SrsMetaCache::vsh() +{ + return cache_sh_video; +} + +SrsSharedPtrMessage* SrsMetaCache::ash() +{ + return cache_sh_audio; +} + +int SrsMetaCache::dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds) +{ + int ret = ERROR_SUCCESS; + + // copy metadata. + if (dm && cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, ag)) != ERROR_SUCCESS) { + srs_error("dispatch metadata failed. ret=%d", ret); + return ret; + } + srs_info("dispatch metadata success"); + + // copy sequence header + // copy audio sequence first, for hls to fast parse the "right" audio codec. + // @see https://github.com/ossrs/srs/issues/301 + if (ds && cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, ag)) != ERROR_SUCCESS) { + srs_error("dispatch audio sequence header failed. ret=%d", ret); + return ret; + } + srs_info("dispatch audio sequence header success"); + + if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, ag)) != ERROR_SUCCESS) { + srs_error("dispatch video sequence header failed. ret=%d", ret); + return ret; + } + srs_info("dispatch video sequence header success"); + + return ret; +} + +int SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPacket* metadata, bool& updated) +{ + updated = false; + + int ret = ERROR_SUCCESS; + + SrsAmf0Any* prop = NULL; + + // when exists the duration, remove it to make ExoPlayer happy. + if (metadata->metadata->get_property("duration") != NULL) { + metadata->metadata->remove("duration"); + } + + // generate metadata info to print + std::stringstream ss; + if ((prop = metadata->metadata->ensure_property_number("width")) != NULL) { + ss << ", width=" << (int)prop->to_number(); + } + if ((prop = metadata->metadata->ensure_property_number("height")) != NULL) { + ss << ", height=" << (int)prop->to_number(); + } + if ((prop = metadata->metadata->ensure_property_number("videocodecid")) != NULL) { + ss << ", vcodec=" << (int)prop->to_number(); + } + if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) { + ss << ", acodec=" << (int)prop->to_number(); + } + srs_trace("got metadata%s", ss.str().c_str()); + + // add server info to metadata + metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER)); + metadata->metadata->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY)); + metadata->metadata->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS)); + + // version, for example, 1.0.0 + // add version to metadata, please donot remove it, for debug. + metadata->metadata->set("server_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); + + // encode the metadata to payload + int size = 0; + char* payload = NULL; + if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) { + srs_error("encode metadata error. ret=%d", ret); + srs_freep(payload); + return ret; + } + srs_verbose("encode metadata success."); + + if (size <= 0) { + srs_warn("ignore the invalid metadata. size=%d", size); + return ret; + } + + // create a shared ptr message. + srs_freep(cache_metadata); + cache_metadata = new SrsSharedPtrMessage(); + updated = true; + + // dump message to shared ptr message. + // the payload/size managed by cache_metadata, user should not free it. + if ((ret = cache_metadata->create(header, payload, size)) != ERROR_SUCCESS) { + srs_error("initialize the cache metadata failed. ret=%d", ret); + return ret; + } + srs_verbose("initialize shared ptr metadata success."); + + return ret; +} + +void SrsMetaCache::update_ash(SrsSharedPtrMessage* msg) +{ + srs_freep(cache_sh_audio); + cache_sh_audio = msg->copy(); +} + +void SrsMetaCache::update_vsh(SrsSharedPtrMessage* msg) +{ + srs_freep(cache_sh_video); + cache_sh_video = msg->copy(); +} + std::map SrsSource::pool; int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps) @@ -947,21 +1738,6 @@ SrsSource::SrsSource() mix_correct = false; mix_queue = new SrsMixQueue(); -#ifdef SRS_AUTO_HLS - hls = new SrsHls(); -#endif -#ifdef SRS_AUTO_DVR - dvr = new SrsDvr(); -#endif -#ifdef SRS_AUTO_TRANSCODE - encoder = new SrsEncoder(); -#endif -#ifdef SRS_AUTO_HDS - hds = new SrsHds(this); -#endif - - cache_metadata = cache_sh_video = cache_sh_audio = NULL; - _can_publish = true; _pre_source_id = _source_id = -1; die_at = -1; @@ -970,7 +1746,8 @@ SrsSource::SrsSource() publish_edge = new SrsPublishEdge(); gop_cache = new SrsGopCache(); aggregate_stream = new SrsBuffer(); - ng_exec = new SrsNgExec(); + hub = new SrsOriginHub(this); + meta = new SrsMetaCache(); is_monotonically_increase = false; last_packet_time = 0; @@ -986,69 +1763,29 @@ SrsSource::~SrsSource() // never free the consumers, // for all consumers are auto free. consumers.clear(); - - if (true) { - std::vector::iterator it; - for (it = forwarders.begin(); it != forwarders.end(); ++it) { - SrsForwarder* forwarder = *it; - srs_freep(forwarder); - } - forwarders.clear(); - } + srs_freep(hub); + srs_freep(meta); srs_freep(mix_queue); - srs_freep(cache_metadata); - srs_freep(cache_sh_video); - srs_freep(cache_sh_audio); srs_freep(play_edge); srs_freep(publish_edge); srs_freep(gop_cache); srs_freep(aggregate_stream); - srs_freep(ng_exec); - -#ifdef SRS_AUTO_HLS - srs_freep(hls); -#endif -#ifdef SRS_AUTO_DVR - srs_freep(dvr); -#endif -#ifdef SRS_AUTO_TRANSCODE - srs_freep(encoder); -#endif -#ifdef SRS_AUTO_HDS - srs_freep(hds); -#endif srs_freep(req); } void SrsSource::dispose() { -#ifdef SRS_AUTO_HLS - hls->dispose(); -#endif - - // cleaup the cached packets. - srs_freep(cache_metadata); - srs_freep(cache_sh_video); - srs_freep(cache_sh_audio); - - // cleanup the gop cache. + hub->dispose(); + meta->dispose(); gop_cache->dispose(); } int SrsSource::cycle() { - int ret = ERROR_SUCCESS; - -#ifdef SRS_AUTO_HLS - if ((ret = hls->cycle()) != ERROR_SUCCESS) { - return ret; - } -#endif - - return ret; + return hub->cycle(); } bool SrsSource::expired() @@ -1086,18 +1823,10 @@ int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) handler = h; req = r->copy(); atc = _srs_config->get_atc(req->vhost); - -#ifdef SRS_AUTO_HLS - if ((ret = hls->initialize(this, req)) != ERROR_SUCCESS) { - return ret; - } -#endif -#ifdef SRS_AUTO_DVR - if ((ret = dvr->initialize(this, req)) != ERROR_SUCCESS) { + if ((ret = hub->initialize(req)) != ERROR_SUCCESS) { return ret; } -#endif if ((ret = play_edge->initialize(this, req)) != ERROR_SUCCESS) { return ret; @@ -1174,6 +1903,9 @@ int SrsSource::on_reload_vhost_play(string vhost) srs_trace("consumers reload queue size success."); } + // TODO: FIXME: https://github.com/ossrs/srs/issues/742#issuecomment-273656897 + // TODO: FIXME: support queue size. +#if 0 if (true) { std::vector::iterator it; @@ -1189,234 +1921,9 @@ int SrsSource::on_reload_vhost_play(string vhost) publish_edge->set_queue_size(v); srs_trace("publish_edge reload queue size success."); } - } - - return ret; -} - -int SrsSource::on_reload_vhost_forward(string vhost) -{ - int ret = ERROR_SUCCESS; - - if (req->vhost != vhost) { - return ret; - } - - // TODO: FIXME: maybe should ignore when publish already stopped? - - // forwarders - destroy_forwarders(); - if ((ret = create_forwarders()) != ERROR_SUCCESS) { - srs_error("create forwarders failed. ret=%d", ret); - return ret; - } - - srs_trace("vhost %s forwarders reload success", vhost.c_str()); - - return ret; -} - -int SrsSource::on_reload_vhost_hls(string vhost) -{ - int ret = ERROR_SUCCESS; - - if (req->vhost != vhost) { - return ret; - } - - // TODO: FIXME: maybe should ignore when publish already stopped? - -#ifdef SRS_AUTO_HLS - hls->on_unpublish(); - if ((ret = hls->on_publish(true)) != ERROR_SUCCESS) { - srs_error("hls publish failed. ret=%d", ret); - return ret; - } - srs_trace("vhost %s hls reload success", vhost.c_str()); #endif - - return ret; -} - -int SrsSource::on_reload_vhost_hds(string vhost) -{ - int ret = ERROR_SUCCESS; - - if (req->vhost != vhost) { - return ret; } - // TODO: FIXME: maybe should ignore when publish already stopped? - -#ifdef SRS_AUTO_HDS - hds->on_unpublish(); - if ((ret = hds->on_publish(req)) != ERROR_SUCCESS) { - srs_error("hds publish failed. ret=%d", ret); - return ret; - } - srs_trace("vhost %s hds reload success", vhost.c_str()); -#endif - - return ret; -} - -int SrsSource::on_reload_vhost_dvr(string vhost) -{ - int ret = ERROR_SUCCESS; - - if (req->vhost != vhost) { - return ret; - } - - // TODO: FIXME: maybe should ignore when publish already stopped? - -#ifdef SRS_AUTO_DVR - // cleanup dvr - dvr->on_unpublish(); - - // reinitialize the dvr, update plan. - if ((ret = dvr->initialize(this, req)) != ERROR_SUCCESS) { - return ret; - } - - // start to publish by new plan. - if ((ret = dvr->on_publish(true)) != ERROR_SUCCESS) { - srs_error("dvr publish failed. ret=%d", ret); - return ret; - } - - srs_trace("vhost %s dvr reload success", vhost.c_str()); -#endif - - return ret; -} - -int SrsSource::on_reload_vhost_transcode(string vhost) -{ - int ret = ERROR_SUCCESS; - - if (req->vhost != vhost) { - return ret; - } - - // TODO: FIXME: maybe should ignore when publish already stopped? - -#ifdef SRS_AUTO_TRANSCODE - encoder->on_unpublish(); - if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) { - srs_error("start encoder failed. ret=%d", ret); - return ret; - } - srs_trace("vhost %s transcode reload success", vhost.c_str()); -#endif - - return ret; -} - -int SrsSource::on_reload_vhost_exec(string vhost) -{ - int ret = ERROR_SUCCESS; - - if (req->vhost != vhost) { - return ret; - } - - // TODO: FIXME: maybe should ignore when publish already stopped? - - ng_exec->on_unpublish(); - if ((ret = ng_exec->on_publish(req)) != ERROR_SUCCESS) { - srs_error("start exec failed. ret=%d", ret); - return ret; - } - srs_trace("vhost %s exec reload success", vhost.c_str()); - - return ret; -} - -int SrsSource::on_forwarder_start(SrsForwarder* forwarder) -{ - int ret = ERROR_SUCCESS; - - // feed the forwarder the metadata/sequence header, - // when reload to enable the forwarder. - if (cache_metadata && (ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) { - srs_error("forwarder process onMetaData message failed. ret=%d", ret); - return ret; - } - if (cache_sh_video && (ret = forwarder->on_video(cache_sh_video)) != ERROR_SUCCESS) { - srs_error("forwarder process video sequence header message failed. ret=%d", ret); - return ret; - } - if (cache_sh_audio && (ret = forwarder->on_audio(cache_sh_audio)) != ERROR_SUCCESS) { - srs_error("forwarder process audio sequence header message failed. ret=%d", ret); - return ret; - } - - return ret; -} - -int SrsSource::on_hls_start() -{ - int ret = ERROR_SUCCESS; - -#ifdef SRS_AUTO_HLS - // feed the hls the metadata/sequence header, - // when reload to start hls, hls will never get the sequence header in stream, - // use the SrsSource.on_hls_start to push the sequence header to HLS. - // TODO: maybe need to decode the metadata? - if (cache_sh_video && (ret = hls->on_video(cache_sh_video, true)) != ERROR_SUCCESS) { - srs_error("hls process video sequence header message failed. ret=%d", ret); - return ret; - } - if (cache_sh_audio && (ret = hls->on_audio(cache_sh_audio)) != ERROR_SUCCESS) { - srs_error("hls process audio sequence header message failed. ret=%d", ret); - return ret; - } -#endif - - return ret; -} - -int SrsSource::on_dvr_request_sh() -{ - int ret = ERROR_SUCCESS; - -#ifdef SRS_AUTO_DVR - // feed the dvr the metadata/sequence header, - // when reload to start dvr, dvr will never get the sequence header in stream, - // use the SrsSource.on_dvr_request_sh to push the sequence header to DVR. - if (cache_metadata) { - char* payload = cache_metadata->payload; - int size = cache_metadata->size; - - SrsBuffer stream; - if ((ret = stream.initialize(payload, size)) != ERROR_SUCCESS) { - srs_error("dvr decode metadata stream failed. ret=%d", ret); - return ret; - } - - SrsOnMetaDataPacket pkt; - if ((ret = pkt.decode(&stream)) != ERROR_SUCCESS) { - srs_error("dvr decode metadata packet failed."); - return ret; - } - - if ((ret = dvr->on_meta_data(&pkt)) != ERROR_SUCCESS) { - srs_error("dvr process onMetaData message failed. ret=%d", ret); - return ret; - } - } - - if (cache_sh_video && (ret = dvr->on_video(cache_sh_video)) != ERROR_SUCCESS) { - srs_error("dvr process video sequence header message failed. ret=%d", ret); - return ret; - } - if (cache_sh_audio && (ret = dvr->on_audio(cache_sh_audio)) != ERROR_SUCCESS) { - srs_error("dvr process audio sequence header message failed. ret=%d", ret); - return ret; - } -#endif - return ret; } @@ -1469,53 +1976,13 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata { int ret = ERROR_SUCCESS; -#ifdef SRS_AUTO_HLS - if (metadata && (ret = hls->on_meta_data(metadata->metadata)) != ERROR_SUCCESS) { - srs_error("hls process onMetaData message failed. ret=%d", ret); + // Notify hub about the original metadata. + if ((ret = hub->on_original_metadata(metadata)) != ERROR_SUCCESS) { return ret; } -#endif - -#ifdef SRS_AUTO_DVR - if (metadata && (ret = dvr->on_meta_data(metadata)) != ERROR_SUCCESS) { - srs_error("dvr process onMetaData message failed. ret=%d", ret); - return ret; - } -#endif - - SrsAmf0Any* prop = NULL; - - // when exists the duration, remove it to make ExoPlayer happy. - if (metadata->metadata->get_property("duration") != NULL) { - metadata->metadata->remove("duration"); - } - - // generate metadata info to print - std::stringstream ss; - if ((prop = metadata->metadata->ensure_property_number("width")) != NULL) { - ss << ", width=" << (int)prop->to_number(); - } - if ((prop = metadata->metadata->ensure_property_number("height")) != NULL) { - ss << ", height=" << (int)prop->to_number(); - } - if ((prop = metadata->metadata->ensure_property_number("videocodecid")) != NULL) { - ss << ", vcodec=" << (int)prop->to_number(); - } - if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) { - ss << ", acodec=" << (int)prop->to_number(); - } - srs_trace("got metadata%s", ss.str().c_str()); - - // add server info to metadata - metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER)); - metadata->metadata->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY)); - metadata->metadata->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS)); - - // version, for example, 1.0.0 - // add version to metadata, please donot remove it, for debug. - metadata->metadata->set("server_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); // if allow atc_auto and bravo-atc detected, open atc for vhost. + SrsAmf0Any* prop = NULL; atc = _srs_config->get_atc(req->vhost); if (_srs_config->get_atc_auto(req->vhost)) { if ((prop = metadata->metadata->get_property("bravo_atc")) != NULL) { @@ -1525,65 +1992,36 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata } } - // encode the metadata to payload - int size = 0; - char* payload = NULL; - if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) { - srs_error("encode metadata error. ret=%d", ret); - srs_freep(payload); + // Update the meta cache. + bool updated = false; + if ((ret = meta->update_data(&msg->header, metadata, updated)) != ERROR_SUCCESS) { return ret; } - srs_verbose("encode metadata success."); - - if (size <= 0) { - srs_warn("ignore the invalid metadata. size=%d", size); + if (!updated) { return ret; } // when already got metadata, drop when reduce sequence header. bool drop_for_reduce = false; - if (cache_metadata && _srs_config->get_reduce_sequence_header(req->vhost)) { + if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) { drop_for_reduce = true; srs_warn("drop for reduce sh metadata, size=%d", msg->size); } - // create a shared ptr message. - srs_freep(cache_metadata); - cache_metadata = new SrsSharedPtrMessage(); - - // dump message to shared ptr message. - // the payload/size managed by cache_metadata, user should not free it. - if ((ret = cache_metadata->create(&msg->header, payload, size)) != ERROR_SUCCESS) { - srs_error("initialize the cache metadata failed. ret=%d", ret); - return ret; - } - srs_verbose("initialize shared ptr metadata success."); - // copy to all consumer if (!drop_for_reduce) { std::vector::iterator it; for (it = consumers.begin(); it != consumers.end(); ++it) { SrsConsumer* consumer = *it; - if ((ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) != ERROR_SUCCESS) { + if ((ret = consumer->enqueue(meta->data(), atc, jitter_algorithm)) != ERROR_SUCCESS) { srs_error("dispatch the metadata failed. ret=%d", ret); return ret; } } } - // copy to all forwarders - if (true) { - std::vector::iterator it; - for (it = forwarders.begin(); it != forwarders.end(); ++it) { - SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) { - srs_error("forwarder process onMetaData message failed. ret=%d", ret); - return ret; - } - } - } - - return ret; + // Copy to hub to all utilities. + return hub->on_meta_data(meta->data()); } int SrsSource::on_audio(SrsCommonMessage* shared_audio) @@ -1633,24 +2071,6 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) return ret; } -bool srs_hls_can_continue(int ret, SrsSharedPtrMessage* sh, SrsSharedPtrMessage* msg) -{ - // only continue for decode error. - if (ret != ERROR_HLS_DECODE_ERROR) { - return false; - } - - // when video size equals to sequence header, - // the video actually maybe a sequence header, - // continue to make ffmpeg happy. - if (sh && sh->size == msg->size) { - srs_warn("the msg is actually a sequence header, ignore this packet."); - return true; - } - - return false; -} - int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) { int ret = ERROR_SUCCESS; @@ -1661,9 +2081,9 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) // whether consumer should drop for the duplicated sequence header. bool drop_for_reduce = false; - if (is_sequence_header && cache_sh_audio && _srs_config->get_reduce_sequence_header(req->vhost)) { - if (cache_sh_audio->size == msg->size) { - drop_for_reduce = srs_bytes_equals(cache_sh_audio->payload, msg->payload, msg->size); + if (is_sequence_header && meta->ash() && _srs_config->get_reduce_sequence_header(req->vhost)) { + if (meta->ash()->size == msg->size) { + drop_for_reduce = srs_bytes_equals(meta->ash()->payload, msg->payload, msg->size); srs_warn("drop for reduce sh audio, size=%d", msg->size); } } @@ -1697,56 +2117,6 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) flv_sample_rates[sample.sound_rate]); } -#ifdef SRS_AUTO_HLS - if ((ret = hls->on_audio(msg)) != ERROR_SUCCESS) { - // apply the error strategy for hls. - // @see https://github.com/ossrs/srs/issues/264 - std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost); - if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) { - srs_warn("hls process audio message failed, ignore and disable hls. ret=%d", ret); - - // unpublish, ignore ret. - hls->on_unpublish(); - - // ignore. - ret = ERROR_SUCCESS; - } else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) { - if (srs_hls_can_continue(ret, cache_sh_audio, msg)) { - ret = ERROR_SUCCESS; - } else { - srs_warn("hls continue audio failed. ret=%d", ret); - return ret; - } - } else { - srs_warn("hls disconnect publisher for audio error. ret=%d", ret); - return ret; - } - } -#endif - -#ifdef SRS_AUTO_DVR - if ((ret = dvr->on_audio(msg)) != ERROR_SUCCESS) { - srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret); - - // unpublish, ignore ret. - dvr->on_unpublish(); - - // ignore. - ret = ERROR_SUCCESS; - } -#endif - -#ifdef SRS_AUTO_HDS - if ((ret = hds->on_audio(msg)) != ERROR_SUCCESS) { - srs_warn("hds process audio message failed, ignore and disable dvr. ret=%d", ret); - - // unpublish, ignore ret. - hds->on_unpublish(); - // ignore. - ret = ERROR_SUCCESS; - } -#endif - // copy to all consumer if (!drop_for_reduce) { for (int i = 0; i < (int)consumers.size(); i++) { @@ -1759,24 +2129,16 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) srs_info("dispatch audio success."); } - // copy to all forwarders. - if (true) { - std::vector::iterator it; - for (it = forwarders.begin(); it != forwarders.end(); ++it) { - SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_audio(msg)) != ERROR_SUCCESS) { - srs_error("forwarder process audio message failed. ret=%d", ret); - return ret; - } - } + // Copy to hub to all utilities. + if ((ret = hub->on_audio(msg)) != ERROR_SUCCESS) { + return ret; } // cache the sequence header of aac, or first packet of mp3. // for example, the mp3 is used for hls to write the "right" audio codec. // TODO: FIXME: to refine the stream info system. - if (is_aac_sequence_header || !cache_sh_audio) { - srs_freep(cache_sh_audio); - cache_sh_audio = msg->copy(); + if (is_aac_sequence_header || !meta->ash()) { + meta->update_ash(msg); } // when sequence header, donot push to gop cache and adjust the timestamp. @@ -1793,11 +2155,11 @@ int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) // if atc, update the sequence header to abs time. if (atc) { - if (cache_sh_audio) { - cache_sh_audio->timestamp = msg->timestamp; + if (meta->ash()) { + meta->ash()->timestamp = msg->timestamp; } - if (cache_metadata) { - cache_metadata->timestamp = msg->timestamp; + if (meta->data()) { + meta->data()->timestamp = msg->timestamp; } } @@ -1874,9 +2236,9 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg) // whether consumer should drop for the duplicated sequence header. bool drop_for_reduce = false; - if (is_sequence_header && cache_sh_video && _srs_config->get_reduce_sequence_header(req->vhost)) { - if (cache_sh_video->size == msg->size) { - drop_for_reduce = srs_bytes_equals(cache_sh_video->payload, msg->payload, msg->size); + if (is_sequence_header && meta->vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) { + if (meta->vsh()->size == msg->size) { + drop_for_reduce = srs_bytes_equals(meta->vsh()->payload, msg->payload, msg->size); srs_warn("drop for reduce sh video, size=%d", msg->size); } } @@ -1884,8 +2246,7 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg) // cache the sequence header if h264 // donot cache the sequence header to gop_cache, return here. if (is_sequence_header) { - srs_freep(cache_sh_video); - cache_sh_video = msg->copy(); + meta->update_vsh(msg); // parse detail audio codec SrsAvcAacCodec codec; @@ -1913,55 +2274,10 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg) codec.video_data_rate / 1000, codec.frame_rate, codec.duration); } -#ifdef SRS_AUTO_HLS - if ((ret = hls->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) { - // apply the error strategy for hls. - // @see https://github.com/ossrs/srs/issues/264 - std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost); - if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) { - srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret); - - // unpublish, ignore ret. - hls->on_unpublish(); - - // ignore. - ret = ERROR_SUCCESS; - } else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) { - if (srs_hls_can_continue(ret, cache_sh_video, msg)) { - ret = ERROR_SUCCESS; - } else { - srs_warn("hls continue video failed. ret=%d", ret); - return ret; - } - } else { - srs_warn("hls disconnect publisher for video error. ret=%d", ret); - return ret; - } + // Copy to hub to all utilities. + if ((ret = hub->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) { + return ret; } -#endif - -#ifdef SRS_AUTO_DVR - if ((ret = dvr->on_video(msg)) != ERROR_SUCCESS) { - srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret); - - // unpublish, ignore ret. - dvr->on_unpublish(); - - // ignore. - ret = ERROR_SUCCESS; - } -#endif - -#ifdef SRS_AUTO_HDS - if ((ret = hds->on_video(msg)) != ERROR_SUCCESS) { - srs_warn("hds process video message failed, ignore and disable dvr. ret=%d", ret); - - // unpublish, ignore ret. - hds->on_unpublish(); - // ignore. - ret = ERROR_SUCCESS; - } -#endif // copy to all consumer if (!drop_for_reduce) { @@ -1974,18 +2290,6 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg) } srs_info("dispatch video success."); } - - // copy to all forwarders. - if (!forwarders.empty()) { - std::vector::iterator it; - for (it = forwarders.begin(); it != forwarders.end(); ++it) { - SrsForwarder* forwarder = *it; - if ((ret = forwarder->on_video(msg)) != ERROR_SUCCESS) { - srs_error("forwarder process video message failed. ret=%d", ret); - return ret; - } - } - } // when sequence header, donot push to gop cache and adjust the timestamp. if (is_sequence_header) { @@ -2001,11 +2305,11 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg) // if atc, update the sequence header to abs time. if (atc) { - if (cache_sh_video) { - cache_sh_video->timestamp = msg->timestamp; + if (meta->vsh()) { + meta->vsh()->timestamp = msg->timestamp; } - if (cache_metadata) { - cache_metadata->timestamp = msg->timestamp; + if (meta->data()) { + meta->data()->timestamp = msg->timestamp; } } @@ -2142,45 +2446,8 @@ int SrsSource::on_publish() is_monotonically_increase = true; last_packet_time = 0; - // create forwarders - if ((ret = create_forwarders()) != ERROR_SUCCESS) { - srs_error("create forwarders failed. ret=%d", ret); - return ret; - } - - // TODO: FIXME: use initialize to set req. -#ifdef SRS_AUTO_TRANSCODE - if ((ret = encoder->on_publish(req)) != ERROR_SUCCESS) { - srs_error("start encoder failed. ret=%d", ret); - return ret; - } -#endif - -#ifdef SRS_AUTO_HLS - if ((ret = hls->on_publish(false)) != ERROR_SUCCESS) { - srs_error("start hls failed. ret=%d", ret); - return ret; - } -#endif - -#ifdef SRS_AUTO_DVR - if ((ret = dvr->on_publish(false)) != ERROR_SUCCESS) { - srs_error("start dvr failed. ret=%d", ret); - return ret; - } -#endif - - // TODO: FIXME: use initialize to set req. -#ifdef SRS_AUTO_HDS - if ((ret = hds->on_publish(req)) != ERROR_SUCCESS) { - srs_error("start hds failed. ret=%d", ret); - return ret; - } -#endif - - // TODO: FIXME: use initialize to set req. - if ((ret = ng_exec->on_publish(req)) != ERROR_SUCCESS) { - srs_error("start exec failed. ret=%d", ret); + // Notify the hub about the publish event. + if ((ret = hub->on_publish()) != ERROR_SUCCESS) { return ret; } @@ -2203,26 +2470,8 @@ void SrsSource::on_unpublish() return; } - // destroy all forwarders - destroy_forwarders(); - -#ifdef SRS_AUTO_TRANSCODE - encoder->on_unpublish(); -#endif - -#ifdef SRS_AUTO_HLS - hls->on_unpublish(); -#endif - -#ifdef SRS_AUTO_DVR - dvr->on_unpublish(); -#endif - -#ifdef SRS_AUTO_HDS - hds->on_unpublish(); -#endif - - ng_exec->on_unpublish(); + // Notify the hub about the unpublish event. + hub->on_unpublish(); // only clear the gop cache, // donot clear the sequence header, for it maybe not changed, @@ -2259,38 +2508,21 @@ int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool // if atc, update the sequence header to gop cache time. if (atc && !gop_cache->empty()) { - if (cache_metadata) { - cache_metadata->timestamp = gop_cache->start_time(); + if (meta->data()) { + meta->data()->timestamp = gop_cache->start_time(); } - if (cache_sh_video) { - cache_sh_video->timestamp = gop_cache->start_time(); + if (meta->vsh()) { + meta->vsh()->timestamp = gop_cache->start_time(); } - if (cache_sh_audio) { - cache_sh_audio->timestamp = gop_cache->start_time(); + if (meta->ash()) { + meta->ash()->timestamp = gop_cache->start_time(); } } - // copy metadata. - if (dm && cache_metadata && (ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) != ERROR_SUCCESS) { - srs_error("dispatch metadata failed. ret=%d", ret); + // Copy metadata and sequence header to consumer. + if ((ret = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != ERROR_SUCCESS) { return ret; } - srs_info("dispatch metadata success"); - - // copy sequence header - // copy audio sequence first, for hls to fast parse the "right" audio codec. - // @see https://github.com/ossrs/srs/issues/301 - if (ds && cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, jitter_algorithm)) != ERROR_SUCCESS) { - srs_error("dispatch audio sequence header failed. ret=%d", ret); - return ret; - } - srs_info("dispatch audio sequence header success"); - - if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, jitter_algorithm)) != ERROR_SUCCESS) { - srs_error("dispatch video sequence header failed. ret=%d", ret); - return ret; - } - srs_info("dispatch video sequence header success"); // copy gop cache to client. if (dg && (ret = gop_cache->dump(consumer, atc, jitter_algorithm)) != ERROR_SUCCESS) { @@ -2356,52 +2588,6 @@ void SrsSource::on_edge_proxy_unpublish() publish_edge->on_proxy_unpublish(); } -int SrsSource::create_forwarders() -{ - int ret = ERROR_SUCCESS; - - if (!_srs_config->get_forward_enabled(req->vhost)) { - return ret; - } - - SrsConfDirective* conf = _srs_config->get_forwards(req->vhost); - for (int i = 0; conf && i < (int)conf->args.size(); i++) { - std::string forward_server = conf->args.at(i); - - SrsForwarder* forwarder = new SrsForwarder(this); - forwarders.push_back(forwarder); - - // initialize the forwarder with request. - if ((ret = forwarder->initialize(req, forward_server)) != ERROR_SUCCESS) { - return ret; - } - - double queue_size = _srs_config->get_queue_length(req->vhost); - forwarder->set_queue_size(queue_size); - - if ((ret = forwarder->on_publish()) != ERROR_SUCCESS) { - srs_error("start forwarder failed. " - "vhost=%s, app=%s, stream=%s, forward-to=%s", - req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), - forward_server.c_str()); - return ret; - } - } - - return ret; -} - -void SrsSource::destroy_forwarders() -{ - std::vector::iterator it; - for (it = forwarders.begin(); it != forwarders.end(); ++it) { - SrsForwarder* forwarder = *it; - forwarder->on_unpublish(); - srs_freep(forwarder); - } - forwarders.clear(); -} - string SrsSource::get_curr_origin() { return play_edge->get_curr_origin(); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 153cd1e3d..bf9af897d 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -53,6 +53,7 @@ class SrsEdgeProxyContext; class SrsMessageArray; class SrsNgExec; class SrsConnection; +class SrsMessageHeader; #ifdef SRS_AUTO_HLS class SrsHls; #endif @@ -410,11 +411,129 @@ public: virtual SrsSharedPtrMessage* pop(); }; +/** + * The hub for origin is a collection of utilities for origin only, + * for example, DVR, HLS, Forward and Transcode are only available for origin, + * they are meanless for edge server. + */ +class SrsOriginHub : public ISrsReloadHandler +{ +private: + SrsSource* source; + SrsRequest* req; +private: + // hls handler. +#ifdef SRS_AUTO_HLS + SrsHls* hls; +#endif + // dvr handler. +#ifdef SRS_AUTO_DVR + SrsDvr* dvr; +#endif + // transcoding handler. +#ifdef SRS_AUTO_TRANSCODE + SrsEncoder* encoder; +#endif +#ifdef SRS_AUTO_HDS + // adobe hds(http dynamic streaming). + SrsHds *hds; +#endif + // nginx-rtmp exec feature. + SrsNgExec* ng_exec; + // to forward stream to other servers + std::vector forwarders; +public: + SrsOriginHub(SrsSource* s); + virtual ~SrsOriginHub(); +public: + // Initialize the hub with source and request. + // @param r The request object, managed by source. + virtual int initialize(SrsRequest* r); + // Dispose the hub, release utilities resource, + // for example, delete all HLS pieces. + virtual void dispose(); + // Cycle the hub, process some regular events, + // for example, dispose hls in cycle. + virtual int cycle(); +public: + // When got a original metadata. + virtual int on_original_metadata(SrsOnMetaDataPacket* metadata); + // When got a parsed metadata. + virtual int on_meta_data(SrsSharedPtrMessage* shared_metadata); + // When got a parsed audio packet. + virtual int on_audio(SrsSharedPtrMessage* shared_audio); + // When got a parsed video packet. + virtual int on_video(SrsSharedPtrMessage* shared_video, bool is_sequence_header); +public: + // When start publish stream. + virtual int on_publish(); + // When stop publish stream. + virtual void on_unpublish(); +// for the tools callback +public: + // for the SrsForwarder to callback to request the sequence headers. + virtual int on_forwarder_start(SrsForwarder* forwarder); + // for the SrsHls to callback to request the sequence headers. + virtual int on_hls_start(); + // for the SrsDvr to callback to request the sequence headers. + virtual int on_dvr_request_sh(); +// interface ISrsReloadHandler +public: + virtual int on_reload_vhost_forward(std::string vhost); + virtual int on_reload_vhost_hls(std::string vhost); + virtual int on_reload_vhost_hds(std::string vhost); + virtual int on_reload_vhost_dvr(std::string vhost); + virtual int on_reload_vhost_transcode(std::string vhost); + virtual int on_reload_vhost_exec(std::string vhost); +private: + virtual int create_forwarders(); + virtual void destroy_forwarders(); +}; + +/** + * Each stream have optional meta(sps/pps in sequence header and metadata). + * This class cache and update the meta. + */ +class SrsMetaCache +{ +private: + SrsSharedPtrMessage* cache_metadata; + // the cached video sequence header. + SrsSharedPtrMessage* cache_sh_video; + // the cached audio sequence header. + SrsSharedPtrMessage* cache_sh_audio; +public: + SrsMetaCache(); + virtual ~SrsMetaCache(); +public: + // Dispose the metadata cache. + virtual void dispose(); +public: + // Get the cached metadata. + virtual SrsSharedPtrMessage* data(); + // Get the cached vsh(video sequence header). + virtual SrsSharedPtrMessage* vsh(); + // Get the cached ash(audio sequence header). + virtual SrsSharedPtrMessage* ash(); + // Dumps cached metadata to consumer. + // @param dm Whether dumps the metadata. + // @param ds Whether dumps the sequence header. + virtual int dumps(SrsConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds); +public: + // Update the cached metadata by packet. + virtual int update_data(SrsMessageHeader* header, SrsOnMetaDataPacket* metadata, bool& updated); + // Update the cached audio sequence header. + virtual void update_ash(SrsSharedPtrMessage* msg); + // Update the cached video sequence header. + virtual void update_vsh(SrsSharedPtrMessage* msg); +}; + /** * live streaming source. */ class SrsSource : public ISrsReloadHandler { + friend class SrsOriginHub; private: static std::map pool; public: @@ -475,35 +594,19 @@ private: bool is_monotonically_increase; // the time of the packet we just got. int64_t last_packet_time; - // hls handler. -#ifdef SRS_AUTO_HLS - SrsHls* hls; -#endif - // dvr handler. -#ifdef SRS_AUTO_DVR - SrsDvr* dvr; -#endif - // transcoding handler. -#ifdef SRS_AUTO_TRANSCODE - SrsEncoder* encoder; -#endif -#ifdef SRS_AUTO_HDS - // adobe hds(http dynamic streaming). - SrsHds *hds; -#endif - // nginx-rtmp exec feature. - SrsNgExec* ng_exec; + // for aggregate message + SrsBuffer* aggregate_stream; + // the event handler. + ISrsSourceHandler* handler; // edge control service SrsPlayEdge* play_edge; SrsPublishEdge* publish_edge; // gop cache for client fast startup. SrsGopCache* gop_cache; - // to forward stream to other servers - std::vector forwarders; - // for aggregate message - SrsBuffer* aggregate_stream; - // the event handler. - ISrsSourceHandler* handler; + // The hub for origin server. + SrsOriginHub* hub; + // The metadata cache. + SrsMetaCache* meta; private: /** * can publish, true when is not streaming @@ -512,12 +615,6 @@ private: // last die time, when all consumers quit and no publisher, // we will remove the source when source die. int64_t die_at; -private: - SrsSharedPtrMessage* cache_metadata; - // the cached video sequence header. - SrsSharedPtrMessage* cache_sh_video; - // the cached audio sequence header. - SrsSharedPtrMessage* cache_sh_audio; public: SrsSource(); virtual ~SrsSource(); @@ -535,20 +632,8 @@ public: // interface ISrsReloadHandler public: virtual int on_reload_vhost_play(std::string vhost); - virtual int on_reload_vhost_forward(std::string vhost); - virtual int on_reload_vhost_hls(std::string vhost); - virtual int on_reload_vhost_hds(std::string vhost); - virtual int on_reload_vhost_dvr(std::string vhost); - virtual int on_reload_vhost_transcode(std::string vhost); - virtual int on_reload_vhost_exec(std::string vhost); // for the tools callback public: - // for the SrsForwarder to callback to request the sequence headers. - virtual int on_forwarder_start(SrsForwarder* forwarder); - // for the SrsHls to callback to request the sequence headers. - virtual int on_hls_start(); - // for the SrsDvr to callback to request the sequence headers. - virtual int on_dvr_request_sh(); // source id changed. virtual int on_source_id_changed(int id); // get current source id. @@ -599,9 +684,6 @@ public: virtual int on_edge_proxy_publish(SrsCommonMessage* msg); // for edge, proxy stop publish virtual void on_edge_proxy_unpublish(); -private: - virtual int create_forwarders(); - virtual void destroy_forwarders(); public: virtual std::string get_curr_origin(); }; diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 968516b65..efbc09fc3 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 3 #define VERSION_MINOR 0 -#define VERSION_REVISION 15 +#define VERSION_REVISION 16 // generated by configure, only macros. #include