diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index 092a4f7f3..1f2bdca1e 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -386,7 +386,7 @@ srs_error_t SrsGbListener::listen_api() { srs_error_t err = srs_success; - ISrsHttpServeMux *mux = api_server_owner_->api_server(); + ISrsCommonHttpHandler *mux = api_server_owner_->api_server(); if (!mux) { return err; } diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp index c6d20acca..580461374 100644 --- a/trunk/src/app/srs_app_gb28181.hpp +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -35,7 +35,7 @@ class SrsRawHEVCStream; class SrsMediaPacket; class SrsPithyPrint; class SrsRawAacStream; -class ISrsHttpServeMux; +class ISrsCommonHttpHandler; class ISrsGbMuxer; class ISrsGbSession; class ISrsPackContext; diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 69c881744..84cbc67b1 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -57,7 +57,7 @@ ISrsHttpConn::~ISrsHttpConn() { } -SrsHttpConn::SrsHttpConn(ISrsHttpConnOwner *handler, ISrsProtocolReadWriter *fd, ISrsHttpServeMux *m, string cip, int cport) +SrsHttpConn::SrsHttpConn(ISrsHttpConnOwner *handler, ISrsProtocolReadWriter *fd, ISrsCommonHttpHandler *m, string cip, int cport) { parser_ = new SrsHttpParser(); auth_ = new SrsHttpAuthMux(m); @@ -318,7 +318,7 @@ ISrsHttpxConn::~ISrsHttpxConn() { } -SrsHttpxConn::SrsHttpxConn(ISrsResourceManager *cm, ISrsProtocolReadWriter *io, ISrsHttpServeMux *m, string cip, int port, string key, string cert) : manager_(cm), io_(io), enable_stat_(false), ssl_key_file_(key), ssl_cert_file_(cert) +SrsHttpxConn::SrsHttpxConn(ISrsResourceManager *cm, ISrsProtocolReadWriter *io, ISrsCommonHttpHandler *m, string cip, int port, string key, string cert) : manager_(cm), io_(io), enable_stat_(false), ssl_key_file_(key), ssl_cert_file_(cert) { // Create a identify for this client. _srs_context->set_id(_srs_context->generate_id()); @@ -523,7 +523,7 @@ srs_error_t SrsHttpServer::initialize() srs_error_t err = srs_success; // for SRS go-sharp to detect the status of HTTP server of SRS HTTP FLV Cluster. - if ((err = http_static_->mux_.handle("/api/v1/versions", new SrsGoApiVersion())) != srs_success) { + if ((err = http_static_->mux()->handle("/api/v1/versions", new SrsGoApiVersion())) != srs_success) { return srs_error_wrap(err, "handle versions"); } @@ -540,7 +540,7 @@ srs_error_t SrsHttpServer::initialize() srs_error_t SrsHttpServer::handle(std::string pattern, ISrsHttpHandler *handler) { - return http_static_->mux_.handle(pattern, handler); + return http_static_->mux()->handle(pattern, handler); } srs_error_t SrsHttpServer::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r) @@ -555,21 +555,21 @@ srs_error_t SrsHttpServer::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage bool is_api = memcmp(p, "/api/", 5) == 0; bool is_console = path.length() > 8 && memcmp(p, "/console/", 9) == 0; if (is_api || is_console) { - return http_static_->mux_.serve_http(w, r); + return http_static_->mux()->serve_http(w, r); } } // Try http stream first, then http static if not found. ISrsHttpHandler *h = NULL; - if ((err = http_stream_->mux_.find_handler(r, &h)) != srs_success) { + if ((err = http_stream_->mux()->find_handler(r, &h)) != srs_success) { return srs_error_wrap(err, "find handler"); } if (!h->is_not_found()) { - return http_stream_->mux_.serve_http(w, r); + return http_stream_->mux()->serve_http(w, r); } // Use http static as default server. - return http_static_->mux_.serve_http(w, r); + return http_static_->mux()->serve_http(w, r); } srs_error_t SrsHttpServer::http_mount(ISrsRequest *r) diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index 2358231c4..6fa6d1d56 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -100,7 +100,7 @@ private: protected: ISrsHttpParser *parser_; - ISrsHttpServeMux *http_mux_; + ISrsCommonHttpHandler *http_mux_; ISrsHttpCorsMux *cors_; ISrsHttpAuthMux *auth_; ISrsHttpConnOwner *handler_; @@ -122,7 +122,7 @@ private: srs_utime_t create_time_; public: - SrsHttpConn(ISrsHttpConnOwner *handler, ISrsProtocolReadWriter *fd, ISrsHttpServeMux *m, std::string cip, int port); + SrsHttpConn(ISrsHttpConnOwner *handler, ISrsProtocolReadWriter *fd, ISrsCommonHttpHandler *m, std::string cip, int port); virtual ~SrsHttpConn(); // Interface ISrsResource. public: @@ -196,7 +196,7 @@ private: const std::string ssl_cert_file_; public: - SrsHttpxConn(ISrsResourceManager *cm, ISrsProtocolReadWriter *io, ISrsHttpServeMux *m, std::string cip, int port, std::string key, std::string cert); + SrsHttpxConn(ISrsResourceManager *cm, ISrsProtocolReadWriter *io, ISrsCommonHttpHandler *m, std::string cip, int port, std::string key, std::string cert); virtual ~SrsHttpxConn(); public: @@ -230,7 +230,7 @@ public: }; // The http server, use http stream or static server to serve requests. -class ISrsHttpServer : public ISrsHttpServeMux +class ISrsHttpServer : public ISrsCommonHttpHandler { public: ISrsHttpServer(); @@ -252,7 +252,7 @@ public: public: virtual srs_error_t initialize(); - // Interface ISrsHttpServeMux + // Interface ISrsCommonHttpHandler public: virtual srs_error_t handle(std::string pattern, ISrsHttpHandler *handler); // Interface ISrsHttpHandler diff --git a/trunk/src/app/srs_app_http_static.cpp b/trunk/src/app/srs_app_http_static.cpp index b083ff984..dfed7d961 100644 --- a/trunk/src/app/srs_app_http_static.cpp +++ b/trunk/src/app/srs_app_http_static.cpp @@ -609,17 +609,17 @@ ISrsHttpStaticServer::~ISrsHttpStaticServer() SrsHttpStaticServer::SrsHttpStaticServer() { - _srs_config->subscribe(this); + mux_ = new SrsHttpServeMux(); } SrsHttpStaticServer::~SrsHttpStaticServer() { - _srs_config->unsubscribe(this); + srs_freep(mux_); } srs_error_t SrsHttpStaticServer::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r) { - return mux_.serve_http(w, r); + return mux_->serve_http(w, r); } srs_error_t SrsHttpStaticServer::initialize() @@ -653,7 +653,7 @@ srs_error_t SrsHttpStaticServer::initialize() if (!default_root_exists) { // add root std::string dir = _srs_config->get_http_stream_dir(); - if ((err = mux_.handle("/", new SrsVodStream(dir))) != srs_success) { + if ((err = mux_->handle("/", new SrsVodStream(dir))) != srs_success) { return srs_error_wrap(err, "mount root dir=%s", dir.c_str()); } srs_trace("http: root mount to %s", dir.c_str()); @@ -662,6 +662,11 @@ srs_error_t SrsHttpStaticServer::initialize() return err; } +ISrsHttpServeMux *SrsHttpStaticServer::mux() +{ + return mux_; +} + srs_error_t SrsHttpStaticServer::mount_vhost(string vhost, string &pmount) { srs_error_t err = srs_success; @@ -692,7 +697,7 @@ srs_error_t SrsHttpStaticServer::mount_vhost(string vhost, string &pmount) } // mount the http of vhost. - if ((err = mux_.handle(mount, new SrsVodStream(dir))) != srs_success) { + if ((err = mux_->handle(mount, new SrsVodStream(dir))) != srs_success) { return srs_error_wrap(err, "mux handle"); } srs_trace("http: vhost=%s mount to %s at %s", vhost.c_str(), mount.c_str(), dir.c_str()); diff --git a/trunk/src/app/srs_app_http_static.hpp b/trunk/src/app/srs_app_http_static.hpp index 7c2452c2b..b0254b766 100644 --- a/trunk/src/app/srs_app_http_static.hpp +++ b/trunk/src/app/srs_app_http_static.hpp @@ -12,6 +12,8 @@ #include class ISrsFileReaderFactory; +class ISrsCommonHttpHandler; +class ISrsHttpServeMux; // HLS virtual connection, build on query string ctx of hls stream. class SrsHlsVirtualConn : public ISrsExpire @@ -94,17 +96,15 @@ public: public: virtual srs_error_t initialize() = 0; - -public: - SrsHttpServeMux mux_; + virtual ISrsHttpServeMux *mux() = 0; }; // The http static server instance, // serve http static file and flv/mp4 vod stream. -class SrsHttpStaticServer : public ISrsHttpStaticServer, public ISrsReloadHandler +class SrsHttpStaticServer : public ISrsHttpStaticServer { -public: - SrsHttpServeMux mux_; +private: + ISrsHttpServeMux *mux_; public: SrsHttpStaticServer(); @@ -112,6 +112,8 @@ public: public: virtual srs_error_t initialize(); + virtual ISrsHttpServeMux *mux(); + // Interface ISrsHttpHandler public: virtual srs_error_t serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r); diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 3e902c502..0f5aee1cb 100644 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -1057,21 +1057,25 @@ ISrsHttpStreamServer::~ISrsHttpStreamServer() SrsHttpStreamServer::SrsHttpStreamServer() { async_ = new SrsAsyncCallWorker(); - - mux_.add_dynamic_matcher(this); + mux_ = new SrsHttpServeMux(); config_ = _srs_config; } void SrsHttpStreamServer::assemble() { - config_->subscribe(this); + SrsHttpServeMux *mux = dynamic_cast(mux_); + if (!mux) { + mux->add_dynamic_matcher(this); + } } SrsHttpStreamServer::~SrsHttpStreamServer() { - mux_.remove_dynamic_matcher(this); - config_->unsubscribe(this); + SrsHttpServeMux *mux = dynamic_cast(mux_); + if (mux) { + mux->remove_dynamic_matcher(this); + } async_->stop(); srs_freep(async_); @@ -1112,6 +1116,11 @@ srs_error_t SrsHttpStreamServer::initialize() return err; } +ISrsHttpServeMux *SrsHttpStreamServer::mux() +{ + return mux_; +} + // TODO: FIXME: rename for HTTP FLV mount. srs_error_t SrsHttpStreamServer::http_mount(ISrsRequest *r) { @@ -1160,7 +1169,7 @@ srs_error_t SrsHttpStreamServer::http_mount(ISrsRequest *r) // mount the http flv stream. // we must register the handler, then start the thread, // for the thread will cause thread switch context. - if ((err = mux_.handle(mount, entry->stream_)) != srs_success) { + if ((err = mux_->handle(mount, entry->stream_)) != srs_success) { return srs_error_wrap(err, "http: mount flv stream for vhost=%s failed", sid.c_str()); } @@ -1208,7 +1217,7 @@ void SrsHttpStreamServer::http_unmount(ISrsRequest *r) // Use async worker to execute the task, which will destroy the stream. srs_error_t err = srs_success; - if ((err = async_->execute(new SrsHttpStreamDestroy(&mux_, &streamHandlers_, sid))) != srs_success) { + if ((err = async_->execute(new SrsHttpStreamDestroy(mux_, &streamHandlers_, sid))) != srs_success) { srs_warn("http: ignore unmount stream failed, sid=%s, err=%s", sid.c_str(), srs_error_desc(err).c_str()); srs_freep(err); } @@ -1361,7 +1370,7 @@ srs_error_t SrsHttpStreamServer::initialize_flv_entry(std::string vhost) return err; } -SrsHttpStreamDestroy::SrsHttpStreamDestroy(SrsHttpServeMux *mux, map *handlers, string sid) +SrsHttpStreamDestroy::SrsHttpStreamDestroy(ISrsHttpServeMux *mux, map *handlers, string sid) { mux_ = mux; sid_ = sid; diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index a4ce2d408..463de132b 100644 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -29,6 +29,7 @@ class ISrsTsTransmuxer; class ISrsAacTransmuxer; class ISrsBufferCache; class ISrsMp3Transmuxer; +class ISrsCommonHttpHandler; // The cache for HTTP Live Streaming encoder. class ISrsBufferCache @@ -322,7 +323,7 @@ public: }; // The HTTP Live Streaming Server, to serve FLV/TS/MP3/AAC stream. -class ISrsHttpStreamServer : public ISrsReloadHandler, public ISrsHttpDynamicMatcher +class ISrsHttpStreamServer : public ISrsHttpDynamicMatcher { public: ISrsHttpStreamServer(); @@ -334,9 +335,7 @@ public: // HTTP flv/ts/mp3/aac stream virtual srs_error_t http_mount(ISrsRequest *r) = 0; virtual void http_unmount(ISrsRequest *r) = 0; - -public: - SrsHttpServeMux mux_; + virtual ISrsHttpServeMux *mux() = 0; }; // The HTTP Live Streaming Server, to serve FLV/TS/MP3/AAC stream. @@ -345,12 +344,12 @@ class SrsHttpStreamServer : public ISrsHttpStreamServer { private: ISrsAppConfig *config_; + ISrsHttpServeMux *mux_; private: ISrsAsyncCallWorker *async_; public: - SrsHttpServeMux mux_; // The http live streaming template, to create streams. std::map templateHandlers_; // The http live streaming streams, created by template. @@ -363,6 +362,7 @@ public: public: virtual srs_error_t initialize(); + virtual ISrsHttpServeMux *mux(); public: // HTTP flv/ts/mp3/aac stream @@ -383,10 +383,10 @@ class SrsHttpStreamDestroy : public ISrsAsyncCallTask private: std::string sid_; std::map *streamHandlers_; - SrsHttpServeMux *mux_; + ISrsHttpServeMux *mux_; public: - SrsHttpStreamDestroy(SrsHttpServeMux *mux, std::map *handlers, std::string sid); + SrsHttpStreamDestroy(ISrsHttpServeMux *mux, std::map *handlers, std::string sid); virtual ~SrsHttpStreamDestroy(); public: diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index d41852823..697bed4b2 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -39,6 +39,14 @@ ISrsMessagePumper::~ISrsMessagePumper() { } +ISrsRecvThread::ISrsRecvThread() +{ +} + +ISrsRecvThread::~ISrsRecvThread() +{ +} + SrsRecvThread::SrsRecvThread(ISrsMessagePumper *p, ISrsRtmpServer *r, srs_utime_t tm, SrsContextId parent_cid) { rtmp_ = r; @@ -144,12 +152,20 @@ srs_error_t SrsRecvThread::do_cycle() return err; } +ISrsQueueRecvThread::ISrsQueueRecvThread() +{ +} + +ISrsQueueRecvThread::~ISrsQueueRecvThread() +{ +} + SrsQueueRecvThread::SrsQueueRecvThread(SrsLiveConsumer *consumer, ISrsRtmpServer *rtmp_sdk, srs_utime_t tm, SrsContextId parent_cid) - : trd_(this, rtmp_sdk, tm, parent_cid) { _consumer = consumer; rtmp_ = rtmp_sdk; recv_error_ = srs_success; + trd_ = new SrsRecvThread(this, rtmp_sdk, tm, parent_cid); } SrsQueueRecvThread::~SrsQueueRecvThread() @@ -165,13 +181,14 @@ SrsQueueRecvThread::~SrsQueueRecvThread() queue_.clear(); srs_freep(recv_error_); + srs_freep(trd_); } srs_error_t SrsQueueRecvThread::start() { srs_error_t err = srs_success; - if ((err = trd_.start()) != srs_success) { + if ((err = trd_->start()) != srs_success) { return srs_error_wrap(err, "queue recv thread"); } @@ -180,7 +197,7 @@ srs_error_t SrsQueueRecvThread::start() void SrsQueueRecvThread::stop() { - trd_.stop(); + trd_->stop(); } bool SrsQueueRecvThread::empty() @@ -257,9 +274,16 @@ void SrsQueueRecvThread::on_stop() rtmp_->set_auto_response(true); } +ISrsPublishRecvThread::ISrsPublishRecvThread() +{ +} + +ISrsPublishRecvThread::~ISrsPublishRecvThread() +{ +} + SrsPublishRecvThread::SrsPublishRecvThread(ISrsRtmpServer *rtmp_sdk, ISrsRequest *_req, int mr_sock_fd, srs_utime_t tm, SrsRtmpConn *conn, SrsSharedPtr source, SrsContextId parent_cid) - : trd_(this, rtmp_sdk, tm, parent_cid) { rtmp_ = rtmp_sdk; @@ -275,22 +299,27 @@ SrsPublishRecvThread::SrsPublishRecvThread(ISrsRtmpServer *rtmp_sdk, ISrsRequest req_ = _req; mr_fd_ = mr_sock_fd; - // the mr settings, - mr_ = _srs_config->get_mr_enabled(req_->vhost_); - mr_sleep_ = _srs_config->get_mr_sleep(req_->vhost_); + trd_ = new SrsRecvThread(this, rtmp_sdk, tm, parent_cid); - realtime_ = _srs_config->get_realtime_enabled(req_->vhost_, false); + config_ = _srs_config; +} - _srs_config->subscribe(this); +void SrsPublishRecvThread::assemble() +{ + mr_ = config_->get_mr_enabled(req_->vhost_); + mr_sleep_ = config_->get_mr_sleep(req_->vhost_); + realtime_ = config_->get_realtime_enabled(req_->vhost_, false); } SrsPublishRecvThread::~SrsPublishRecvThread() { - _srs_config->unsubscribe(this); + trd_->stop(); - trd_.stop(); srs_cond_destroy(error_); srs_freep(recv_error_); + srs_freep(trd_); + + config_ = NULL; } srs_error_t SrsPublishRecvThread::wait(srs_utime_t tm) @@ -334,18 +363,18 @@ srs_error_t SrsPublishRecvThread::start() { srs_error_t err = srs_success; - if ((err = trd_.start()) != srs_success) { + if ((err = trd_->start()) != srs_success) { err = srs_error_wrap(err, "publish recv thread"); } - ncid_ = cid_ = trd_.cid(); + ncid_ = cid_ = trd_->cid(); return err; } void SrsPublishRecvThread::stop() { - trd_.stop(); + trd_->stop(); } srs_error_t SrsPublishRecvThread::consume(SrsRtmpCommonMessage *msg) @@ -492,6 +521,14 @@ void SrsPublishRecvThread::set_socket_buffer(srs_utime_t sleep_v) rtmp_->set_recv_buffer(nb_rbuf); } +ISrsHttpRecvThread::ISrsHttpRecvThread() +{ +} + +ISrsHttpRecvThread::~ISrsHttpRecvThread() +{ +} + SrsHttpRecvThread::SrsHttpRecvThread(SrsHttpxConn *c) { conn_ = c; diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 6fb1d4bd4..f70574b74 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -27,6 +27,9 @@ class SrsLiveConsumer; class SrsHttpConn; class SrsHttpxConn; class ISrsRtmpServer; +class SrsRecvThread; +class ISrsRecvThread; +class ISrsAppConfig; // The message consumer which consume a message. class ISrsMessageConsumer @@ -61,8 +64,21 @@ public: virtual void on_stop() = 0; }; +// The recv thread interface. +class ISrsRecvThread : public ISrsCoroutineHandler +{ +public: + ISrsRecvThread(); + virtual ~ISrsRecvThread(); + +public: + virtual SrsContextId cid() = 0; + virtual srs_error_t start() = 0; + virtual void stop() = 0; +}; + // The recv thread, use message handler to handle each received message. -class SrsRecvThread : public ISrsCoroutineHandler +class SrsRecvThread : public ISrsRecvThread { protected: ISrsCoroutine *trd_; @@ -93,15 +109,25 @@ private: virtual srs_error_t do_cycle(); }; +// The queue recv thread interface. +class ISrsQueueRecvThread : public ISrsMessagePumper +{ +public: + ISrsQueueRecvThread(); + virtual ~ISrsQueueRecvThread(); + +public: +}; + // The recv thread used to replace the timeout recv, // which hurt performance for the epoll_ctrl is frequently used. // @see: SrsRtmpConn::playing // @see: https://github.com/ossrs/srs/issues/217 -class SrsQueueRecvThread : public ISrsMessagePumper +class SrsQueueRecvThread : public ISrsQueueRecvThread { private: std::vector queue_; - SrsRecvThread trd_; + ISrsRecvThread *trd_; ISrsRtmpServer *rtmp_; // The recv thread error code. srs_error_t recv_error_; @@ -130,17 +156,29 @@ public: virtual void on_stop(); }; -// The publish recv thread got message and callback the source method to process message. -// @see: https://github.com/ossrs/srs/issues/237 -class SrsPublishRecvThread : public ISrsMessagePumper, public ISrsReloadHandler +// The publish recv thread interface. +class ISrsPublishRecvThread : public ISrsMessagePumper, #ifdef SRS_PERF_MERGED_READ - , - public IMergeReadHandler + public IMergeReadHandler #endif { +public: + ISrsPublishRecvThread(); + virtual ~ISrsPublishRecvThread(); + +public: +}; + +// The publish recv thread got message and callback the source method to process message. +// @see: https://github.com/ossrs/srs/issues/237 +class SrsPublishRecvThread : public ISrsPublishRecvThread +{ +private: + ISrsAppConfig *config_; + private: uint32_t nn_msgs_for_yield_; - SrsRecvThread trd_; + ISrsRecvThread *trd_; ISrsRtmpServer *rtmp_; ISrsRequest *req_; // The msgs already got. @@ -168,6 +206,7 @@ private: public: SrsPublishRecvThread(ISrsRtmpServer *rtmp_sdk, ISrsRequest *_req, int mr_sock_fd, srs_utime_t tm, SrsRtmpConn *conn, SrsSharedPtr source, SrsContextId parent_cid); + void assemble(); virtual ~SrsPublishRecvThread(); public: @@ -199,11 +238,21 @@ private: virtual void set_socket_buffer(srs_utime_t sleep_v); }; +// The HTTP receive thread interface. +class ISrsHttpRecvThread : public ISrsCoroutineHandler +{ +public: + ISrsHttpRecvThread(); + virtual ~ISrsHttpRecvThread(); + +public: +}; + // The HTTP receive thread, try to read messages util EOF. // For example, the HTTP FLV serving thread will use the receive thread to break // when client closed the request, to avoid FD leak. // @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427 -class SrsHttpRecvThread : public ISrsCoroutineHandler +class SrsHttpRecvThread : public ISrsHttpRecvThread { private: SrsHttpxConn *conn_; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 2ef6c52fe..6cc55a2fb 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -893,6 +893,8 @@ srs_error_t SrsRtmpConn::publishing(SrsSharedPtr source) // use isolate thread to recv, // @see: https://github.com/ossrs/srs/issues/237 SrsPublishRecvThread rtrd(rtmp_, req, srs_netfd_fileno(transport_->fd()), 0, this, source, _srs_context->get_id()); + rtrd.assemble(); + err = do_publishing(source, &rtrd); rtrd.stop(); } diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 300fba7ba..0bbd675ab 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -382,7 +382,7 @@ void SrsServer::gracefully_dispose() srs_trace("final wait for %dms", srsu2msi(config_->get_grace_final_wait())); } -ISrsHttpServeMux *SrsServer::api_server() +ISrsCommonHttpHandler *SrsServer::api_server() { return http_api_mux_; } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 3bb8dc234..7c76829fc 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -33,7 +33,7 @@ class SrsRtcConnection; class ISrsAsyncCallTask; class SrsSignalManager; class SrsServer; -class ISrsHttpServeMux; +class ISrsCommonHttpHandler; class SrsHttpServer; class SrsIngester; class SrsHttpHeartbeat; @@ -94,7 +94,7 @@ public: virtual ~ISrsApiServerOwner(); public: - virtual ISrsHttpServeMux *api_server() = 0; + virtual ISrsCommonHttpHandler *api_server() = 0; }; // The RTC API server owner interface. @@ -142,7 +142,7 @@ private: ISrsAppFactory *app_factory_; private: - ISrsHttpServeMux *http_api_mux_; + ISrsCommonHttpHandler *http_api_mux_; SrsHttpServer *http_server_; private: @@ -229,7 +229,7 @@ private: public: // Get the HTTP API server mux. - ISrsHttpServeMux *api_server(); + ISrsCommonHttpHandler *api_server(); // server startup workflow, @see run_master() public: diff --git a/trunk/src/protocol/srs_protocol_http_conn.cpp b/trunk/src/protocol/srs_protocol_http_conn.cpp index 77d3f5585..4ee883229 100644 --- a/trunk/src/protocol/srs_protocol_http_conn.cpp +++ b/trunk/src/protocol/srs_protocol_http_conn.cpp @@ -301,7 +301,7 @@ int SrsHttpParser::on_body(llhttp_t *parser, const char *at, size_t length) return 0; } -SrsHttpMessage::SrsHttpMessage(ISrsReader *reader, SrsFastStream *buffer) : ISrsHttpMessage() +SrsHttpMessage::SrsHttpMessage(ISrsReader *reader, SrsFastStream *buffer) { owner_conn_ = NULL; chunked_ = false; diff --git a/trunk/src/protocol/srs_protocol_http_stack.cpp b/trunk/src/protocol/srs_protocol_http_stack.cpp index 1ee84fe79..6afcb3398 100644 --- a/trunk/src/protocol/srs_protocol_http_stack.cpp +++ b/trunk/src/protocol/srs_protocol_http_stack.cpp @@ -797,7 +797,15 @@ ISrsHttpDynamicMatcher::~ISrsHttpDynamicMatcher() { } -ISrsHttpServeMux::ISrsHttpServeMux() : ISrsHttpHandler() +ISrsCommonHttpHandler::ISrsCommonHttpHandler() +{ +} + +ISrsCommonHttpHandler::~ISrsCommonHttpHandler() +{ +} + +ISrsHttpServeMux::ISrsHttpServeMux() { } diff --git a/trunk/src/protocol/srs_protocol_http_stack.hpp b/trunk/src/protocol/srs_protocol_http_stack.hpp index 50410ce51..ec1cd1ee4 100644 --- a/trunk/src/protocol/srs_protocol_http_stack.hpp +++ b/trunk/src/protocol/srs_protocol_http_stack.hpp @@ -422,16 +422,29 @@ public: virtual srs_error_t dynamic_match(ISrsHttpMessage *request, ISrsHttpHandler **ph) = 0; }; -// The server mux, all http server should implements it. -class ISrsHttpServeMux : public ISrsHttpHandler +// The common http handler, for example, the http serve mux. +class ISrsCommonHttpHandler : public ISrsHttpHandler +{ +public: + ISrsCommonHttpHandler(); + virtual ~ISrsCommonHttpHandler(); + +public: + // Register HTTP handler to mux. + virtual srs_error_t handle(std::string pattern, ISrsHttpHandler *handler) = 0; +}; + +// The http serve mux interface. +class ISrsHttpServeMux : public ISrsCommonHttpHandler { public: ISrsHttpServeMux(); virtual ~ISrsHttpServeMux(); public: - // Register HTTP handler to mux. - virtual srs_error_t handle(std::string pattern, ISrsHttpHandler *handler) = 0; + // Find the handler for request. + virtual srs_error_t find_handler(ISrsHttpMessage *r, ISrsHttpHandler **ph) = 0; + virtual void unhandle(std::string pattern, ISrsHttpHandler *handler) = 0; }; // ServeMux is an HTTP request multiplexer. @@ -498,7 +511,7 @@ public: virtual srs_error_t handle(std::string pattern, ISrsHttpHandler *handler); // Remove the handler for pattern. Note that this will not free the handler. void unhandle(std::string pattern, ISrsHttpHandler *handler); - // Interface ISrsHttpServeMux + // Interface ISrsCommonHttpHandler public: virtual srs_error_t serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r); @@ -535,7 +548,7 @@ public: public: virtual srs_error_t initialize(bool cros_enabled); - // Interface ISrsHttpServeMux + // Interface ISrsCommonHttpHandler public: virtual srs_error_t serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r); }; @@ -569,7 +582,7 @@ public: public: virtual srs_error_t initialize(bool enabled, std::string username, std::string password); - // Interface ISrsHttpServeMux + // Interface ISrsCommonHttpHandler public: virtual srs_error_t serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r); diff --git a/trunk/src/utest/srs_utest_app10.cpp b/trunk/src/utest/srs_utest_app10.cpp index 2d899d8fd..512d880f2 100644 --- a/trunk/src/utest/srs_utest_app10.cpp +++ b/trunk/src/utest/srs_utest_app10.cpp @@ -269,7 +269,7 @@ VOID TEST(SrsServerTest, HttpHandleSuccess) EXPECT_TRUE(server.get() != NULL); // Inject mock HTTP API mux - ISrsHttpServeMux *original_mux = server->http_api_mux_; + ISrsCommonHttpHandler *original_mux = server->http_api_mux_; server->http_api_mux_ = mock_mux; // Set reuse_api_over_server_ to false to test all handler registrations diff --git a/trunk/src/utest/srs_utest_app10.hpp b/trunk/src/utest/srs_utest_app10.hpp index 8d9f33d07..acfdacb44 100644 --- a/trunk/src/utest/srs_utest_app10.hpp +++ b/trunk/src/utest/srs_utest_app10.hpp @@ -69,8 +69,8 @@ public: virtual std::string get_exporter_listen(); }; -// Mock ISrsHttpServeMux for testing SrsServer::http_handle() -class MockHttpServeMux : public ISrsHttpServeMux +// Mock ISrsCommonHttpHandler for testing SrsServer::http_handle() +class MockHttpServeMux : public ISrsCommonHttpHandler { public: int handle_count_; diff --git a/trunk/src/utest/srs_utest_app14.cpp b/trunk/src/utest/srs_utest_app14.cpp index 9df1deb54..02515094c 100644 --- a/trunk/src/utest/srs_utest_app14.cpp +++ b/trunk/src/utest/srs_utest_app14.cpp @@ -535,7 +535,7 @@ VOID TEST(GB28181Test, ListenerInitialize) srs_freep(mock_listener); } -// Mock ISrsHttpServeMux implementation +// Mock ISrsCommonHttpHandler implementation MockHttpServeMuxForGbListener::MockHttpServeMuxForGbListener() { handle_called_ = false; @@ -578,7 +578,7 @@ MockApiServerOwnerForGbListener::~MockApiServerOwnerForGbListener() mux_ = NULL; } -ISrsHttpServeMux *MockApiServerOwnerForGbListener::api_server() +ISrsCommonHttpHandler *MockApiServerOwnerForGbListener::api_server() { return mux_; } diff --git a/trunk/src/utest/srs_utest_app14.hpp b/trunk/src/utest/srs_utest_app14.hpp index 59eed8f76..a656b3302 100644 --- a/trunk/src/utest/srs_utest_app14.hpp +++ b/trunk/src/utest/srs_utest_app14.hpp @@ -230,8 +230,8 @@ public: virtual int get_stream_caster_listen(SrsConfDirective *conf); }; -// Mock ISrsHttpServeMux for testing SrsGbListener::listen_api -class MockHttpServeMuxForGbListener : public ISrsHttpServeMux +// Mock ISrsCommonHttpHandler for testing SrsGbListener::listen_api +class MockHttpServeMuxForGbListener : public ISrsCommonHttpHandler { public: bool handle_called_; @@ -252,14 +252,14 @@ public: class MockApiServerOwnerForGbListener : public ISrsApiServerOwner { public: - ISrsHttpServeMux *mux_; + ISrsCommonHttpHandler *mux_; public: MockApiServerOwnerForGbListener(); virtual ~MockApiServerOwnerForGbListener(); public: - virtual ISrsHttpServeMux *api_server(); + virtual ISrsCommonHttpHandler *api_server(); }; // Mock ISrsIpListener for testing SrsGbListener::listen