diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index 0dbf394ce..d41852823 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -39,12 +39,12 @@ ISrsMessagePumper::~ISrsMessagePumper() { } -SrsRecvThread::SrsRecvThread(ISrsMessagePumper *p, SrsRtmpServer *r, srs_utime_t tm, SrsContextId parent_cid) +SrsRecvThread::SrsRecvThread(ISrsMessagePumper *p, ISrsRtmpServer *r, srs_utime_t tm, SrsContextId parent_cid) { rtmp_ = r; pumper_ = p; timeout_ = tm; - _parent_cid = parent_cid; + parent_cid_ = parent_cid; trd_ = new SrsDummyCoroutine(); } @@ -63,7 +63,7 @@ srs_error_t SrsRecvThread::start() srs_error_t err = srs_success; srs_freep(trd_); - trd_ = new SrsSTCoroutine("recv", this, _parent_cid); + trd_ = new SrsSTCoroutine("recv", this, parent_cid_); // change stack size to 256K, fix crash when call some 3rd-part api. ((SrsSTCoroutine *)trd_)->set_stack_size(1 << 18); @@ -144,7 +144,7 @@ srs_error_t SrsRecvThread::do_cycle() return err; } -SrsQueueRecvThread::SrsQueueRecvThread(SrsLiveConsumer *consumer, SrsRtmpServer *rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid) +SrsQueueRecvThread::SrsQueueRecvThread(SrsLiveConsumer *consumer, ISrsRtmpServer *rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid) : trd_(this, rtmp_sdk, tm, parent_cid) { _consumer = consumer; @@ -257,7 +257,7 @@ void SrsQueueRecvThread::on_stop() rtmp_->set_auto_response(true); } -SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer *rtmp_sdk, ISrsRequest *_req, +SrsPublishRecvThread::SrsPublishRecvThread(ISrsRtmpServer *rtmp_sdk, ISrsRequest *_req, int mr_sock_fd, srs_utime_t tm, SrsRtmpConn *conn, SrsSharedPtr source, SrsContextId parent_cid) : trd_(this, rtmp_sdk, tm, parent_cid) { diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 9f07cf7e6..6fb1d4bd4 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -26,6 +26,7 @@ class ISrsRequest; class SrsLiveConsumer; class SrsHttpConn; class SrsHttpxConn; +class ISrsRtmpServer; // The message consumer which consume a message. class ISrsMessageConsumer @@ -66,15 +67,15 @@ class SrsRecvThread : public ISrsCoroutineHandler protected: ISrsCoroutine *trd_; ISrsMessagePumper *pumper_; - SrsRtmpServer *rtmp_; - SrsContextId _parent_cid; + ISrsRtmpServer *rtmp_; + SrsContextId parent_cid_; // The recv timeout in srs_utime_t. srs_utime_t timeout_; public: // Constructor. // @param tm The receive timeout in srs_utime_t. - SrsRecvThread(ISrsMessagePumper *p, SrsRtmpServer *r, srs_utime_t tm, SrsContextId parent_cid); + SrsRecvThread(ISrsMessagePumper *p, ISrsRtmpServer *r, srs_utime_t tm, SrsContextId parent_cid); virtual ~SrsRecvThread(); public: @@ -101,14 +102,14 @@ class SrsQueueRecvThread : public ISrsMessagePumper private: std::vector queue_; SrsRecvThread trd_; - SrsRtmpServer *rtmp_; + ISrsRtmpServer *rtmp_; // The recv thread error code. srs_error_t recv_error_; SrsLiveConsumer *_consumer; public: // TODO: FIXME: Refine timeout in time unit. - SrsQueueRecvThread(SrsLiveConsumer *consumer, SrsRtmpServer *rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid); + SrsQueueRecvThread(SrsLiveConsumer *consumer, ISrsRtmpServer *rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid); virtual ~SrsQueueRecvThread(); public: @@ -140,7 +141,7 @@ class SrsPublishRecvThread : public ISrsMessagePumper, public ISrsReloadHandler private: uint32_t nn_msgs_for_yield_; SrsRecvThread trd_; - SrsRtmpServer *rtmp_; + ISrsRtmpServer *rtmp_; ISrsRequest *req_; // The msgs already got. int64_t _nb_msgs; @@ -165,7 +166,7 @@ private: SrsContextId ncid_; public: - SrsPublishRecvThread(SrsRtmpServer *rtmp_sdk, ISrsRequest *_req, + SrsPublishRecvThread(ISrsRtmpServer *rtmp_sdk, ISrsRequest *_req, int mr_sock_fd, srs_utime_t tm, SrsRtmpConn *conn, SrsSharedPtr source, SrsContextId parent_cid); virtual ~SrsPublishRecvThread(); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index dc54a37c2..0c9dfb1c5 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -48,6 +48,14 @@ using namespace std; // the timeout in srs_utime_t to wait encoder to republish // if timeout, close the connection. #define SRS_REPUBLISH_SEND_TIMEOUT (3 * SRS_UTIME_MINUTES) + +ISrsRtmpTransport::ISrsRtmpTransport() +{ +} + +ISrsRtmpTransport::~ISrsRtmpTransport() +{ +} // if timeout, close the connection. #define SRS_REPUBLISH_RECV_TIMEOUT (3 * SRS_UTIME_MINUTES) @@ -183,13 +191,11 @@ const char *SrsRtmpsTransport::transport_type() return "ssl"; } -SrsRtmpConn::SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, string cip, int cport) +SrsRtmpConn::SrsRtmpConn(ISrsRtmpTransport *transport, string cip, int cport) { // Create a identify for this client. _srs_context->set_id(_srs_context->generate_id()); - server_ = svr; - transport_ = transport; ip_ = cip; port_ = cport; @@ -226,7 +232,9 @@ SrsRtmpConn::SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, string cip hooks_ = _srs_hooks; rtc_sources_ = _srs_rtc_sources; srt_sources_ = _srs_srt_sources; +#ifdef SRS_RTSP rtsp_sources_ = _srs_rtsp_sources; +#endif } void SrsRtmpConn::assemble() @@ -261,7 +269,9 @@ SrsRtmpConn::~SrsRtmpConn() hooks_ = NULL; rtc_sources_ = NULL; srt_sources_ = NULL; +#ifdef SRS_RTSP rtsp_sources_ = NULL; +#endif } std::string SrsRtmpConn::desc() @@ -635,6 +645,11 @@ srs_error_t SrsRtmpConn::playing(SrsSharedPtr source) { srs_error_t err = srs_success; + // Check whether thread is quiting. + if ((err = trd_->pull()) != srs_success) { + return srs_error_wrap(err, "thread"); + } + // Check page referer of player. ISrsRequest *req = info_->req_; if (config_->get_refer_enabled(req->vhost_)) { @@ -846,6 +861,11 @@ srs_error_t SrsRtmpConn::publishing(SrsSharedPtr source) { srs_error_t err = srs_success; + // Check whether thread is quiting. + if ((err = trd_->pull()) != srs_success) { + return srs_error_wrap(err, "thread"); + } + ISrsRequest *req = info_->req_; if (config_->get_refer_enabled(req->vhost_)) { diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 0a3ec4a65..7008aa9cd 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -49,6 +49,9 @@ class ISrsHttpHooks; class ISrsRtcSourceManager; class ISrsSrtSourceManager; class ISrsRtspSourceManager; +class ISrsRtmpServer; +class ISrsRtmpTransport; +class ISrsSecurity; // The simple rtmp client for SRS. class SrsSimpleRtmpClient : public SrsBasicRtmpClient @@ -82,8 +85,26 @@ public: virtual ~SrsClientInfo(); }; +// The transport layer for RTMP connections. +class ISrsRtmpTransport +{ +public: + ISrsRtmpTransport(); + virtual ~ISrsRtmpTransport(); + +public: + virtual srs_netfd_t fd() = 0; + virtual ISrsProtocolReadWriter *io() = 0; + virtual srs_error_t handshake() = 0; + virtual const char *transport_type() = 0; + virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v) = 0; + virtual srs_error_t set_tcp_nodelay(bool v) = 0; + virtual int64_t get_recv_bytes() = 0; + virtual int64_t get_send_bytes() = 0; +}; + // The base transport layer for RTMP connections over plain TCP. -class SrsRtmpTransport +class SrsRtmpTransport : public ISrsRtmpTransport { protected: srs_netfd_t stfd_; @@ -147,14 +168,15 @@ private: ISrsHttpHooks *hooks_; ISrsRtcSourceManager *rtc_sources_; ISrsSrtSourceManager *srt_sources_; +#ifdef SRS_RTSP ISrsRtspSourceManager *rtsp_sources_; +#endif private: - SrsServer *server_; - SrsRtmpServer *rtmp_; + ISrsRtmpServer *rtmp_; SrsRefer *refer_; SrsBandwidth *bandwidth_; - SrsSecurity *security_; + ISrsSecurity *security_; // The wakable handler, maybe NULL. // TODO: FIXME: Should refine the state for receiving thread. ISrsWakable *wakable_; @@ -179,7 +201,7 @@ private: SrsClientInfo *info_; private: - SrsRtmpTransport *transport_; + ISrsRtmpTransport *transport_; // Each connection start a green thread, // when thread stop, the connection will be delete by server. ISrsCoroutine *trd_; @@ -194,7 +216,7 @@ private: int64_t create_time_; public: - SrsRtmpConn(SrsServer *svr, SrsRtmpTransport *transport, std::string cip, int port); + SrsRtmpConn(ISrsRtmpTransport *transport, std::string cip, int port); void assemble(); virtual ~SrsRtmpConn(); // Interface ISrsResource. diff --git a/trunk/src/app/srs_app_security.cpp b/trunk/src/app/srs_app_security.cpp index efcd41859..bf04e66ac 100644 --- a/trunk/src/app/srs_app_security.cpp +++ b/trunk/src/app/srs_app_security.cpp @@ -11,6 +11,14 @@ using namespace std; +ISrsSecurity::ISrsSecurity() +{ +} + +ISrsSecurity::~ISrsSecurity() +{ +} + SrsSecurity::SrsSecurity() { } diff --git a/trunk/src/app/srs_app_security.hpp b/trunk/src/app/srs_app_security.hpp index 010f39ea7..d8ff2487b 100644 --- a/trunk/src/app/srs_app_security.hpp +++ b/trunk/src/app/srs_app_security.hpp @@ -16,8 +16,19 @@ class SrsConfDirective; +// The security interface. +class ISrsSecurity +{ +public: + ISrsSecurity(); + virtual ~ISrsSecurity(); + +public: + virtual srs_error_t check(SrsRtmpConnType type, std::string ip, ISrsRequest *req) = 0; +}; + // The security apply on vhost. -class SrsSecurity +class SrsSecurity : public ISrsSecurity { public: SrsSecurity(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 63503fc3f..af664be6f 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -1507,12 +1507,12 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf if (!resource) { if (listener == rtmp_listener_) { SrsRtmpTransport *transport = new SrsRtmpTransport(stfd2); - SrsRtmpConn *conn = new SrsRtmpConn(this, transport, ip, port); + SrsRtmpConn *conn = new SrsRtmpConn(transport, ip, port); conn->assemble(); resource = conn; } else if (listener == rtmps_listener_) { SrsRtmpTransport *transport = new SrsRtmpsTransport(stfd2); - SrsRtmpConn *conn = new SrsRtmpConn(this, transport, ip, port); + SrsRtmpConn *conn = new SrsRtmpConn(transport, ip, port); conn->assemble(); resource = conn; } else if (listener == api_listener_ || listener == apis_listener_) { diff --git a/trunk/src/app/srs_app_utility.hpp b/trunk/src/app/srs_app_utility.hpp index f59e7abac..03d201053 100644 --- a/trunk/src/app/srs_app_utility.hpp +++ b/trunk/src/app/srs_app_utility.hpp @@ -336,6 +336,9 @@ extern SrsProcSelfStat *srs_get_self_proc_stat(); extern SrsProcSystemStat *srs_get_system_proc_stat(); // The daemon st-thread will update it. extern void srs_update_proc_stat(); +// Read process self stat from /proc/self/stat (Linux only). +// @return true on success, false on failure. +extern bool get_proc_self_stat(SrsProcSelfStat &r); // Stat disk iops // @see: http://stackoverflow.com/questions/4458183/how-the-util-of-iostat-is-computed @@ -443,6 +446,10 @@ public: extern SrsDiskStat *srs_get_disk_stat(); // The daemon st-thread will update it. extern void srs_update_disk_stat(); +// Internal helper function to read disk stats from /proc/diskstats +// @param r the disk stat object to fill +// @return true on success, false on failure +extern bool srs_get_disk_diskstats_stat(SrsDiskStat &r); // Stat system memory info // @see: cat /proc/meminfo diff --git a/trunk/src/protocol/srs_protocol_rtmp_stack.cpp b/trunk/src/protocol/srs_protocol_rtmp_stack.cpp index 3e5e7adcc..b2cf30f2c 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_protocol_rtmp_stack.cpp @@ -2150,6 +2150,14 @@ srs_error_t SrsRtmpClient::fmle_publish(string stream, int &stream_id) return err; } +ISrsRtmpServer::ISrsRtmpServer() +{ +} + +ISrsRtmpServer::~ISrsRtmpServer() +{ +} + SrsRtmpServer::SrsRtmpServer(ISrsProtocolReadWriter *skt) { io_ = skt; diff --git a/trunk/src/protocol/srs_protocol_rtmp_stack.hpp b/trunk/src/protocol/srs_protocol_rtmp_stack.hpp index 4c8142f8e..88f70c5c8 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_protocol_rtmp_stack.hpp @@ -660,10 +660,47 @@ public: } }; +// The rtmp server interface. +class ISrsRtmpServer +{ +public: + ISrsRtmpServer(); + virtual ~ISrsRtmpServer(); + +public: + virtual void set_recv_timeout(srs_utime_t tm) = 0; + virtual void set_send_timeout(srs_utime_t tm) = 0; + virtual srs_error_t handshake() = 0; + virtual srs_error_t connect_app(ISrsRequest *req) = 0; + virtual uint32_t proxy_real_ip() = 0; + virtual srs_error_t set_window_ack_size(int ack_size) = 0; + virtual srs_error_t set_peer_bandwidth(int bandwidth, int type) = 0; + virtual srs_error_t set_chunk_size(int chunk_size) = 0; + virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip = NULL) = 0; + virtual srs_error_t on_bw_done() = 0; + virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration) = 0; + virtual srs_error_t start_play(int stream_id) = 0; + virtual srs_error_t start_fmle_publish(int stream_id) = 0; + virtual srs_error_t start_haivision_publish(int stream_id) = 0; + virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid) = 0; + virtual srs_error_t start_flash_publish(int stream_id) = 0; + virtual srs_error_t start_publishing(int stream_id) = 0; + virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted) = 0; + virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id) = 0; + virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) = 0; + virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id) = 0; + virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause) = 0; + virtual srs_error_t set_in_window_ack_size(int ack_size) = 0; + virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg) = 0; + virtual void set_auto_response(bool v) = 0; + virtual void set_merge_read(bool v, IMergeReadHandler *handler) = 0; + virtual void set_recv_buffer(int buffer_size) = 0; +}; + // The rtmp provices rtmp-command-protocol services, // a high level protocol, media stream oriented services, // such as connect to vhost/app, play stream, get audio/video data. -class SrsRtmpServer +class SrsRtmpServer : public ISrsRtmpServer { private: SrsHandshakeBytes *hs_bytes_; diff --git a/trunk/src/utest/srs_utest_app10.cpp b/trunk/src/utest/srs_utest_app10.cpp index 8075964ac..3bf17380c 100644 --- a/trunk/src/utest/srs_utest_app10.cpp +++ b/trunk/src/utest/srs_utest_app10.cpp @@ -9,12 +9,23 @@ using namespace std; #include #include +#include +#include +#include #include #include #include #include #include +#include +#include +#include +#include +#include #include +#include +#include +#include // Mock config implementation for SrsServer::listen() testing MockAppConfigForServerListen::MockAppConfigForServerListen() @@ -914,6 +925,364 @@ void MockConnectionManagerForConnectionLimit::unsubscribe(ISrsDisposingHandler * { } +MockAppConfigForRtmpConn::MockAppConfigForRtmpConn() +{ + subscribe_count_ = 0; + unsubscribe_count_ = 0; + last_subscribed_handler_ = NULL; + vhost_directive_ = new SrsConfDirective(); + vhost_directive_->name_ = "vhost"; + vhost_directive_->args_.push_back("__defaultVhost__"); +} + +MockAppConfigForRtmpConn::~MockAppConfigForRtmpConn() +{ + srs_freep(vhost_directive_); +} + +void MockAppConfigForRtmpConn::subscribe(ISrsReloadHandler *handler) +{ + subscribe_count_++; + last_subscribed_handler_ = handler; +} + +void MockAppConfigForRtmpConn::unsubscribe(ISrsReloadHandler *handler) +{ + unsubscribe_count_++; +} + +SrsConfDirective *MockAppConfigForRtmpConn::get_vhost(std::string vhost, bool try_default_vhost) +{ + return vhost_directive_; +} + +bool MockAppConfigForRtmpConn::get_vhost_is_edge(std::string vhost) +{ + return false; +} + +void MockAppConfigForRtmpConn::reset() +{ + subscribe_count_ = 0; + unsubscribe_count_ = 0; + last_subscribed_handler_ = NULL; +} + +MockRtmpServerForStreamService::MockRtmpServerForStreamService() +{ + identify_type_ = SrsRtmpConnPlay; + identify_stream_ = ""; + identify_duration_ = 0; + start_play_count_ = 0; + start_fmle_publish_count_ = 0; + start_flash_publish_count_ = 0; + start_haivision_publish_count_ = 0; +} + +MockRtmpServerForStreamService::~MockRtmpServerForStreamService() +{ +} + +void MockRtmpServerForStreamService::set_recv_timeout(srs_utime_t tm) +{ +} + +void MockRtmpServerForStreamService::set_send_timeout(srs_utime_t tm) +{ +} + +srs_error_t MockRtmpServerForStreamService::handshake() +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::connect_app(ISrsRequest *req) +{ + return srs_success; +} + +uint32_t MockRtmpServerForStreamService::proxy_real_ip() +{ + return 0; +} + +srs_error_t MockRtmpServerForStreamService::set_window_ack_size(int ack_size) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::set_peer_bandwidth(int bandwidth, int type) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::set_chunk_size(int chunk_size) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::response_connect_app(ISrsRequest *req, const char *server_ip) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::on_bw_done() +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration) +{ + type = identify_type_; + stream_name = identify_stream_; + duration = identify_duration_; + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::start_play(int stream_id) +{ + start_play_count_++; + // Return an error to exit the test cleanly after verifying selection worked + return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "mock start play"); +} + +srs_error_t MockRtmpServerForStreamService::start_fmle_publish(int stream_id) +{ + start_fmle_publish_count_++; + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::start_haivision_publish(int stream_id) +{ + start_haivision_publish_count_++; + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::start_flash_publish(int stream_id) +{ + start_flash_publish_count_++; + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::fmle_unpublish(int stream_id, double unpublish_tid) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::start_publishing(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::redirect(ISrsRequest *r, std::string url, bool &accepted) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::send_and_free_packet(SrsRtmpCommand *packet, int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::on_play_client_pause(int stream_id, bool is_pause) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::set_in_window_ack_size(int ack_size) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForStreamService::recv_message(SrsRtmpCommonMessage **pmsg) +{ + return srs_success; +} + +void MockRtmpServerForStreamService::set_auto_response(bool v) +{ +} + +void MockRtmpServerForStreamService::set_merge_read(bool v, IMergeReadHandler *handler) +{ +} + +void MockRtmpServerForStreamService::set_recv_buffer(int buffer_size) +{ +} + +MockCoroutineForRtmpConn::MockCoroutineForRtmpConn() +{ + pull_error_ = srs_success; + pull_count_ = 0; +} + +MockCoroutineForRtmpConn::~MockCoroutineForRtmpConn() +{ + srs_freep(pull_error_); +} + +srs_error_t MockCoroutineForRtmpConn::start() +{ + return srs_success; +} + +void MockCoroutineForRtmpConn::stop() +{ +} + +void MockCoroutineForRtmpConn::interrupt() +{ +} + +srs_error_t MockCoroutineForRtmpConn::pull() +{ + pull_count_++; + + // Return success on first call to allow stream_service_cycle() to be called + // Return error on subsequent calls to exit the loop + if (pull_count_ == 1) { + return srs_success; + } + + // Return the configured error (or success) + // Note: We don't free pull_error_ here because it might be called multiple times + if (pull_error_ != srs_success) { + return srs_error_copy(pull_error_); + } + return srs_success; +} + +const SrsContextId &MockCoroutineForRtmpConn::cid() +{ + static SrsContextId dummy_cid; + return dummy_cid; +} + +void MockCoroutineForRtmpConn::set_cid(const SrsContextId &cid) +{ +} + +MockRtmpTransportForDoCycle::MockRtmpTransportForDoCycle() +{ +} + +MockRtmpTransportForDoCycle::~MockRtmpTransportForDoCycle() +{ +} + +srs_netfd_t MockRtmpTransportForDoCycle::fd() +{ + // Create a dummy st_netfd_t that won't crash when accessed + // We use a socket pair to get a valid file descriptor + static int fds[2] = {-1, -1}; + static srs_netfd_t dummy_stfd = NULL; + + if (fds[0] == -1) { + socketpair(AF_UNIX, SOCK_STREAM, 0, fds); + dummy_stfd = srs_netfd_open_socket(fds[0]); + } + + return dummy_stfd; +} + +ISrsProtocolReadWriter *MockRtmpTransportForDoCycle::io() +{ + return NULL; +} + +srs_error_t MockRtmpTransportForDoCycle::handshake() +{ + return srs_success; +} + +const char *MockRtmpTransportForDoCycle::transport_type() +{ + return "mock"; +} + +srs_error_t MockRtmpTransportForDoCycle::set_socket_buffer(srs_utime_t buffer_v) +{ + return srs_success; +} + +srs_error_t MockRtmpTransportForDoCycle::set_tcp_nodelay(bool v) +{ + return srs_success; +} + +int64_t MockRtmpTransportForDoCycle::get_recv_bytes() +{ + return 0; +} + +int64_t MockRtmpTransportForDoCycle::get_send_bytes() +{ + return 0; +} + +VOID TEST(SrsRtmpConnTest, ConstructorAndAssemble) +{ + // Create a dummy file descriptor for transport + srs_netfd_t dummy_fd = (srs_netfd_t)((void*)0x1234); + SrsRtmpTransport *transport = new SrsRtmpTransport(dummy_fd); + // Prevent destructor from closing dummy fd + transport->skt_->stfd_ = NULL; + + // Create mock config + MockAppConfigForRtmpConn *mock_config = new MockAppConfigForRtmpConn(); + + // Test the major use scenario: constructor + assemble() pattern + // This is how SrsRtmpConn is used in production code (srs_app_server.cpp) + if (true) { + // Create connection - constructor initializes from global config + // Note: SrsRtmpConn takes ownership of transport and will delete it + SrsRtmpConn *conn = new SrsRtmpConn(transport, "192.168.1.100", 1935); + + // Inject mock config before calling assemble() + // This allows us to test the assemble() pattern without modifying global state + conn->config_ = mock_config; + + // Initially, no subscription should have occurred + EXPECT_EQ(0, mock_config->subscribe_count_); + EXPECT_TRUE(mock_config->last_subscribed_handler_ == NULL); + + // Call assemble() - should subscribe to config + conn->assemble(); + EXPECT_EQ(1, mock_config->subscribe_count_); + EXPECT_TRUE(mock_config->last_subscribed_handler_ == conn); + + // Verify constructor initialized all fields correctly + EXPECT_STREQ("192.168.1.100", conn->ip_.c_str()); + EXPECT_EQ(1935, conn->port_); + EXPECT_TRUE(conn->config_ == mock_config); + EXPECT_TRUE(conn->rtmp_ != NULL); + EXPECT_TRUE(conn->refer_ != NULL); + EXPECT_TRUE(conn->security_ != NULL); + EXPECT_TRUE(conn->info_ != NULL); + EXPECT_TRUE(conn->kbps_ != NULL); + EXPECT_TRUE(conn->delta_ != NULL); + EXPECT_TRUE(conn->trd_ != NULL); + + // Cleanup - destructor should unsubscribe and delete transport + srs_freep(conn); + EXPECT_EQ(1, mock_config->unsubscribe_count_); + } + + // Cleanup mock config + srs_freep(mock_config); +} + VOID TEST(SrsServerTest, OnBeforeConnectionExceedLimit) { srs_error_t err = srs_success; @@ -1002,3 +1371,2103 @@ VOID TEST(SrsRtmpTransportTest, BasicOperations) // by setting the internal stfd_ to NULL before destruction transport->skt_->stfd_ = NULL; } + +// Test srs_update_proc_stat() function to verify proper CPU statistics collection +// and calculation. This test covers the major use scenario of updating system and +// self process CPU statistics, which is called periodically by the circuit breaker. +VOID TEST(SrsUtilityTest, UpdateProcStat) +{ + // Call srs_update_proc_stat() - this is the main test + // This will: + // 1. Read /proc/stat for system CPU statistics (Linux only) + // 2. Read /proc/self/stat for self process CPU statistics (Linux only) + // 3. Calculate CPU usage percentages based on deltas + // 4. Update the global stat objects + // On macOS, the function returns early but doesn't crash + srs_update_proc_stat(); + + // Get system and self CPU stats after first update + SrsProcSystemStat *system_stat = srs_get_system_proc_stat(); + SrsProcSelfStat *self_stat = srs_get_self_proc_stat(); + + // Verify that the stat objects are not NULL + EXPECT_TRUE(system_stat != NULL); + EXPECT_TRUE(self_stat != NULL); + +#if !defined(SRS_OSX) + // After first call, ok_ should be true if /proc/stat was read successfully + EXPECT_TRUE(system_stat->ok_); + // After first call, ok_ should be true if /proc/self/stat was read successfully + EXPECT_TRUE(self_stat->ok_); +#endif + + // Call srs_update_proc_stat() again to test delta calculation + // The second call should calculate CPU usage percentage based on + // the difference between current and previous values + srs_update_proc_stat(); + +#if !defined(SRS_OSX) + // Verify that percent values are in valid range [0, 1] + // Note: percent_ might be 0 if very little time passed between calls + EXPECT_TRUE(system_stat->percent_ >= 0.0f); + EXPECT_TRUE(system_stat->percent_ <= 1.0f); + EXPECT_TRUE(self_stat->percent_ >= 0.0f); + // Self process percent can exceed 1.0 on multi-core systems, but should be reasonable + EXPECT_TRUE(self_stat->percent_ < 100.0f); +#endif +} + +MockSecurityForStreamService::MockSecurityForStreamService() +{ +} + +MockSecurityForStreamService::~MockSecurityForStreamService() +{ +} + +srs_error_t MockSecurityForStreamService::check(SrsRtmpConnType type, std::string ip, ISrsRequest *req) +{ + return srs_success; +} + +MockRtmpServerForHandlePublishMessage::MockRtmpServerForHandlePublishMessage() +{ + decode_message_error_ = srs_success; + decode_message_packet_ = NULL; + decode_message_count_ = 0; + fmle_unpublish_error_ = srs_success; + fmle_unpublish_count_ = 0; +} + +MockRtmpServerForHandlePublishMessage::~MockRtmpServerForHandlePublishMessage() +{ + srs_freep(decode_message_error_); + srs_freep(decode_message_packet_); + srs_freep(fmle_unpublish_error_); +} + +void MockRtmpServerForHandlePublishMessage::set_recv_timeout(srs_utime_t tm) +{ +} + +void MockRtmpServerForHandlePublishMessage::set_send_timeout(srs_utime_t tm) +{ +} + +srs_error_t MockRtmpServerForHandlePublishMessage::handshake() +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::connect_app(ISrsRequest *req) +{ + return srs_success; +} + +uint32_t MockRtmpServerForHandlePublishMessage::proxy_real_ip() +{ + return 0; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::set_window_ack_size(int ack_size) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::set_peer_bandwidth(int bandwidth, int type) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::set_chunk_size(int chunk_size) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::response_connect_app(ISrsRequest *req, const char *server_ip) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::on_bw_done() +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::start_play(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::start_fmle_publish(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::start_haivision_publish(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::fmle_unpublish(int stream_id, double unpublish_tid) +{ + fmle_unpublish_count_++; + return srs_error_copy(fmle_unpublish_error_); +} + +srs_error_t MockRtmpServerForHandlePublishMessage::start_flash_publish(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::start_publishing(int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::redirect(ISrsRequest *r, std::string url, bool &accepted) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) +{ + decode_message_count_++; + if (decode_message_error_ != srs_success) { + return srs_error_copy(decode_message_error_); + } + // Return the configured packet (can be NULL or a specific packet type) + *ppacket = decode_message_packet_; + decode_message_packet_ = NULL; // Transfer ownership + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::send_and_free_packet(SrsRtmpCommand *packet, int stream_id) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::on_play_client_pause(int stream_id, bool is_pause) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::set_in_window_ack_size(int ack_size) +{ + return srs_success; +} + +srs_error_t MockRtmpServerForHandlePublishMessage::recv_message(SrsRtmpCommonMessage **pmsg) +{ + return srs_success; +} + +void MockRtmpServerForHandlePublishMessage::set_auto_response(bool v) +{ +} + +void MockRtmpServerForHandlePublishMessage::set_merge_read(bool v, IMergeReadHandler *handler) +{ +} + +void MockRtmpServerForHandlePublishMessage::set_recv_buffer(int buffer_size) +{ +} + +void MockRtmpServerForHandlePublishMessage::reset() +{ + srs_freep(decode_message_error_); + srs_freep(decode_message_packet_); + srs_freep(fmle_unpublish_error_); + decode_message_error_ = srs_success; + decode_message_packet_ = NULL; + decode_message_count_ = 0; + fmle_unpublish_error_ = srs_success; + fmle_unpublish_count_ = 0; +} + +VOID TEST(SrsRtmpConnTest, StreamServiceCycleSelection) +{ + srs_error_t err = srs_success; + + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsRtmpConn *conn = new SrsRtmpConn(mock_transport, "192.168.1.100", 1935); + + // Create mock rtmp server + MockRtmpServerForStreamService *mock_rtmp = new MockRtmpServerForStreamService(); + + // Create mock coroutine that always returns error in pull() + MockCoroutineForRtmpConn *mock_trd = new MockCoroutineForRtmpConn(); + mock_trd->pull_error_ = srs_error_new(ERROR_THREAD_INTERRUPED, "mock thread interrupted"); + + // Inject mocks into connection + srs_freep(conn->rtmp_); + conn->rtmp_ = mock_rtmp; + srs_freep(conn->trd_); + conn->trd_ = mock_trd; + + // Test the major use scenario: RTMP connection cycle + // This tests do_cycle() -> service_cycle() path + // Note: stream_service_cycle() is NOT executed because trd_->pull() returns error + // in service_cycle() before the while loop calls stream_service_cycle() + if (true) { + mock_rtmp->identify_type_ = SrsRtmpConnPlay; + mock_rtmp->identify_stream_ = "livestream"; + mock_rtmp->start_play_count_ = 0; + mock_rtmp->start_fmle_publish_count_ = 0; + mock_rtmp->start_flash_publish_count_ = 0; + mock_rtmp->start_haivision_publish_count_ = 0; + + // Call do_cycle - executes do_cycle() and service_cycle() + // Fails at trd_->pull() in service_cycle() before reaching stream_service_cycle() + err = conn->do_cycle(); + HELPER_EXPECT_FAILED(err); + + // Verify that no play or publish methods were called + // This is expected because trd_->pull() returns error before stream_service_cycle() + // is executed, so the selection code (switch statement) is never reached + EXPECT_EQ(0, mock_rtmp->start_play_count_); + EXPECT_EQ(0, mock_rtmp->start_fmle_publish_count_); + EXPECT_EQ(0, mock_rtmp->start_flash_publish_count_); + EXPECT_EQ(0, mock_rtmp->start_haivision_publish_count_); + } + + // Cleanup - note: conn owns mock_rtmp, mock_trd, and mock_transport now, so they will be deleted + srs_freep(conn); +} + +// Test srs_get_disk_diskstats_stat() function to verify proper disk statistics +// collection from /proc/diskstats. This test covers the major use scenario of +// reading disk I/O statistics for configured disk devices. The function uses +// the global _srs_config to get disk device configuration. +VOID TEST(SrsUtilityTest, GetDiskDiskstatsStat) +{ + // Test case 1: Call with default config - should return true with ok_ = true + // This is the major use scenario - the function should work with the global config + // and return success even if no disk devices are configured or /proc/diskstats + // is not available (on macOS). + if (true) { + SrsDiskStat stat; + bool result = srs_get_disk_diskstats_stat(stat); + + // The function should always return true and set ok_ = true + EXPECT_TRUE(result); + EXPECT_TRUE(stat.ok_); + // sample_time_ should be set to current time + EXPECT_TRUE(stat.sample_time_ > 0); + } + +#if !defined(SRS_OSX) + // Test case 2: Verify that disk stat fields are initialized + // On Linux, if /proc/diskstats exists and disk devices are configured, + // the function should read and accumulate disk statistics + if (true) { + SrsDiskStat stat; + bool result = srs_get_disk_diskstats_stat(stat); + + EXPECT_TRUE(result); + EXPECT_TRUE(stat.ok_); + // All disk stat fields should be initialized (may be 0 if no devices configured) + // Just verify they are accessible and don't crash + EXPECT_TRUE(stat.rd_ios_ >= 0); + EXPECT_TRUE(stat.wr_ios_ >= 0); + EXPECT_TRUE(stat.rd_sectors_ >= 0); + EXPECT_TRUE(stat.wr_sectors_ >= 0); + } +#endif +} + +// Test srs_get_cpuinfo() function to verify proper CPU information retrieval +// using sysconf(). This test covers the major use scenario of getting system +// CPU configuration including total processors and online processors. The +// function uses a singleton pattern with static caching for performance. +VOID TEST(SrsUtilityTest, GetCpuInfo) +{ + // Call srs_get_cpuinfo() - this is the main test + // This will: + // 1. Create singleton SrsCpuInfo instance on first call + // 2. Initialize CPU info using sysconf(_SC_NPROCESSORS_CONF) and sysconf(_SC_NPROCESSORS_ONLN) + // 3. Return cached instance on subsequent calls + SrsCpuInfo *cpu_info = srs_get_cpuinfo(); + + // Verify that the cpu_info object is not NULL + EXPECT_TRUE(cpu_info != NULL); + + // Verify that ok_ flag is set to true + EXPECT_TRUE(cpu_info->ok_); + + // Verify that nb_processors_ is positive (at least 1 CPU) + EXPECT_TRUE(cpu_info->nb_processors_ > 0); + + // Verify that nb_processors_online_ is positive (at least 1 CPU online) + EXPECT_TRUE(cpu_info->nb_processors_online_ > 0); + + // Verify that online processors <= total processors + EXPECT_TRUE(cpu_info->nb_processors_online_ <= cpu_info->nb_processors_); + + // Call srs_get_cpuinfo() again to verify singleton pattern + // Should return the same cached instance + SrsCpuInfo *cpu_info2 = srs_get_cpuinfo(); + EXPECT_TRUE(cpu_info2 == cpu_info); +} + +// Test srs_update_disk_stat() function to verify proper disk statistics calculation +// including vmstat KBps calculation and diskstats busy percentage calculation. +// This test covers the major use scenario of updating disk statistics with delta +// calculations between two samples. +VOID TEST(SrsUtilityTest, UpdateDiskStat) +{ + // Get the initial disk stat to save the original state + SrsDiskStat* original_stat = srs_get_disk_stat(); + SrsDiskStat saved_stat = *original_stat; + + // Test case 1: First call to srs_update_disk_stat() - should initialize the stat + // This is the major use scenario - the first call should set ok_ = true and + // initialize all fields without calculating deltas (since there's no previous sample) + if (true) { + srs_update_disk_stat(); + + SrsDiskStat* stat = srs_get_disk_stat(); + EXPECT_TRUE(stat->ok_); + EXPECT_TRUE(stat->sample_time_ > 0); + // After first call, KBps values should be 0 (no delta to calculate) + EXPECT_EQ(0, stat->in_KBps_); + // busy_ should be 0 (no delta to calculate) + EXPECT_EQ(0.0f, stat->busy_); + } + +#if !defined(SRS_OSX) + // Test case 2: Second call to srs_update_disk_stat() - should calculate deltas + // This tests the vmstat KBps calculation and diskstats busy percentage calculation + // On Linux, if /proc/vmstat and /proc/diskstats exist, the function should + // calculate the delta between current and previous samples + if (true) { + // Save the first sample + SrsDiskStat first_sample = *srs_get_disk_stat(); + + // Wait a bit to ensure time difference + srs_usleep(100 * SRS_UTIME_MILLISECONDS); + + // Call srs_update_disk_stat() again to calculate deltas + srs_update_disk_stat(); + + SrsDiskStat* stat = srs_get_disk_stat(); + EXPECT_TRUE(stat->ok_); + EXPECT_TRUE(stat->sample_time_ > first_sample.sample_time_); + + // Verify vmstat calculation logic: + // If pgpgin/pgpgout increased and duration_ms > 0, then KBps should be calculated + // KBps = (current - previous) * 1000 / duration_ms + // Note: KBps values may still be 0 if no disk I/O occurred between samples + EXPECT_TRUE(stat->in_KBps_ >= 0); + + // Verify diskstats calculation logic: + // If cpu_.ok_ and cpu_.total_delta_ > 0 and cpuinfo->ok_ and nb_processors_ > 0 + // and ticks increased, then busy_ should be calculated + // busy_ = ticks / delta_ms, where delta_ms = cpu_.total_delta_ * 10 / nb_processors_ + // Note: busy_ may still be 0 if no disk I/O occurred or if conditions not met + EXPECT_TRUE(stat->busy_ >= 0.0f); + EXPECT_TRUE(stat->busy_ <= 1.0f); // busy_ should be in [0, 1] range + } + + // Test case 3: Verify the calculation formulas with known values + // This tests the specific calculation logic for vmstat and diskstats + if (true) { + // Get current stat + SrsDiskStat* current = srs_get_disk_stat(); + + // Manually create a previous stat with known values to test calculation + SrsDiskStat prev = *current; + prev.sample_time_ = current->sample_time_ - 1000; // 1 second ago + prev.pgpgin_ = 1000; // 1000 KB read + prev.pgpgout_ = 2000; // 2000 KB written + prev.ticks_ = 100; // 100 ticks + prev.cpu_.ok_ = true; + prev.cpu_.user_ = 1000; + prev.cpu_.sys_ = 500; + prev.cpu_.idle_ = 8500; + prev.cpu_.total_delta_ = 0; + + // Create a new stat with increased values + SrsDiskStat next = *current; + next.sample_time_ = current->sample_time_; + next.pgpgin_ = 2000; // 1000 KB more read + next.pgpgout_ = 4000; // 2000 KB more written + next.ticks_ = 200; // 100 ticks more + next.cpu_.ok_ = true; + next.cpu_.user_ = 1100; + next.cpu_.sys_ = 600; + next.cpu_.idle_ = 8800; + next.cpu_.total_delta_ = next.cpu_.total() - prev.cpu_.total(); + + // Calculate expected values + int64_t duration_ms = next.sample_time_ - prev.sample_time_; + if (duration_ms > 0 && prev.pgpgin_ > 0 && next.pgpgin_ > prev.pgpgin_) { + int expected_in_KBps = (int)((next.pgpgin_ - prev.pgpgin_) * 1000 / duration_ms); + // Verify the formula: KBps = KB * 1000 / ms = KB/s + EXPECT_EQ(expected_in_KBps, (int)((next.pgpgin_ - prev.pgpgin_) * 1000 / duration_ms)); + } + + if (duration_ms > 0 && prev.pgpgout_ > 0 && next.pgpgout_ > prev.pgpgout_) { + int expected_out_KBps = (int)((next.pgpgout_ - prev.pgpgout_) * 1000 / duration_ms); + // Verify the formula: KBps = KB * 1000 / ms = KB/s + EXPECT_EQ(expected_out_KBps, (int)((next.pgpgout_ - prev.pgpgout_) * 1000 / duration_ms)); + } + + // Verify diskstats busy calculation formula + if (next.cpu_.ok_ && prev.cpu_.ok_ && next.cpu_.total_delta_ > 0) { + SrsCpuInfo* cpuinfo = srs_get_cpuinfo(); + if (cpuinfo->ok_ && cpuinfo->nb_processors_ > 0 && prev.ticks_ < next.ticks_) { + // delta_ms = cpu_.total_delta_ * 10 / nb_processors_ + double delta_ms = next.cpu_.total_delta_ * 10 / cpuinfo->nb_processors_; + unsigned int ticks = next.ticks_ - prev.ticks_; + // busy_ = ticks / delta_ms + float expected_busy = (float)(ticks / delta_ms); + // Verify the formula is correct + EXPECT_TRUE(expected_busy >= 0.0f); + } + } + } +#endif + + // Restore the original disk stat to avoid side effects + *original_stat = saved_stat; +} + +VOID TEST(SrsRtmpConnTest, StreamServiceCycleThreadQuitCheck) +{ + srs_error_t err = srs_success; + + // This test covers the thread quit check in playing() and publishing() methods + // that are called from stream_service_cycle(). The major use scenario is testing + // that when stream_service_cycle() calls playing() or publishing(), those methods + // check for thread quit (trd_->pull()) and return immediately if the thread is quitting. + + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + MockRtmpServerForStreamService *mock_rtmp = new MockRtmpServerForStreamService(); + MockSecurityForStreamService *mock_security = new MockSecurityForStreamService(); + MockCoroutineForRtmpConn *mock_trd = new MockCoroutineForRtmpConn(); + MockAppConfigForRtmpConn *mock_config = new MockAppConfigForRtmpConn(); + + SrsRtmpConn *conn = new SrsRtmpConn(mock_transport, "127.0.0.1", 1935); + + // Inject mock config before assemble() + conn->config_ = mock_config; + conn->assemble(); + + // Inject mock objects + srs_freep(conn->rtmp_); + conn->rtmp_ = mock_rtmp; + srs_freep(conn->security_); + conn->security_ = mock_security; + srs_freep(conn->trd_); + conn->trd_ = mock_trd; + + // Configure mock to simulate thread quitting + // Set pull_count_ to 1 so the next call will return error (mock returns success on first call) + mock_trd->pull_count_ = 1; + mock_trd->pull_error_ = srs_error_new(ERROR_THREAD_INTERRUPED, "thread interrupted"); + + // Set connection type to Play + mock_rtmp->identify_type_ = SrsRtmpConnPlay; + mock_rtmp->identify_stream_ = "livestream"; + conn->info_->type_ = SrsRtmpConnPlay; + + // Set up request with valid tcUrl to pass tcUrl parsing + conn->info_->req_->tcUrl_ = "rtmp://127.0.0.1/live"; + conn->info_->req_->vhost_ = "__defaultVhost__"; + conn->info_->req_->app_ = "live"; + conn->info_->req_->stream_ = "livestream"; + + // Call stream_service_cycle - it should reach the switch statement, + // call start_play, then call playing() which should immediately return + // error from trd_->pull() check + err = conn->stream_service_cycle(); + EXPECT_TRUE(err != srs_success); + srs_freep(err); + + // Verify that start_play was called (selection code was reached) + EXPECT_EQ(1, mock_rtmp->start_play_count_); + // Verify that pull() was called in playing() + EXPECT_EQ(1, mock_trd->pull_count_); + + srs_freep(conn); + srs_freep(mock_config); +} + +// Mock config implementation for SrsRtmpConn::acquire_publish() testing +MockAppConfigForAcquirePublish::MockAppConfigForAcquirePublish() +{ + rtc_server_enabled_ = true; + rtc_enabled_ = true; + srt_enabled_ = true; + rtsp_server_enabled_ = false; + rtsp_enabled_ = false; +} + +MockAppConfigForAcquirePublish::~MockAppConfigForAcquirePublish() +{ +} + +bool MockAppConfigForAcquirePublish::get_rtc_server_enabled() +{ + return rtc_server_enabled_; +} + +bool MockAppConfigForAcquirePublish::get_rtc_enabled(std::string vhost) +{ + return rtc_enabled_; +} + +bool MockAppConfigForAcquirePublish::get_srt_enabled() +{ + return srt_enabled_; +} + +bool MockAppConfigForAcquirePublish::get_srt_enabled(std::string vhost) +{ + return srt_enabled_; +} + +bool MockAppConfigForAcquirePublish::get_rtsp_server_enabled() +{ + return rtsp_server_enabled_; +} + +bool MockAppConfigForAcquirePublish::get_rtsp_enabled(std::string vhost) +{ + return rtsp_enabled_; +} + +// Test SrsRtmpConn::acquire_publish() method to verify proper cross-protocol stream +// busy checking for RTC, SRT, and RTSP sources. This test covers the major use scenario +// of acquiring publish permission when multiple protocols are enabled and checking that +// RTC and SRT sources are properly checked for busy state. +VOID TEST(SrsRtmpConnTest, AcquirePublishCrossProtocolCheck) +{ + srs_error_t err = srs_success; + + // Create mock config (must outlive connection) + MockAppConfigForAcquirePublish *mock_config = new MockAppConfigForAcquirePublish(); + + // Create mock source managers (from srs_utest_app6.hpp) + MockRtcSourceManager *mock_rtc_sources = new MockRtcSourceManager(); + MockSrtSourceManager *mock_srt_sources = new MockSrtSourceManager(); + + // Use scope block to ensure conn is destroyed before mock objects + { + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Inject mock config before assemble() + conn->config_ = mock_config; + conn->assemble(); + + // Inject mock source managers + conn->rtc_sources_ = mock_rtc_sources; + conn->srt_sources_ = mock_srt_sources; + + // Setup request + conn->info_->req_->vhost_ = "__defaultVhost__"; + conn->info_->req_->app_ = "live"; + conn->info_->req_->stream_ = "livestream"; + conn->info_->edge_ = false; + + // Create a mock live source (from srs_utest_app6.hpp) + SrsSharedPtr live_source(new MockLiveSource()); + + // Test major use scenario: RTC and SRT enabled, both sources can publish + // This covers the code path where rtc_server_enabled && rtc_enabled && !info_->edge_ + // and srt_server_enabled && srt_enabled && !info_->edge_ + if (true) { + // Configure SRT source to allow publishing + mock_srt_sources->set_can_publish(true); + + // Note: MockRtcSourceManager creates a real SrsRtcSource which by default + // allows publishing (can_publish() returns true when no publisher exists) + + // Call acquire_publish - should succeed + HELPER_EXPECT_SUCCESS(conn->acquire_publish(live_source)); + + // Verify that fetch_or_create was called for both RTC and SRT + EXPECT_EQ(1, mock_rtc_sources->fetch_or_create_count_); + EXPECT_EQ(1, mock_srt_sources->fetch_or_create_count_); + } + + // Test scenario: SRT stream is busy (cannot publish) + // This covers the code path where !srt->can_publish() returns error + if (true) { + // Reset counters + mock_rtc_sources->fetch_or_create_count_ = 0; + mock_srt_sources->fetch_or_create_count_ = 0; + + // Configure SRT source to reject publishing + mock_srt_sources->set_can_publish(false); + + // Call acquire_publish - should fail with ERROR_SYSTEM_STREAM_BUSY + err = conn->acquire_publish(live_source); + EXPECT_TRUE(err != srs_success); + EXPECT_EQ(ERROR_SYSTEM_STREAM_BUSY, srs_error_code(err)); + srs_freep(err); + + // Verify that fetch_or_create was called for both RTC and SRT + EXPECT_EQ(1, mock_rtc_sources->fetch_or_create_count_); + EXPECT_EQ(1, mock_srt_sources->fetch_or_create_count_); + } + + // Test scenario: Edge server - RTC and SRT should be skipped + // This covers the code path where info_->edge_ is true + if (true) { + // Reset counters + mock_rtc_sources->fetch_or_create_count_ = 0; + mock_srt_sources->fetch_or_create_count_ = 0; + + // Set connection as edge + conn->info_->edge_ = true; + + // Configure SRT source to allow publishing + mock_srt_sources->set_can_publish(true); + + // Call acquire_publish - should succeed + HELPER_EXPECT_SUCCESS(conn->acquire_publish(live_source)); + + // Verify that fetch_or_create was NOT called for RTC and SRT (edge server) + EXPECT_EQ(0, mock_rtc_sources->fetch_or_create_count_); + EXPECT_EQ(0, mock_srt_sources->fetch_or_create_count_); + } + } + + // Cleanup mock objects + srs_freep(mock_config); + srs_freep(mock_rtc_sources); + srs_freep(mock_srt_sources); +} + +VOID TEST(SrsRtmpConnTest, HandlePublishMessageFlashRepublish) +{ + srs_error_t err = srs_success; + + // Create mock config (must outlive connection) + MockAppConfigForRtmpConn *mock_config = new MockAppConfigForRtmpConn(); + + // Use scope block to ensure conn is destroyed before mock_config + { + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Inject mock config before assemble() + conn->config_ = mock_config; + conn->assemble(); + + // Create mock rtmp server + MockRtmpServerForHandlePublishMessage *mock_rtmp = new MockRtmpServerForHandlePublishMessage(); + + // Inject mock into connection + srs_freep(conn->rtmp_); + conn->rtmp_ = mock_rtmp; + + // Test major use scenario: Flash publish - any AMF command should return ERROR_CONTROL_REPUBLISH + // This covers the code path where info_->type_ == SrsRtmpConnFlashPublish + if (true) { + // Set connection type to Flash publish + conn->info_->type_ = SrsRtmpConnFlashPublish; + + // Create a dummy AMF0 command message + SrsUniquePtr msg(new SrsRtmpCommonMessage()); + msg->header_.message_type_ = RTMP_MSG_AMF0CommandMessage; + + // Configure mock to return a generic command packet + mock_rtmp->decode_message_packet_ = new SrsCallPacket(); + mock_rtmp->decode_message_error_ = srs_success; + + // Create a dummy live source (not used in this test path) + SrsSharedPtr source; + + // Call handle_publish_message - should return ERROR_CONTROL_REPUBLISH + err = conn->handle_publish_message(source, msg.get()); + EXPECT_TRUE(err != srs_success); + EXPECT_EQ(ERROR_CONTROL_REPUBLISH, srs_error_code(err)); + srs_freep(err); + + // Verify decode_message was called + EXPECT_EQ(1, mock_rtmp->decode_message_count_); + } + } + + // Cleanup mock config after connection is destroyed + srs_freep(mock_config); +} + +VOID TEST(SrsRtmpConnTest, HandlePublishMessageFMLERepublish) +{ + srs_error_t err = srs_success; + + // Create mock config (must outlive connection) + MockAppConfigForRtmpConn *mock_config = new MockAppConfigForRtmpConn(); + + // Use scope block to ensure conn is destroyed before mock_config + { + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Inject mock config before assemble() + conn->config_ = mock_config; + conn->assemble(); + + // Create mock rtmp server + MockRtmpServerForHandlePublishMessage *mock_rtmp = new MockRtmpServerForHandlePublishMessage(); + + // Inject mock into connection + srs_freep(conn->rtmp_); + conn->rtmp_ = mock_rtmp; + + // Test major use scenario: FMLE publish with SrsFMLEStartPacket - should call fmle_unpublish + // This covers the code path where dynamic_cast(pkt.get()) succeeds + if (true) { + // Set connection type to FMLE publish + conn->info_->type_ = SrsRtmpConnFMLEPublish; + conn->info_->res_->stream_id_ = 1; + + // Create a dummy AMF0 command message + SrsUniquePtr msg(new SrsRtmpCommonMessage()); + msg->header_.message_type_ = RTMP_MSG_AMF0CommandMessage; + + // Configure mock to return SrsFMLEStartPacket + SrsFMLEStartPacket *fmle_pkt = new SrsFMLEStartPacket(); + fmle_pkt->transaction_id_ = 2.0; + mock_rtmp->decode_message_packet_ = fmle_pkt; + mock_rtmp->decode_message_error_ = srs_success; + mock_rtmp->fmle_unpublish_error_ = srs_success; + + // Create a dummy live source (not used in this test path) + SrsSharedPtr source; + + // Call handle_publish_message - should return ERROR_CONTROL_REPUBLISH + err = conn->handle_publish_message(source, msg.get()); + EXPECT_TRUE(err != srs_success); + EXPECT_EQ(ERROR_CONTROL_REPUBLISH, srs_error_code(err)); + srs_freep(err); + + // Verify decode_message and fmle_unpublish were called + EXPECT_EQ(1, mock_rtmp->decode_message_count_); + EXPECT_EQ(1, mock_rtmp->fmle_unpublish_count_); + } + } + + // Cleanup mock config after connection is destroyed + srs_freep(mock_config); +} + +VOID TEST(SrsRtmpConnTest, HandlePublishMessageFMLEIgnoreCommand) +{ + srs_error_t err = srs_success; + + // Create mock config (must outlive connection) + MockAppConfigForRtmpConn *mock_config = new MockAppConfigForRtmpConn(); + + // Use scope block to ensure conn is destroyed before mock_config + { + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Inject mock config before assemble() + conn->config_ = mock_config; + conn->assemble(); + + // Create mock rtmp server + MockRtmpServerForHandlePublishMessage *mock_rtmp = new MockRtmpServerForHandlePublishMessage(); + + // Inject mock into connection + srs_freep(conn->rtmp_); + conn->rtmp_ = mock_rtmp; + + // Test major use scenario: FMLE publish with non-FMLE command - should trace and return success + // This covers the code path where the packet is not SrsFMLEStartPacket + if (true) { + // Set connection type to FMLE publish + conn->info_->type_ = SrsRtmpConnFMLEPublish; + + // Create a dummy AMF0 command message + SrsUniquePtr msg(new SrsRtmpCommonMessage()); + msg->header_.message_type_ = RTMP_MSG_AMF0CommandMessage; + + // Configure mock to return a generic command packet (not SrsFMLEStartPacket) + mock_rtmp->decode_message_packet_ = new SrsCallPacket(); + mock_rtmp->decode_message_error_ = srs_success; + + // Create a dummy live source (not used in this test path) + SrsSharedPtr source; + + // Call handle_publish_message - should return success (trace and ignore) + HELPER_EXPECT_SUCCESS(conn->handle_publish_message(source, msg.get())); + + // Verify decode_message was called but fmle_unpublish was not + EXPECT_EQ(1, mock_rtmp->decode_message_count_); + EXPECT_EQ(0, mock_rtmp->fmle_unpublish_count_); + } + } + + // Cleanup mock config after connection is destroyed + srs_freep(mock_config); +} + +VOID TEST(SrsRtmpConnTest, AcquirePublishStreamBusyCheck) +{ + srs_error_t err = srs_success; + + // Create mock config that disables all protocols except RTMP + MockAppConfigForRtmpConn *mock_config = new MockAppConfigForRtmpConn(); + + // Use scope block to ensure conn is destroyed before mock_config + { + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Inject mock config before assemble() + conn->config_ = mock_config; + conn->assemble(); + + // Create mock rtmp server + MockRtmpServerForStreamService *mock_rtmp = new MockRtmpServerForStreamService(); + + // Create mock security + MockSecurityForStreamService *mock_security = new MockSecurityForStreamService(); + + // Inject mocks into connection + srs_freep(conn->rtmp_); + conn->rtmp_ = mock_rtmp; + srs_freep(conn->security_); + conn->security_ = mock_security; + + // Set up request with valid stream info + conn->info_->req_->tcUrl_ = "rtmp://127.0.0.1/live"; + conn->info_->req_->vhost_ = "__defaultVhost__"; + conn->info_->req_->app_ = "live"; + conn->info_->req_->stream_ = "livestream"; + conn->info_->edge_ = false; + + // Test the major use scenario: acquire_publish fails when RTMP stream is busy + // Create mock live source that does NOT allow publishing (stream is busy) + SrsSharedPtr source(new MockLiveSource()); + MockLiveSource *mock_source = dynamic_cast(source.get()); + mock_source->set_can_publish(false); // Stream is busy + + // Call acquire_publish - should fail with ERROR_SYSTEM_STREAM_BUSY + err = conn->acquire_publish(source); + EXPECT_TRUE(err != srs_success); + EXPECT_EQ(ERROR_SYSTEM_STREAM_BUSY, srs_error_code(err)); + srs_freep(err); + + // Note: conn owns mock_rtmp and mock_security, they will be deleted by conn destructor + } // conn is destroyed here + + // Now safe to delete mock_config + srs_freep(mock_config); +} + +// Test SrsRtmpConn::handle_publish_message() to verify proper handling of video/audio messages +// during publishing. This test covers the major use scenario: processing a video message through +// process_publish_message() which calls source->on_video(). +VOID TEST(SrsRtmpConnTest, HandlePublishMessageVideoSuccess) +{ + srs_error_t err = srs_success; + + // Create mock config + MockAppConfigForRtmpConn *mock_config = new MockAppConfigForRtmpConn(); + + // Use scope block to ensure conn is destroyed before mock_config + { + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Inject mock config before assemble() + conn->config_ = mock_config; + conn->assemble(); + + // Create mock rtmp server + MockRtmpServerForHandlePublishMessage *mock_rtmp = new MockRtmpServerForHandlePublishMessage(); + + // Inject mock into connection + srs_freep(conn->rtmp_); + conn->rtmp_ = mock_rtmp; + + // Set connection type to FMLE publish (not Flash, so AMF commands are ignored) + conn->info_->type_ = SrsRtmpConnFMLEPublish; + conn->info_->edge_ = false; + + // Create mock live source + SrsSharedPtr source(new MockLiveSource()); + + // Create a video message (RTMP_MSG_VideoMessage = 9) + SrsUniquePtr msg(new SrsRtmpCommonMessage()); + msg->header_.message_type_ = RTMP_MSG_VideoMessage; + msg->header_.payload_length_ = 10; + msg->header_.timestamp_ = 1000; + msg->header_.stream_id_ = 1; + msg->create_payload(10); + + // Test the major use scenario: handle_publish_message processes video message + // This should: + // 1. Check if message is AMF command (it's not - it's video) + // 2. Call process_publish_message() which calls source->on_video() + HELPER_EXPECT_SUCCESS(conn->handle_publish_message(source, msg.get())); + + // Verify that decode_message was NOT called (because it's not an AMF command) + EXPECT_EQ(0, mock_rtmp->decode_message_count_); + + // Note: conn owns mock_rtmp, it will be deleted by conn destructor + } // conn is destroyed here + + // Now safe to delete mock_config + srs_freep(mock_config); +} + +MockRtmpServerForPlayControl::MockRtmpServerForPlayControl() +{ + decode_message_packet_ = NULL; + decode_message_count_ = 0; + send_and_free_packet_count_ = 0; + on_play_client_pause_count_ = 0; + last_pause_state_ = false; +} + +MockRtmpServerForPlayControl::~MockRtmpServerForPlayControl() +{ + srs_freep(decode_message_packet_); +} + +void MockRtmpServerForPlayControl::set_recv_timeout(srs_utime_t tm) {} +void MockRtmpServerForPlayControl::set_send_timeout(srs_utime_t tm) {} +srs_error_t MockRtmpServerForPlayControl::handshake() { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::connect_app(ISrsRequest *req) { return srs_success; } +uint32_t MockRtmpServerForPlayControl::proxy_real_ip() { return 0; } +srs_error_t MockRtmpServerForPlayControl::set_window_ack_size(int ack_size) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::set_peer_bandwidth(int bandwidth, int type) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::set_chunk_size(int chunk_size) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::response_connect_app(ISrsRequest *req, const char *server_ip) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::on_bw_done() { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::start_play(int stream_id) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::start_fmle_publish(int stream_id) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::start_haivision_publish(int stream_id) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::fmle_unpublish(int stream_id, double unpublish_tid) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::start_flash_publish(int stream_id) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::start_publishing(int stream_id) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::redirect(ISrsRequest *r, std::string url, bool &accepted) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id) { return srs_success; } + +srs_error_t MockRtmpServerForPlayControl::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) +{ + decode_message_count_++; + *ppacket = decode_message_packet_; + decode_message_packet_ = NULL; + return srs_success; +} + +srs_error_t MockRtmpServerForPlayControl::send_and_free_packet(SrsRtmpCommand *packet, int stream_id) +{ + send_and_free_packet_count_++; + srs_freep(packet); + return srs_success; +} + +srs_error_t MockRtmpServerForPlayControl::on_play_client_pause(int stream_id, bool is_pause) +{ + on_play_client_pause_count_++; + last_pause_state_ = is_pause; + return srs_success; +} + +srs_error_t MockRtmpServerForPlayControl::set_in_window_ack_size(int ack_size) { return srs_success; } +srs_error_t MockRtmpServerForPlayControl::recv_message(SrsRtmpCommonMessage **pmsg) { return srs_success; } +void MockRtmpServerForPlayControl::set_auto_response(bool v) {} +void MockRtmpServerForPlayControl::set_merge_read(bool v, IMergeReadHandler *handler) {} +void MockRtmpServerForPlayControl::set_recv_buffer(int buffer_size) {} + +void MockRtmpServerForPlayControl::reset() +{ + srs_freep(decode_message_packet_); + decode_message_count_ = 0; + send_and_free_packet_count_ = 0; + on_play_client_pause_count_ = 0; + last_pause_state_ = false; +} + +MockLiveConsumerForPlayControl::MockLiveConsumerForPlayControl(ISrsLiveSource *source) + : SrsLiveConsumer(source) +{ + on_play_client_pause_count_ = 0; + last_pause_state_ = false; +} + +MockLiveConsumerForPlayControl::~MockLiveConsumerForPlayControl() +{ +} + +srs_error_t MockLiveConsumerForPlayControl::on_play_client_pause(bool is_pause) +{ + on_play_client_pause_count_++; + last_pause_state_ = is_pause; + return srs_success; +} + +MockAppConfigForHttpHooksOnConnect::MockAppConfigForHttpHooksOnConnect() +{ + http_hooks_enabled_ = false; + on_connect_directive_ = NULL; +} + +MockAppConfigForHttpHooksOnConnect::~MockAppConfigForHttpHooksOnConnect() +{ + srs_freep(on_connect_directive_); +} + +bool MockAppConfigForHttpHooksOnConnect::get_vhost_http_hooks_enabled(std::string vhost) +{ + return http_hooks_enabled_; +} + +SrsConfDirective *MockAppConfigForHttpHooksOnConnect::get_vhost_on_connect(std::string vhost) +{ + return on_connect_directive_; +} + +MockAppConfigForHttpHooksOnClose::MockAppConfigForHttpHooksOnClose() +{ + http_hooks_enabled_ = false; + on_close_directive_ = NULL; +} + +MockAppConfigForHttpHooksOnClose::~MockAppConfigForHttpHooksOnClose() +{ + srs_freep(on_close_directive_); +} + +bool MockAppConfigForHttpHooksOnClose::get_vhost_http_hooks_enabled(std::string vhost) +{ + return http_hooks_enabled_; +} + +SrsConfDirective *MockAppConfigForHttpHooksOnClose::get_vhost_on_close(std::string vhost) +{ + return on_close_directive_; +} + +MockHttpHooksForOnConnect::MockHttpHooksForOnConnect() +{ + on_connect_count_ = 0; + on_connect_error_ = srs_success; + on_close_count_ = 0; + on_unpublish_count_ = 0; + on_stop_count_ = 0; +} + +MockHttpHooksForOnConnect::~MockHttpHooksForOnConnect() +{ + srs_freep(on_connect_error_); + on_connect_calls_.clear(); + on_close_calls_.clear(); + on_unpublish_calls_.clear(); + on_stop_calls_.clear(); +} + +srs_error_t MockHttpHooksForOnConnect::on_connect(std::string url, ISrsRequest *req) +{ + on_connect_count_++; + on_connect_calls_.push_back(std::make_pair(url, req)); + return srs_error_copy(on_connect_error_); +} + +void MockHttpHooksForOnConnect::on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes) +{ + on_close_count_++; + OnCloseCall call; + call.url_ = url; + call.req_ = req; + call.send_bytes_ = send_bytes; + call.recv_bytes_ = recv_bytes; + on_close_calls_.push_back(call); +} + +srs_error_t MockHttpHooksForOnConnect::on_publish(std::string url, ISrsRequest *req) +{ + return srs_success; +} + +void MockHttpHooksForOnConnect::on_unpublish(std::string url, ISrsRequest *req) +{ + on_unpublish_count_++; + on_unpublish_calls_.push_back(std::make_pair(url, req)); +} + +srs_error_t MockHttpHooksForOnConnect::on_play(std::string url, ISrsRequest *req) +{ + return srs_success; +} + +void MockHttpHooksForOnConnect::on_stop(std::string url, ISrsRequest *req) +{ + on_stop_count_++; + on_stop_calls_.push_back(std::make_pair(url, req)); +} + +srs_error_t MockHttpHooksForOnConnect::on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnConnect::on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, + std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnConnect::on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnConnect::discover_co_workers(std::string url, std::string &host, int &port) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnConnect::on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls) +{ + return srs_success; +} + +void MockHttpHooksForOnConnect::reset() +{ + srs_freep(on_connect_error_); + on_connect_error_ = srs_success; + on_connect_count_ = 0; + on_connect_calls_.clear(); + on_close_count_ = 0; + on_close_calls_.clear(); + on_unpublish_count_ = 0; + on_unpublish_calls_.clear(); + on_stop_count_ = 0; + on_stop_calls_.clear(); +} + +VOID TEST(SrsRtmpConnTest, HttpHooksOnConnect) +{ + srs_error_t err = srs_success; + + // Create mock config + MockAppConfigForHttpHooksOnConnect *mock_config = new MockAppConfigForHttpHooksOnConnect(); + + // Create mock hooks + MockHttpHooksForOnConnect *mock_hooks = new MockHttpHooksForOnConnect(); + + // Use scope block to ensure conn is destroyed before mock_config + { + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Inject mock config before assemble() + conn->config_ = mock_config; + conn->assemble(); + + // Inject mock hooks + ISrsHttpHooks *original_hooks = conn->hooks_; + conn->hooks_ = mock_hooks; + + // Set up request with vhost + conn->info_->req_->vhost_ = "__defaultVhost__"; + + // Test case 1: HTTP hooks disabled - should return success without calling hooks + if (true) { + mock_config->http_hooks_enabled_ = false; + mock_hooks->reset(); + + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_connect()); + EXPECT_EQ(0, mock_hooks->on_connect_count_); + } + + // Test case 2: HTTP hooks enabled but no on_connect directive - should return success + if (true) { + mock_config->http_hooks_enabled_ = true; + mock_config->on_connect_directive_ = NULL; + mock_hooks->reset(); + + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_connect()); + EXPECT_EQ(0, mock_hooks->on_connect_count_); + } + + // Test case 3: HTTP hooks enabled with single URL - should call hooks_->on_connect once + if (true) { + mock_config->http_hooks_enabled_ = true; + mock_config->on_connect_directive_ = new SrsConfDirective(); + mock_config->on_connect_directive_->name_ = "on_connect"; + mock_config->on_connect_directive_->args_.push_back("http://localhost:8080/api/on_connect"); + mock_hooks->reset(); + + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_connect()); + EXPECT_EQ(1, mock_hooks->on_connect_count_); + EXPECT_EQ(1, (int)mock_hooks->on_connect_calls_.size()); + EXPECT_STREQ("http://localhost:8080/api/on_connect", mock_hooks->on_connect_calls_[0].first.c_str()); + EXPECT_TRUE(mock_hooks->on_connect_calls_[0].second == conn->info_->req_); + } + + // Test case 4: HTTP hooks enabled with multiple URLs - should call hooks_->on_connect for each URL + if (true) { + srs_freep(mock_config->on_connect_directive_); + mock_config->http_hooks_enabled_ = true; + mock_config->on_connect_directive_ = new SrsConfDirective(); + mock_config->on_connect_directive_->name_ = "on_connect"; + mock_config->on_connect_directive_->args_.push_back("http://localhost:8080/api/on_connect1"); + mock_config->on_connect_directive_->args_.push_back("http://localhost:8080/api/on_connect2"); + mock_config->on_connect_directive_->args_.push_back("http://localhost:8080/api/on_connect3"); + mock_hooks->reset(); + + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_connect()); + EXPECT_EQ(3, mock_hooks->on_connect_count_); + EXPECT_EQ(3, (int)mock_hooks->on_connect_calls_.size()); + EXPECT_STREQ("http://localhost:8080/api/on_connect1", mock_hooks->on_connect_calls_[0].first.c_str()); + EXPECT_STREQ("http://localhost:8080/api/on_connect2", mock_hooks->on_connect_calls_[1].first.c_str()); + EXPECT_STREQ("http://localhost:8080/api/on_connect3", mock_hooks->on_connect_calls_[2].first.c_str()); + } + + // Test case 5: HTTP hooks enabled but on_connect returns error - should wrap and return error + if (true) { + srs_freep(mock_config->on_connect_directive_); + mock_config->http_hooks_enabled_ = true; + mock_config->on_connect_directive_ = new SrsConfDirective(); + mock_config->on_connect_directive_->name_ = "on_connect"; + mock_config->on_connect_directive_->args_.push_back("http://localhost:8080/api/on_connect"); + mock_hooks->reset(); + mock_hooks->on_connect_error_ = srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "mock hook error"); + + err = conn->http_hooks_on_connect(); + EXPECT_TRUE(err != srs_success); + srs_freep(err); + EXPECT_EQ(1, mock_hooks->on_connect_count_); + } + + // Restore original hooks + conn->hooks_ = original_hooks; + } // conn is destroyed here + + // Now safe to delete mock objects + srs_freep(mock_hooks); + srs_freep(mock_config); +} + +// Test SrsRtmpConn::process_play_control_msg() to verify proper handling of pause control messages +// during playback. This test covers the major use scenario: processing a pause packet which calls +// both rtmp_->on_play_client_pause() and consumer->on_play_client_pause(). +VOID TEST(SrsRtmpConnTest, ProcessPlayControlMsgPauseSuccess) +{ + srs_error_t err = srs_success; + + // Create mock config + MockAppConfigForRtmpConn *mock_config = new MockAppConfigForRtmpConn(); + + // Use scope block to ensure conn is destroyed before mock_config + { + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Inject mock config before assemble() + conn->config_ = mock_config; + conn->assemble(); + + // Create mock rtmp server + MockRtmpServerForPlayControl *mock_rtmp = new MockRtmpServerForPlayControl(); + + // Create pause packet to be returned by decode_message + SrsPausePacket *pause_pkt = new SrsPausePacket(); + pause_pkt->is_pause_ = true; + pause_pkt->time_ms_ = 1000.0; + mock_rtmp->decode_message_packet_ = pause_pkt; + + // Inject mock rtmp into connection + srs_freep(conn->rtmp_); + conn->rtmp_ = mock_rtmp; + + // Initialize stream_id in response object + conn->info_->res_->stream_id_ = 1; + + // Create mock live source and consumer (use MockLiveSourceForQueue which properly handles on_consumer_destroy) + SrsSharedPtr source(new MockLiveSourceForQueue()); + SrsUniquePtr consumer(new MockLiveConsumerForPlayControl(source.get())); + + // Create an AMF0 command message + SrsRtmpCommonMessage *msg = new SrsRtmpCommonMessage(); + msg->header_.message_type_ = RTMP_MSG_AMF0CommandMessage; + msg->header_.payload_length_ = 10; + msg->header_.timestamp_ = 1000; + msg->header_.stream_id_ = 1; + msg->create_payload(10); + + // Test the major use scenario: process_play_control_msg handles pause packet + // This should: + // 1. Check if message is AMF command (it is) + // 2. Decode the message to get pause packet + // 3. Call rtmp_->on_play_client_pause() + // 4. Call consumer->on_play_client_pause() + HELPER_EXPECT_SUCCESS(conn->process_play_control_msg(consumer.get(), msg)); + + // Verify that decode_message was called + EXPECT_EQ(1, mock_rtmp->decode_message_count_); + + // Verify that on_play_client_pause was called on both rtmp and consumer + EXPECT_EQ(1, mock_rtmp->on_play_client_pause_count_); + EXPECT_EQ(true, mock_rtmp->last_pause_state_); + EXPECT_EQ(1, consumer->on_play_client_pause_count_); + EXPECT_EQ(true, consumer->last_pause_state_); + + // Note: conn owns mock_rtmp, it will be deleted by conn destructor + } // conn is destroyed here + + // Now safe to delete mock_config + srs_freep(mock_config); +} + +// Test SrsRtmpConn::http_hooks_on_close() method to verify proper HTTP hook invocation +// when a client disconnects. This test covers the major use scenario where on_close hooks +// are configured and should be called with the correct URL, request, and byte counts. +VOID TEST(SrsRtmpConnTest, HttpHooksOnClose) +{ + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Create mock config with on_close hooks enabled + MockAppConfigForHttpHooksOnClose *mock_config = new MockAppConfigForHttpHooksOnClose(); + mock_config->http_hooks_enabled_ = true; + + // Create on_close directive with two hook URLs + mock_config->on_close_directive_ = new SrsConfDirective(); + mock_config->on_close_directive_->name_ = "on_close"; + mock_config->on_close_directive_->args_.push_back("http://127.0.0.1:8085/api/v1/close"); + mock_config->on_close_directive_->args_.push_back("http://localhost:8085/api/v1/close"); + + // Create mock hooks + MockHttpHooksForOnConnect *mock_hooks = new MockHttpHooksForOnConnect(); + + // Inject mocks into connection + conn->config_ = mock_config; + conn->hooks_ = mock_hooks; + + // Set up request with valid vhost + conn->info_->req_->vhost_ = "__defaultVhost__"; + conn->info_->req_->app_ = "live"; + conn->info_->req_->stream_ = "livestream"; + + // Test the major use scenario: http_hooks_on_close() with hooks enabled + // This should: + // 1. Check if HTTP hooks are enabled (they are) + // 2. Get the on_close directive from config + // 3. Copy the hook URLs from the directive + // 4. Call hooks_->on_close() for each URL with transport byte counts + conn->http_hooks_on_close(); + + // Verify that on_close was called twice (once for each URL) + EXPECT_EQ(2, mock_hooks->on_close_count_); + EXPECT_EQ(2, (int)mock_hooks->on_close_calls_.size()); + + // Verify the first call + EXPECT_STREQ("http://127.0.0.1:8085/api/v1/close", mock_hooks->on_close_calls_[0].url_.c_str()); + EXPECT_TRUE(mock_hooks->on_close_calls_[0].req_ == conn->info_->req_); + EXPECT_EQ(0, mock_hooks->on_close_calls_[0].send_bytes_); // Mock transport returns 0 + EXPECT_EQ(0, mock_hooks->on_close_calls_[0].recv_bytes_); // Mock transport returns 0 + + // Verify the second call + EXPECT_STREQ("http://localhost:8085/api/v1/close", mock_hooks->on_close_calls_[1].url_.c_str()); + EXPECT_TRUE(mock_hooks->on_close_calls_[1].req_ == conn->info_->req_); + EXPECT_EQ(0, mock_hooks->on_close_calls_[1].send_bytes_); + EXPECT_EQ(0, mock_hooks->on_close_calls_[1].recv_bytes_); + + // Cleanup: restore original config and hooks to avoid side effects + conn->config_ = _srs_config; + conn->hooks_ = _srs_hooks; + srs_freep(mock_config); + srs_freep(mock_hooks); +} + +MockAppConfigForHttpHooksOnPublish::MockAppConfigForHttpHooksOnPublish() +{ + http_hooks_enabled_ = false; + on_publish_directive_ = NULL; +} + +MockAppConfigForHttpHooksOnPublish::~MockAppConfigForHttpHooksOnPublish() +{ + srs_freep(on_publish_directive_); +} + +bool MockAppConfigForHttpHooksOnPublish::get_vhost_http_hooks_enabled(std::string vhost) +{ + return http_hooks_enabled_; +} + +SrsConfDirective *MockAppConfigForHttpHooksOnPublish::get_vhost_on_publish(std::string vhost) +{ + return on_publish_directive_; +} + +MockHttpHooksForOnPublish::MockHttpHooksForOnPublish() +{ + on_publish_count_ = 0; + on_publish_error_ = srs_success; +} + +MockHttpHooksForOnPublish::~MockHttpHooksForOnPublish() +{ + srs_freep(on_publish_error_); +} + +srs_error_t MockHttpHooksForOnPublish::on_connect(std::string url, ISrsRequest *req) +{ + return srs_success; +} + +void MockHttpHooksForOnPublish::on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes) +{ +} + +srs_error_t MockHttpHooksForOnPublish::on_publish(std::string url, ISrsRequest *req) +{ + on_publish_count_++; + on_publish_calls_.push_back(std::make_pair(url, req)); + return srs_error_copy(on_publish_error_); +} + +void MockHttpHooksForOnPublish::on_unpublish(std::string url, ISrsRequest *req) +{ +} + +srs_error_t MockHttpHooksForOnPublish::on_play(std::string url, ISrsRequest *req) +{ + return srs_success; +} + +void MockHttpHooksForOnPublish::on_stop(std::string url, ISrsRequest *req) +{ +} + +srs_error_t MockHttpHooksForOnPublish::on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnPublish::on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, + std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnPublish::on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnPublish::discover_co_workers(std::string url, std::string &host, int &port) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnPublish::on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls) +{ + return srs_success; +} + +void MockHttpHooksForOnPublish::reset() +{ + on_publish_calls_.clear(); + on_publish_count_ = 0; + srs_freep(on_publish_error_); + on_publish_error_ = srs_success; +} + +VOID TEST(SrsRtmpConnTest, HttpHooksOnPublishSuccess) +{ + srs_error_t err = srs_success; + + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Create mock config with HTTP hooks enabled + MockAppConfigForHttpHooksOnPublish *mock_config = new MockAppConfigForHttpHooksOnPublish(); + mock_config->http_hooks_enabled_ = true; + + // Create on_publish directive with two hook URLs + mock_config->on_publish_directive_ = new SrsConfDirective(); + mock_config->on_publish_directive_->name_ = "on_publish"; + mock_config->on_publish_directive_->args_.push_back("http://127.0.0.1:8085/api/v1/publish"); + mock_config->on_publish_directive_->args_.push_back("http://localhost:8085/api/v1/publish"); + + // Create mock hooks + MockHttpHooksForOnPublish *mock_hooks = new MockHttpHooksForOnPublish(); + + // Inject mocks into connection + conn->config_ = mock_config; + conn->hooks_ = mock_hooks; + + // Set up request with valid vhost + conn->info_->req_->vhost_ = "__defaultVhost__"; + conn->info_->req_->app_ = "live"; + conn->info_->req_->stream_ = "livestream"; + + // Test the major use scenario: http_hooks_on_publish() with hooks enabled + // This should: + // 1. Check if HTTP hooks are enabled (they are) + // 2. Get the on_publish directive from config + // 3. Copy the hook URLs from the directive + // 4. Call hooks_->on_publish() for each URL + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_publish()); + + // Verify that on_publish was called twice (once for each URL) + EXPECT_EQ(2, mock_hooks->on_publish_count_); + EXPECT_EQ(2, (int)mock_hooks->on_publish_calls_.size()); + + // Verify the first call + EXPECT_STREQ("http://127.0.0.1:8085/api/v1/publish", mock_hooks->on_publish_calls_[0].first.c_str()); + EXPECT_TRUE(mock_hooks->on_publish_calls_[0].second == conn->info_->req_); + + // Verify the second call + EXPECT_STREQ("http://localhost:8085/api/v1/publish", mock_hooks->on_publish_calls_[1].first.c_str()); + EXPECT_TRUE(mock_hooks->on_publish_calls_[1].second == conn->info_->req_); + + // Cleanup: restore original config and hooks to avoid side effects + conn->config_ = _srs_config; + conn->hooks_ = _srs_hooks; + srs_freep(mock_config); + srs_freep(mock_hooks); +} + +MockAppConfigForHttpHooksOnUnpublish::MockAppConfigForHttpHooksOnUnpublish() +{ + http_hooks_enabled_ = false; + on_unpublish_directive_ = NULL; +} + +MockAppConfigForHttpHooksOnUnpublish::~MockAppConfigForHttpHooksOnUnpublish() +{ + srs_freep(on_unpublish_directive_); +} + +bool MockAppConfigForHttpHooksOnUnpublish::get_vhost_http_hooks_enabled(std::string vhost) +{ + return http_hooks_enabled_; +} + +SrsConfDirective *MockAppConfigForHttpHooksOnUnpublish::get_vhost_on_unpublish(std::string vhost) +{ + return on_unpublish_directive_; +} + +// Test SrsRtmpConn::http_hooks_on_unpublish() method to verify proper HTTP hook invocation +// when a publisher stops publishing. This test covers the major use scenario where on_unpublish +// hooks are configured and should be called with the correct URL and request. +VOID TEST(SrsRtmpConnTest, HttpHooksOnUnpublishSuccess) +{ + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Create mock config with HTTP hooks enabled + MockAppConfigForHttpHooksOnUnpublish *mock_config = new MockAppConfigForHttpHooksOnUnpublish(); + mock_config->http_hooks_enabled_ = true; + + // Create on_unpublish directive with two hook URLs + mock_config->on_unpublish_directive_ = new SrsConfDirective(); + mock_config->on_unpublish_directive_->name_ = "on_unpublish"; + mock_config->on_unpublish_directive_->args_.push_back("http://127.0.0.1:8085/api/v1/unpublish"); + mock_config->on_unpublish_directive_->args_.push_back("http://localhost:8085/api/v1/unpublish"); + + // Create mock hooks + MockHttpHooksForOnConnect *mock_hooks = new MockHttpHooksForOnConnect(); + + // Inject mocks into connection + conn->config_ = mock_config; + conn->hooks_ = mock_hooks; + + // Set up request with valid vhost + conn->info_->req_->vhost_ = "__defaultVhost__"; + conn->info_->req_->app_ = "live"; + conn->info_->req_->stream_ = "livestream"; + + // Test the major use scenario: http_hooks_on_unpublish() with hooks enabled + // This should: + // 1. Check if HTTP hooks are enabled (they are) + // 2. Get the on_unpublish directive from config + // 3. Copy the hook URLs from the directive + // 4. Call hooks_->on_unpublish() for each URL + conn->http_hooks_on_unpublish(); + + // Verify that on_unpublish was called twice (once for each URL) + EXPECT_EQ(2, mock_hooks->on_unpublish_count_); + EXPECT_EQ(2, (int)mock_hooks->on_unpublish_calls_.size()); + + // Verify the first call + EXPECT_STREQ("http://127.0.0.1:8085/api/v1/unpublish", mock_hooks->on_unpublish_calls_[0].first.c_str()); + EXPECT_TRUE(mock_hooks->on_unpublish_calls_[0].second == conn->info_->req_); + + // Verify the second call + EXPECT_STREQ("http://localhost:8085/api/v1/unpublish", mock_hooks->on_unpublish_calls_[1].first.c_str()); + EXPECT_TRUE(mock_hooks->on_unpublish_calls_[1].second == conn->info_->req_); + + // Cleanup: restore original config and hooks to avoid side effects + conn->config_ = _srs_config; + conn->hooks_ = _srs_hooks; + srs_freep(mock_config); + srs_freep(mock_hooks); +} + +MockAppConfigForHttpHooksOnStop::MockAppConfigForHttpHooksOnStop() +{ + http_hooks_enabled_ = false; + on_stop_directive_ = NULL; +} + +MockAppConfigForHttpHooksOnStop::~MockAppConfigForHttpHooksOnStop() +{ + srs_freep(on_stop_directive_); +} + +bool MockAppConfigForHttpHooksOnStop::get_vhost_http_hooks_enabled(std::string vhost) +{ + return http_hooks_enabled_; +} + +SrsConfDirective *MockAppConfigForHttpHooksOnStop::get_vhost_on_stop(std::string vhost) +{ + return on_stop_directive_; +} + +// Test SrsRtmpConn::http_hooks_on_stop() method to verify proper HTTP hook invocation +// when a player stops playing. This test covers the major use scenario where on_stop +// hooks are configured and should be called with the correct URL and request. +VOID TEST(SrsRtmpConnTest, HttpHooksOnStopSuccess) +{ + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Create mock config with HTTP hooks enabled + MockAppConfigForHttpHooksOnStop *mock_config = new MockAppConfigForHttpHooksOnStop(); + mock_config->http_hooks_enabled_ = true; + + // Create on_stop directive with two hook URLs + mock_config->on_stop_directive_ = new SrsConfDirective(); + mock_config->on_stop_directive_->name_ = "on_stop"; + mock_config->on_stop_directive_->args_.push_back("http://127.0.0.1:8085/api/v1/stop"); + mock_config->on_stop_directive_->args_.push_back("http://localhost:8085/api/v1/stop"); + + // Create mock hooks (reuse MockHttpHooksForOnConnect which has on_stop tracking) + MockHttpHooksForOnConnect *mock_hooks = new MockHttpHooksForOnConnect(); + + // Inject mocks into connection + conn->config_ = mock_config; + conn->hooks_ = mock_hooks; + + // Set up request with valid vhost + conn->info_->req_->vhost_ = "__defaultVhost__"; + conn->info_->req_->app_ = "live"; + conn->info_->req_->stream_ = "livestream"; + + // Test the major use scenario: http_hooks_on_stop() with hooks enabled + // This should: + // 1. Check if HTTP hooks are enabled (they are) + // 2. Get the on_stop directive from config + // 3. Copy the hook URLs from the directive + // 4. Call hooks_->on_stop() for each URL + conn->http_hooks_on_stop(); + + // Verify that on_stop was called twice (once for each URL) + EXPECT_EQ(2, mock_hooks->on_stop_count_); + EXPECT_EQ(2, (int)mock_hooks->on_stop_calls_.size()); + + // Verify the first call + EXPECT_STREQ("http://127.0.0.1:8085/api/v1/stop", mock_hooks->on_stop_calls_[0].first.c_str()); + EXPECT_TRUE(mock_hooks->on_stop_calls_[0].second == conn->info_->req_); + + // Verify the second call + EXPECT_STREQ("http://localhost:8085/api/v1/stop", mock_hooks->on_stop_calls_[1].first.c_str()); + EXPECT_TRUE(mock_hooks->on_stop_calls_[1].second == conn->info_->req_); + + // Cleanup: restore original config and hooks to avoid side effects + conn->config_ = _srs_config; + conn->hooks_ = _srs_hooks; + srs_freep(mock_config); + srs_freep(mock_hooks); +} + +MockAppConfigForHttpHooksOnPlay::MockAppConfigForHttpHooksOnPlay() +{ + http_hooks_enabled_ = false; + on_play_directive_ = NULL; +} + +MockAppConfigForHttpHooksOnPlay::~MockAppConfigForHttpHooksOnPlay() +{ + srs_freep(on_play_directive_); +} + +bool MockAppConfigForHttpHooksOnPlay::get_vhost_http_hooks_enabled(std::string vhost) +{ + return http_hooks_enabled_; +} + +SrsConfDirective *MockAppConfigForHttpHooksOnPlay::get_vhost_on_play(std::string vhost) +{ + return on_play_directive_; +} + +MockHttpHooksForOnPlay::MockHttpHooksForOnPlay() +{ + on_play_count_ = 0; + on_play_error_ = srs_success; +} + +MockHttpHooksForOnPlay::~MockHttpHooksForOnPlay() +{ + srs_freep(on_play_error_); +} + +srs_error_t MockHttpHooksForOnPlay::on_connect(std::string url, ISrsRequest *req) +{ + return srs_success; +} + +void MockHttpHooksForOnPlay::on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes) +{ +} + +srs_error_t MockHttpHooksForOnPlay::on_publish(std::string url, ISrsRequest *req) +{ + return srs_success; +} + +void MockHttpHooksForOnPlay::on_unpublish(std::string url, ISrsRequest *req) +{ +} + +srs_error_t MockHttpHooksForOnPlay::on_play(std::string url, ISrsRequest *req) +{ + on_play_count_++; + on_play_calls_.push_back(std::make_pair(url, req)); + return srs_error_copy(on_play_error_); +} + +void MockHttpHooksForOnPlay::on_stop(std::string url, ISrsRequest *req) +{ +} + +srs_error_t MockHttpHooksForOnPlay::on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnPlay::on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, + std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnPlay::on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnPlay::discover_co_workers(std::string url, std::string &host, int &port) +{ + return srs_success; +} + +srs_error_t MockHttpHooksForOnPlay::on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls) +{ + return srs_success; +} + +void MockHttpHooksForOnPlay::reset() +{ + on_play_calls_.clear(); + on_play_count_ = 0; + srs_freep(on_play_error_); + on_play_error_ = srs_success; +} + +VOID TEST(SrsRtmpConnTest, HttpHooksOnPlaySuccess) +{ + srs_error_t err = srs_success; + + // Create mock transport + MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle(); + + // Create connection + SrsUniquePtr conn(new SrsRtmpConn(mock_transport, "192.168.1.100", 1935)); + + // Create mock config with HTTP hooks enabled + MockAppConfigForHttpHooksOnPlay *mock_config = new MockAppConfigForHttpHooksOnPlay(); + mock_config->http_hooks_enabled_ = true; + + // Create on_play directive with two hook URLs + mock_config->on_play_directive_ = new SrsConfDirective(); + mock_config->on_play_directive_->name_ = "on_play"; + mock_config->on_play_directive_->args_.push_back("http://127.0.0.1:8085/api/v1/play"); + mock_config->on_play_directive_->args_.push_back("http://localhost:8085/api/v1/play"); + + // Create mock hooks + MockHttpHooksForOnPlay *mock_hooks = new MockHttpHooksForOnPlay(); + + // Inject mocks into connection + conn->config_ = mock_config; + conn->hooks_ = mock_hooks; + + // Set up request with valid vhost + conn->info_->req_->vhost_ = "__defaultVhost__"; + conn->info_->req_->app_ = "live"; + conn->info_->req_->stream_ = "livestream"; + + // Test the major use scenario: http_hooks_on_play() with hooks enabled + // This should: + // 1. Check if HTTP hooks are enabled (they are) + // 2. Get the on_play directive from config + // 3. Copy the hook URLs from the directive + // 4. Call hooks_->on_play() for each URL + HELPER_EXPECT_SUCCESS(conn->http_hooks_on_play()); + + // Verify that on_play was called twice (once for each URL) + EXPECT_EQ(2, mock_hooks->on_play_count_); + EXPECT_EQ(2, (int)mock_hooks->on_play_calls_.size()); + + // Verify the first call + EXPECT_STREQ("http://127.0.0.1:8085/api/v1/play", mock_hooks->on_play_calls_[0].first.c_str()); + EXPECT_TRUE(mock_hooks->on_play_calls_[0].second == conn->info_->req_); + + // Verify the second call + EXPECT_STREQ("http://localhost:8085/api/v1/play", mock_hooks->on_play_calls_[1].first.c_str()); + EXPECT_TRUE(mock_hooks->on_play_calls_[1].second == conn->info_->req_); + + // Cleanup: restore original config and hooks to avoid side effects + conn->config_ = _srs_config; + conn->hooks_ = _srs_hooks; + srs_freep(mock_config); + srs_freep(mock_hooks); +} + +// Test get_proc_self_stat() function to verify proper reading of /proc/self/stat +// on Linux systems. This test covers the major use scenario of reading process +// statistics from the /proc filesystem. +VOID TEST(UtilityTest, GetProcSelfStatSuccess) +{ + // Create SrsProcSelfStat instance + SrsProcSelfStat stat; + + // Call get_proc_self_stat() - this is the main test + // On Linux: reads /proc/self/stat and parses all fields + // On macOS: skips reading but still sets ok_ to true + bool result = get_proc_self_stat(stat); + + // Verify that the function succeeded + EXPECT_TRUE(result); + EXPECT_TRUE(stat.ok_); + +#if !defined(SRS_OSX) + // On Linux, verify that key fields were populated correctly + // pid should be positive + EXPECT_TRUE(stat.pid_ > 0); + + // comm should not be empty (process name in parentheses) + EXPECT_TRUE(stat.comm_[0] != '\0'); + + // state should be one of the valid process states: R, S, D, Z, T, W + EXPECT_TRUE(stat.state_ == 'R' || stat.state_ == 'S' || stat.state_ == 'D' || + stat.state_ == 'Z' || stat.state_ == 'T' || stat.state_ == 'W'); + + // ppid should be positive (parent process ID) + EXPECT_TRUE(stat.ppid_ > 0); + + // num_threads should be at least 1 (current thread) + EXPECT_TRUE(stat.num_threads_ >= 1); + + // vsize should be positive (virtual memory size) + EXPECT_TRUE(stat.vsize_ > 0); +#endif +} + +// Test srs_get_local_port() function to verify proper retrieval of local port +// from a socket file descriptor. This test covers both IPv4 and IPv6 scenarios. +VOID TEST(UtilityTest, GetLocalPortSuccess) +{ + srs_error_t err; + + // Test with IPv4 TCP socket - listen on random port in [30000, 60000] + if (true) { + int port = 30000 + (rand() % 30001); + srs_netfd_t fd = NULL; + HELPER_EXPECT_SUCCESS(srs_tcp_listen("127.0.0.1", port, &fd)); + EXPECT_TRUE(fd != NULL); + + // Get the actual port using srs_get_local_port + int actual_fd = srs_netfd_fileno(fd); + EXPECT_GT(actual_fd, 0); + + int local_port = srs_get_local_port(actual_fd); + EXPECT_EQ(local_port, port); + EXPECT_GE(local_port, 30000); + EXPECT_LE(local_port, 60000); + + srs_close_stfd(fd); + } + + // Test with IPv6 TCP socket - listen on random port in [30000, 60000] + if (true) { + int port = 30000 + (rand() % 30001); + srs_netfd_t fd = NULL; + HELPER_EXPECT_SUCCESS(srs_tcp_listen("::1", port, &fd)); + EXPECT_TRUE(fd != NULL); + + // Get the actual port using srs_get_local_port + int actual_fd = srs_netfd_fileno(fd); + EXPECT_GT(actual_fd, 0); + + int local_port = srs_get_local_port(actual_fd); + EXPECT_EQ(local_port, port); + EXPECT_GE(local_port, 30000); + EXPECT_LE(local_port, 60000); + + srs_close_stfd(fd); + } + + // Test with invalid file descriptor - should return 0 + if (true) { + int invalid_fd = -1; + int local_port = srs_get_local_port(invalid_fd); + EXPECT_EQ(local_port, 0); + } +} + +VOID TEST(AppUtilityTest, ApiDumpSummaries) +{ + // Test srs_api_dump_summaries function + SrsUniquePtr obj(SrsJsonAny::object()); + + // Call the function to dump summaries + srs_api_dump_summaries(obj.get()); + + // Verify the JSON structure + // Check that "data" object exists + SrsJsonAny *data_any = obj->get_property("data"); + ASSERT_TRUE(data_any != NULL); + ASSERT_TRUE(data_any->is_object()); + SrsJsonObject *data = (SrsJsonObject *)data_any; + + // Check "ok" field + SrsJsonAny *ok_any = data->get_property("ok"); + ASSERT_TRUE(ok_any != NULL); + ASSERT_TRUE(ok_any->is_boolean()); + + // Check "now_ms" field + SrsJsonAny *now_ms_any = data->get_property("now_ms"); + ASSERT_TRUE(now_ms_any != NULL); + ASSERT_TRUE(now_ms_any->is_integer()); + + // Check "self" object + SrsJsonAny *self_any = data->get_property("self"); + ASSERT_TRUE(self_any != NULL); + ASSERT_TRUE(self_any->is_object()); + SrsJsonObject *self = (SrsJsonObject *)self_any; + + // Verify self fields + ASSERT_TRUE(self->get_property("version") != NULL); + ASSERT_TRUE(self->get_property("pid") != NULL); + ASSERT_TRUE(self->get_property("ppid") != NULL); + ASSERT_TRUE(self->get_property("argv") != NULL); + ASSERT_TRUE(self->get_property("cwd") != NULL); + ASSERT_TRUE(self->get_property("mem_kbyte") != NULL); + ASSERT_TRUE(self->get_property("mem_percent") != NULL); + ASSERT_TRUE(self->get_property("cpu_percent") != NULL); + ASSERT_TRUE(self->get_property("srs_uptime") != NULL); + + // Check "system" object + SrsJsonAny *sys_any = data->get_property("system"); + ASSERT_TRUE(sys_any != NULL); + ASSERT_TRUE(sys_any->is_object()); + SrsJsonObject *sys = (SrsJsonObject *)sys_any; + + // Verify system fields + ASSERT_TRUE(sys->get_property("cpu_percent") != NULL); + ASSERT_TRUE(sys->get_property("disk_read_KBps") != NULL); + ASSERT_TRUE(sys->get_property("disk_write_KBps") != NULL); + ASSERT_TRUE(sys->get_property("disk_busy_percent") != NULL); + ASSERT_TRUE(sys->get_property("mem_ram_kbyte") != NULL); + ASSERT_TRUE(sys->get_property("mem_ram_percent") != NULL); + ASSERT_TRUE(sys->get_property("mem_swap_kbyte") != NULL); + ASSERT_TRUE(sys->get_property("mem_swap_percent") != NULL); + ASSERT_TRUE(sys->get_property("cpus") != NULL); + ASSERT_TRUE(sys->get_property("cpus_online") != NULL); + ASSERT_TRUE(sys->get_property("uptime") != NULL); + ASSERT_TRUE(sys->get_property("ilde_time") != NULL); + ASSERT_TRUE(sys->get_property("load_1m") != NULL); + ASSERT_TRUE(sys->get_property("load_5m") != NULL); + ASSERT_TRUE(sys->get_property("load_15m") != NULL); + + // Verify network fields + ASSERT_TRUE(sys->get_property("net_sample_time") != NULL); + ASSERT_TRUE(sys->get_property("net_recv_bytes") != NULL); + ASSERT_TRUE(sys->get_property("net_send_bytes") != NULL); + ASSERT_TRUE(sys->get_property("net_recvi_bytes") != NULL); + ASSERT_TRUE(sys->get_property("net_sendi_bytes") != NULL); + + // Verify SRS network fields + ASSERT_TRUE(sys->get_property("srs_sample_time") != NULL); + ASSERT_TRUE(sys->get_property("srs_recv_bytes") != NULL); + ASSERT_TRUE(sys->get_property("srs_send_bytes") != NULL); + ASSERT_TRUE(sys->get_property("conn_sys") != NULL); + ASSERT_TRUE(sys->get_property("conn_sys_et") != NULL); + ASSERT_TRUE(sys->get_property("conn_sys_tw") != NULL); + ASSERT_TRUE(sys->get_property("conn_sys_udp") != NULL); + ASSERT_TRUE(sys->get_property("conn_srs") != NULL); +} diff --git a/trunk/src/utest/srs_utest_app10.hpp b/trunk/src/utest/srs_utest_app10.hpp index a7a10dff3..46df146ce 100644 --- a/trunk/src/utest/srs_utest_app10.hpp +++ b/trunk/src/utest/srs_utest_app10.hpp @@ -17,6 +17,8 @@ #include #include #include +#include +#include // Mock config for testing SrsServer::listen() class MockAppConfigForServerListen : public MockAppConfig @@ -293,4 +295,451 @@ public: virtual void unsubscribe(ISrsDisposingHandler *h); }; +// Mock config for testing SrsRtmpConn constructor and assemble() +class MockAppConfigForRtmpConn : public MockAppConfig +{ +public: + int subscribe_count_; + int unsubscribe_count_; + ISrsReloadHandler *last_subscribed_handler_; + SrsConfDirective *vhost_directive_; + +public: + MockAppConfigForRtmpConn(); + virtual ~MockAppConfigForRtmpConn(); + +public: + virtual void subscribe(ISrsReloadHandler *handler); + virtual void unsubscribe(ISrsReloadHandler *handler); + virtual SrsConfDirective *get_vhost(std::string vhost, bool try_default_vhost = true); + virtual bool get_vhost_is_edge(std::string vhost); + void reset(); +}; + +// Mock ISrsRtmpServer for testing SrsRtmpConn::stream_service_cycle() +class MockRtmpServerForStreamService : public ISrsRtmpServer +{ +public: + SrsRtmpConnType identify_type_; + std::string identify_stream_; + srs_utime_t identify_duration_; + int start_play_count_; + int start_fmle_publish_count_; + int start_flash_publish_count_; + int start_haivision_publish_count_; + +public: + MockRtmpServerForStreamService(); + virtual ~MockRtmpServerForStreamService(); + +public: + virtual void set_recv_timeout(srs_utime_t tm); + virtual void set_send_timeout(srs_utime_t tm); + virtual srs_error_t handshake(); + virtual srs_error_t connect_app(ISrsRequest *req); + virtual uint32_t proxy_real_ip(); + virtual srs_error_t set_window_ack_size(int ack_size); + virtual srs_error_t set_peer_bandwidth(int bandwidth, int type); + virtual srs_error_t set_chunk_size(int chunk_size); + virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip); + virtual srs_error_t on_bw_done(); + virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration); + virtual srs_error_t start_play(int stream_id); + virtual srs_error_t start_fmle_publish(int stream_id); + virtual srs_error_t start_haivision_publish(int stream_id); + virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid); + virtual srs_error_t start_flash_publish(int stream_id); + virtual srs_error_t start_publishing(int stream_id); + virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted); + virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id); + virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket); + virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id); + virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause); + virtual srs_error_t set_in_window_ack_size(int ack_size); + virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg); + virtual void set_auto_response(bool v); + virtual void set_merge_read(bool v, IMergeReadHandler *handler); + virtual void set_recv_buffer(int buffer_size); +}; + +// Mock ISrsCoroutine for testing SrsRtmpConn::service_cycle() +class MockCoroutineForRtmpConn : public ISrsCoroutine +{ +public: + srs_error_t pull_error_; + int pull_count_; + +public: + MockCoroutineForRtmpConn(); + virtual ~MockCoroutineForRtmpConn(); + +public: + virtual srs_error_t start(); + virtual void stop(); + virtual void interrupt(); + virtual srs_error_t pull(); + virtual const SrsContextId &cid(); + virtual void set_cid(const SrsContextId &cid); +}; + +// Mock ISrsRtmpTransport for testing SrsRtmpConn::do_cycle() +class MockRtmpTransportForDoCycle : public ISrsRtmpTransport +{ +public: + MockRtmpTransportForDoCycle(); + virtual ~MockRtmpTransportForDoCycle(); + +public: + virtual srs_netfd_t fd(); + virtual ISrsProtocolReadWriter *io(); + virtual srs_error_t handshake(); + virtual const char *transport_type(); + virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v); + virtual srs_error_t set_tcp_nodelay(bool v); + virtual int64_t get_recv_bytes(); + virtual int64_t get_send_bytes(); +}; + +// Mock ISrsSecurity for testing SrsRtmpConn::stream_service_cycle() +class MockSecurityForStreamService : public ISrsSecurity +{ +public: + MockSecurityForStreamService(); + virtual ~MockSecurityForStreamService(); + +public: + virtual srs_error_t check(SrsRtmpConnType type, std::string ip, ISrsRequest *req); +}; + +// Mock ISrsRtmpServer for testing SrsRtmpConn::handle_publish_message() +class MockRtmpServerForHandlePublishMessage : public ISrsRtmpServer +{ +public: + srs_error_t decode_message_error_; + SrsRtmpCommand* decode_message_packet_; + int decode_message_count_; + srs_error_t fmle_unpublish_error_; + int fmle_unpublish_count_; + +public: + MockRtmpServerForHandlePublishMessage(); + virtual ~MockRtmpServerForHandlePublishMessage(); + +public: + virtual void set_recv_timeout(srs_utime_t tm); + virtual void set_send_timeout(srs_utime_t tm); + virtual srs_error_t handshake(); + virtual srs_error_t connect_app(ISrsRequest *req); + virtual uint32_t proxy_real_ip(); + virtual srs_error_t set_window_ack_size(int ack_size); + virtual srs_error_t set_peer_bandwidth(int bandwidth, int type); + virtual srs_error_t set_chunk_size(int chunk_size); + virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip); + virtual srs_error_t on_bw_done(); + virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration); + virtual srs_error_t start_play(int stream_id); + virtual srs_error_t start_fmle_publish(int stream_id); + virtual srs_error_t start_haivision_publish(int stream_id); + virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid); + virtual srs_error_t start_flash_publish(int stream_id); + virtual srs_error_t start_publishing(int stream_id); + virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted); + virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id); + virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket); + virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id); + virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause); + virtual srs_error_t set_in_window_ack_size(int ack_size); + virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg); + virtual void set_auto_response(bool v); + virtual void set_merge_read(bool v, IMergeReadHandler *handler); + virtual void set_recv_buffer(int buffer_size); + + void reset(); +}; + +// Mock ISrsRtmpServer for testing SrsRtmpConn::process_play_control_msg() +class MockRtmpServerForPlayControl : public ISrsRtmpServer +{ +public: + SrsRtmpCommand* decode_message_packet_; + int decode_message_count_; + int send_and_free_packet_count_; + int on_play_client_pause_count_; + bool last_pause_state_; + +public: + MockRtmpServerForPlayControl(); + virtual ~MockRtmpServerForPlayControl(); + +public: + virtual void set_recv_timeout(srs_utime_t tm); + virtual void set_send_timeout(srs_utime_t tm); + virtual srs_error_t handshake(); + virtual srs_error_t connect_app(ISrsRequest *req); + virtual uint32_t proxy_real_ip(); + virtual srs_error_t set_window_ack_size(int ack_size); + virtual srs_error_t set_peer_bandwidth(int bandwidth, int type); + virtual srs_error_t set_chunk_size(int chunk_size); + virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip); + virtual srs_error_t on_bw_done(); + virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration); + virtual srs_error_t start_play(int stream_id); + virtual srs_error_t start_fmle_publish(int stream_id); + virtual srs_error_t start_haivision_publish(int stream_id); + virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid); + virtual srs_error_t start_flash_publish(int stream_id); + virtual srs_error_t start_publishing(int stream_id); + virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted); + virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id); + virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket); + virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id); + virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause); + virtual srs_error_t set_in_window_ack_size(int ack_size); + virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg); + virtual void set_auto_response(bool v); + virtual void set_merge_read(bool v, IMergeReadHandler *handler); + virtual void set_recv_buffer(int buffer_size); + + void reset(); +}; + +// Mock SrsLiveConsumer for testing SrsRtmpConn::process_play_control_msg() +class MockLiveConsumerForPlayControl : public SrsLiveConsumer +{ +public: + int on_play_client_pause_count_; + bool last_pause_state_; + +public: + MockLiveConsumerForPlayControl(ISrsLiveSource *source); + virtual ~MockLiveConsumerForPlayControl(); + +public: + virtual srs_error_t on_play_client_pause(bool is_pause); +}; + +// Mock ISrsAppConfig for testing SrsRtmpConn::http_hooks_on_connect() +class MockAppConfigForHttpHooksOnConnect : public MockAppConfig +{ +public: + bool http_hooks_enabled_; + SrsConfDirective *on_connect_directive_; + +public: + MockAppConfigForHttpHooksOnConnect(); + virtual ~MockAppConfigForHttpHooksOnConnect(); + +public: + virtual bool get_vhost_http_hooks_enabled(std::string vhost); + virtual SrsConfDirective *get_vhost_on_connect(std::string vhost); +}; + +// Mock ISrsHttpHooks for testing SrsRtmpConn::http_hooks_on_connect() and http_hooks_on_close() +class MockHttpHooksForOnConnect : public ISrsHttpHooks +{ +public: + std::vector > on_connect_calls_; + int on_connect_count_; + srs_error_t on_connect_error_; + + // For on_close tracking + struct OnCloseCall { + std::string url_; + ISrsRequest *req_; + int64_t send_bytes_; + int64_t recv_bytes_; + }; + std::vector on_close_calls_; + int on_close_count_; + + // For on_unpublish tracking + std::vector > on_unpublish_calls_; + int on_unpublish_count_; + + // For on_stop tracking + std::vector > on_stop_calls_; + int on_stop_count_; + +public: + MockHttpHooksForOnConnect(); + virtual ~MockHttpHooksForOnConnect(); + +public: + virtual srs_error_t on_connect(std::string url, ISrsRequest *req); + virtual void on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes); + virtual srs_error_t on_publish(std::string url, ISrsRequest *req); + virtual void on_unpublish(std::string url, ISrsRequest *req); + virtual srs_error_t on_play(std::string url, ISrsRequest *req); + virtual void on_stop(std::string url, ISrsRequest *req); + virtual srs_error_t on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file); + virtual srs_error_t on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, + std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration); + virtual srs_error_t on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify); + virtual srs_error_t discover_co_workers(std::string url, std::string &host, int &port); + virtual srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls); + + void reset(); +}; + +// Mock ISrsAppConfig for testing SrsRtmpConn::http_hooks_on_close() +class MockAppConfigForHttpHooksOnClose : public MockAppConfig +{ +public: + bool http_hooks_enabled_; + SrsConfDirective *on_close_directive_; + +public: + MockAppConfigForHttpHooksOnClose(); + virtual ~MockAppConfigForHttpHooksOnClose(); + +public: + virtual bool get_vhost_http_hooks_enabled(std::string vhost); + virtual SrsConfDirective *get_vhost_on_close(std::string vhost); +}; + +// Mock ISrsAppConfig for testing SrsRtmpConn::http_hooks_on_publish() +class MockAppConfigForHttpHooksOnPublish : public MockAppConfig +{ +public: + bool http_hooks_enabled_; + SrsConfDirective *on_publish_directive_; + +public: + MockAppConfigForHttpHooksOnPublish(); + virtual ~MockAppConfigForHttpHooksOnPublish(); + +public: + virtual bool get_vhost_http_hooks_enabled(std::string vhost); + virtual SrsConfDirective *get_vhost_on_publish(std::string vhost); +}; + +// Mock ISrsHttpHooks for testing SrsRtmpConn::http_hooks_on_publish() +class MockHttpHooksForOnPublish : public ISrsHttpHooks +{ +public: + std::vector > on_publish_calls_; + int on_publish_count_; + srs_error_t on_publish_error_; + +public: + MockHttpHooksForOnPublish(); + virtual ~MockHttpHooksForOnPublish(); + +public: + virtual srs_error_t on_connect(std::string url, ISrsRequest *req); + virtual void on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes); + virtual srs_error_t on_publish(std::string url, ISrsRequest *req); + virtual void on_unpublish(std::string url, ISrsRequest *req); + virtual srs_error_t on_play(std::string url, ISrsRequest *req); + virtual void on_stop(std::string url, ISrsRequest *req); + virtual srs_error_t on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file); + virtual srs_error_t on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, + std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration); + virtual srs_error_t on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify); + virtual srs_error_t discover_co_workers(std::string url, std::string &host, int &port); + virtual srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls); + + void reset(); +}; + +// Mock ISrsAppConfig for testing SrsRtmpConn::http_hooks_on_unpublish() +class MockAppConfigForHttpHooksOnUnpublish : public MockAppConfig +{ +public: + bool http_hooks_enabled_; + SrsConfDirective *on_unpublish_directive_; + +public: + MockAppConfigForHttpHooksOnUnpublish(); + virtual ~MockAppConfigForHttpHooksOnUnpublish(); + +public: + virtual bool get_vhost_http_hooks_enabled(std::string vhost); + virtual SrsConfDirective *get_vhost_on_unpublish(std::string vhost); +}; + +// Mock ISrsAppConfig for testing SrsRtmpConn::http_hooks_on_stop() +class MockAppConfigForHttpHooksOnStop : public MockAppConfig +{ +public: + bool http_hooks_enabled_; + SrsConfDirective *on_stop_directive_; + +public: + MockAppConfigForHttpHooksOnStop(); + virtual ~MockAppConfigForHttpHooksOnStop(); + +public: + virtual bool get_vhost_http_hooks_enabled(std::string vhost); + virtual SrsConfDirective *get_vhost_on_stop(std::string vhost); +}; + +// Mock ISrsAppConfig for testing SrsRtmpConn::http_hooks_on_play() +class MockAppConfigForHttpHooksOnPlay : public MockAppConfig +{ +public: + bool http_hooks_enabled_; + SrsConfDirective *on_play_directive_; + +public: + MockAppConfigForHttpHooksOnPlay(); + virtual ~MockAppConfigForHttpHooksOnPlay(); + +public: + virtual bool get_vhost_http_hooks_enabled(std::string vhost); + virtual SrsConfDirective *get_vhost_on_play(std::string vhost); +}; + +// Mock ISrsHttpHooks for testing SrsRtmpConn::http_hooks_on_play() +class MockHttpHooksForOnPlay : public ISrsHttpHooks +{ +public: + std::vector > on_play_calls_; + int on_play_count_; + srs_error_t on_play_error_; + +public: + MockHttpHooksForOnPlay(); + virtual ~MockHttpHooksForOnPlay(); + +public: + virtual srs_error_t on_connect(std::string url, ISrsRequest *req); + virtual void on_close(std::string url, ISrsRequest *req, int64_t send_bytes, int64_t recv_bytes); + virtual srs_error_t on_publish(std::string url, ISrsRequest *req); + virtual void on_unpublish(std::string url, ISrsRequest *req); + virtual srs_error_t on_play(std::string url, ISrsRequest *req); + virtual void on_stop(std::string url, ISrsRequest *req); + virtual srs_error_t on_dvr(SrsContextId cid, std::string url, ISrsRequest *req, std::string file); + virtual srs_error_t on_hls(SrsContextId cid, std::string url, ISrsRequest *req, std::string file, std::string ts_url, + std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration); + virtual srs_error_t on_hls_notify(SrsContextId cid, std::string url, ISrsRequest *req, std::string ts_url, int nb_notify); + virtual srs_error_t discover_co_workers(std::string url, std::string &host, int &port); + virtual srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector &rtmp_urls); + + void reset(); +}; + +// Mock ISrsAppConfig for testing SrsRtmpConn::acquire_publish() +class MockAppConfigForAcquirePublish : public MockAppConfigForRtmpConn +{ +public: + bool rtc_server_enabled_; + bool rtc_enabled_; + bool srt_enabled_; + bool rtsp_server_enabled_; + bool rtsp_enabled_; + +public: + MockAppConfigForAcquirePublish(); + virtual ~MockAppConfigForAcquirePublish(); + +public: + virtual bool get_rtc_server_enabled(); + virtual bool get_rtc_enabled(std::string vhost); + virtual bool get_srt_enabled(); + virtual bool get_srt_enabled(std::string vhost); + virtual bool get_rtsp_server_enabled(); + virtual bool get_rtsp_enabled(std::string vhost); +}; + #endif diff --git a/trunk/src/utest/srs_utest_app6.cpp b/trunk/src/utest/srs_utest_app6.cpp index 0ece1c19d..ffd9ea138 100644 --- a/trunk/src/utest/srs_utest_app6.cpp +++ b/trunk/src/utest/srs_utest_app6.cpp @@ -4374,6 +4374,18 @@ void MockLiveSource::set_can_publish(bool can_publish) can_publish_result_ = can_publish; } +srs_error_t MockLiveSource::on_publish() +{ + // Mock implementation - just return success + return srs_success; +} + +srs_error_t MockLiveSource::on_edge_start_publish() +{ + // Mock implementation - just return success + return srs_success; +} + // Mock SRT source implementation MockSrtSource::MockSrtSource() { diff --git a/trunk/src/utest/srs_utest_app6.hpp b/trunk/src/utest/srs_utest_app6.hpp index 9977cdbb3..c2071c04d 100644 --- a/trunk/src/utest/srs_utest_app6.hpp +++ b/trunk/src/utest/srs_utest_app6.hpp @@ -709,6 +709,8 @@ public: virtual ~MockLiveSource(); virtual bool can_publish(bool is_edge); void set_can_publish(bool can_publish); + virtual srs_error_t on_publish(); + virtual srs_error_t on_edge_start_publish(); }; // Mock SRT source for testing SrsRtcPublishStream