diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp index 49d92943b..7dfbed633 100644 --- a/trunk/src/app/srs_app_async_call.cpp +++ b/trunk/src/app/srs_app_async_call.cpp @@ -56,14 +56,14 @@ SrsAsyncCallWorker::~SrsAsyncCallWorker() srs_cond_destroy(wait); } -int SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t) +srs_error_t SrsAsyncCallWorker::execute(ISrsAsyncCallTask* t) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; tasks.push_back(t); srs_cond_signal(wait); - return ret; + return err; } int SrsAsyncCallWorker::count() diff --git a/trunk/src/app/srs_app_async_call.hpp b/trunk/src/app/srs_app_async_call.hpp index ce433a8ea..dd95b172f 100644 --- a/trunk/src/app/srs_app_async_call.hpp +++ b/trunk/src/app/srs_app_async_call.hpp @@ -74,7 +74,7 @@ public: SrsAsyncCallWorker(); virtual ~SrsAsyncCallWorker(); public: - virtual int execute(ISrsAsyncCallTask* t); + virtual srs_error_t execute(ISrsAsyncCallTask* t); virtual int count(); public: virtual srs_error_t start(); diff --git a/trunk/src/app/srs_app_dash.cpp b/trunk/src/app/srs_app_dash.cpp index 9376032bc..6eb56e41a 100644 --- a/trunk/src/app/srs_app_dash.cpp +++ b/trunk/src/app/srs_app_dash.cpp @@ -85,6 +85,7 @@ SrsFragmentedMp4::~SrsFragmentedMp4() int SrsFragmentedMp4::initialize(SrsRequest* r, bool video, SrsMpdWriter* mpd, uint32_t tid) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; string file_home; string file_name; @@ -97,7 +98,10 @@ int SrsFragmentedMp4::initialize(SrsRequest* r, bool video, SrsMpdWriter* mpd, u string home = _srs_config->get_dash_path(r->vhost); set_path(home + "/" + file_home + "/" + file_name); - if ((ret = create_dir()) != ERROR_SUCCESS) { + if ((err = create_dir()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); return ret; } @@ -146,6 +150,7 @@ int SrsFragmentedMp4::write(SrsSharedPtrMessage* shared_msg, SrsFormat* format) int SrsFragmentedMp4::reap(uint64_t& dts) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if ((ret = enc->flush(dts)) != ERROR_SUCCESS) { srs_error("DASH: Flush encoder failed, ret=%d", ret); @@ -154,7 +159,10 @@ int SrsFragmentedMp4::reap(uint64_t& dts) srs_freep(fw); - if ((ret = rename()) != ERROR_SUCCESS) { + if ((err = rename()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); return ret; } @@ -191,6 +199,7 @@ srs_error_t SrsMpdWriter::initialize(SrsRequest* r) int SrsMpdWriter::write(SrsFormat* format) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // MPD is not expired? if (last_update_mpd != -1 && srs_get_system_time_ms() - last_update_mpd < update_period) { @@ -204,7 +213,10 @@ int SrsMpdWriter::write(SrsFormat* format) fragment_home = srs_path_dirname(mpd_path) + "/" + req->stream; - if ((ret = srs_create_dir_recursively(full_home)) != ERROR_SUCCESS) { + if ((err = srs_create_dir_recursively(full_home)) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); srs_error("DASH: Create MPD home failed, home=%s, ret=%d", full_home.c_str(), ret); return ret; } @@ -427,6 +439,7 @@ int SrsDashController::refresh_mpd(SrsFormat* format) int SrsDashController::refresh_init_mp4(SrsSharedPtrMessage* msg, SrsFormat* format) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (msg->size <= 0 || (msg->is_video() && !format->vcodec->is_avc_codec_ok()) || (msg->is_audio() && !format->acodec->is_aac_codec_ok())) { @@ -435,7 +448,10 @@ int SrsDashController::refresh_init_mp4(SrsSharedPtrMessage* msg, SrsFormat* for } string full_home = home + "/" + req->app + "/" + req->stream; - if ((ret = srs_create_dir_recursively(full_home)) != ERROR_SUCCESS) { + if ((err = srs_create_dir_recursively(full_home)) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); srs_error("DASH: Create media home failed, home=%s, ret=%d", full_home.c_str(), ret); return ret; } @@ -457,7 +473,10 @@ int SrsDashController::refresh_init_mp4(SrsSharedPtrMessage* msg, SrsFormat* for return ret; } - if ((ret = init_mp4->rename()) != ERROR_SUCCESS) { + if ((err = init_mp4->rename()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); return ret; } diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index f2c0f03d4..86211008e 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -84,6 +84,7 @@ SrsFragment* SrsDvrSegmenter::current() int SrsDvrSegmenter::open() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // ignore when already open. if (fs->is_open()) { @@ -99,7 +100,10 @@ int SrsDvrSegmenter::open() fragment->set_path(path); // create dir first. - if ((ret = fragment->create_dir()) != ERROR_SUCCESS) { + if ((err = fragment->create_dir()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); return ret; } @@ -177,6 +181,7 @@ int SrsDvrSegmenter::write_video(SrsSharedPtrMessage* shared_video, SrsFormat* f int SrsDvrSegmenter::close() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // ignore when already closed. if (!fs->is_open()) { @@ -191,13 +196,19 @@ int SrsDvrSegmenter::close() fs->close(); // when tmp flv file exists, reap it. - if ((ret = fragment->rename()) != ERROR_SUCCESS) { + if ((err = fragment->rename()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); return ret; } // TODO: FIXME: the http callback is async, which will trigger thread switch, // so the on_video maybe invoked during the http callback, and error. - if ((ret = plan->on_reap_segment()) != ERROR_SUCCESS) { + if ((err = plan->on_reap_segment()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); srs_error("dvr: notify plan to reap segment failed. ret=%d", ret); return ret; } @@ -669,20 +680,20 @@ int SrsDvrPlan::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format) return ret; } -int SrsDvrPlan::on_reap_segment() +srs_error_t SrsDvrPlan::on_reap_segment() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; int cid = _srs_context->get_id(); SrsFragment* fragment = segment->current(); string fullpath = fragment->fullpath(); - if ((ret = async->execute(new SrsDvrAsyncCallOnDvr(cid, req, fullpath))) != ERROR_SUCCESS) { - return ret; + if ((err = async->execute(new SrsDvrAsyncCallOnDvr(cid, req, fullpath))) != srs_success) { + return srs_error_wrap(err, "reap segment"); } - return ret; + return err; } srs_error_t SrsDvrPlan::create_plan(string vhost, SrsDvrPlan** pplan) diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index efae24802..953a2e23a 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -203,7 +203,7 @@ public: // Internal interface for segmenter. public: // When segmenter close a segment. - virtual int on_reap_segment(); + virtual srs_error_t on_reap_segment(); public: static srs_error_t create_plan(std::string vhost, SrsDvrPlan** pplan); }; diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index f418e8b34..5c32de7e5 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -158,12 +158,16 @@ void SrsForwarder::on_unpublish() int SrsForwarder::on_meta_data(SrsSharedPtrMessage* shared_metadata) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsSharedPtrMessage* metadata = shared_metadata->copy(); // TODO: FIXME: config the jitter of Forwarder. - if ((ret = jitter->correct(metadata, SrsRtmpJitterAlgorithmOFF)) != ERROR_SUCCESS) { + if ((err = jitter->correct(metadata, SrsRtmpJitterAlgorithmOFF)) != srs_success) { srs_freep(metadata); + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); return ret; } @@ -177,12 +181,16 @@ int SrsForwarder::on_meta_data(SrsSharedPtrMessage* shared_metadata) int SrsForwarder::on_audio(SrsSharedPtrMessage* shared_audio) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsSharedPtrMessage* msg = shared_audio->copy(); // TODO: FIXME: config the jitter of Forwarder. - if ((ret = jitter->correct(msg, SrsRtmpJitterAlgorithmOFF)) != ERROR_SUCCESS) { + if ((err = jitter->correct(msg, SrsRtmpJitterAlgorithmOFF)) != srs_success) { srs_freep(msg); + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); return ret; } @@ -201,12 +209,16 @@ int SrsForwarder::on_audio(SrsSharedPtrMessage* shared_audio) int SrsForwarder::on_video(SrsSharedPtrMessage* shared_video) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsSharedPtrMessage* msg = shared_video->copy(); // TODO: FIXME: config the jitter of Forwarder. - if ((ret = jitter->correct(msg, SrsRtmpJitterAlgorithmOFF)) != ERROR_SUCCESS) { + if ((err = jitter->correct(msg, SrsRtmpJitterAlgorithmOFF)) != srs_success) { srs_freep(msg); + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); return ret; } diff --git a/trunk/src/app/srs_app_fragment.cpp b/trunk/src/app/srs_app_fragment.cpp index 0da052e8e..d933104d8 100644 --- a/trunk/src/app/srs_app_fragment.cpp +++ b/trunk/src/app/srs_app_fragment.cpp @@ -77,33 +77,30 @@ void SrsFragment::set_path(string v) filepath = v; } -int SrsFragment::unlink_file() +srs_error_t SrsFragment::unlink_file() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (::unlink(filepath.c_str()) < 0) { - ret = ERROR_SYSTEM_FRAGMENT_UNLINK; - srs_error("Unlink fragment failed, file=%s, ret=%d.", filepath.c_str(), ret); - return ret; + return srs_error_new(ERROR_SYSTEM_FRAGMENT_UNLINK, "unlink %s", filepath.c_str()); } - return ret; + return err; } -int SrsFragment::create_dir() +srs_error_t SrsFragment::create_dir() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; std::string segment_dir = srs_path_dirname(filepath); - if ((ret = srs_create_dir_recursively(segment_dir)) != ERROR_SUCCESS) { - srs_error("Create dir %s failed. ret=%d", segment_dir.c_str(), ret); - return ret; + if ((err = srs_create_dir_recursively(segment_dir)) != srs_success) { + return srs_error_wrap(err, "create %s", segment_dir.c_str()); } srs_info("Create dir %s ok", segment_dir.c_str()); - return ret; + return err; } string SrsFragment::tmppath() @@ -111,34 +108,30 @@ string SrsFragment::tmppath() return filepath + ".tmp"; } -int SrsFragment::unlink_tmpfile() +srs_error_t SrsFragment::unlink_tmpfile() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; string filepath = tmppath(); if (::unlink(filepath.c_str()) < 0) { - ret = ERROR_SYSTEM_FRAGMENT_UNLINK; - srs_error("Unlink temporary fragment failed, file=%s, ret=%d.", filepath.c_str(), ret); - return ret; + return srs_error_new(ERROR_SYSTEM_FRAGMENT_UNLINK, "unlink tmp file %s", filepath.c_str()); } - return ret; + return err; } -int SrsFragment::rename() +srs_error_t SrsFragment::rename() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; string full_path = fullpath(); string tmp_file = tmppath(); if (::rename(tmp_file.c_str(), full_path.c_str()) < 0) { - ret = ERROR_SYSTEM_FRAGMENT_RENAME; - srs_error("rename ts file failed, %s => %s. ret=%d", tmp_file.c_str(), full_path.c_str(), ret); - return ret; + return srs_error_new(ERROR_SYSTEM_FRAGMENT_RENAME, "rename %s to %s", tmp_file.c_str(), full_path.c_str()); } - return ret; + return err; } SrsFragmentWindow::SrsFragmentWindow() @@ -164,14 +157,15 @@ SrsFragmentWindow::~SrsFragmentWindow() void SrsFragmentWindow::dispose() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; std::vector::iterator it; for (it = fragments.begin(); it != fragments.end(); ++it) { SrsFragment* fragment = *it; - if ((ret = fragment->unlink_file()) != ERROR_SUCCESS) { - srs_warn("Unlink ts failed, file=%s, ret=%d", fragment->fullpath().c_str(), ret); + if ((err = fragment->unlink_file()) != srs_success) { + srs_warn("Unlink ts failed %s", srs_error_desc(err).c_str()); + srs_freep(err); } srs_freep(fragment); } @@ -179,8 +173,9 @@ void SrsFragmentWindow::dispose() for (it = expired_fragments.begin(); it != expired_fragments.end(); ++it) { SrsFragment* fragment = *it; - if ((ret = fragment->unlink_file()) != ERROR_SUCCESS) { - srs_warn("Unlink ts failed, file=%s, ret=%d", fragment->fullpath().c_str(), ret); + if ((err = fragment->unlink_file()) != srs_success) { + srs_warn("Unlink ts failed %s", srs_error_desc(err).c_str()); + srs_freep(err); } srs_freep(fragment); } @@ -217,14 +212,15 @@ void SrsFragmentWindow::shrink(int64_t window) void SrsFragmentWindow::clear_expired(bool delete_files) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; std::vector::iterator it; for (it = expired_fragments.begin(); it != expired_fragments.end(); ++it) { SrsFragment* fragment = *it; - if (delete_files && (ret = fragment->unlink_file()) != ERROR_SUCCESS) { - srs_warn("Unlink ts failed, file=%s, ret=%d", fragment->fullpath().c_str(), ret); + if (delete_files && (err = fragment->unlink_file()) != srs_success) { + srs_warn("Unlink ts failed, %s", srs_error_desc(err).c_str()); + srs_freep(err); } srs_freep(fragment); } diff --git a/trunk/src/app/srs_app_fragment.hpp b/trunk/src/app/srs_app_fragment.hpp index 21d1a0d99..128c196ca 100644 --- a/trunk/src/app/srs_app_fragment.hpp +++ b/trunk/src/app/srs_app_fragment.hpp @@ -63,16 +63,16 @@ public: virtual void set_path(std::string v); // Unlink the fragment, to delete the file. // @remark Ignore any error. - virtual int unlink_file(); + virtual srs_error_t unlink_file(); // Create the dir for file recursively. - virtual int create_dir(); + virtual srs_error_t create_dir(); public: // Get the temporary path for file. virtual std::string tmppath(); // Unlink the temporary file. - virtual int unlink_tmpfile(); + virtual srs_error_t unlink_tmpfile(); // Rename the temp file to final file. - virtual int rename(); + virtual srs_error_t rename(); }; /** diff --git a/trunk/src/app/srs_app_hds.cpp b/trunk/src/app/srs_app_hds.cpp index 2cd3d72e2..24b1d71d1 100644 --- a/trunk/src/app/srs_app_hds.cpp +++ b/trunk/src/app/srs_app_hds.cpp @@ -433,6 +433,7 @@ int SrsHds::on_audio(SrsSharedPtrMessage* msg) int SrsHds::flush_mainfest() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; char buf[1024] = {0}; sprintf(buf, "\n" @@ -446,7 +447,10 @@ int SrsHds::flush_mainfest() , hds_req->stream.c_str(), hds_req->stream.c_str(), hds_req->stream.c_str()); string dir = _srs_config->get_hds_path(hds_req->vhost) + "/" + hds_req->app; - if ((ret = srs_create_dir_recursively(dir)) != ERROR_SUCCESS) { + if ((err = srs_create_dir_recursively(dir)) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); srs_error("hds create dir failed. ret=%d", ret); return ret; } diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index babf4d4eb..df17572fc 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -210,13 +210,14 @@ SrsHlsMuxer::~SrsHlsMuxer() void SrsHlsMuxer::dispose() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; segments->dispose(); if (current) { - if ((ret = current->unlink_tmpfile()) != ERROR_SUCCESS) { - srs_warn("Unlink tmp ts failed, ret=%d", ret); + if ((err = current->unlink_tmpfile()) != srs_success) { + srs_warn("Unlink tmp ts failed %s", srs_error_desc(err).c_str()); + srs_freep(err); } srs_freep(current); } @@ -264,11 +265,11 @@ srs_error_t SrsHlsMuxer::initialize() return err; } -int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix, +srs_error_t SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix, string path, string m3u8_file, string ts_file, double fragment, double window, bool ts_floor, double aof_ratio, bool cleanup, bool wait_keyframe ) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; srs_freep(req); req = r->copy(); @@ -295,22 +296,21 @@ int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix, // create m3u8 dir once. m3u8_dir = srs_path_dirname(m3u8); - if ((ret = srs_create_dir_recursively(m3u8_dir)) != ERROR_SUCCESS) { - srs_error("create app dir %s failed. ret=%d", m3u8_dir.c_str(), ret); - return ret; + if ((err = srs_create_dir_recursively(m3u8_dir)) != srs_success) { + return srs_error_wrap(err, "create dir"); } srs_info("create m3u8 dir %s ok", m3u8_dir.c_str()); - return ret; + return err; } -int SrsHlsMuxer::segment_open() +srs_error_t SrsHlsMuxer::segment_open() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (current) { srs_warn("ignore the segment open, for segment is already open."); - return ret; + return err; } // when segment open, the current segment must be NULL. @@ -421,24 +421,23 @@ int SrsHlsMuxer::segment_open() current->uri += ts_url; // create dir recursively for hls. - if ((ret = current->create_dir()) != ERROR_SUCCESS) { - return ret; + if ((err = current->create_dir()) != srs_success) { + return srs_error_wrap(err, "create dir"); } // open temp ts file. std::string tmp_file = current->tmppath(); - if ((ret = current->tscw->open(tmp_file.c_str())) != ERROR_SUCCESS) { - srs_error("open hls muxer failed. ret=%d", ret); - return ret; + if ((err = current->tscw->open(tmp_file.c_str())) != srs_success) { + return srs_error_wrap(err, "open hls muxer"); } srs_info("open HLS muxer success. path=%s, tmp=%s", current->fullpath().c_str(), tmp_file.c_str()); - return ret; + return err; } -int SrsHlsMuxer::on_sequence_header() +srs_error_t SrsHlsMuxer::on_sequence_header() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; srs_assert(current); @@ -446,7 +445,7 @@ int SrsHlsMuxer::on_sequence_header() // when close the segement, it will write a discontinuity to m3u8 file. current->set_sequence_header(true); - return ret; + return err; } bool SrsHlsMuxer::is_segment_overflow() @@ -494,45 +493,45 @@ bool SrsHlsMuxer::pure_audio() return current && current->tscw && current->tscw->video_codec() == SrsVideoCodecIdDisabled; } -int SrsHlsMuxer::flush_audio(SrsTsMessageCache* cache) +srs_error_t SrsHlsMuxer::flush_audio(SrsTsMessageCache* cache) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // if current is NULL, segment is not open, ignore the flush event. if (!current) { srs_warn("flush audio ignored, for segment is not open."); - return ret; + return err; } if (!cache->audio || cache->audio->payload->length() <= 0) { - return ret; + return err; } // update the duration of segment. current->append(cache->audio->pts / 90); - if ((ret = current->tscw->write_audio(cache->audio)) != ERROR_SUCCESS) { - return ret; + if ((err = current->tscw->write_audio(cache->audio)) != srs_success) { + return srs_error_wrap(err, "hls: write audio"); } // write success, clear and free the msg srs_freep(cache->audio); - return ret; + return err; } -int SrsHlsMuxer::flush_video(SrsTsMessageCache* cache) +srs_error_t SrsHlsMuxer::flush_video(SrsTsMessageCache* cache) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // if current is NULL, segment is not open, ignore the flush event. if (!current) { srs_warn("flush video ignored, for segment is not open."); - return ret; + return err; } if (!cache->video || cache->video->payload->length() <= 0) { - return ret; + return err; } srs_assert(current); @@ -540,23 +539,23 @@ int SrsHlsMuxer::flush_video(SrsTsMessageCache* cache) // update the duration of segment. current->append(cache->video->dts / 90); - if ((ret = current->tscw->write_video(cache->video)) != ERROR_SUCCESS) { - return ret; + if ((err = current->tscw->write_video(cache->video)) != srs_success) { + return srs_error_wrap(err, "hls: write video"); } // write success, clear and free the msg srs_freep(cache->video); - return ret; + return err; } -int SrsHlsMuxer::segment_close() +srs_error_t SrsHlsMuxer::segment_close() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!current) { srs_warn("ignore the segment close, for segment is not open."); - return ret; + return err; } // when close current segment, the current segment must not be NULL. @@ -568,17 +567,14 @@ int SrsHlsMuxer::segment_close() // make the segment more acceptable, when in [min, max_td * 2], it's ok. if (current->duration() >= SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS && (int)current->duration() <= max_td * 2 * 1000) { // use async to call the http hooks, for it will cause thread switch. - if ((ret = async->execute(new SrsDvrAsyncCallOnHls( - _srs_context->get_id(), req, - current->fullpath(), current->uri, m3u8, m3u8_url, - current->sequence_no, current->duration() / 1000.0))) != ERROR_SUCCESS) - { - return ret; + if ((err = async->execute(new SrsDvrAsyncCallOnHls(_srs_context->get_id(), req, current->fullpath(), + current->uri, m3u8, m3u8_url, current->sequence_no, current->duration() / 1000.0))) != srs_success) { + return srs_error_wrap(err, "segment close"); } // use async to call the http hooks, for it will cause thread switch. - if ((ret = async->execute(new SrsDvrAsyncCallOnHlsNotify(_srs_context->get_id(), req, current->uri))) != ERROR_SUCCESS) { - return ret; + if ((err = async->execute(new SrsDvrAsyncCallOnHlsNotify(_srs_context->get_id(), req, current->uri))) != srs_success) { + return srs_error_wrap(err, "segment close"); } srs_info("Reap ts segment, sequence_no=%d, uri=%s, duration=%" PRId64 "ms", current->sequence_no, current->uri.c_str(), current->duration()); @@ -586,8 +582,8 @@ int SrsHlsMuxer::segment_close() srs_freep(current->tscw); // rename from tmp to real path - if ((ret = current->rename()) != ERROR_SUCCESS) { - return ret; + if ((err = current->rename()) != srs_success) { + return srs_error_wrap(err, "rename"); } segments->append(current); @@ -599,8 +595,8 @@ int SrsHlsMuxer::segment_close() srs_trace("Drop ts segment, sequence_no=%d, uri=%s, duration=%" PRId64 "ms", current->sequence_no, current->uri.c_str(), current->duration()); // rename from tmp to real path - if ((ret = current->unlink_tmpfile()) != ERROR_SUCCESS) { - return ret; + if ((err = current->unlink_tmpfile()) != srs_success) { + return srs_error_wrap(err, "rename"); } srs_freep(current); } @@ -609,34 +605,32 @@ int SrsHlsMuxer::segment_close() segments->shrink(hls_window * 1000); // refresh the m3u8, donot contains the removed ts - ret = refresh_m3u8(); + err = refresh_m3u8(); // remove the ts file. segments->clear_expired(hls_cleanup); // check ret of refresh m3u8 - if (ret != ERROR_SUCCESS) { - srs_error("refresh m3u8 failed. ret=%d", ret); - return ret; + if (err != srs_success) { + return srs_error_wrap(err, "hls: refresh m3u8"); } - return ret; + return err; } -int SrsHlsMuxer::refresh_m3u8() +srs_error_t SrsHlsMuxer::refresh_m3u8() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // no segments, also no m3u8, return. if (segments->empty()) { - return ret; + return err; } std::string temp_m3u8 = m3u8 + ".temp"; - if ((ret = _refresh_m3u8(temp_m3u8)) == ERROR_SUCCESS) { + if ((err = _refresh_m3u8(temp_m3u8)) == srs_success) { if (rename(temp_m3u8.c_str(), m3u8.c_str()) < 0) { - ret = ERROR_HLS_WRITE_FAILED; - srs_error("rename m3u8 file failed. %s => %s, ret=%d", temp_m3u8.c_str(), m3u8.c_str(), ret); + err = srs_error_new(ERROR_HLS_WRITE_FAILED, "hls: rename m3u8 file failed. %s => %s", temp_m3u8.c_str(), m3u8.c_str()); } } @@ -647,22 +641,22 @@ int SrsHlsMuxer::refresh_m3u8() } } - return ret; + return err; } -int SrsHlsMuxer::_refresh_m3u8(string m3u8_file) +srs_error_t SrsHlsMuxer::_refresh_m3u8(string m3u8_file) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // no segments, return. if (segments->empty()) { - return ret; + return err; } SrsFileWriter writer; if ((ret = writer.open(m3u8_file)) != ERROR_SUCCESS) { - srs_error("open m3u8 file %s failed. ret=%d", m3u8_file.c_str(), ret); - return ret; + return srs_error_new(ret, "hls: open m3u8 file %s", m3u8_file.c_str()); } srs_info("open m3u8 file %s success.", m3u8_file.c_str()); @@ -723,12 +717,11 @@ int SrsHlsMuxer::_refresh_m3u8(string m3u8_file) // write m3u8 to writer. std::string m3u8 = ss.str(); if ((ret = writer.write((char*)m3u8.c_str(), (int)m3u8.length(), NULL)) != ERROR_SUCCESS) { - srs_error("write m3u8 failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "hls: write m3u8"); } srs_info("write m3u8 %s success.", m3u8_file.c_str()); - return ret; + return err; } SrsHlsController::SrsHlsController() @@ -777,9 +770,9 @@ int SrsHlsController::deviation() return muxer->deviation(); } -int SrsHlsController::on_publish(SrsRequest* req) +srs_error_t SrsHlsController::on_publish(SrsRequest* req) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; std::string vhost = req->vhost; std::string stream = req->stream; @@ -807,42 +800,37 @@ int SrsHlsController::on_publish(SrsRequest* req) // for the HLS donot requires the EXT-X-MEDIA-SEQUENCE be monotonically increase. // open muxer - if ((ret = muxer->update_config(req, entry_prefix, - path, m3u8_file, ts_file, hls_fragment, hls_window, ts_floor, hls_aof_ratio, - cleanup, wait_keyframe)) != ERROR_SUCCESS - ) { - srs_error("m3u8 muxer update config failed. ret=%d", ret); - return ret; + if ((err = muxer->update_config(req, entry_prefix, path, m3u8_file, ts_file, hls_fragment, + hls_window, ts_floor, hls_aof_ratio, cleanup, wait_keyframe)) != srs_success ) { + return srs_error_wrap(err, "hls: update config"); } - if ((ret = muxer->segment_open()) != ERROR_SUCCESS) { - srs_error("m3u8 muxer open segment failed. ret=%d", ret); - return ret; + if ((err = muxer->segment_open()) != srs_success) { + return srs_error_wrap(err, "hls: segment open"); } srs_trace("hls: win=%.2f, frag=%.2f, prefix=%s, path=%s, m3u8=%s, ts=%s, aof=%.2f, floor=%d, clean=%d, waitk=%d, dispose=%d", - hls_window, hls_fragment, entry_prefix.c_str(), path.c_str(), m3u8_file.c_str(), - ts_file.c_str(), hls_aof_ratio, ts_floor, cleanup, wait_keyframe, hls_dispose); + hls_window, hls_fragment, entry_prefix.c_str(), path.c_str(), m3u8_file.c_str(), + ts_file.c_str(), hls_aof_ratio, ts_floor, cleanup, wait_keyframe, hls_dispose); - return ret; + return err; } -int SrsHlsController::on_unpublish() +srs_error_t SrsHlsController::on_unpublish() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - if ((ret = muxer->flush_audio(tsmc)) != ERROR_SUCCESS) { - srs_error("m3u8 muxer flush audio failed. ret=%d", ret); - return ret; + if ((err = muxer->flush_audio(tsmc)) != srs_success) { + return srs_error_wrap(err, "hls: flush audio"); } - if ((ret = muxer->segment_close()) != ERROR_SUCCESS) { - return ret; + if ((err = muxer->segment_close()) != srs_success) { + return srs_error_wrap(err, "hls: segment close"); } - return ret; + return err; } -int SrsHlsController::on_sequence_header() +srs_error_t SrsHlsController::on_sequence_header() { // TODO: support discontinuity for the same stream // currently we reap and insert discontinity when encoder republish, @@ -853,13 +841,13 @@ int SrsHlsController::on_sequence_header() return muxer->on_sequence_header(); } -int SrsHlsController::write_audio(SrsAudioFrame* frame, int64_t pts) +srs_error_t SrsHlsController::write_audio(SrsAudioFrame* frame, int64_t pts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // write audio to cache. - if ((ret = tsmc->cache_audio(frame, pts)) != ERROR_SUCCESS) { - return ret; + if ((err = tsmc->cache_audio(frame, pts)) != srs_success) { + return srs_error_wrap(err, "hls: cache audio"); } // reap when current source is pure audio. @@ -871,9 +859,8 @@ int SrsHlsController::write_audio(SrsAudioFrame* frame, int64_t pts) // we use absolutely overflow of segment to make jwplayer/ffplay happy // @see https://github.com/ossrs/srs/issues/151#issuecomment-71155184 if (tsmc->audio && muxer->is_segment_absolutely_overflow()) { - srs_info("hls: absolute audio reap segment."); - if ((ret = reap_segment()) != ERROR_SUCCESS) { - return ret; + if ((err = reap_segment()) != srs_success) { + return srs_error_wrap(err, "hls: reap segment"); } } @@ -881,7 +868,7 @@ int SrsHlsController::write_audio(SrsAudioFrame* frame, int64_t pts) // TODO: FIXME: Check whether it's necessary. if (muxer->pure_audio() && tsmc->audio) { if (pts - tsmc->audio->start_pts < SRS_CONSTS_HLS_PURE_AUDIO_AGGREGATE) { - return ret; + return err; } } @@ -889,20 +876,20 @@ int SrsHlsController::write_audio(SrsAudioFrame* frame, int64_t pts) // it's ok for the hls overload, or maybe cause the audio corrupt, // which introduced by aggregate the audios to a big one. // @see https://github.com/ossrs/srs/issues/512 - if ((ret = muxer->flush_audio(tsmc)) != ERROR_SUCCESS) { - return ret; + if ((err = muxer->flush_audio(tsmc)) != srs_success) { + return srs_error_wrap(err, "hls: flush audio"); } - return ret; + return err; } -int SrsHlsController::write_video(SrsVideoFrame* frame, int64_t dts) +srs_error_t SrsHlsController::write_video(SrsVideoFrame* frame, int64_t dts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // write video to cache. - if ((ret = tsmc->cache_video(frame, dts)) != ERROR_SUCCESS) { - return ret; + if ((err = tsmc->cache_video(frame, dts)) != srs_success) { + return srs_error_wrap(err, "hls: cache video"); } // when segment overflow, reap if possible. @@ -912,55 +899,50 @@ int SrsHlsController::write_video(SrsVideoFrame* frame, int64_t dts) // b. always reap when not wait keyframe. if (!muxer->wait_keyframe() || frame->frame_type == SrsVideoAvcFrameTypeKeyFrame) { // reap the segment, which will also flush the video. - if ((ret = reap_segment()) != ERROR_SUCCESS) { - return ret; + if ((err = reap_segment()) != srs_success) { + return srs_error_wrap(err, "hls: reap segment"); } } } // flush video when got one - if ((ret = muxer->flush_video(tsmc)) != ERROR_SUCCESS) { - srs_error("m3u8 muxer flush video failed. ret=%d", ret); - return ret; + if ((err = muxer->flush_video(tsmc)) != srs_success) { + return srs_error_wrap(err, "hls: flush video"); } - return ret; + return err; } -int SrsHlsController::reap_segment() +srs_error_t SrsHlsController::reap_segment() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // TODO: flush audio before or after segment? // TODO: fresh segment begin with audio or video? // close current ts. - if ((ret = muxer->segment_close()) != ERROR_SUCCESS) { - srs_error("m3u8 muxer close segment failed. ret=%d", ret); - return ret; + if ((err = muxer->segment_close()) != srs_success) { + return srs_error_wrap(err, "hls: segment close"); } // open new ts. - if ((ret = muxer->segment_open()) != ERROR_SUCCESS) { - srs_error("m3u8 muxer open segment failed. ret=%d", ret); - return ret; + if ((err = muxer->segment_open()) != srs_success) { + return srs_error_wrap(err, "hls: segment open"); } // segment open, flush video first. - if ((ret = muxer->flush_video(tsmc)) != ERROR_SUCCESS) { - srs_error("m3u8 muxer flush video failed. ret=%d", ret); - return ret; + if ((err = muxer->flush_video(tsmc)) != srs_success) { + return srs_error_wrap(err, "hls: flush video"); } // segment open, flush the audio. // @see: ngx_rtmp_hls_open_fragment /* start fragment with audio to make iPhone happy */ - if ((ret = muxer->flush_audio(tsmc)) != ERROR_SUCCESS) { - srs_error("m3u8 muxer flush audio failed. ret=%d", ret); - return ret; + if ((err = muxer->flush_audio(tsmc)) != srs_success) { + return srs_error_wrap(err, "hls: flush audio"); } - return ret; + return err; } SrsHls::SrsHls() @@ -1050,24 +1032,24 @@ srs_error_t SrsHls::initialize(SrsOriginHub* h, SrsRequest* r) return err; } -int SrsHls::on_publish() +srs_error_t SrsHls::on_publish() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // update the hls time, for hls_dispose. last_update_time = srs_get_system_time_ms(); // support multiple publish. if (enabled) { - return ret; + return err; } if (!_srs_config->get_hls_enabled(req->vhost)) { - return ret; + return err; } - if ((ret = controller->on_publish(req)) != ERROR_SUCCESS) { - return ret; + if ((err = controller->on_publish(req)) != srs_success) { + return srs_error_wrap(err, "hls: on publish"); } // if enabled, open the muxer. @@ -1076,31 +1058,32 @@ int SrsHls::on_publish() // ok, the hls can be dispose, or need to be dispose. disposable = true; - return ret; + return err; } void SrsHls::on_unpublish() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // support multiple unpublish. if (!enabled) { return; } - if ((ret = controller->on_unpublish()) != ERROR_SUCCESS) { - srs_error("ignore m3u8 muxer flush/close audio failed. ret=%d", ret); + if ((err = controller->on_unpublish()) != srs_success) { + srs_warn("hls: ignore unpublish failed %s", srs_error_desc(err).c_str()); + srs_freep(err); } enabled = false; } -int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format) +srs_error_t SrsHls::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!enabled) { - return ret; + return err; } // update the hls time, for hls_dispose. @@ -1113,7 +1096,7 @@ int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format) srs_assert(format->acodec); SrsAudioCodecId acodec = format->acodec->id; if (acodec != SrsAudioCodecIdAAC && acodec != SrsAudioCodecIdMP3) { - return ret; + return err; } // ignore sequence header @@ -1123,9 +1106,8 @@ int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format) } // TODO: FIXME: config the jitter of HLS. - if ((ret = jitter->correct(audio, SrsRtmpJitterAlgorithmOFF)) != ERROR_SUCCESS) { - srs_error("rtmp jitter correct audio failed. ret=%d", ret); - return ret; + if ((err = jitter->correct(audio, SrsRtmpJitterAlgorithmOFF)) != srs_success) { + return srs_error_wrap(err, "hls: jitter"); } // Reset the aac samples counter when DTS jitter. @@ -1145,20 +1127,19 @@ int SrsHls::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format) int64_t dts = 90000 * aac_samples / srs_flv_srates[format->acodec->sound_rate]; aac_samples += nb_samples_per_frame; - if ((ret = controller->write_audio(format->audio, dts)) != ERROR_SUCCESS) { - srs_error("hls cache write audio failed. ret=%d", ret); - return ret; + if ((err = controller->write_audio(format->audio, dts)) != srs_success) { + return srs_error_wrap(err, "hls: write audio"); } - return ret; + return err; } -int SrsHls::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format) +srs_error_t SrsHls::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!enabled) { - return ret; + return err; } // update the hls time, for hls_dispose. @@ -1171,12 +1152,12 @@ int SrsHls::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format) // @see https://github.com/ossrs/srs/issues/288#issuecomment-69863909 srs_assert(format->video); if (format->video->frame_type == SrsVideoAvcFrameTypeVideoInfoFrame) { - return ret; + return err; } srs_assert(format->vcodec); if (format->vcodec->id != SrsVideoCodecIdAVC) { - return ret; + return err; } // ignore sequence header @@ -1185,21 +1166,19 @@ int SrsHls::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format) } // TODO: FIXME: config the jitter of HLS. - if ((ret = jitter->correct(video, SrsRtmpJitterAlgorithmOFF)) != ERROR_SUCCESS) { - srs_error("rtmp jitter correct video failed. ret=%d", ret); - return ret; + if ((err = jitter->correct(video, SrsRtmpJitterAlgorithmOFF)) != srs_success) { + return srs_error_wrap(err, "hls: jitter"); } int64_t dts = video->timestamp * 90; - if ((ret = controller->write_video(format->video, dts)) != ERROR_SUCCESS) { - srs_error("hls cache write video failed. ret=%d", ret); - return ret; + if ((err = controller->write_video(format->video, dts)) != srs_success) { + return srs_error_wrap(err, "hls: write video"); } // pithy print message. hls_show_mux_log(); - return ret; + return err; } void SrsHls::hls_show_mux_log() diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index ca29a544d..486401809 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -179,15 +179,15 @@ public: /** * when publish, update the config for muxer. */ - virtual int update_config(SrsRequest* r, std::string entry_prefix, + virtual srs_error_t update_config(SrsRequest* r, std::string entry_prefix, std::string path, std::string m3u8_file, std::string ts_file, double fragment, double window, bool ts_floor, double aof_ratio, bool cleanup, bool wait_keyframe); /** * open a new segment(a new ts file) */ - virtual int segment_open(); - virtual int on_sequence_header(); + virtual srs_error_t segment_open(); + virtual srs_error_t on_sequence_header(); /** * whether segment overflow, * that is whether the current segment duration>=(the segment in config) @@ -208,15 +208,15 @@ public: * whether current hls muxer is pure audio mode. */ virtual bool pure_audio(); - virtual int flush_audio(SrsTsMessageCache* cache); - virtual int flush_video(SrsTsMessageCache* cache); + virtual srs_error_t flush_audio(SrsTsMessageCache* cache); + virtual srs_error_t flush_video(SrsTsMessageCache* cache); /** * Close segment(ts). */ - virtual int segment_close(); + virtual srs_error_t segment_close(); private: - virtual int refresh_m3u8(); - virtual int _refresh_m3u8(std::string m3u8_file); + virtual srs_error_t refresh_m3u8(); + virtual srs_error_t _refresh_m3u8(std::string m3u8_file); }; /** @@ -258,23 +258,23 @@ public: /** * when publish or unpublish stream. */ - virtual int on_publish(SrsRequest* req); - virtual int on_unpublish(); + virtual srs_error_t on_publish(SrsRequest* req); + virtual srs_error_t on_unpublish(); /** * when get sequence header, * must write a #EXT-X-DISCONTINUITY to m3u8. * @see: hls-m3u8-draft-pantos-http-live-streaming-12.txt * @see: 3.4.11. EXT-X-DISCONTINUITY */ - virtual int on_sequence_header(); + virtual srs_error_t on_sequence_header(); /** * write audio to cache, if need to flush, flush to muxer. */ - virtual int write_audio(SrsAudioFrame* frame, int64_t pts); + virtual srs_error_t write_audio(SrsAudioFrame* frame, int64_t pts); /** * write video to muxer. */ - virtual int write_video(SrsVideoFrame* frame, int64_t dts); + virtual srs_error_t write_video(SrsVideoFrame* frame, int64_t dts); private: /** * reopen the muxer for a new hls segment, @@ -282,7 +282,7 @@ private: * then write the key frame to the new segment. * so, user must reap_segment then flush_video to hls muxer. */ - virtual int reap_segment(); + virtual srs_error_t reap_segment(); }; /** @@ -324,7 +324,7 @@ public: * for the muxer object not destroyed. * @param fetch_sequence_header whether fetch sequence from source. */ - virtual int on_publish(); + virtual srs_error_t on_publish(); /** * the unpublish event, only close the muxer, donot destroy the * muxer, for when we continue to publish, the m3u8 will continue. @@ -334,14 +334,14 @@ public: * mux the audio packets to ts. * @param shared_audio, directly ptr, copy it if need to save it. */ - virtual int on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format); + virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format); /** * mux the video packets to ts. * @param shared_video, directly ptr, copy it if need to save it. * @param is_sps_pps whether the video is h.264 sps/pps. */ // TODO: FIXME: Remove param is_sps_pps. - virtual int on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format); + virtual srs_error_t on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format); private: virtual void hls_show_mux_log(); }; diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index f8d2671cc..d2b8d419c 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -203,8 +203,12 @@ SrsTsStreamEncoder::~SrsTsStreamEncoder() int SrsTsStreamEncoder::initialize(SrsFileWriter* w, SrsBufferCache* /*c*/) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - if ((ret = enc->initialize(w)) != ERROR_SUCCESS) { + if ((err = enc->initialize(w)) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); return ret; } @@ -213,12 +217,20 @@ int SrsTsStreamEncoder::initialize(SrsFileWriter* w, SrsBufferCache* /*c*/) int SrsTsStreamEncoder::write_audio(int64_t timestamp, char* data, int size) { - return enc->write_audio(timestamp, data, size); + srs_error_t err = enc->write_audio(timestamp, data, size); + // TODO: FIXME: Use error + int ret = srs_error_code(err); + srs_freep(err); + return ret; } int SrsTsStreamEncoder::write_video(int64_t timestamp, char* data, int size) { - return enc->write_video(timestamp, data, size); + srs_error_t err = enc->write_video(timestamp, data, size); + // TODO: FIXME: Use error + int ret = srs_error_code(err); + srs_freep(err); + return ret; } int SrsTsStreamEncoder::write_metadata(int64_t /*timestamp*/, char* /*data*/, int /*size*/) diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 7c30c6a97..bd8d67c6b 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -460,12 +460,12 @@ int SrsKafkaProducer::send(int key, SrsJsonObject* obj) return ret; } -int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) +srs_error_t SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!enabled) { - return ret; + return err; } SrsJsonObject* obj = SrsJsonAny::object(); @@ -477,12 +477,12 @@ int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) return worker->execute(new SrsKafkaMessage(this, key, obj)); } -int SrsKafkaProducer::on_close(int key) +srs_error_t SrsKafkaProducer::on_close(int key) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!enabled) { - return ret; + return err; } SrsJsonObject* obj = SrsJsonAny::object(); diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index 466b348b1..20ad4b328 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -141,12 +141,12 @@ public: * @param type the type of client. * @param ip the peer ip of client. */ - virtual int on_client(int key, SrsListenerType type, std::string ip) = 0; + virtual srs_error_t on_client(int key, SrsListenerType type, std::string ip) = 0; /** * when client close or disconnect for error. * @param key the partition map key, the client id or hash(ip). */ - virtual int on_close(int key) = 0; + virtual srs_error_t on_close(int key) = 0; }; // @global kafka event producer. @@ -192,8 +192,8 @@ public: virtual int send(int key, SrsJsonObject* obj); // interface ISrsKafkaCluster public: - virtual int on_client(int key, SrsListenerType type, std::string ip); - virtual int on_close(int key); + virtual srs_error_t on_client(int key, SrsListenerType type, std::string ip); + virtual srs_error_t on_close(int key); // interface ISrsReusableThreadHandler public: virtual srs_error_t cycle(); diff --git a/trunk/src/app/srs_app_mpegts_udp.cpp b/trunk/src/app/srs_app_mpegts_udp.cpp index 730c28626..aa856430e 100644 --- a/trunk/src/app/srs_app_mpegts_udp.cpp +++ b/trunk/src/app/srs_app_mpegts_udp.cpp @@ -178,6 +178,7 @@ srs_error_t SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb int SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // collect nMB data to parse in a time. // TODO: FIXME: comment the following for release. @@ -237,7 +238,10 @@ int SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf) } // process each ts packet - if ((ret = context->decode(stream, this)) != ERROR_SUCCESS) { + if ((err = context->decode(stream, this)) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret); continue; } @@ -253,9 +257,10 @@ int SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf) return ret; } -int SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg) +srs_error_t SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; pprint->elapse(); @@ -311,36 +316,35 @@ int SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg) // when not audio/video, or not adts/annexb format, donot support. if (msg->stream_number() != 0) { - ret = ERROR_STREAM_CASTER_TS_ES; - srs_error("mpegts: unsupported stream format, sid=%#x(%s-%d). ret=%d", - msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number(), ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_ES, "ts: unsupported stream format, sid=%#x(%s-%d)", + msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number()); } // check supported codec if (msg->channel->stream != SrsTsStreamVideoH264 && msg->channel->stream != SrsTsStreamAudioAAC) { - ret = ERROR_STREAM_CASTER_TS_CODEC; - srs_error("mpegts: unsupported stream codec=%d. ret=%d", msg->channel->stream, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_CODEC, "ts: unsupported stream codec=%d", msg->channel->stream); } // parse the stream. SrsBuffer avs; if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) { - srs_error("mpegts: initialize av stream failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "ts: init av stream"); } // publish audio or video. if (msg->channel->stream == SrsTsStreamVideoH264) { - return on_ts_video(msg, &avs); + if ((ret = on_ts_video(msg, &avs)) != ERROR_SUCCESS) { + return srs_error_new(ret, "ts: consume video"); + } } if (msg->channel->stream == SrsTsStreamAudioAAC) { - return on_ts_audio(msg, &avs); + if ((ret = on_ts_audio(msg, &avs)) != ERROR_SUCCESS) { + return srs_error_new(ret, "ts: consume audio"); + } } // TODO: FIXME: implements it. - return ret; + return err; } int SrsMpegtsOverUdp::on_ts_video(SrsTsMessage* msg, SrsBuffer* avs) diff --git a/trunk/src/app/srs_app_mpegts_udp.hpp b/trunk/src/app/srs_app_mpegts_udp.hpp index a1c045558..b891c939c 100644 --- a/trunk/src/app/srs_app_mpegts_udp.hpp +++ b/trunk/src/app/srs_app_mpegts_udp.hpp @@ -106,7 +106,7 @@ private: virtual int on_udp_bytes(std::string host, int port, char* buf, int nb_buf); // interface ISrsTsHandler public: - virtual int on_ts_message(SrsTsMessage* msg); + virtual srs_error_t on_ts_message(SrsTsMessage* msg); private: virtual int on_ts_video(SrsTsMessage* msg, SrsBuffer* avs); virtual int write_h264_sps_pps(uint32_t dts, uint32_t pts); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index e7d37e5cc..0e8915185 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -166,8 +166,8 @@ srs_error_t SrsRtmpConn::do_cycle() // notify kafka cluster. #ifdef SRS_AUTO_KAFKA - if ((ret = _srs_kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != ERROR_SUCCESS) { - return srs_error_new(ret, "kafka on client"); + if ((err = _srs_kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != srs_success) { + return srs_error_wrap(err, "kafka on client"); } #endif @@ -1411,14 +1411,13 @@ int SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client) srs_error_t SrsRtmpConn::on_disconnect() { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; http_hooks_on_close(); #ifdef SRS_AUTO_KAFKA - if ((ret = _srs_kafka->on_close(srs_id())) != ERROR_SUCCESS) { - return srs_error_new(ret, "kafka on close"); + if ((err = _srs_kafka->on_close(srs_id())) != srs_success) { + return srs_error_wrap(err, "kafka on close"); } #endif diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index d577d535f..cadfbf667 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -85,15 +85,15 @@ SrsRtmpJitter::~SrsRtmpJitter() { } -int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag) +srs_error_t SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // for performance issue if (ag != SrsRtmpJitterAlgorithmFULL) { // all jitter correct features is disabled, ignore. if (ag == SrsRtmpJitterAlgorithmOFF) { - return ret; + return err; } // start at zero, but donot ensure monotonically increasing. @@ -103,18 +103,18 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag) last_pkt_correct_time = msg->timestamp; } msg->timestamp -= last_pkt_correct_time; - return ret; + return err; } // other algorithm, ignore. - return ret; + return err; } // full jitter algorithm, do jitter correct. // set to 0 for metadata. if (!msg->is_av()) { msg->timestamp = 0; - return ret; + return err; } /** @@ -148,7 +148,7 @@ int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag) msg->timestamp = last_pkt_correct_time; last_pkt_time = time; - return ret; + return err; } int SrsRtmpJitter::get_time() @@ -470,12 +470,16 @@ int SrsConsumer::get_time() int SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsSharedPtrMessage* msg = shared_msg->copy(); if (!atc) { - if ((ret = jitter->correct(msg, ag)) != ERROR_SUCCESS) { + if ((err = jitter->correct(msg, ag)) != srs_success) { srs_freep(msg); + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); return ret; } } @@ -963,6 +967,7 @@ int SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDa int SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsSharedPtrMessage* msg = shared_audio; @@ -993,7 +998,11 @@ int SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) srs_flv_srates[c->sound_rate]); } - if ((ret = hls->on_audio(msg, format)) != ERROR_SUCCESS) { + if ((err = hls->on_audio(msg, format)) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + // apply the error strategy for hls. // @see https://github.com/ossrs/srs/issues/264 std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost); @@ -1063,6 +1072,7 @@ int SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) int SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_sequence_header) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsSharedPtrMessage* msg = shared_video; @@ -1095,7 +1105,11 @@ int SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_sequence_h c->video_data_rate / 1000, c->frame_rate, c->duration); } - if ((ret = hls->on_video(msg, format)) != ERROR_SUCCESS) { + if ((err = hls->on_video(msg, format)) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); + // apply the error strategy for hls. // @see https://github.com/ossrs/srs/issues/264 std::string hls_error_strategy = _srs_config->get_hls_on_error(req->vhost); @@ -1165,6 +1179,7 @@ int SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_sequence_h int SrsOriginHub::on_publish() { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // create forwarders if ((ret = create_forwarders()) != ERROR_SUCCESS) { @@ -1180,7 +1195,10 @@ int SrsOriginHub::on_publish() } #endif - if ((ret = hls->on_publish()) != ERROR_SUCCESS) { + if ((err = hls->on_publish()) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); srs_error("start hls failed. ret=%d", ret); return ret; } @@ -1384,8 +1402,8 @@ srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost) return err; } - if ((ret = hls->on_publish()) != ERROR_SUCCESS) { - return srs_error_new(ret, "hls publish failed"); + if ((err = hls->on_publish()) != srs_success) { + return srs_error_wrap(err, "hls publish failed"); } srs_trace("vhost %s hls reload success", vhost.c_str()); @@ -1399,8 +1417,8 @@ srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost) if ((ret = format->on_video(cache_sh_video)) != ERROR_SUCCESS) { return srs_error_new(ret, "format on_video"); } - if ((ret = hls->on_video(cache_sh_video, format)) != ERROR_SUCCESS) { - return srs_error_new(ret, "hls on_video"); + if ((err = hls->on_video(cache_sh_video, format)) != srs_success) { + return srs_error_wrap(err, "hls on_video"); } } @@ -1409,8 +1427,8 @@ srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost) if ((ret = format->on_audio(cache_sh_audio)) != ERROR_SUCCESS) { return srs_error_new(ret, "format on_audio"); } - if ((ret = hls->on_audio(cache_sh_audio, format)) != ERROR_SUCCESS) { - return srs_error_new(ret, "hls on_audio"); + if ((err = hls->on_audio(cache_sh_audio, format)) != srs_success) { + return srs_error_wrap(err, "hls on_audio"); } } diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index b1f3a9351..63110f789 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -94,7 +94,7 @@ public: * detect the time jitter and correct it. * @param ag the algorithm to use for time jitter. */ - virtual int correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag); + virtual srs_error_t correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag); /** * get current client time, the last packet time. */ diff --git a/trunk/src/kernel/srs_kernel_ts.cpp b/trunk/src/kernel/srs_kernel_ts.cpp index 2cc4dffc5..6c76569d1 100644 --- a/trunk/src/kernel/srs_kernel_ts.cpp +++ b/trunk/src/kernel/srs_kernel_ts.cpp @@ -109,12 +109,12 @@ SrsTsMessage::~SrsTsMessage() srs_freep(payload); } -int SrsTsMessage::dump(SrsBuffer* stream, int* pnb_bytes) +srs_error_t SrsTsMessage::dump(SrsBuffer* stream, int* pnb_bytes) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (stream->empty()) { - return ret; + return err; } // xB @@ -125,9 +125,7 @@ int SrsTsMessage::dump(SrsBuffer* stream, int* pnb_bytes) if (nb_bytes > 0) { if (!stream->require(nb_bytes)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: dump PSE bytes failed, requires=%dB. ret=%d", nb_bytes, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: dump PSE bytes failed, requires=%dB", nb_bytes); } payload->append(stream->data() + stream->pos(), nb_bytes); @@ -136,7 +134,7 @@ int SrsTsMessage::dump(SrsBuffer* stream, int* pnb_bytes) *pnb_bytes = nb_bytes; - return ret; + return err; } bool SrsTsMessage::completed(int8_t payload_unit_start_indicator) @@ -266,9 +264,9 @@ void SrsTsContext::set(int pid, SrsTsPidApply apply_pid, SrsTsStream stream) channel->stream = stream; } -int SrsTsContext::decode(SrsBuffer* stream, ISrsTsHandler* handler) +srs_error_t SrsTsContext::decode(SrsBuffer* stream, ISrsTsHandler* handler) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // parse util EOF of stream. // for example, parse multiple times for the PES_packet_length(0) packet. @@ -277,9 +275,8 @@ int SrsTsContext::decode(SrsBuffer* stream, ISrsTsHandler* handler) SrsAutoFree(SrsTsPacket, packet); SrsTsMessage* msg = NULL; - if ((ret = packet->decode(stream, &msg)) != ERROR_SUCCESS) { - srs_error("mpegts: decode ts packet failed. ret=%d", ret); - return ret; + if ((err = packet->decode(stream, &msg)) != srs_success) { + return srs_error_wrap(err, "ts: ts packet decode"); } if (!msg) { @@ -287,18 +284,17 @@ int SrsTsContext::decode(SrsBuffer* stream, ISrsTsHandler* handler) } SrsAutoFree(SrsTsMessage, msg); - if ((ret = handler->on_ts_message(msg)) != ERROR_SUCCESS) { - srs_error("mpegts: handler ts message failed. ret=%d", ret); - return ret; + if ((err = handler->on_ts_message(msg)) != srs_success) { + return srs_error_wrap(err, "ts: handle ts message"); } } - return ret; + return err; } -int SrsTsContext::encode(SrsFileWriter* writer, SrsTsMessage* msg, SrsVideoCodecId vc, SrsAudioCodecId ac) +srs_error_t SrsTsContext::encode(SrsFileWriter* writer, SrsTsMessage* msg, SrsVideoCodecId vc, SrsAudioCodecId ac) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsTsStream vs, as; int16_t video_pid = 0, audio_pid = 0; @@ -351,17 +347,15 @@ int SrsTsContext::encode(SrsFileWriter* writer, SrsTsMessage* msg, SrsVideoCodec } if (as == SrsTsStreamReserved && vs == SrsTsStreamReserved) { - ret = ERROR_HLS_NO_STREAM; - srs_error("hls: no video or audio stream, vcodec=%d, acodec=%d. ret=%d", vc, ac, ret); - return ret; + return srs_error_new(ERROR_HLS_NO_STREAM, "ts: no a/v stream, vcodec=%d, acodec=%d", vc, ac); } // when any codec changed, write PAT/PMT table. if (vcodec != vc || acodec != ac) { vcodec = vc; acodec = ac; - if ((ret = encode_pat_pmt(writer, video_pid, vs, audio_pid, as)) != ERROR_SUCCESS) { - return ret; + if ((err = encode_pat_pmt(writer, video_pid, vs, audio_pid, as)) != srs_success) { + return srs_error_wrap(err, "ts: encode PAT/PMT"); } } @@ -378,14 +372,13 @@ void SrsTsContext::set_sync_byte(int8_t sb) sync_byte = sb; } -int SrsTsContext::encode_pat_pmt(SrsFileWriter* writer, int16_t vpid, SrsTsStream vs, int16_t apid, SrsTsStream as) +srs_error_t SrsTsContext::encode_pat_pmt(SrsFileWriter* writer, int16_t vpid, SrsTsStream vs, int16_t apid, SrsTsStream as) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (vs != SrsTsStreamVideoH264 && as != SrsTsStreamAudioAAC && as != SrsTsStreamAudioMp3) { - ret = ERROR_HLS_NO_STREAM; - srs_error("hls: no pmt pcr pid, vs=%d, as=%d. ret=%d", vs, as, ret); - return ret; + return srs_error_new(ERROR_HLS_NO_STREAM, "ts: no PID, vs=%d, as=%d", vs, as); } int16_t pmt_number = TS_PMT_NUMBER; @@ -406,15 +399,13 @@ int SrsTsContext::encode_pat_pmt(SrsFileWriter* writer, int16_t vpid, SrsTsStrea SrsBuffer stream; if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "ts: init stream"); } - if ((ret = pkt->encode(&stream)) != ERROR_SUCCESS) { - srs_error("ts encode ts packet failed. ret=%d", ret); - return ret; + if ((err = pkt->encode(&stream)) != srs_success) { + return srs_error_wrap(err, "ts: encode packet"); } if ((ret = writer->write(buf, SRS_TS_PACKET_SIZE, NULL)) != ERROR_SUCCESS) { - srs_error("ts write ts packet failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "ts: write packet"); } } if (true) { @@ -433,42 +424,39 @@ int SrsTsContext::encode_pat_pmt(SrsFileWriter* writer, int16_t vpid, SrsTsStrea SrsBuffer stream; if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "ts: init stream"); } - if ((ret = pkt->encode(&stream)) != ERROR_SUCCESS) { - srs_error("ts encode ts packet failed. ret=%d", ret); - return ret; + if ((err = pkt->encode(&stream)) != srs_success) { + return srs_error_wrap(err, "ts: encode packet"); } if ((ret = writer->write(buf, SRS_TS_PACKET_SIZE, NULL)) != ERROR_SUCCESS) { - srs_error("ts write ts packet failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "ts: write packet"); } } // When PAT and PMT are writen, the context is ready now. ready = true; - return ret; + return err; } -int SrsTsContext::encode_pes(SrsFileWriter* writer, SrsTsMessage* msg, int16_t pid, SrsTsStream sid, bool pure_audio) +srs_error_t SrsTsContext::encode_pes(SrsFileWriter* writer, SrsTsMessage* msg, int16_t pid, SrsTsStream sid, bool pure_audio) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // Sometimes, the context is not ready(PAT/PMT write failed), error in this situation. if (!ready) { - ret = ERROR_TS_CONTEXT_NOT_READY; - srs_error("TS: context not ready, ret=%d", ret); - return ret; + return srs_error_new(ERROR_TS_CONTEXT_NOT_READY, "ts: not ready"); } if (msg->payload->length() == 0) { - return ret; + return err; } if (sid != SrsTsStreamVideoH264 && sid != SrsTsStreamAudioMp3 && sid != SrsTsStreamAudioAAC) { srs_info("ts: ignore the unknown stream, sid=%d", sid); - return ret; + return err; } SrsTsChannel* channel = get(pid); @@ -543,19 +531,17 @@ int SrsTsContext::encode_pes(SrsFileWriter* writer, SrsTsMessage* msg, int16_t p SrsBuffer stream; if ((ret = stream.initialize(buf, nb_buf)) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "ts: init stream"); } - if ((ret = pkt->encode(&stream)) != ERROR_SUCCESS) { - srs_error("ts encode ts packet failed. ret=%d", ret); - return ret; + if ((err = pkt->encode(&stream)) != srs_success) { + return srs_error_wrap(err, "ts: encode packet"); } if ((ret = writer->write(buf, SRS_TS_PACKET_SIZE, NULL)) != ERROR_SUCCESS) { - srs_error("ts write ts packet failed. ret=%d", ret); - return ret; + return srs_error_new(ret, "ts: write packet"); } } - return ret; + return err; } SrsTsPacket::SrsTsPacket(SrsTsContext* c) @@ -580,24 +566,20 @@ SrsTsPacket::~SrsTsPacket() srs_freep(payload); } -int SrsTsPacket::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) +srs_error_t SrsTsPacket::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; int pos = stream->pos(); // 4B ts packet header. if (!stream->require(4)) { - ret = ERROR_STREAM_CASTER_TS_HEADER; - srs_error("ts: demux header failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_HEADER, "ts: decode packet"); } sync_byte = stream->read_1bytes(); if (sync_byte != 0x47) { - ret = ERROR_STREAM_CASTER_TS_SYNC_BYTE; - srs_error("ts: sync_bytes must be 0x47, actual=%#x. ret=%d", sync_byte, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_SYNC_BYTE, "ts: sync_bytes must be 0x47, actual=%#x", sync_byte); } int16_t pidv = stream->read_2bytes(); @@ -622,9 +604,8 @@ int SrsTsPacket::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) srs_freep(adaptation_field); adaptation_field = new SrsTsAdaptationField(this); - if ((ret = adaptation_field->decode(stream)) != ERROR_SUCCESS) { - srs_error("ts: demux af faield. ret=%d", ret); - return ret; + if ((err = adaptation_field->decode(stream)) != srs_success) { + return srs_error_wrap(err, "ts: demux af field"); } srs_verbose("ts: demux af ok."); } @@ -654,13 +635,12 @@ int SrsTsPacket::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) } } - if (payload && (ret = payload->decode(stream, ppmsg)) != ERROR_SUCCESS) { - srs_error("ts: demux payload failed. ret=%d", ret); - return ret; + if (payload && (err = payload->decode(stream, ppmsg)) != srs_success) { + return srs_error_wrap(err, "ts: demux payload"); } } - return ret; + return err; } int SrsTsPacket::size() @@ -673,15 +653,13 @@ int SrsTsPacket::size() return sz; } -int SrsTsPacket::encode(SrsBuffer* stream) +srs_error_t SrsTsPacket::encode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // 4B ts packet header. if (!stream->require(4)) { - ret = ERROR_STREAM_CASTER_TS_HEADER; - srs_error("ts: mux header failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_HEADER, "ts: requires 4+ bytes"); } stream->write_1bytes(sync_byte); @@ -703,23 +681,21 @@ int SrsTsPacket::encode(SrsBuffer* stream) // optional: adaptation field if (adaptation_field) { - if ((ret = adaptation_field->encode(stream)) != ERROR_SUCCESS) { - srs_error("ts: mux af faield. ret=%d", ret); - return ret; + if ((err = adaptation_field->encode(stream)) != srs_success) { + return srs_error_wrap(err, "ts: mux af field"); } srs_verbose("ts: mux af ok."); } // optional: payload. if (payload) { - if ((ret = payload->encode(stream)) != ERROR_SUCCESS) { - srs_error("ts: mux payload failed. ret=%d", ret); - return ret; + if ((err = payload->encode(stream)) != srs_success) { + return srs_error_wrap(err, "ts: mux payload"); } srs_verbose("ts: mux payload ok."); } - return ret; + return err; } void SrsTsPacket::padding(int nb_stuffings) @@ -943,36 +919,30 @@ SrsTsAdaptationField::~SrsTsAdaptationField() { } -int SrsTsAdaptationField::decode(SrsBuffer* stream) +srs_error_t SrsTsAdaptationField::decode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!stream->require(2)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: decode af"); } adaption_field_length = stream->read_1bytes(); // When the adaptation_field_control value is '11', the value of the adaptation_field_length shall // be in the range 0 to 182. if (packet->adaption_field_control == SrsTsAdaptationFieldTypeBoth && adaption_field_length > 182) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af length failed, must in [0, 182], actual=%d. ret=%d", adaption_field_length, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af length failed, must in [0, 182], actual=%d", adaption_field_length); } // When the adaptation_field_control value is '10', the value of the adaptation_field_length shall // be 183. if (packet->adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly && adaption_field_length != 183) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af length failed, must be 183, actual=%d. ret=%d", adaption_field_length, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af length failed, must be 183, actual=%d", adaption_field_length); } // no adaptation field. if (adaption_field_length == 0) { srs_info("ts: demux af empty."); - return ret; + return err; } // the adaptation field start at here. @@ -990,9 +960,7 @@ int SrsTsAdaptationField::decode(SrsBuffer* stream) if (PCR_flag) { if (!stream->require(6)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af PCR_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af PCR_flag"); } char* pp = NULL; @@ -1017,9 +985,7 @@ int SrsTsAdaptationField::decode(SrsBuffer* stream) if (OPCR_flag) { if (!stream->require(6)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af OPCR_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af OPCR_flag"); } char* pp = NULL; @@ -1044,18 +1010,14 @@ int SrsTsAdaptationField::decode(SrsBuffer* stream) if (splicing_point_flag) { if (!stream->require(1)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af splicing_point_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af splicing_point_flag"); } splice_countdown = stream->read_1bytes(); } if (transport_private_data_flag) { if (!stream->require(1)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af transport_private_data_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af transport_private_data_flag"); } /** * The transport_private_data_length is an 8-bit field specifying the number of @@ -1064,11 +1026,9 @@ int SrsTsAdaptationField::decode(SrsBuffer* stream) */ uint8_t transport_private_data_length = (uint8_t)stream->read_1bytes(); - if (transport_private_data_length> 0) { + if (transport_private_data_length > 0) { if (!stream->require(transport_private_data_length)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af transport_private_data_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af transport_private_data"); } transport_private_data.resize(transport_private_data_length); stream->read_bytes(&transport_private_data[0], transport_private_data_length); @@ -1079,9 +1039,7 @@ int SrsTsAdaptationField::decode(SrsBuffer* stream) int pos_af_ext = stream->pos(); if (!stream->require(2)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af adaptation_field_extension_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af adaptation_field_extension_flag"); } adaptation_field_extension_length = (uint8_t)stream->read_1bytes(); int8_t ltwfv = stream->read_1bytes(); @@ -1093,9 +1051,7 @@ int SrsTsAdaptationField::decode(SrsBuffer* stream) if (ltw_flag) { if (!stream->require(2)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af ltw_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af ltw_flag"); } ltw_offset = stream->read_2bytes(); @@ -1105,9 +1061,7 @@ int SrsTsAdaptationField::decode(SrsBuffer* stream) if (piecewise_rate_flag) { if (!stream->require(3)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af piecewise_rate_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af piecewise_rate_flag"); } piecewise_rate = stream->read_3bytes(); @@ -1116,9 +1070,7 @@ int SrsTsAdaptationField::decode(SrsBuffer* stream) if (seamless_splice_flag) { if (!stream->require(5)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af seamless_splice_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: demux af seamless_splice_flag"); } marker_bit0 = stream->read_1bytes(); DTS_next_AU1 = stream->read_2bytes(); @@ -1147,7 +1099,7 @@ int SrsTsAdaptationField::decode(SrsBuffer* stream) transport_private_data_flag, adaptation_field_extension_flag, adaptation_field_extension_length, program_clock_reference_base, program_clock_reference_extension, original_program_clock_reference_base, original_program_clock_reference_extension); - return ret; + return err; } int SrsTsAdaptationField::size() @@ -1167,36 +1119,30 @@ int SrsTsAdaptationField::size() return sz; } -int SrsTsAdaptationField::encode(SrsBuffer* stream) +srs_error_t SrsTsAdaptationField::encode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!stream->require(2)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af"); } stream->write_1bytes(adaption_field_length); // When the adaptation_field_control value is '11', the value of the adaptation_field_length shall // be in the range 0 to 182. if (packet->adaption_field_control == SrsTsAdaptationFieldTypeBoth && adaption_field_length > 182) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af length failed, must in [0, 182], actual=%d. ret=%d", adaption_field_length, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af length failed, must in [0, 182], actual=%d", adaption_field_length); } // When the adaptation_field_control value is '10', the value of the adaptation_field_length shall // be 183. if (packet->adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly && adaption_field_length != 183) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af length failed, must be 183, actual=%d. ret=%d", adaption_field_length, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af length failed, must be 183, actual=%d", adaption_field_length); } // no adaptation field. if (adaption_field_length == 0) { srs_info("ts: mux af empty."); - return ret; + return err; } int8_t tmpv = adaptation_field_extension_flag & 0x01; tmpv |= (discontinuity_indicator << 7) & 0x80; @@ -1210,9 +1156,7 @@ int SrsTsAdaptationField::encode(SrsBuffer* stream) if (PCR_flag) { if (!stream->require(6)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af PCR_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af PCR_flag"); } char* pp = NULL; @@ -1236,9 +1180,7 @@ int SrsTsAdaptationField::encode(SrsBuffer* stream) if (OPCR_flag) { if (!stream->require(6)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: demux af OPCR_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af OPCR_flag"); } stream->skip(6); srs_warn("ts: mux af ignore OPCR"); @@ -1246,36 +1188,28 @@ int SrsTsAdaptationField::encode(SrsBuffer* stream) if (splicing_point_flag) { if (!stream->require(1)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af splicing_point_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af splicing_point_flag"); } stream->write_1bytes(splice_countdown); } if (transport_private_data_flag) { if (!stream->require(1)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af transport_private_data_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af transport_private_data_flag"); } stream->write_1bytes(transport_private_data.size()); if (!transport_private_data.empty()) { - if (!stream->require(transport_private_data.size())) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af transport_private_data_flag failed. ret=%d", ret); - return ret; + if (!stream->require((int)transport_private_data.size())) { + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af transport_private_data"); } - stream->write_bytes(&transport_private_data[0], transport_private_data.size()); + stream->write_bytes(&transport_private_data[0], (int)transport_private_data.size()); } } if (adaptation_field_extension_flag) { if (!stream->require(2)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af adaptation_field_extension_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af adaptation_field_extension_flag"); } stream->write_1bytes(adaptation_field_extension_length); int8_t ltwfv = const1_value1 & 0x1F; @@ -1286,9 +1220,7 @@ int SrsTsAdaptationField::encode(SrsBuffer* stream) if (ltw_flag) { if (!stream->require(2)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af ltw_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af ltw_flag"); } stream->skip(2); srs_warn("ts: mux af ignore ltw"); @@ -1296,9 +1228,7 @@ int SrsTsAdaptationField::encode(SrsBuffer* stream) if (piecewise_rate_flag) { if (!stream->require(3)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af piecewise_rate_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af piecewise_rate_flag"); } stream->skip(3); srs_warn("ts: mux af ignore piecewise_rate"); @@ -1306,9 +1236,7 @@ int SrsTsAdaptationField::encode(SrsBuffer* stream) if (seamless_splice_flag) { if (!stream->require(5)) { - ret = ERROR_STREAM_CASTER_TS_AF; - srs_error("ts: mux af seamless_splice_flag failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_AF, "ts: mux af seamless_splice_flag"); } stream->skip(5); srs_warn("ts: mux af ignore seamless_splice"); @@ -1328,7 +1256,7 @@ int SrsTsAdaptationField::encode(SrsBuffer* stream) transport_private_data_flag, adaptation_field_extension_flag, adaptation_field_extension_length, program_clock_reference_base, program_clock_reference_extension, original_program_clock_reference_base, original_program_clock_reference_extension); - return ret; + return err; } SrsTsPayload::SrsTsPayload(SrsTsPacket* p) @@ -1353,16 +1281,14 @@ SrsTsPayloadPES::~SrsTsPayloadPES() { } -int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) +srs_error_t SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // find the channel from chunk. SrsTsChannel* channel = packet->context->get(packet->pid); if (!channel) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PES no channel for pid=%#x. ret=%d", packet->pid, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PES no channel for pid=%#x", packet->pid); } // init msg. @@ -1380,54 +1306,42 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) // check when fresh, the payload_unit_start_indicator // should be 1 for the fresh msg. if (is_fresh_msg && !packet->payload_unit_start_indicator) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: PES fresh packet length=%d, us=%d, cc=%d. ret=%d", - msg->PES_packet_length, packet->payload_unit_start_indicator, packet->continuity_counter, - ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: PES fresh packet length=%d, us=%d, cc=%d", + msg->PES_packet_length, packet->payload_unit_start_indicator, packet->continuity_counter); } // check when not fresh and PES_packet_length>0, // the payload_unit_start_indicator should never be 1 when not completed. - if (!is_fresh_msg && msg->PES_packet_length > 0 - && !msg->completed(packet->payload_unit_start_indicator) - && packet->payload_unit_start_indicator - ) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: PES packet length=%d, payload=%d, us=%d, cc=%d. ret=%d", - msg->PES_packet_length, msg->payload->length(), packet->payload_unit_start_indicator, - packet->continuity_counter, ret); + if (!is_fresh_msg && msg->PES_packet_length > 0 && !msg->completed(packet->payload_unit_start_indicator) && packet->payload_unit_start_indicator) { + srs_warn("ts: ignore PES packet length=%d, payload=%d, us=%d, cc=%d", + msg->PES_packet_length, msg->payload->length(), packet->payload_unit_start_indicator, packet->continuity_counter); // reparse current msg. stream->skip(stream->pos() * -1); srs_freep(msg); channel->msg = NULL; - return ERROR_SUCCESS; + return err; } // check the continuity counter if (!is_fresh_msg) { // late-incoming or duplicated continuity, drop message. // @remark check overflow, the counter plus 1 should greater when invalid. - if (msg->continuity_counter >= packet->continuity_counter - && ((msg->continuity_counter + 1) & 0x0f) > packet->continuity_counter - ) { + if (msg->continuity_counter >= packet->continuity_counter && ((msg->continuity_counter + 1) & 0x0f) > packet->continuity_counter) { srs_warn("ts: drop PES %dB for duplicated cc=%#x", msg->continuity_counter); stream->skip(stream->size() - stream->pos()); - return ret; + return err; } // when got partially message, the continous count must be continuous, or drop it. if (((msg->continuity_counter + 1) & 0x0f) != packet->continuity_counter) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: continuity must be continous, msg=%#x, packet=%#x. ret=%d", - msg->continuity_counter, packet->continuity_counter, ret); + srs_warn("ts: ignore continuity must be continous, msg=%#x, packet=%#x", msg->continuity_counter, packet->continuity_counter); // reparse current msg. stream->skip(stream->pos() * -1); srs_freep(msg); channel->msg = NULL; - return ERROR_SUCCESS; + return err; } } msg->continuity_counter = packet->continuity_counter; @@ -1440,13 +1354,13 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) // reparse current msg. stream->skip(stream->pos() * -1); - return ret; + return err; } // contious packet, append bytes for unit start is 0 if (!packet->payload_unit_start_indicator) { - if ((ret = msg->dump(stream, &nb_bytes)) != ERROR_SUCCESS) { - return ret; + if ((err = msg->dump(stream, &nb_bytes)) != srs_success) { + return srs_error_wrap(err, "ts: pes dump"); } } @@ -1454,9 +1368,7 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) if (packet->payload_unit_start_indicator) { // 6B fixed header. if (!stream->require(6)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PSE failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE"); } // 3B packet_start_code_prefix = stream->read_3bytes(); @@ -1468,9 +1380,7 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) // check the packet start prefix. packet_start_code_prefix &= 0xFFFFFF; if (packet_start_code_prefix != 0x01) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PES start code failed, expect=0x01, actual=%#x. ret=%d", packet_start_code_prefix, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PES start code failed, expect=0x01, actual=%#x", packet_start_code_prefix); } int pos_packet = stream->pos(); @@ -1490,9 +1400,7 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) ) { // 3B flags. if (!stream->require(3)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PES flags failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE flags"); } // 1B int8_t oocv = stream->read_1bytes(); @@ -1529,15 +1437,13 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) nb_required += PES_CRC_flag? 2:0; nb_required += PES_extension_flag? 1:0; if (!stream->require(nb_required)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PES payload failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE payload"); } // 5B if (PTS_DTS_flags == 0x2) { - if ((ret = decode_33bits_dts_pts(stream, &pts)) != ERROR_SUCCESS) { - return ret; + if ((err = decode_33bits_dts_pts(stream, &pts)) != srs_success) { + return srs_error_wrap(err, "dts/pts"); } dts = pts; @@ -1548,11 +1454,11 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) // 10B if (PTS_DTS_flags == 0x3) { - if ((ret = decode_33bits_dts_pts(stream, &pts)) != ERROR_SUCCESS) { - return ret; + if ((err = decode_33bits_dts_pts(stream, &pts)) != srs_success) { + return srs_error_wrap(err, "dts/pts"); } - if ((ret = decode_33bits_dts_pts(stream, &dts)) != ERROR_SUCCESS) { - return ret; + if ((err = decode_33bits_dts_pts(stream, &dts)) != srs_success) { + return srs_error_wrap(err, "dts/pts"); } // check sync, the diff of dts and pts should never greater than 1s. @@ -1620,9 +1526,7 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) nb_required += P_STD_buffer_flag? 2:0; nb_required += PES_extension_flag_2? 1:0; // 1+x bytes. if (!stream->require(nb_required)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PSE ext payload failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE ext payload"); } // 16B @@ -1639,9 +1543,7 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) // the adjust required bytes. nb_required = nb_required - 16 - 1 + pack_field_length; if (!stream->require(nb_required)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PSE ext pack failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE ext pack"); } pack_field.resize(pack_field_length); stream->read_bytes(&pack_field[0], pack_field_length); @@ -1680,9 +1582,7 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) if (PES_extension_field_length > 0) { if (!stream->require(PES_extension_field_length)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PSE ext field failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE ext field"); } PES_extension_field.resize(PES_extension_field_length); stream->read_bytes(&PES_extension_field[0], PES_extension_field_length); @@ -1694,9 +1594,7 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) nb_stuffings = PES_header_data_length - (stream->pos() - pos_header); if (nb_stuffings > 0) { if (!stream->require(nb_stuffings)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PSE stuffings failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE stuffings"); } stream->skip(nb_stuffings); } @@ -1719,8 +1617,8 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) } // xB - if ((ret = msg->dump(stream, &nb_bytes)) != ERROR_SUCCESS) { - return ret; + if ((err = msg->dump(stream, &nb_bytes)) != srs_success) { + return srs_error_wrap(err, "dump pes"); } } else if (sid == SrsTsPESStreamIdProgramStreamMap || sid == SrsTsPESStreamIdPrivateStream2 @@ -1735,8 +1633,8 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) // } // xB - if ((ret = msg->dump(stream, &nb_bytes)) != ERROR_SUCCESS) { - return ret; + if ((err = msg->dump(stream, &nb_bytes)) != srs_success) { + return srs_error_wrap(err, "dump packet"); } } else if (sid == SrsTsPESStreamIdPaddingStream) { // for (i = 0; i < PES_packet_length; i++) { @@ -1756,7 +1654,7 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) // the payload_unit_start_indicator always be 1, // the message should never EOF for the first packet. if (is_fresh_msg && msg->PES_packet_length == 0) { - return ret; + return err; } // check msg, reap when completed. @@ -1766,7 +1664,7 @@ int SrsTsPayloadPES::decode(SrsBuffer* stream, SrsTsMessage** ppmsg) srs_info("ts: reap msg for completed."); } - return ret; + return err; } int SrsTsPayloadPES::size() @@ -1826,15 +1724,13 @@ int SrsTsPayloadPES::size() return sz; } -int SrsTsPayloadPES::encode(SrsBuffer* stream) +srs_error_t SrsTsPayloadPES::encode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // 6B fixed header. if (!stream->require(6)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: mux PSE failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: mux PSE"); } // 3B @@ -1854,16 +1750,12 @@ int SrsTsPayloadPES::encode(SrsBuffer* stream) // check the packet start prefix. packet_start_code_prefix &= 0xFFFFFF; if (packet_start_code_prefix != 0x01) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: mux PSE start code failed, expect=0x01, actual=%#x. ret=%d", packet_start_code_prefix, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: mux PSE start code failed, expect=0x01, actual=%#x", packet_start_code_prefix); } // 3B flags. if (!stream->require(3)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: mux PSE flags failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: mux PSE flags"); } // 1B int8_t oocv = original_or_copy & 0x01; @@ -1896,25 +1788,23 @@ int SrsTsPayloadPES::encode(SrsBuffer* stream) nb_required += PES_CRC_flag? 2:0; nb_required += PES_extension_flag? 1:0; if (!stream->require(nb_required)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: mux PSE payload failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: mux PSE payload"); } // 5B if (PTS_DTS_flags == 0x2) { - if ((ret = encode_33bits_dts_pts(stream, 0x02, pts)) != ERROR_SUCCESS) { - return ret; + if ((err = encode_33bits_dts_pts(stream, 0x02, pts)) != srs_success) { + return srs_error_wrap(err, "dts/pts"); } } // 10B if (PTS_DTS_flags == 0x3) { - if ((ret = encode_33bits_dts_pts(stream, 0x03, pts)) != ERROR_SUCCESS) { - return ret; + if ((err = encode_33bits_dts_pts(stream, 0x03, pts)) != srs_success) { + return srs_error_wrap(err, "dts/pts"); } - if ((ret = encode_33bits_dts_pts(stream, 0x01, dts)) != ERROR_SUCCESS) { - return ret; + if ((err = encode_33bits_dts_pts(stream, 0x01, dts)) != srs_success) { + return srs_error_wrap(err, "dts/pts"); } // check sync, the diff of dts and pts should never greater than 1s. @@ -1970,9 +1860,7 @@ int SrsTsPayloadPES::encode(SrsBuffer* stream) nb_required += P_STD_buffer_flag? 2:0; nb_required += PES_extension_flag_2 ? 1 + PES_extension_field.size() : 0; // 1+x bytes. if (!stream->require(nb_required)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: mux PSE ext payload failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: mux PSE ext payload"); } stream->skip(nb_required); srs_warn("ts: demux PES, ignore the PES_extension."); @@ -1984,17 +1872,15 @@ int SrsTsPayloadPES::encode(SrsBuffer* stream) srs_warn("ts: demux PES, ignore the stuffings."); } - return ret; + return err; } -int SrsTsPayloadPES::decode_33bits_dts_pts(SrsBuffer* stream, int64_t* pv) +srs_error_t SrsTsPayloadPES::decode_33bits_dts_pts(SrsBuffer* stream, int64_t* pv) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!stream->require(5)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PSE dts/pts failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE dts/pts"); } // decode the 33bits schema. @@ -2004,16 +1890,12 @@ int SrsTsPayloadPES::decode_33bits_dts_pts(SrsBuffer* stream, int64_t* pv) // 1bit const '1' int64_t dts_pts_30_32 = stream->read_1bytes(); if ((dts_pts_30_32 & 0x01) != 0x01) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PSE dts/pts 30-32 failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE dts/pts 30-32"); } // @remark, we donot check the high 4bits, maybe '0001', '0010' or '0011'. // so we just ensure the high 4bits is not 0x00. if (((dts_pts_30_32 >> 4) & 0x0f) == 0x00) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PSE dts/pts 30-32 failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE dts/pts 30-32"); } dts_pts_30_32 = (dts_pts_30_32 >> 1) & 0x07; @@ -2022,9 +1904,7 @@ int SrsTsPayloadPES::decode_33bits_dts_pts(SrsBuffer* stream, int64_t* pv) // 1bit const '1' int64_t dts_pts_15_29 = stream->read_2bytes(); if ((dts_pts_15_29 & 0x01) != 0x01) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PSE dts/pts 15-29 failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE dts/pts 15-29"); } dts_pts_15_29 = (dts_pts_15_29 >> 1) & 0x7fff; @@ -2033,9 +1913,7 @@ int SrsTsPayloadPES::decode_33bits_dts_pts(SrsBuffer* stream, int64_t* pv) // 1bit const '1' int64_t dts_pts_0_14 = stream->read_2bytes(); if ((dts_pts_0_14 & 0x01) != 0x01) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: demux PSE dts/pts 0-14 failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: demux PSE dts/pts 0-14"); } dts_pts_0_14 = (dts_pts_0_14 >> 1) & 0x7fff; @@ -2045,17 +1923,15 @@ int SrsTsPayloadPES::decode_33bits_dts_pts(SrsBuffer* stream, int64_t* pv) v |= dts_pts_0_14 & 0x7fff; *pv = v; - return ret; + return err; } -int SrsTsPayloadPES::encode_33bits_dts_pts(SrsBuffer* stream, uint8_t fb, int64_t v) +srs_error_t SrsTsPayloadPES::encode_33bits_dts_pts(SrsBuffer* stream, uint8_t fb, int64_t v) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (!stream->require(5)) { - ret = ERROR_STREAM_CASTER_TS_PSE; - srs_error("ts: mux PSE dts/pts failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSE, "ts: mux PSE dts/pts"); } char* p = stream->data() + stream->pos(); @@ -2063,18 +1939,18 @@ int SrsTsPayloadPES::encode_33bits_dts_pts(SrsBuffer* stream, uint8_t fb, int64_ int32_t val = 0; - val = fb << 4 | (((v >> 30) & 0x07) << 1) | 1; + val = int32_t(fb << 4 | (((v >> 30) & 0x07) << 1) | 1); *p++ = val; - val = (((v >> 15) & 0x7fff) << 1) | 1; + val = int32_t((((v >> 15) & 0x7fff) << 1) | 1); *p++ = (val >> 8); *p++ = val; - val = (((v) & 0x7fff) << 1) | 1; + val = int32_t((((v) & 0x7fff) << 1) | 1); *p++ = (val >> 8); *p++ = val; - return ret; + return err; } SrsTsPayloadPSI::SrsTsPayloadPSI(SrsTsPacket* p) : SrsTsPayload(p) @@ -2089,9 +1965,9 @@ SrsTsPayloadPSI::~SrsTsPayloadPSI() { } -int SrsTsPayloadPSI::decode(SrsBuffer* stream, SrsTsMessage** /*ppmsg*/) +srs_error_t SrsTsPayloadPSI::decode(SrsBuffer* stream, SrsTsMessage** /*ppmsg*/) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; /** * When the payload of the Transport Stream packet contains PSI data, the payload_unit_start_indicator has the following @@ -2103,9 +1979,7 @@ int SrsTsPayloadPSI::decode(SrsBuffer* stream, SrsTsMessage** /*ppmsg*/) */ if (packet->payload_unit_start_indicator) { if (!stream->require(1)) { - ret = ERROR_STREAM_CASTER_TS_PSI; - srs_error("ts: demux PSI failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSI, "ts: demux PSI"); } pointer_field = stream->read_1bytes(); } @@ -2116,9 +1990,7 @@ int SrsTsPayloadPSI::decode(SrsBuffer* stream, SrsTsMessage** /*ppmsg*/) // atleast 3B for all psi. if (!stream->require(3)) { - ret = ERROR_STREAM_CASTER_TS_PSI; - srs_error("ts: demux PSI failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSI, "ts: demux PSI"); } // 1B table_id = (SrsTsPsiId)stream->read_1bytes(); @@ -2134,34 +2006,28 @@ int SrsTsPayloadPSI::decode(SrsBuffer* stream, SrsTsMessage** /*ppmsg*/) // no section, ignore. if (section_length == 0) { srs_warn("ts: demux PAT ignore empty section"); - return ret; + return err; } if (!stream->require(section_length)) { - ret = ERROR_STREAM_CASTER_TS_PSI; - srs_error("ts: demux PAT section failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSI, "ts: demux PSI section"); } // call the virtual method of actual PSI. - if ((ret = psi_decode(stream)) != ERROR_SUCCESS) { - return ret; + if ((err = psi_decode(stream)) != srs_success) { + return srs_error_wrap(err, "demux PSI"); } // 4B if (!stream->require(4)) { - ret = ERROR_STREAM_CASTER_TS_PSI; - srs_error("ts: demux PSI crc32 failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSI, "ts: demux PSI crc32"); } CRC_32 = stream->read_4bytes(); // verify crc32. int32_t crc32 = srs_crc32_mpegts(ppat, stream->pos() - pat_pos - 4); if (crc32 != CRC_32) { - ret = ERROR_STREAM_CASTER_TS_CRC32; - srs_error("ts: verify PSI crc32 failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSI, "ts: verify PSI crc32"); } // consume left stuffings @@ -2181,7 +2047,7 @@ int SrsTsPayloadPSI::decode(SrsBuffer* stream, SrsTsMessage** /*ppmsg*/) stream->skip(nb_stuffings); } - return ret; + return err; } int SrsTsPayloadPSI::size() @@ -2198,15 +2064,13 @@ int SrsTsPayloadPSI::size() return sz; } -int SrsTsPayloadPSI::encode(SrsBuffer* stream) +srs_error_t SrsTsPayloadPSI::encode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if (packet->payload_unit_start_indicator) { if (!stream->require(1)) { - ret = ERROR_STREAM_CASTER_TS_PSI; - srs_error("ts: mux PSI failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSI, "ts: mux PSI"); } stream->write_1bytes(pointer_field); } @@ -2217,9 +2081,7 @@ int SrsTsPayloadPSI::encode(SrsBuffer* stream) // atleast 3B for all psi. if (!stream->require(3)) { - ret = ERROR_STREAM_CASTER_TS_PSI; - srs_error("ts: mux PSI failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSI, "ts: mux PSI"); } // 1B stream->write_1bytes(table_id); @@ -2234,30 +2096,26 @@ int SrsTsPayloadPSI::encode(SrsBuffer* stream) // no section, ignore. if (section_length == 0) { srs_warn("ts: mux PAT ignore empty section"); - return ret; + return err; } if (!stream->require(section_length)) { - ret = ERROR_STREAM_CASTER_TS_PSI; - srs_error("ts: mux PAT section failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSI, "ts: mux PSI section"); } // call the virtual method of actual PSI. - if ((ret = psi_encode(stream)) != ERROR_SUCCESS) { - return ret; + if ((err = psi_encode(stream)) != srs_success) { + return srs_error_wrap(err, "mux PSI"); } // 4B if (!stream->require(4)) { - ret = ERROR_STREAM_CASTER_TS_PSI; - srs_error("ts: mux PSI crc32 failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PSI, "ts: mux PSI crc32"); } CRC_32 = srs_crc32_mpegts(ppat, stream->pos() - pat_pos); stream->write_4bytes(CRC_32); - return ret; + return err; } SrsTsPayloadPATProgram::SrsTsPayloadPATProgram(int16_t n, int16_t p) @@ -2271,15 +2129,13 @@ SrsTsPayloadPATProgram::~SrsTsPayloadPATProgram() { } -int SrsTsPayloadPATProgram::decode(SrsBuffer* stream) +srs_error_t SrsTsPayloadPATProgram::decode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // atleast 4B for PAT program specified if (!stream->require(4)) { - ret = ERROR_STREAM_CASTER_TS_PAT; - srs_error("ts: demux PAT failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PAT, "ts: demux PAT"); } int tmpv = stream->read_4bytes(); @@ -2287,7 +2143,7 @@ int SrsTsPayloadPATProgram::decode(SrsBuffer* stream) const1_value = (int16_t)((tmpv >> 13) & 0x07); pid = (int16_t)(tmpv & 0x1FFF); - return ret; + return err; } int SrsTsPayloadPATProgram::size() @@ -2295,15 +2151,13 @@ int SrsTsPayloadPATProgram::size() return 4; } -int SrsTsPayloadPATProgram::encode(SrsBuffer* stream) +srs_error_t SrsTsPayloadPATProgram::encode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // atleast 4B for PAT program specified if (!stream->require(4)) { - ret = ERROR_STREAM_CASTER_TS_PAT; - srs_error("ts: mux PAT failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PAT, "ts: mux PAT"); } int tmpv = pid & 0x1FFF; @@ -2311,7 +2165,7 @@ int SrsTsPayloadPATProgram::encode(SrsBuffer* stream) tmpv |= (const1_value << 13) & 0xE000; stream->write_4bytes(tmpv); - return ret; + return err; } SrsTsPayloadPAT::SrsTsPayloadPAT(SrsTsPacket* p) : SrsTsPayloadPSI(p) @@ -2329,15 +2183,13 @@ SrsTsPayloadPAT::~SrsTsPayloadPAT() programs.clear(); } -int SrsTsPayloadPAT::psi_decode(SrsBuffer* stream) +srs_error_t SrsTsPayloadPAT::psi_decode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // atleast 5B for PAT specified if (!stream->require(5)) { - ret = ERROR_STREAM_CASTER_TS_PAT; - srs_error("ts: demux PAT failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PAT, "ts: demux PAT"); } int pos = stream->pos(); @@ -2364,8 +2216,8 @@ int SrsTsPayloadPAT::psi_decode(SrsBuffer* stream) for (int i = 0; i < program_bytes; i += 4) { SrsTsPayloadPATProgram* program = new SrsTsPayloadPATProgram(); - if ((ret = program->decode(stream)) != ERROR_SUCCESS) { - return ret; + if ((err = program->decode(stream)) != srs_success) { + return srs_error_wrap(err, "demux PAT program"); } // update the apply pid table. @@ -2378,7 +2230,7 @@ int SrsTsPayloadPAT::psi_decode(SrsBuffer* stream) packet->context->set(packet->pid, SrsTsPidApplyPAT); packet->context->on_pmt_parsed(); - return ret; + return err; } int SrsTsPayloadPAT::psi_size() @@ -2391,15 +2243,13 @@ int SrsTsPayloadPAT::psi_size() return sz; } -int SrsTsPayloadPAT::psi_encode(SrsBuffer* stream) +srs_error_t SrsTsPayloadPAT::psi_encode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // atleast 5B for PAT specified if (!stream->require(5)) { - ret = ERROR_STREAM_CASTER_TS_PAT; - srs_error("ts: mux PAT failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PAT, "ts: mux PAT"); } // 2B @@ -2419,8 +2269,8 @@ int SrsTsPayloadPAT::psi_encode(SrsBuffer* stream) // multiple 4B program data. for (int i = 0; i < (int)programs.size(); i ++) { SrsTsPayloadPATProgram* program = programs.at(i); - if ((ret = program->encode(stream)) != ERROR_SUCCESS) { - return ret; + if ((err = program->encode(stream)) != srs_success) { + return srs_error_wrap(err, "mux PAT program"); } // update the apply pid table. @@ -2430,7 +2280,7 @@ int SrsTsPayloadPAT::psi_encode(SrsBuffer* stream) // update the apply pid table. packet->context->set(packet->pid, SrsTsPidApplyPAT); - return ret; + return err; } SrsTsPayloadPMTESInfo::SrsTsPayloadPMTESInfo(SrsTsStream st, int16_t epid) @@ -2446,15 +2296,13 @@ SrsTsPayloadPMTESInfo::~SrsTsPayloadPMTESInfo() { } -int SrsTsPayloadPMTESInfo::decode(SrsBuffer* stream) +srs_error_t SrsTsPayloadPMTESInfo::decode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // 5B if (!stream->require(5)) { - ret = ERROR_STREAM_CASTER_TS_PMT; - srs_error("ts: demux PMT es info failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PMT, "ts: demux PMT"); } stream_type = (SrsTsStream)stream->read_1bytes(); @@ -2473,15 +2321,13 @@ int SrsTsPayloadPMTESInfo::decode(SrsBuffer* stream) if (ES_info_length > 0) { if (!stream->require(ES_info_length)) { - ret = ERROR_STREAM_CASTER_TS_PMT; - srs_error("ts: demux PMT es info data failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PMT, "ts: demux PMT ES_info"); } ES_info.resize(ES_info_length); stream->read_bytes(&ES_info[0], ES_info_length); } - return ret; + return err; } int SrsTsPayloadPMTESInfo::size() @@ -2489,15 +2335,13 @@ int SrsTsPayloadPMTESInfo::size() return 5 + (int)ES_info.size(); } -int SrsTsPayloadPMTESInfo::encode(SrsBuffer* stream) +srs_error_t SrsTsPayloadPMTESInfo::encode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // 5B if (!stream->require(5)) { - ret = ERROR_STREAM_CASTER_TS_PMT; - srs_error("ts: mux PMT es info failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PMT, "ts: mux PMT"); } stream->write_1bytes(stream_type); @@ -2511,15 +2355,13 @@ int SrsTsPayloadPMTESInfo::encode(SrsBuffer* stream) stream->write_2bytes(eilv); if (!ES_info.empty()) { - if (!stream->require(ES_info.size())) { - ret = ERROR_STREAM_CASTER_TS_PMT; - srs_error("ts: mux PMT es info data failed. ret=%d", ret); - return ret; + if (!stream->require((int)ES_info.size())) { + return srs_error_new(ERROR_STREAM_CASTER_TS_PMT, "ts: mux PMT ES_info"); } - stream->write_bytes(&ES_info[0], ES_info.size()); + stream->write_bytes(&ES_info[0], (int)ES_info.size()); } - return ret; + return err; } SrsTsPayloadPMT::SrsTsPayloadPMT(SrsTsPacket* p) : SrsTsPayloadPSI(p) @@ -2539,15 +2381,13 @@ SrsTsPayloadPMT::~SrsTsPayloadPMT() infos.clear(); } -int SrsTsPayloadPMT::psi_decode(SrsBuffer* stream) +srs_error_t SrsTsPayloadPMT::psi_decode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // atleast 9B for PMT specified if (!stream->require(9)) { - ret = ERROR_STREAM_CASTER_TS_PMT; - srs_error("ts: demux PMT failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PMT, "ts: demux PMT"); } // 2B @@ -2582,9 +2422,7 @@ int SrsTsPayloadPMT::psi_decode(SrsBuffer* stream) if (program_info_length > 0) { if (!stream->require(program_info_length)) { - ret = ERROR_STREAM_CASTER_TS_PMT; - srs_error("ts: demux PMT program info failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PMT, "ts: demux PMT program info"); } program_info_desc.resize(program_info_length); @@ -2597,8 +2435,8 @@ int SrsTsPayloadPMT::psi_decode(SrsBuffer* stream) SrsTsPayloadPMTESInfo* info = new SrsTsPayloadPMTESInfo(); infos.push_back(info); - if ((ret = info->decode(stream)) != ERROR_SUCCESS) { - return ret; + if ((err = info->decode(stream)) != srs_success) { + return srs_error_wrap(err, "demux PMT program info"); } // update the apply pid table @@ -2622,7 +2460,7 @@ int SrsTsPayloadPMT::psi_decode(SrsBuffer* stream) // update the apply pid table. packet->context->set(packet->pid, SrsTsPidApplyPMT); - return ret; + return err; } int SrsTsPayloadPMT::psi_size() @@ -2636,15 +2474,13 @@ int SrsTsPayloadPMT::psi_size() return sz; } -int SrsTsPayloadPMT::psi_encode(SrsBuffer* stream) +srs_error_t SrsTsPayloadPMT::psi_encode(SrsBuffer* stream) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // atleast 9B for PMT specified if (!stream->require(9)) { - ret = ERROR_STREAM_CASTER_TS_PMT; - srs_error("ts: mux PMT failed. ret=%d", ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_PMT, "ts: mux PMT"); } // 2B @@ -2673,19 +2509,17 @@ int SrsTsPayloadPMT::psi_encode(SrsBuffer* stream) stream->write_2bytes(pilv); if (!program_info_desc.empty()) { - if (!stream->require(program_info_desc.size())) { - ret = ERROR_STREAM_CASTER_TS_PMT; - srs_error("ts: mux PMT program info failed. ret=%d", ret); - return ret; + if (!stream->require((int)program_info_desc.size())) { + return srs_error_new(ERROR_STREAM_CASTER_TS_PMT, "ts: mux PMT program info"); } - stream->write_bytes(&program_info_desc[0], program_info_desc.size()); + stream->write_bytes(&program_info_desc[0], (int)program_info_desc.size()); } for (int i = 0; i < (int)infos.size(); i ++) { SrsTsPayloadPMTESInfo* info = infos.at(i); - if ((ret = info->encode(stream)) != ERROR_SUCCESS) { - return ret; + if ((err = info->encode(stream)) != srs_success) { + return srs_error_wrap(err, "mux PMT program info"); } // update the apply pid table @@ -2709,7 +2543,7 @@ int SrsTsPayloadPMT::psi_encode(SrsBuffer* stream) // update the apply pid table. packet->context->set(packet->pid, SrsTsPidApplyPMT); - return ret; + return err; } SrsTsContextWriter::SrsTsContextWriter(SrsFileWriter* w, SrsTsContext* c, SrsAudioCodecId ac, SrsVideoCodecId vc) @@ -2726,9 +2560,10 @@ SrsTsContextWriter::~SrsTsContextWriter() close(); } -int SrsTsContextWriter::open(string p) +srs_error_t SrsTsContextWriter::open(string p) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; path = p; @@ -2738,42 +2573,40 @@ int SrsTsContextWriter::open(string p) context->reset(); if ((ret = writer->open(path)) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "ts: open writer"); } - return ret; + return err; } -int SrsTsContextWriter::write_audio(SrsTsMessage* audio) +srs_error_t SrsTsContextWriter::write_audio(SrsTsMessage* audio) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; srs_info("hls: write audio pts=%" PRId64 ", dts=%" PRId64 ", size=%d", - audio->pts, audio->dts, audio->PES_packet_length); + audio->pts, audio->dts, audio->PES_packet_length); - if ((ret = context->encode(writer, audio, vcodec, acodec)) != ERROR_SUCCESS) { - srs_error("hls encode audio failed. ret=%d", ret); - return ret; + if ((err = context->encode(writer, audio, vcodec, acodec)) != srs_success) { + return srs_error_wrap(err, "ts: write audio"); } srs_info("hls encode audio ok"); - return ret; + return err; } -int SrsTsContextWriter::write_video(SrsTsMessage* video) +srs_error_t SrsTsContextWriter::write_video(SrsTsMessage* video) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; srs_info("hls: write video pts=%" PRId64 ", dts=%" PRId64 ", size=%d", - video->pts, video->dts, video->PES_packet_length); + video->pts, video->dts, video->PES_packet_length); - if ((ret = context->encode(writer, video, vcodec, acodec)) != ERROR_SUCCESS) { - srs_error("hls encode video failed. ret=%d", ret); - return ret; + if ((err = context->encode(writer, video, vcodec, acodec)) != srs_success) { + return srs_error_wrap(err, "ts: write video"); } srs_info("hls encode video ok"); - return ret; + return err; } void SrsTsContextWriter::close() @@ -2798,9 +2631,9 @@ SrsTsMessageCache::~SrsTsMessageCache() srs_freep(video); } -int SrsTsMessageCache::cache_audio(SrsAudioFrame* frame, int64_t dts) +srs_error_t SrsTsMessageCache::cache_audio(SrsAudioFrame* frame, int64_t dts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // create the ts audio message. if (!audio) { @@ -2820,21 +2653,21 @@ int SrsTsMessageCache::cache_audio(SrsAudioFrame* frame, int64_t dts) // write video to cache. if (acodec->id == SrsAudioCodecIdAAC) { - if ((ret = do_cache_aac(frame)) != ERROR_SUCCESS) { - return ret; + if ((err = do_cache_aac(frame)) != srs_success) { + return srs_error_wrap(err, "ts: cache aac"); } } else { - if ((ret = do_cache_mp3(frame)) != ERROR_SUCCESS) { - return ret; + if ((err = do_cache_mp3(frame)) != srs_success) { + return srs_error_wrap(err, "ts: cache mp3"); } } - return ret; + return err; } -int SrsTsMessageCache::cache_video(SrsVideoFrame* frame, int64_t dts) +srs_error_t SrsTsMessageCache::cache_video(SrsVideoFrame* frame, int64_t dts) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // create the ts video message. if (!video) { @@ -2848,16 +2681,16 @@ int SrsTsMessageCache::cache_video(SrsVideoFrame* frame, int64_t dts) video->sid = SrsTsPESStreamIdVideoCommon; // write video to cache. - if ((ret = do_cache_avc(frame)) != ERROR_SUCCESS) { - return ret; + if ((err = do_cache_avc(frame)) != srs_success) { + return srs_error_wrap(err, "ts: cache avc"); } - return ret; + return err; } -int SrsTsMessageCache::do_cache_mp3(SrsAudioFrame* frame) +srs_error_t SrsTsMessageCache::do_cache_mp3(SrsAudioFrame* frame) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // for mp3, directly write to cache. // TODO: FIXME: implements the ts jitter. @@ -2866,12 +2699,12 @@ int SrsTsMessageCache::do_cache_mp3(SrsAudioFrame* frame) audio->payload->append(sample->bytes, sample->size); } - return ret; + return err; } -int SrsTsMessageCache::do_cache_aac(SrsAudioFrame* frame) +srs_error_t SrsTsMessageCache::do_cache_aac(SrsAudioFrame* frame) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; SrsAudioCodecConfig* codec = frame->acodec(); srs_assert(codec); @@ -2881,9 +2714,7 @@ int SrsTsMessageCache::do_cache_aac(SrsAudioFrame* frame) int32_t size = sample->size; if (!sample->bytes || size <= 0 || size > 0x1fff) { - ret = ERROR_HLS_AAC_FRAME_LENGTH; - srs_error("invalid aac frame length=%d, ret=%d", size, ret); - return ret; + return srs_error_new(ERROR_HLS_AAC_FRAME_LENGTH, "ts: invalid aac frame length=%d", size); } // the frame length is the AAC raw data plus the adts header size. @@ -2937,7 +2768,7 @@ int SrsTsMessageCache::do_cache_aac(SrsAudioFrame* frame) audio->payload->append(sample->bytes, sample->size); } - return ret; + return err; } void srs_avc_insert_aud(SrsSimpleStream* payload, bool& aud_inserted) @@ -2996,9 +2827,9 @@ void srs_avc_insert_aud(SrsSimpleStream* payload, bool& aud_inserted) } } -int SrsTsMessageCache::do_cache_avc(SrsVideoFrame* frame) +srs_error_t SrsTsMessageCache::do_cache_avc(SrsVideoFrame* frame) { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // Whether aud inserted. bool aud_inserted = false; @@ -3051,9 +2882,7 @@ int SrsTsMessageCache::do_cache_avc(SrsVideoFrame* frame) int32_t size = sample->size; if (!sample->bytes || size <= 0) { - ret = ERROR_HLS_AVC_SAMPLE_SIZE; - srs_error("invalid avc sample length=%d, ret=%d", size, ret); - return ret; + return srs_error_new(ERROR_HLS_AVC_SAMPLE_SIZE, "ts: invalid avc sample length=%d", size); } // 5bits, 7.3.1 NAL unit syntax, @@ -3065,11 +2894,11 @@ int SrsTsMessageCache::do_cache_avc(SrsVideoFrame* frame) if (nal_unit_type == SrsAvcNaluTypeIDR && !frame->has_sps_pps && !is_sps_pps_appended) { if (!codec->sequenceParameterSetNALUnit.empty()) { srs_avc_insert_aud(video->payload, aud_inserted); - video->payload->append(&codec->sequenceParameterSetNALUnit[0], codec->sequenceParameterSetNALUnit.size()); + video->payload->append(&codec->sequenceParameterSetNALUnit[0], (int)codec->sequenceParameterSetNALUnit.size()); } if (!codec->pictureParameterSetNALUnit.empty()) { srs_avc_insert_aud(video->payload, aud_inserted); - video->payload->append(&codec->pictureParameterSetNALUnit[0], codec->pictureParameterSetNALUnit.size()); + video->payload->append(&codec->pictureParameterSetNALUnit[0], (int)codec->pictureParameterSetNALUnit.size()); } is_sps_pps_appended = true; } @@ -3079,7 +2908,7 @@ int SrsTsMessageCache::do_cache_avc(SrsVideoFrame* frame) video->payload->append(sample->bytes, sample->size); } - return ret; + return err; } SrsTsTransmuxer::SrsTsTransmuxer() @@ -3099,25 +2928,18 @@ SrsTsTransmuxer::~SrsTsTransmuxer() srs_freep(context); } -int SrsTsTransmuxer::initialize(SrsFileWriter* fw) +srs_error_t SrsTsTransmuxer::initialize(SrsFileWriter* fw) { - int ret = ERROR_SUCCESS; srs_error_t err = srs_success; if ((err = format->initialize()) != srs_success) { - // TODO: FIXME: Use error. - ret = srs_error_code(err); - srs_freep(err); - - return ret; + return srs_error_wrap(err, "ts: init format"); } srs_assert(fw); if (!fw->is_open()) { - ret = ERROR_KERNEL_FLV_STREAM_CLOSED; - srs_warn("stream is not open for encoder. ret=%d", ret); - return ret; + return srs_error_new(ERROR_KERNEL_FLV_STREAM_CLOSED, "ts: stream is not open"); } writer = fw; @@ -3126,30 +2948,31 @@ int SrsTsTransmuxer::initialize(SrsFileWriter* fw) // TODO: FIXME: Support config the codec. tscw = new SrsTsContextWriter(fw, context, SrsAudioCodecIdAAC, SrsVideoCodecIdAVC); - if ((ret = tscw->open("")) != ERROR_SUCCESS) { - return ret; + if ((err = tscw->open("")) != srs_success) { + return srs_error_wrap(err, "ts: open writer"); } - return ret; + return err; } -int SrsTsTransmuxer::write_audio(int64_t timestamp, char* data, int size) +srs_error_t SrsTsTransmuxer::write_audio(int64_t timestamp, char* data, int size) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if ((ret = format->on_audio(timestamp, data, size)) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "ts: format on audio"); } // ts support audio codec: aac/mp3 srs_assert(format->acodec && format->audio); if (format->acodec->id != SrsAudioCodecIdAAC && format->acodec->id != SrsAudioCodecIdMP3) { - return ret; + return err; } // for aac: ignore sequence header if (format->acodec->id == SrsAudioCodecIdAAC && format->audio->aac_packet_type == SrsAudioAacFrameTraitSequenceHeader) { - return ret; + return err; } // the dts calc from rtmp/flv header. @@ -3158,8 +2981,8 @@ int SrsTsTransmuxer::write_audio(int64_t timestamp, char* data, int size) int64_t dts = timestamp * 90; // write audio to cache. - if ((ret = tsmc->cache_audio(format->audio, dts)) != ERROR_SUCCESS) { - return ret; + if ((err = tsmc->cache_audio(format->audio, dts)) != srs_success) { + return srs_error_wrap(err, "ts: cache audio"); } // TODO: FIXME: for pure audio, aggregate some frame to one. @@ -3169,67 +2992,67 @@ int SrsTsTransmuxer::write_audio(int64_t timestamp, char* data, int size) return flush_audio(); } -int SrsTsTransmuxer::write_video(int64_t timestamp, char* data, int size) +srs_error_t SrsTsTransmuxer::write_video(int64_t timestamp, char* data, int size) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; if ((ret = format->on_video(timestamp, data, size)) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "ts: on video"); } // ignore info frame, // @see https://github.com/ossrs/srs/issues/288#issuecomment-69863909 srs_assert(format->video && format->vcodec); if (format->video->frame_type == SrsVideoAvcFrameTypeVideoInfoFrame) { - return ret; + return err; } if (format->vcodec->id != SrsVideoCodecIdAVC) { - return ret; + return err; } // ignore sequence header - if (format->video->frame_type == SrsVideoAvcFrameTypeKeyFrame - && format->video->avc_packet_type == SrsVideoAvcFrameTraitSequenceHeader) { - return ret; + if (format->video->frame_type == SrsVideoAvcFrameTypeKeyFrame && format->video->avc_packet_type == SrsVideoAvcFrameTraitSequenceHeader) { + return err; } int64_t dts = timestamp * 90; // write video to cache. - if ((ret = tsmc->cache_video(format->video, dts)) != ERROR_SUCCESS) { - return ret; + if ((err = tsmc->cache_video(format->video, dts)) != srs_success) { + return srs_error_wrap(err, "ts: cache video"); } return flush_video(); } -int SrsTsTransmuxer::flush_audio() +srs_error_t SrsTsTransmuxer::flush_audio() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - if ((ret = tscw->write_audio(tsmc->audio)) != ERROR_SUCCESS) { - return ret; + if ((err = tscw->write_audio(tsmc->audio)) != srs_success) { + return srs_error_wrap(err, "ts: write audio"); } // write success, clear and free the ts message. srs_freep(tsmc->audio); - return ret; + return err; } -int SrsTsTransmuxer::flush_video() +srs_error_t SrsTsTransmuxer::flush_video() { - int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; - if ((ret = tscw->write_video(tsmc->video)) != ERROR_SUCCESS) { - return ret; + if ((err = tscw->write_video(tsmc->video)) != srs_success) { + return srs_error_wrap(err, "ts: write video"); } // write success, clear and free the ts message. srs_freep(tsmc->video); - return ret; + return err; } #endif diff --git a/trunk/src/kernel/srs_kernel_ts.hpp b/trunk/src/kernel/srs_kernel_ts.hpp index 52da01049..5e4ad36af 100644 --- a/trunk/src/kernel/srs_kernel_ts.hpp +++ b/trunk/src/kernel/srs_kernel_ts.hpp @@ -283,7 +283,7 @@ public: /** * dumps all bytes in stream to ts message. */ - virtual int dump(SrsBuffer* stream, int* pnb_bytes); + virtual srs_error_t dump(SrsBuffer* stream, int* pnb_bytes); /** * whether ts message is completed to reap. * @param payload_unit_start_indicator whether new ts message start. @@ -334,7 +334,7 @@ public: * @param msg the ts msg, user should never free it. * @return an int error code. */ - virtual int on_ts_message(SrsTsMessage* msg) = 0; + virtual srs_error_t on_ts_message(SrsTsMessage* msg) = 0; }; /** @@ -392,7 +392,7 @@ public: * @param handler the ts message handler to process the msg. * @remark we will consume all bytes in stream. */ - virtual int decode(SrsBuffer* stream, ISrsTsHandler* handler); + virtual srs_error_t decode(SrsBuffer* stream, ISrsTsHandler* handler); // encode methods public: /** @@ -401,8 +401,8 @@ public: * @param vc the video codec, write the PAT/PMT table when changed. * @param ac the audio codec, write the PAT/PMT table when changed. */ - virtual int encode(SrsFileWriter* writer, SrsTsMessage* msg, SrsVideoCodecId vc, SrsAudioCodecId ac); - // drm methods + virtual srs_error_t encode(SrsFileWriter* writer, SrsTsMessage* msg, SrsVideoCodecId vc, SrsAudioCodecId ac); +// drm methods public: /** * set sync byte of ts segment. @@ -410,8 +410,8 @@ public: */ virtual void set_sync_byte(int8_t sb); private: - virtual int encode_pat_pmt(SrsFileWriter* writer, int16_t vpid, SrsTsStream vs, int16_t apid, SrsTsStream as); - virtual int encode_pes(SrsFileWriter* writer, SrsTsMessage* msg, int16_t pid, SrsTsStream sid, bool pure_audio); + virtual srs_error_t encode_pat_pmt(SrsFileWriter* writer, int16_t vpid, SrsTsStream vs, int16_t apid, SrsTsStream as); + virtual srs_error_t encode_pes(SrsFileWriter* writer, SrsTsMessage* msg, int16_t pid, SrsTsStream sid, bool pure_audio); }; /** @@ -516,10 +516,10 @@ public: SrsTsPacket(SrsTsContext* c); virtual ~SrsTsPacket(); public: - virtual int decode(SrsBuffer* stream, SrsTsMessage** ppmsg); + virtual srs_error_t decode(SrsBuffer* stream, SrsTsMessage** ppmsg); public: virtual int size(); - virtual int encode(SrsBuffer* stream); + virtual srs_error_t encode(SrsBuffer* stream); virtual void padding(int nb_stuffings); public: static SrsTsPacket* create_pat(SrsTsContext* context, int16_t pmt_number, int16_t pmt_pid); @@ -832,10 +832,10 @@ public: SrsTsAdaptationField(SrsTsPacket* pkt); virtual ~SrsTsAdaptationField(); public: - virtual int decode(SrsBuffer* stream); + virtual srs_error_t decode(SrsBuffer* stream); public: virtual int size(); - virtual int encode(SrsBuffer* stream); + virtual srs_error_t encode(SrsBuffer* stream); }; /** @@ -880,10 +880,10 @@ public: SrsTsPayload(SrsTsPacket* p); virtual ~SrsTsPayload(); public: - virtual int decode(SrsBuffer* stream, SrsTsMessage** ppmsg) = 0; + virtual srs_error_t decode(SrsBuffer* stream, SrsTsMessage** ppmsg) = 0; public: virtual int size() = 0; - virtual int encode(SrsBuffer* stream) = 0; + virtual srs_error_t encode(SrsBuffer* stream) = 0; }; /** @@ -1222,13 +1222,13 @@ public: SrsTsPayloadPES(SrsTsPacket* p); virtual ~SrsTsPayloadPES(); public: - virtual int decode(SrsBuffer* stream, SrsTsMessage** ppmsg); + virtual srs_error_t decode(SrsBuffer* stream, SrsTsMessage** ppmsg); public: virtual int size(); - virtual int encode(SrsBuffer* stream); + virtual srs_error_t encode(SrsBuffer* stream); private: - virtual int decode_33bits_dts_pts(SrsBuffer* stream, int64_t* pv); - virtual int encode_33bits_dts_pts(SrsBuffer* stream, uint8_t fb, int64_t v); + virtual srs_error_t decode_33bits_dts_pts(SrsBuffer* stream, int64_t* pv); + virtual srs_error_t encode_33bits_dts_pts(SrsBuffer* stream, uint8_t fb, int64_t v); }; /** @@ -1289,14 +1289,14 @@ public: SrsTsPayloadPSI(SrsTsPacket* p); virtual ~SrsTsPayloadPSI(); public: - virtual int decode(SrsBuffer* stream, SrsTsMessage** ppmsg); + virtual srs_error_t decode(SrsBuffer* stream, SrsTsMessage** ppmsg); public: virtual int size(); - virtual int encode(SrsBuffer* stream); + virtual srs_error_t encode(SrsBuffer* stream); protected: virtual int psi_size() = 0; - virtual int psi_encode(SrsBuffer* stream) = 0; - virtual int psi_decode(SrsBuffer* stream) = 0; + virtual srs_error_t psi_encode(SrsBuffer* stream) = 0; + virtual srs_error_t psi_decode(SrsBuffer* stream) = 0; }; /** @@ -1329,10 +1329,10 @@ public: SrsTsPayloadPATProgram(int16_t n = 0, int16_t p = 0); virtual ~SrsTsPayloadPATProgram(); public: - virtual int decode(SrsBuffer* stream); + virtual srs_error_t decode(SrsBuffer* stream); public: virtual int size(); - virtual int encode(SrsBuffer* stream); + virtual srs_error_t encode(SrsBuffer* stream); }; /** @@ -1393,10 +1393,10 @@ public: SrsTsPayloadPAT(SrsTsPacket* p); virtual ~SrsTsPayloadPAT(); protected: - virtual int psi_decode(SrsBuffer* stream); + virtual srs_error_t psi_decode(SrsBuffer* stream); protected: virtual int psi_size(); - virtual int psi_encode(SrsBuffer* stream); + virtual srs_error_t psi_encode(SrsBuffer* stream); }; /** @@ -1433,10 +1433,10 @@ public: SrsTsPayloadPMTESInfo(SrsTsStream st = SrsTsStreamReserved, int16_t epid = 0); virtual ~SrsTsPayloadPMTESInfo(); public: - virtual int decode(SrsBuffer* stream); + virtual srs_error_t decode(SrsBuffer* stream); public: virtual int size(); - virtual int encode(SrsBuffer* stream); + virtual srs_error_t encode(SrsBuffer* stream); }; /** @@ -1523,10 +1523,10 @@ public: SrsTsPayloadPMT(SrsTsPacket* p); virtual ~SrsTsPayloadPMT(); protected: - virtual int psi_decode(SrsBuffer* stream); + virtual srs_error_t psi_decode(SrsBuffer* stream); protected: virtual int psi_size(); - virtual int psi_encode(SrsBuffer* stream); + virtual srs_error_t psi_encode(SrsBuffer* stream); }; /** @@ -1551,15 +1551,15 @@ public: * open the writer, donot write the PSI of ts. * @param p a string indicates the path of ts file to mux to. */ - virtual int open(std::string p); + virtual srs_error_t open(std::string p); /** * write an audio frame to ts, */ - virtual int write_audio(SrsTsMessage* audio); + virtual srs_error_t write_audio(SrsTsMessage* audio); /** * write a video frame to ts, */ - virtual int write_video(SrsTsMessage* video); + virtual srs_error_t write_video(SrsTsMessage* video); /** * close the writer. */ @@ -1588,15 +1588,15 @@ public: /** * write audio to cache */ - virtual int cache_audio(SrsAudioFrame* frame, int64_t dts); + virtual srs_error_t cache_audio(SrsAudioFrame* frame, int64_t dts); /** * write video to muxer. */ - virtual int cache_video(SrsVideoFrame* frame, int64_t dts); + virtual srs_error_t cache_video(SrsVideoFrame* frame, int64_t dts); private: - virtual int do_cache_mp3(SrsAudioFrame* frame); - virtual int do_cache_aac(SrsAudioFrame* frame); - virtual int do_cache_avc(SrsVideoFrame* frame); + virtual srs_error_t do_cache_mp3(SrsAudioFrame* frame); + virtual srs_error_t do_cache_aac(SrsAudioFrame* frame); + virtual srs_error_t do_cache_avc(SrsVideoFrame* frame); }; /** @@ -1619,17 +1619,17 @@ public: * initialize the underlayer file stream. * @param fw the writer to use for ts encoder, user must free it. */ - virtual int initialize(SrsFileWriter* fw); + virtual srs_error_t initialize(SrsFileWriter* fw); public: /** * write audio/video packet. * @remark assert data is not NULL. */ - virtual int write_audio(int64_t timestamp, char* data, int size); - virtual int write_video(int64_t timestamp, char* data, int size); + virtual srs_error_t write_audio(int64_t timestamp, char* data, int size); + virtual srs_error_t write_video(int64_t timestamp, char* data, int size); private: - virtual int flush_audio(); - virtual int flush_video(); + virtual srs_error_t flush_audio(); + virtual srs_error_t flush_video(); }; #endif diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp index 05a8ce82e..0ac2233cf 100644 --- a/trunk/src/kernel/srs_kernel_utility.cpp +++ b/trunk/src/kernel/srs_kernel_utility.cpp @@ -509,17 +509,15 @@ int srs_do_create_dir_recursively(string dir) return true; } - int srs_create_dir_recursively(string dir) + srs_error_t srs_create_dir_recursively(string dir) { - int ret = ERROR_SUCCESS; - - ret = srs_do_create_dir_recursively(dir); + int ret = srs_do_create_dir_recursively(dir); if (ret == ERROR_SYSTEM_DIR_EXISTS) { - return ERROR_SUCCESS; + return srs_success; } - return ret; + return srs_error_new(ret, "create dir %s", dir.c_str()); } bool srs_path_exists(std::string path) diff --git a/trunk/src/kernel/srs_kernel_utility.hpp b/trunk/src/kernel/srs_kernel_utility.hpp index b07092d19..130e4e3a3 100644 --- a/trunk/src/kernel/srs_kernel_utility.hpp +++ b/trunk/src/kernel/srs_kernel_utility.hpp @@ -102,7 +102,7 @@ extern std::vector srs_string_split(std::string str, std::vectordecode(stream, handler)) != ERROR_SUCCESS) { + if ((err = context->decode(stream, handler)) != srs_success) { + // TODO: FIXME: Use error + ret = srs_error_code(err); + srs_freep(err); srs_error("mpegts: ignore parse ts packet failed. ret=%d", ret); return ret; } @@ -654,7 +658,7 @@ public: virtual ~SrsIngestHlsOutput(); // interface ISrsTsHandler public: - virtual int on_ts_message(SrsTsMessage* msg); + virtual srs_error_t on_ts_message(SrsTsMessage* msg); // interface IAacHandler public: virtual int on_aac_frame(char* frame, int frame_size, double duration); @@ -713,9 +717,10 @@ SrsIngestHlsOutput::~SrsIngestHlsOutput() queue.clear(); } -int SrsIngestHlsOutput::on_ts_message(SrsTsMessage* msg) +srs_error_t SrsIngestHlsOutput::on_ts_message(SrsTsMessage* msg) { int ret = ERROR_SUCCESS; + srs_error_t err = srs_success; // about the bytes of msg, specified by elementary stream which indicates by PES_packet_data_byte and stream_id // for example, when SrsTsStream of SrsTsChannel indicates stream_type is SrsTsStreamVideoMpeg4 and SrsTsStreamAudioMpeg4, @@ -767,26 +772,22 @@ int SrsIngestHlsOutput::on_ts_message(SrsTsMessage* msg) // when not audio/video, or not adts/annexb format, donot support. if (msg->stream_number() != 0) { - ret = ERROR_STREAM_CASTER_TS_ES; - srs_error("mpegts: unsupported stream format, sid=%#x(%s-%d). ret=%d", - msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number(), ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_ES, "ts: unsupported stream format, sid=%#x(%s-%d)", + msg->sid, msg->is_audio()? "A":msg->is_video()? "V":"N", msg->stream_number()); } // check supported codec if (msg->channel->stream != SrsTsStreamVideoH264 && msg->channel->stream != SrsTsStreamAudioAAC) { - ret = ERROR_STREAM_CASTER_TS_CODEC; - srs_error("mpegts: unsupported stream codec=%d. ret=%d", msg->channel->stream, ret); - return ret; + return srs_error_new(ERROR_STREAM_CASTER_TS_CODEC, "ts: unsupported stream codec=%d", msg->channel->stream); } // we must use queue to cache the msg, then parse it if possible. queue.insert(std::make_pair(msg->dts, msg->detach())); if ((ret = parse_message_queue()) != ERROR_SUCCESS) { - return ret; + return srs_error_new(ret, "ts: parse message"); } - return ret; + return err; } int SrsIngestHlsOutput::on_aac_frame(char* frame, int frame_size, double duration)