From 41155b7789c067a56cf36fb2c933be9558128896 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 24 Aug 2022 12:42:21 +0800 Subject: [PATCH 1/3] STAT: Add kbps for client. --- trunk/src/app/srs_app_statistic.cpp | 20 ++++++++++++++++++++ trunk/src/app/srs_app_statistic.hpp | 4 ++++ 2 files changed, 24 insertions(+) diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index 68c85a207..f03a41926 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -186,10 +186,16 @@ SrsStatisticClient::SrsStatisticClient() req = NULL; type = SrsRtmpConnUnknown; create = srs_get_system_time(); + + clk = new SrsWallClock(); + kbps = new SrsKbps(clk); + kbps->set_io(NULL, NULL); } SrsStatisticClient::~SrsStatisticClient() { + srs_freep(kbps); + srs_freep(clk); srs_freep(req); } @@ -208,6 +214,12 @@ srs_error_t SrsStatisticClient::dumps(SrsJsonObject* obj) obj->set("type", SrsJsonAny::str(srs_client_type_string(type).c_str())); obj->set("publish", SrsJsonAny::boolean(srs_client_type_is_publish(type))); obj->set("alive", SrsJsonAny::number(srsu2ms(srs_get_system_time() - create) / 1000.0)); + + SrsJsonObject* okbps = SrsJsonAny::object(); + obj->set("kbps", okbps); + + okbps->set("recv_30s", SrsJsonAny::integer(kbps->get_recv_kbps_30s())); + okbps->set("send_30s", SrsJsonAny::integer(kbps->get_send_kbps_30s())); return err; } @@ -446,6 +458,7 @@ void SrsStatistic::kbps_add_delta(std::string id, ISrsKbpsDelta* delta) // add delta of connection to kbps. // for next sample() of server kbps can get the stat. kbps->add_delta(in, out); + client->kbps->add_delta(in, out); client->stream->kbps->add_delta(in, out); client->stream->vhost->kbps->add_delta(in, out); } @@ -467,6 +480,13 @@ SrsKbps* SrsStatistic::kbps_sample() stream->kbps->sample(); } } + if (true) { + std::map::iterator it; + for (it = clients.begin(); it != clients.end(); it++) { + SrsStatisticClient* client = it->second; + client->kbps->sample(); + } + } return kbps; } diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index b196a4f0a..6a25c6ec7 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -100,6 +100,10 @@ public: SrsRtmpConnType type; std::string id; srs_utime_t create; +public: + // The stream total kbps. + SrsKbps* kbps; + SrsWallClock* clk; public: SrsStatisticClient(); virtual ~SrsStatisticClient(); From 8e6d207e56efc570be238873240020f5550f2f0f Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 24 Aug 2022 12:47:16 +0800 Subject: [PATCH 2/3] For #2136: API: Cleanup no active streams for statistics. v4.0.256 --- trunk/doc/CHANGELOG.md | 1 + trunk/src/app/srs_app_statistic.cpp | 60 ++++++++++++++++++++-------- trunk/src/app/srs_app_statistic.hpp | 4 ++ trunk/src/core/srs_core_version4.hpp | 2 +- 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 83acf7ca3..96118be7c 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -8,6 +8,7 @@ The changelog for SRS. ## SRS 4.0 Changelog +* v5.0, 2022-08-24, For [#2136](https://github.com/ossrs/srs/issues/2136): API: Cleanup no active streams for statistics. v4.0.256 * v4.0, 2022-08-17, RTMP URL supports domain in stream parameters. v4.0.255 * v4.0, 2022-08-10, Fix server id generator bug. v4.0.254 * v4.0, 2022-06-29, Update SRS image for r.ossrs.net. v4.0.253 diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index f03a41926..a8f6a8528 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -164,6 +164,11 @@ srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj) void SrsStatisticStream::publish(std::string id) { + // To prevent duplicated publish event by bridger. + if (active) { + return; + } + publisher_id = id; active = true; @@ -172,6 +177,11 @@ void SrsStatisticStream::publish(std::string id) void SrsStatisticStream::close() { + // To prevent duplicated close event. + if (!active) { + return; + } + has_video = false; has_audio = false; active = false; @@ -375,22 +385,6 @@ void SrsStatistic::on_stream_close(SrsRequest* req) SrsStatisticVhost* vhost = create_vhost(req); SrsStatisticStream* stream = create_stream(vhost, req); stream->close(); - - // TODO: FIXME: Should fix https://github.com/ossrs/srs/issues/803 - if (true) { - std::map::iterator it; - if ((it=streams.find(stream->id)) != streams.end()) { - streams.erase(it); - } - } - - // TODO: FIXME: Should fix https://github.com/ossrs/srs/issues/803 - if (true) { - std::map::iterator it; - if ((it = rstreams.find(stream->url)) != rstreams.end()) { - rstreams.erase(it); - } - } } srs_error_t SrsStatistic::on_client(std::string id, SrsRequest* req, ISrsExpire* conn, SrsRtmpConnType type) @@ -441,6 +435,40 @@ void SrsStatistic::on_disconnect(std::string id) stream->nb_clients--; vhost->nb_clients--; + + cleanup_stream(stream); +} + +void SrsStatistic::cleanup_stream(SrsStatisticStream* stream) +{ + // If stream has publisher(not active) or player(clients), never cleanup it. + if (stream->active || stream->nb_clients > 0) { + return; + } + + // There should not be any clients referring to the stream. + for (std::map::iterator it = clients.begin(); it != clients.end(); ++it) { + SrsStatisticClient* client = it->second; + srs_assert(client->stream != stream); + } + + // Do cleanup streams. + if (true) { + std::map::iterator it; + if ((it = streams.find(stream->id)) != streams.end()) { + streams.erase(it); + } + } + + if (true) { + std::map::iterator it; + if ((it = rstreams.find(stream->url)) != rstreams.end()) { + rstreams.erase(it); + } + } + + // It's safe to delete the stream now. + srs_freep(stream); } void SrsStatistic::kbps_add_delta(std::string id, ISrsKbpsDelta* delta) diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 6a25c6ec7..4285f3ced 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -173,6 +173,10 @@ public: // only got the request object, so the client specified by id maybe not // exists in stat. virtual void on_disconnect(std::string id); +private: + // Cleanup the stream if stream is not active and for the last client. + void cleanup_stream(SrsStatisticStream* stream); +public: // Sample the kbps, add delta bytes of conn. // Use kbps_sample() to get all result of kbps stat. virtual void kbps_add_delta(std::string id, ISrsKbpsDelta* delta); diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 5443b9c32..0ecbf255e 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 255 +#define VERSION_REVISION 256 #endif From 9923c749d4d67257c43b26a0516a6830b025a4f3 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 24 Aug 2022 15:06:43 +0800 Subject: [PATCH 3/3] STAT: Support config server_id and generate one if empty. v4.0.257 --- trunk/conf/full.conf | 11 +++ trunk/doc/CHANGELOG.md | 1 + trunk/src/app/srs_app_config.cpp | 105 ++++++++++++++++++++++- trunk/src/app/srs_app_config.hpp | 3 + trunk/src/app/srs_app_latest_version.cpp | 37 +++----- trunk/src/app/srs_app_latest_version.hpp | 1 + trunk/src/app/srs_app_statistic.cpp | 23 ++++- trunk/src/app/srs_app_statistic.hpp | 8 +- trunk/src/core/srs_core_version4.hpp | 2 +- 9 files changed, 158 insertions(+), 33 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 234dc4584..e14bf339b 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -1,5 +1,10 @@ # all config for srs +# The id of server, for stat and api identification. +# Note that SRS will generate a random id if not configured. +# Overwrite by env SRS_SERVER_ID +server_id srs-ie193id; + ############################################################################################# # RTMP sections ############################################################################################# @@ -121,6 +126,12 @@ tcmalloc_release_rate 0.8; # Default: on query_latest_version on; +# First wait when qlv(query latest version), in seconds. +# Only available when qlv is enabled. +# Overwrite by env SRS_FIRST_WAIT_FOR_QLV +# Default: 300 +first_wait_for_qlv 300; + # For system circuit breaker. circuit_breaker { # Whether enable the circuit breaker. diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 96118be7c..8d3edd078 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -8,6 +8,7 @@ The changelog for SRS. ## SRS 4.0 Changelog +* v4.0, 2022-08-24, STAT: Support config server_id and generate one if empty. v4.0.257 * v5.0, 2022-08-24, For [#2136](https://github.com/ossrs/srs/issues/2136): API: Cleanup no active streams for statistics. v4.0.256 * v4.0, 2022-08-17, RTMP URL supports domain in stream parameters. v4.0.255 * v4.0, 2022-08-10, Fix server id generator bug. v4.0.254 diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index f7174dbd7..50a19cfd6 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -58,6 +58,9 @@ const char* _srs_version = "XCORE-" RTMP_SIG_SRS_SERVER; // '\r' #define SRS_CR (char)SRS_CONSTS_CR +// Overwrite the config by env. +#define SRS_OVERWRITE_BY_ENV_SECONDS(key) if (getenv(key)) return ::atoi(getenv(key)) * SRS_UTIME_SECONDS + /** * dumps the ingest/transcode-engine in @param dir to amf0 object @param engine. * @param dir the transcode or ingest config directive. @@ -2466,11 +2469,11 @@ srs_error_t SrsConfig::check_normal_config() && n != "max_connections" && n != "daemon" && n != "heartbeat" && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms" && n != "http_server" && n != "stream_caster" && n != "rtc_server" && n != "srt_server" - && n != "utc_time" && n != "work_dir" && n != "asprocess" + && n != "utc_time" && n != "work_dir" && n != "asprocess" && n != "server_id" && n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit" && n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker" && n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate" - && n != "query_latest_version" + && n != "query_latest_version" && n != "first_wait_for_qlv" && n != "circuit_breaker" && n != "is_full" && n != "in_docker" ) { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str()); @@ -3028,6 +3031,90 @@ SrsConfDirective* SrsConfig::get_root() return root; } +string srs_server_id_path(string pid_file) +{ + string path = srs_string_replace(pid_file, ".pid", ".id"); + if (!srs_string_ends_with(path, ".id")) { + path += ".id"; + } + return path; +} + +string srs_try_read_file(string path) { + srs_error_t err = srs_success; + + SrsFileReader r; + if ((err = r.open(path)) != srs_success) { + srs_freep(err); + return ""; + } + + static char buf[1024]; + ssize_t nn = 0; + if ((err = r.read(buf, sizeof(buf), &nn)) != srs_success) { + srs_freep(err); + return ""; + } + + if (nn > 0) { + return string(buf, nn); + } + return ""; +} + +void srs_try_write_file(string path, string content) { + srs_error_t err = srs_success; + + SrsFileWriter w; + if ((err = w.open(path)) != srs_success) { + srs_freep(err); + return; + } + + if ((err = w.write((void*)content.data(), content.length(), NULL)) != srs_success) { + srs_freep(err); + return; + } +} + +string SrsConfig::get_server_id() +{ + static string DEFAULT = ""; + + // Try to read DEFAULT from server id file. + if (DEFAULT.empty()) { + DEFAULT = srs_try_read_file(srs_server_id_path(get_pid_file())); + } + + // Generate a random one if empty. + if (DEFAULT.empty()) { + DEFAULT = srs_generate_stat_vid(); + } + + // Get the server id from env, config or DEFAULT. + string server_id; + + if (getenv("SRS_SERVER_ID")) { + server_id = getenv("SRS_SERVER_ID"); + } + + SrsConfDirective* conf = root->get("server_id"); + if (conf) { + server_id = conf->arg0(); + } + + if (server_id.empty()) { + server_id = DEFAULT; + } + + // Write server id to tmp file. + if (!server_id.empty()) { + srs_try_write_file(srs_server_id_path(get_pid_file()), server_id); + } + + return server_id; +} + int SrsConfig::get_max_connections() { static int DEFAULT = 1000; @@ -3128,6 +3215,20 @@ bool SrsConfig::whether_query_latest_version() return SRS_CONF_PERFER_TRUE(conf->arg0()); } +srs_utime_t SrsConfig::first_wait_for_qlv() +{ + SRS_OVERWRITE_BY_ENV_SECONDS("SRS_FIRST_WAIT_FOR_QLV"); + + static srs_utime_t DEFAULT = 5 * 60 * SRS_UTIME_SECONDS; + + SrsConfDirective* conf = root->get("first_wait_for_qlv"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()) * SRS_UTIME_SECONDS; +} + bool SrsConfig::empty_ip_ok() { static bool DEFAULT = true; diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index a37461643..9cc71b433 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -388,6 +388,8 @@ private: // Whether user use full.conf virtual bool is_full_config(); public: + // Get the server id, generated a random one if not configured. + virtual std::string get_server_id(); // Get the max connections limit of system. // If exceed the max connection, SRS will disconnect the connection. // @remark, linux will limit the connections of each process, @@ -419,6 +421,7 @@ public: virtual bool get_asprocess(); // Whether query the latest available version of SRS. virtual bool whether_query_latest_version(); + virtual srs_utime_t first_wait_for_qlv(); // Whether empty client IP is ok. virtual bool empty_ip_ok(); // Get the start wait in ms for gracefully quit. diff --git a/trunk/src/app/srs_app_latest_version.cpp b/trunk/src/app/srs_app_latest_version.cpp index aacb81e32..c73c89501 100644 --- a/trunk/src/app/srs_app_latest_version.cpp +++ b/trunk/src/app/srs_app_latest_version.cpp @@ -34,6 +34,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include #include #include @@ -189,20 +190,8 @@ srs_error_t SrsLatestVersion::start() return srs_success; } - if (true) { - uuid_t uuid; - uuid_generate_time(uuid); - - // Must reserve last 1 byte for the trailing '\0', because we expect the size of uuid string is 32 bytes. - char buf[32 + 1]; - srs_assert(16 == sizeof(uuid_t)); - - for (int i = 0; i < 16; i++) { - int r0 = snprintf(buf + i * 2, sizeof(buf) - i * 2, "%02x", uuid[i]); - srs_assert(r0 > 0 && r0 < sizeof(buf) - i * 2); - } - server_id_ = buf; - } + server_id_ = SrsStatistic::instance()->server_id(); + session_id_ = srs_generate_stat_vid(); return trd_->start(); } @@ -212,15 +201,10 @@ srs_error_t SrsLatestVersion::cycle() srs_error_t err = srs_success; if (true) { - srs_utime_t first_random_wait = 0; - srs_random_generate((char *) &first_random_wait, 8); - first_random_wait = srs_utime_t(uint64_t((first_random_wait + srs_update_system_time() + getpid())) % (5 * 60)) * SRS_UTIME_SECONDS; // in s. - - // Only report after 5+ minutes. - first_random_wait += 5 * 60 * SRS_UTIME_SECONDS; - - srs_trace("Startup query id=%s, eip=%s, wait=%ds", server_id_.c_str(), srs_get_public_internet_address().c_str(), srsu2msi(first_random_wait) / 1000); - srs_usleep(first_random_wait); + srs_utime_t first_wait_for_qlv = _srs_config->first_wait_for_qlv(); + string pip = srs_get_public_internet_address(); + srs_trace("Startup query id=%s, session=%s, eip=%s, wait=%ds", server_id_.c_str(), session_id_.c_str(), pip.c_str(), srsu2msi(first_wait_for_qlv) / 1000); + srs_usleep(first_wait_for_qlv); } while (true) { @@ -235,8 +219,8 @@ srs_error_t SrsLatestVersion::cycle() srs_freep(err); // Ignore any error. } - srs_trace("Finish query id=%s, eip=%s, match=%s, stable=%s, cost=%dms, url=%s", - server_id_.c_str(), srs_get_public_internet_address().c_str(), match_version_.c_str(), + srs_trace("Finish query id=%s, session=%s, eip=%s, match=%s, stable=%s, cost=%dms, url=%s", + server_id_.c_str(), session_id_.c_str(), srs_get_public_internet_address().c_str(), match_version_.c_str(), stable_version_.c_str(), srsu2msi(srs_update_system_time() - starttime), url.c_str()); srs_usleep(3600 * SRS_UTIME_SECONDS); // Every an hour. @@ -253,11 +237,12 @@ srs_error_t SrsLatestVersion::query_latest_version(string& url) stringstream ss; ss << "http://api.ossrs.net/service/v1/releases?" << "version=v" << VERSION_MAJOR << "." << VERSION_MINOR << "." << VERSION_REVISION - << "&id=" << server_id_ << "&role=srs" + << "&id=" << server_id_ << "&session=" << session_id_ << "&role=srs" << "&eip=" << srs_get_public_internet_address() << "&ts=" << srs_get_system_time() << "&alive=" << srsu2ms(srs_get_system_time() - srs_get_system_startup_time()) / 1000; srs_build_features(ss); + SrsStatistic::instance()->dumps_hints_kv(ss); url = ss.str(); SrsHttpUri uri; diff --git a/trunk/src/app/srs_app_latest_version.hpp b/trunk/src/app/srs_app_latest_version.hpp index 8aa81db81..4f85715e1 100644 --- a/trunk/src/app/srs_app_latest_version.hpp +++ b/trunk/src/app/srs_app_latest_version.hpp @@ -39,6 +39,7 @@ class SrsLatestVersion : public ISrsCoroutineHandler private: SrsCoroutine* trd_; std::string server_id_; + std::string session_id_; private: std::string match_version_; std::string stable_version_; diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index a8f6a8528..d5068838a 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -238,8 +238,6 @@ SrsStatistic* SrsStatistic::_instance = NULL; SrsStatistic::SrsStatistic() { - _server_id = srs_generate_stat_vid(); - clk = new SrsWallClock(); kbps = new SrsKbps(clk); kbps->set_io(NULL, NULL); @@ -521,7 +519,10 @@ SrsKbps* SrsStatistic::kbps_sample() std::string SrsStatistic::server_id() { - return _server_id; + if (server_id_.empty()) { + server_id_ = _srs_config->get_server_id(); + } + return server_id_; } srs_error_t SrsStatistic::dumps_vhosts(SrsJsonArray* arr) @@ -589,6 +590,22 @@ srs_error_t SrsStatistic::dumps_clients(SrsJsonArray* arr, int start, int count) return err; } +void SrsStatistic::dumps_hints_kv(std::stringstream & ss) +{ + if (!streams.empty()) { + ss << "&streams=" << streams.size(); + } + if (!clients.empty()) { + ss << "&clients=" << clients.size(); + } + if (kbps->get_recv_kbps_30s()) { + ss << "&recv=" << kbps->get_recv_kbps_30s(); + } + if (kbps->get_send_kbps_30s()) { + ss << "&send=" << kbps->get_send_kbps_30s(); + } +} + SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req) { SrsStatisticVhost* vhost = NULL; diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index 4285f3ced..2d29c5f6b 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -116,7 +117,7 @@ class SrsStatistic private: static SrsStatistic *_instance; // The id to identify the sever. - std::string _server_id; + std::string server_id_; private: // The key: vhost id, value: vhost object. std::map vhosts; @@ -197,9 +198,14 @@ public: // @param start the start index, from 0. // @param count the max count of clients to dump. virtual srs_error_t dumps_clients(SrsJsonArray* arr, int start, int count); + // Dumps the hints about SRS server. + void dumps_hints_kv(std::stringstream & ss); private: virtual SrsStatisticVhost* create_vhost(SrsRequest* req); virtual SrsStatisticStream* create_stream(SrsStatisticVhost* vhost, SrsRequest* req); }; +// Generate a random string id, with constant prefix. +extern std::string srs_generate_stat_vid(); + #endif diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 0ecbf255e..80f2661c0 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 256 +#define VERSION_REVISION 257 #endif