From 6dc86b8a2ee9b942699c5460926254a5ba63b5ca Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 27 Aug 2022 12:28:33 +0800 Subject: [PATCH] CLS: Refine logging to global object. --- trunk/src/app/srs_app_hybrid.cpp | 2 +- trunk/src/app/srs_app_tencentcloud.cpp | 129 +++++++++++++------------ trunk/src/app/srs_app_tencentcloud.hpp | 14 ++- 3 files changed, 75 insertions(+), 70 deletions(-) diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index 05a63f084..0f285b487 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -391,7 +391,7 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval) ); // Report logs to CLS if enabled. - if ((err = srs_cls_report()) != srs_success) { + if ((err = _srs_cls->report()) != srs_success) { srs_warn("ignore cls err %s", srs_error_desc(err).c_str()); srs_freep(err); } diff --git a/trunk/src/app/srs_app_tencentcloud.cpp b/trunk/src/app/srs_app_tencentcloud.cpp index f493d9694..82ae790c8 100644 --- a/trunk/src/app/srs_app_tencentcloud.cpp +++ b/trunk/src/app/srs_app_tencentcloud.cpp @@ -220,6 +220,7 @@ public: SrsClsLogGroupList(); virtual ~SrsClsLogGroupList(); public: + bool empty(); SrsClsLogGroup* add_log_group(); public: virtual uint64_t nb_bytes(); @@ -424,6 +425,11 @@ SrsClsLogGroupList::~SrsClsLogGroupList() } } +bool SrsClsLogGroupList::empty() +{ + return groups_.empty(); +} + SrsClsLogGroup* SrsClsLogGroupList::add_log_group() { SrsClsLogGroup* group = new SrsClsLogGroup(); @@ -469,7 +475,7 @@ SrsClsSugar::SrsClsSugar() log_ = log_group_->add_log(); log_group_->set_source(srs_get_public_internet_address(true)); - log_->set_time(srs_get_system_time() / SRS_UTIME_SECONDS); + log_->set_time(srs_get_system_time() / SRS_UTIME_MILLISECONDS); kv("agent", RTMP_SIG_SRS_SERVER); string label = _srs_cls->label(); @@ -493,6 +499,21 @@ SrsClsSugar::~SrsClsSugar() srs_freep(log_groups_); } +uint64_t SrsClsSugar::nb_bytes() +{ + return log_groups_->nb_bytes(); +} + +srs_error_t SrsClsSugar::encode(SrsBuffer* b) +{ + return log_groups_->encode(b); +} + +bool SrsClsSugar::empty() +{ + return log_groups_->empty(); +} + SrsClsSugar* SrsClsSugar::kv(std::string k, std::string v) { log_->add_content()->set_key(k)->set_value(v); @@ -520,16 +541,6 @@ SrsClsSugar* SrsClsSugar::kvf(std::string k, const char* fmt, ...) return kv(k, v); } -uint64_t SrsClsSugar::nb_bytes() -{ - return log_groups_->nb_bytes(); -} - -srs_error_t SrsClsSugar::encode(SrsBuffer* b) -{ - return log_groups_->encode(b); -} - SrsClsSugars::SrsClsSugars() { } @@ -581,8 +592,16 @@ SrsClsSugars* SrsClsSugars::slice(int max_size) for (vector::iterator it = sugars.begin(); it != sugars.end();) { SrsClsSugar* sugar = *it; - // Always push at least one elem. + // Always consume it. it = sugars.erase(it); + + // If empty, ignore it. + if (sugar->empty()) { + srs_freep(sugar); + continue; + } + + // Not empty, append it, to make sure at least one elem. v->sugars.push_back(sugar); // Util exceed the max size. @@ -616,10 +635,12 @@ SrsClsClient::SrsClsClient() heartbeat_ratio_ = 0; streams_ratio_ = 0; nn_logs_ = 0; + sugars_ = new SrsClsSugars(); } SrsClsClient::~SrsClsClient() { + srs_freep(sugars_); } bool SrsClsClient::enabled() @@ -627,26 +648,6 @@ bool SrsClsClient::enabled() return enabled_; } -bool SrsClsClient::stat_heartbeat() -{ - return stat_heartbeat_; -} - -bool SrsClsClient::stat_streams() -{ - return stat_streams_; -} - -int SrsClsClient::heartbeat_ratio() -{ - return heartbeat_ratio_; -} - -int SrsClsClient::streams_ratio() -{ - return streams_ratio_; -} - string SrsClsClient::label() { return label_; @@ -706,7 +707,34 @@ srs_error_t SrsClsClient::initialize() return err; } -srs_error_t SrsClsClient::send_log(ISrsEncoder* sugar, int count, int total) +srs_error_t SrsClsClient::report() +{ + srs_error_t err = srs_success; + + if ((err = dump_summaries(sugars_)) != srs_success) { + return srs_error_wrap(err, "dump summary"); + } + + if ((err = dump_streams(sugars_)) != srs_success) { + return srs_error_wrap(err, "dump streams"); + } + + if (sugars_->empty()) { + return err; + } + + SrsClsSugars* sugars = sugars_; + SrsAutoFree(SrsClsSugars, sugars); + sugars_ = new SrsClsSugars(); + + if ((err = send_logs(sugars)) != srs_success) { + return srs_error_wrap(err, "cls"); + } + + return err; +} + +srs_error_t SrsClsClient::do_send_logs(ISrsEncoder* sugar, int count, int total) { srs_error_t err = srs_success; @@ -811,7 +839,7 @@ srs_error_t SrsClsClient::send_logs(SrsClsSugars* sugars) SrsClsSugars* v = sugars->slice(SRS_CLS_BATCH_MAX_LOG_SIZE); SrsAutoFree(SrsClsSugars, v); - if ((err = send_log((ISrsEncoder*)v, v->size(), total)) != srs_success) { + if ((err = do_send_logs((ISrsEncoder*)v, v->size(), total)) != srs_success) { return srs_error_wrap(err, "send %d/%d/%d logs", v->size(), i, total); } } @@ -819,18 +847,18 @@ srs_error_t SrsClsClient::send_logs(SrsClsSugars* sugars) return err; } -srs_error_t srs_cls_dump_summaries(SrsClsSugars* sugars) +srs_error_t SrsClsClient::dump_summaries(SrsClsSugars* sugars) { srs_error_t err = srs_success; // Ignore if disabled. - if (!_srs_cls->enabled() || !_srs_cls->stat_heartbeat()) { + if (!enabled_ || !stat_heartbeat_) { return err; } // Whether it's time to report heartbeat. static int nn_heartbeat = -1; - bool interval_ok = nn_heartbeat == -1 || ++nn_heartbeat >= _srs_cls->heartbeat_ratio(); + bool interval_ok = nn_heartbeat == -1 || ++nn_heartbeat >= heartbeat_ratio_; if (interval_ok) { nn_heartbeat = 0; } @@ -911,18 +939,18 @@ srs_error_t srs_cls_dump_summaries(SrsClsSugars* sugars) return err; } -srs_error_t srs_cls_dump_streams(SrsClsSugars* sugars) +srs_error_t SrsClsClient::dump_streams(SrsClsSugars* sugars) { srs_error_t err = srs_success; // Ignore if disabled. - if (!_srs_cls->enabled() || !_srs_cls->stat_streams()) { + if (!enabled_ || !stat_streams_) { return err; } // Whether it's time to report streams. static int nn_streams = -1; - bool interval_ok = nn_streams == -1 || ++nn_streams >= _srs_cls->streams_ratio(); + bool interval_ok = nn_streams == -1 || ++nn_streams >= streams_ratio_; if (interval_ok) { nn_streams = 0; } @@ -936,25 +964,4 @@ srs_error_t srs_cls_dump_streams(SrsClsSugars* sugars) return err; } -srs_error_t srs_cls_report() -{ - srs_error_t err = srs_success; - - SrsClsSugars sugars; - - if ((err = srs_cls_dump_summaries(&sugars)) != srs_success) { - return srs_error_wrap(err, "dump summary"); - } - - if ((err = srs_cls_dump_streams(&sugars)) != srs_success) { - return srs_error_wrap(err, "dump streams"); - } - - if ((err = _srs_cls->send_logs(&sugars)) != srs_success) { - return srs_error_wrap(err, "cls"); - } - - return err; -} - diff --git a/trunk/src/app/srs_app_tencentcloud.hpp b/trunk/src/app/srs_app_tencentcloud.hpp index e8d12f6dd..f68058c30 100644 --- a/trunk/src/app/srs_app_tencentcloud.hpp +++ b/trunk/src/app/srs_app_tencentcloud.hpp @@ -32,6 +32,7 @@ public: virtual uint64_t nb_bytes(); srs_error_t encode(SrsBuffer* b); public: + bool empty(); SrsClsSugar* kv(std::string k, std::string v); SrsClsSugar* kvf(std::string k, const char* fmt, ...); }; @@ -69,30 +70,27 @@ private: std::string endpoint_; std::string topic_; private: + SrsClsSugars* sugars_; uint64_t nn_logs_; public: SrsClsClient(); virtual ~SrsClsClient(); public: bool enabled(); - bool stat_heartbeat(); - bool stat_streams(); - int heartbeat_ratio(); - int streams_ratio(); std::string label(); std::string tag(); uint64_t nn_logs(); public: srs_error_t initialize(); + srs_error_t report(); private: - srs_error_t send_log(ISrsEncoder* sugar, int count, int total); -public: + srs_error_t do_send_logs(ISrsEncoder* sugar, int count, int total); srs_error_t send_logs(SrsClsSugars* sugars); + srs_error_t dump_summaries(SrsClsSugars* sugars); + srs_error_t dump_streams(SrsClsSugars* sugars); }; extern SrsClsClient* _srs_cls; -srs_error_t srs_cls_report(); - #endif