AI: Add utest to cover edge module.

This commit is contained in:
OSSRS-AI 2025-10-07 10:19:27 -04:00 committed by winlin
parent 94dde8e370
commit 8ed07e37b4
40 changed files with 4207 additions and 222 deletions

View File

@ -579,6 +579,66 @@ testing:
- Verify edge cases like sequence number wrap-around, cache overflow, and null inputs
- Use existing mock helper functions for consistency and maintainability
mock_interface_fields:
principle: "MANDATORY - Always mock all private/protected interface member fields (ISrs* types) in the class under test"
description: |
When writing unit tests, ALWAYS identify and mock ALL interface member fields in the class under test.
Interface fields are those with ISrs* prefix (e.g., ISrsAppConfig*, ISrsBasicRtmpClient*, ISrsRequest*).
This enables proper dependency injection and isolation of the unit under test.
process:
- "Step 1: View the class header file to identify all private/protected member fields"
- "Step 2: Identify which fields are interfaces (start with ISrs prefix)"
- "Step 3: Create or reuse mock classes for each interface type"
- "Step 4: In the test, inject mock instances into the class under test by setting the private fields directly"
- "Step 5: After test completes, set injected fields to NULL before object destruction to avoid double-free"
example: |
// Class under test has these private fields:
class SrsEdgeRtmpUpstream {
private:
ISrsAppConfig *config_; // Interface - MUST mock
ISrsBasicRtmpClient *sdk_; // Interface - MUST mock
std::string redirect_; // Not interface - no need to mock
int selected_port_; // Not interface - no need to mock
};
// In unit test:
VOID TEST(EdgeRtmpUpstreamTest, ConnectToOrigin) {
// Create mocks for ALL interface fields
SrsUniquePtr<MockEdgeConfig> mock_config(new MockEdgeConfig());
MockEdgeRtmpClient *mock_sdk = new MockEdgeRtmpClient();
// Create object under test
SrsUniquePtr<SrsEdgeRtmpUpstream> upstream(new SrsEdgeRtmpUpstream(""));
// Inject mocks into private interface fields
upstream->config_ = mock_config.get();
upstream->sdk_ = mock_sdk;
// Run test
err = upstream->connect(req.get(), lb.get());
HELPER_EXPECT_SUCCESS(err);
// Verify mock interactions
EXPECT_TRUE(mock_sdk->connect_called_);
// Clean up - set to NULL to avoid double-free
upstream->sdk_ = NULL;
srs_freep(mock_sdk);
}
rules:
- "ALWAYS view the class header file first to identify all member fields"
- "ALWAYS mock ALL interface fields (ISrs* prefix) - no exceptions"
- "Create mock classes that implement the interface if they don't exist"
- "Reuse existing mock classes from srs_utest_app*.hpp when available"
- "Inject mocks by directly setting private member fields (accessible in utests)"
- "Set injected fields to NULL before object destruction to prevent double-free"
- "Non-interface fields (std::string, int, etc.) do not need mocking"
rationale: "Mocking all interface dependencies ensures tests are isolated, fast, and don't require real network/file/database resources. It also makes tests deterministic and easier to debug."
test_object_declaration:
- pattern: "Use unique pointers for object instantiation"
description: "MANDATORY - Always use SrsUniquePtr for object declaration in unit tests instead of stack allocation"

View File

@ -138,14 +138,23 @@ echo "" >> ${FILE}; echo "" >> ${FILE}
# Depends, the depends objects
echo "# Depends, the depends objects" >> ${FILE}
#
# current module header files
echo -n "SRS_UTEST_DEPS = " >> ${FILE}
# current SRS object files
echo -n "SRS_SOURCE_OBJS = " >> ${FILE}
for item in ${MODULE_OBJS[*]}; do
FILE_NAME=${item%.*}
echo -n "${SRS_TRUNK_PREFIX}/${SRS_OBJS}/${FILE_NAME}.o " >> ${FILE}
done
echo "" >> ${FILE}; echo "" >> ${FILE}
#
# current SRS header files
echo -n "SRS_SOURCE_HEADERS = " >> ${FILE}
for item in ${MODULE_OBJS[*]}; do
FILE_NAME=${item%.*}
echo -n "${SRS_TRUNK_PREFIX}/${FILE_NAME}.hpp " >> ${FILE}
done
echo "" >> ${FILE}; echo "" >> ${FILE}
#
# current utest header files
echo "# Depends, utest header files" >> ${FILE}
DEPS_NAME="UTEST_DEPS"
echo -n "${DEPS_NAME} = " >> ${FILE}
@ -163,7 +172,7 @@ MODULE_OBJS=()
for item in ${MODULE_FILES[*]}; do
MODULE_OBJS="${MODULE_OBJS[@]} ${item}.o"
cat << END >> ${FILE}
${item}.o : \$(${DEPS_NAME}) ${SRS_TRUNK_PREFIX}/${MODULE_DIR}/${item}.cpp \$(SRS_UTEST_DEPS)
${item}.o : \$(${DEPS_NAME}) ${SRS_TRUNK_PREFIX}/${MODULE_DIR}/${item}.cpp \$(SRS_SOURCE_HEADERS)
\$(CXX) \$(CPPFLAGS) \$(CXXFLAGS) \$(SRS_UTEST_INC) -c ${SRS_TRUNK_PREFIX}/${MODULE_DIR}/${item}.cpp -o \$@
END
done
@ -186,7 +195,7 @@ echo "" >> ${FILE}; echo "" >> ${FILE}
#
echo "# generate the utest binary" >> ${FILE}
cat << END >> ${FILE}
${SRS_TRUNK_PREFIX}/${SRS_OBJS}/${APP_NAME} : \$(SRS_UTEST_DEPS) ${MODULE_OBJS} gtest.a
${SRS_TRUNK_PREFIX}/${SRS_OBJS}/${APP_NAME} : \$(SRS_SOURCE_OBJS) ${MODULE_OBJS} gtest.a
\$(CXX) -o \$@ \$(CPPFLAGS) \$(CXXFLAGS) \$^ \$(DEPS_LIBRARIES_FILES) ${LINK_OPTIONS}
END

2
trunk/configure vendored
View File

@ -384,7 +384,7 @@ if [[ $SRS_UTEST == YES ]]; then
"srs_utest_coworkers" "srs_utest_pithy_print" "srs_utest_kernel3" "srs_utest_protocol4"
"srs_utest_protocol3" "srs_utest_app" "srs_utest_app2" "srs_utest_app3" "srs_utest_app4"
"srs_utest_app5" "srs_utest_app6" "srs_utest_app7" "srs_utest_app8" "srs_utest_app9"
"srs_utest_app10" "srs_utest_app11" "srs_utest_app12")
"srs_utest_app10" "srs_utest_app11" "srs_utest_app12" "srs_utest_app13")
# Always include SRT utest
MODULE_FILES+=("srs_utest_srt")
if [[ $SRS_GB28181 == YES ]]; then

View File

@ -504,6 +504,11 @@ public:
virtual bool get_vhost_http_remux_has_video(std::string vhost) = 0;
virtual bool get_vhost_http_remux_guess_has_av(std::string vhost) = 0;
virtual std::string get_vhost_http_remux_mount(std::string vhost) = 0;
public:
virtual std::string get_vhost_edge_protocol(std::string vhost) = 0;
virtual bool get_vhost_edge_follow_client(std::string vhost) = 0;
virtual std::string get_vhost_edge_transform_vhost(std::string vhost) = 0;
};
// The config service provider.

View File

@ -21,6 +21,7 @@ using namespace std;
#include <srs_app_st.hpp>
#include <srs_kernel_pithy_print.hpp>
#include <srs_app_factory.hpp>
#include <srs_app_utility.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_balance.hpp>
@ -42,11 +43,11 @@ using namespace std;
// when edge error, wait for quit
#define SRS_EDGE_FORWARDER_TIMEOUT (150 * SRS_UTIME_MILLISECONDS)
SrsEdgeUpstream::SrsEdgeUpstream()
ISrsEdgeUpstream::ISrsEdgeUpstream()
{
}
SrsEdgeUpstream::~SrsEdgeUpstream()
ISrsEdgeUpstream::~ISrsEdgeUpstream()
{
}
@ -55,11 +56,17 @@ SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream(string r)
redirect_ = r;
sdk_ = NULL;
selected_port_ = 0;
config_ = _srs_config;
app_factory_ = _srs_app_factory;
}
SrsEdgeRtmpUpstream::~SrsEdgeRtmpUpstream()
{
close();
config_ = NULL;
app_factory_ = NULL;
}
srs_error_t SrsEdgeRtmpUpstream::connect(ISrsRequest *r, ISrsLbRoundRobin *lb)
@ -70,7 +77,7 @@ srs_error_t SrsEdgeRtmpUpstream::connect(ISrsRequest *r, ISrsLbRoundRobin *lb)
std::string url;
if (true) {
SrsConfDirective *conf = _srs_config->get_vhost_edge_origin(req->vhost_);
SrsConfDirective *conf = config_->get_vhost_edge_origin(req->vhost_);
// when origin is error, for instance, server is shutdown,
// then user remove the vhost then reload, the conf is empty.
@ -95,7 +102,7 @@ srs_error_t SrsEdgeRtmpUpstream::connect(ISrsRequest *r, ISrsLbRoundRobin *lb)
selected_port_ = port;
// support vhost tranform for edge,
std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost_);
std::string vhost = config_->get_vhost_edge_transform_vhost(req->vhost_);
vhost = srs_strings_replace(vhost, "[vhost]", req->vhost_);
url = srs_net_url_encode_rtmp_url(server, port, req->host_, vhost, req->app_, req->stream_, req->param_);
@ -104,7 +111,8 @@ srs_error_t SrsEdgeRtmpUpstream::connect(ISrsRequest *r, ISrsLbRoundRobin *lb)
srs_freep(sdk_);
srs_utime_t cto = SRS_EDGE_INGESTER_TIMEOUT;
srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
sdk_ = new SrsSimpleRtmpClient(url, cto, sto);
// Use factory to create client.
sdk_ = app_factory_->create_rtmp_client(url, cto, sto);
if ((err = sdk_->connect()) != srs_success) {
return srs_error_wrap(err, "edge pull %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
@ -113,7 +121,7 @@ srs_error_t SrsEdgeRtmpUpstream::connect(ISrsRequest *r, ISrsLbRoundRobin *lb)
// For RTMP client, we pass the vhost in tcUrl when connecting,
// so we publish without vhost in stream.
string stream;
if ((err = sdk_->play(_srs_config->get_chunk_size(req->vhost_), false, &stream)) != srs_success) {
if ((err = sdk_->play(config_->get_chunk_size(req->vhost_), false, &stream)) != srs_success) {
return srs_error_wrap(err, "edge pull %s stream failed", url.c_str());
}
@ -163,11 +171,17 @@ SrsEdgeFlvUpstream::SrsEdgeFlvUpstream(std::string schema)
reader_ = NULL;
decoder_ = NULL;
req_ = NULL;
config_ = _srs_config;
app_factory_ = _srs_app_factory;
}
SrsEdgeFlvUpstream::~SrsEdgeFlvUpstream()
{
close();
config_ = NULL;
app_factory_ = NULL;
}
srs_error_t SrsEdgeFlvUpstream::connect(ISrsRequest *r, ISrsLbRoundRobin *lb)
@ -189,7 +203,7 @@ srs_error_t SrsEdgeFlvUpstream::do_connect(ISrsRequest *r, ISrsLbRoundRobin *lb,
ISrsRequest *req = r;
if (redirect_depth == 0) {
SrsConfDirective *conf = _srs_config->get_vhost_edge_origin(req->vhost_);
SrsConfDirective *conf = config_->get_vhost_edge_origin(req->vhost_);
// when origin is error, for instance, server is shutdown,
// then user remove the vhost then reload, the conf is empty.
@ -216,7 +230,7 @@ srs_error_t SrsEdgeFlvUpstream::do_connect(ISrsRequest *r, ISrsLbRoundRobin *lb,
}
srs_freep(sdk_);
sdk_ = new SrsHttpClient();
sdk_ = app_factory_->create_http_client();
string path = "/" + req->app_ + "/" + req->stream_;
if (!srs_strings_ends_with(req->stream_, ".flv")) {
@ -275,10 +289,10 @@ srs_error_t SrsEdgeFlvUpstream::do_connect(ISrsRequest *r, ISrsLbRoundRobin *lb,
}
srs_freep(reader_);
reader_ = new SrsHttpFileReader(hr_->body_reader());
reader_ = app_factory_->create_http_file_reader(hr_->body_reader());
srs_freep(decoder_);
decoder_ = new SrsFlvDecoder();
decoder_ = app_factory_->create_flv_decoder();
if ((err = decoder_->initialize(reader_)) != srs_success) {
return srs_error_wrap(err, "init decoder");
@ -383,6 +397,14 @@ void SrsEdgeFlvUpstream::kbps_sample(const char *label, srs_utime_t age)
sdk_->kbps_sample(label, age);
}
ISrsEdgeIngester::ISrsEdgeIngester()
{
}
ISrsEdgeIngester::~ISrsEdgeIngester()
{
}
SrsEdgeIngester::SrsEdgeIngester()
{
source_ = NULL;
@ -392,6 +414,8 @@ SrsEdgeIngester::SrsEdgeIngester()
upstream_ = new SrsEdgeRtmpUpstream("");
lb_ = new SrsLbRoundRobin();
trd_ = new SrsDummyCoroutine();
config_ = _srs_config;
}
SrsEdgeIngester::~SrsEdgeIngester()
@ -401,6 +425,8 @@ SrsEdgeIngester::~SrsEdgeIngester()
srs_freep(upstream_);
srs_freep(lb_);
srs_freep(trd_);
config_ = NULL;
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
@ -409,7 +435,7 @@ SrsEdgeIngester::~SrsEdgeIngester()
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsEdgeIngester::initialize(SrsSharedPtr<SrsLiveSource> s, SrsPlayEdge *e, ISrsRequest *r)
srs_error_t SrsEdgeIngester::initialize(SrsSharedPtr<SrsLiveSource> s, ISrsPlayEdge *e, ISrsRequest *r)
{
// Because source references to this object, so we should directly use the source ptr.
source_ = s.get();
@ -490,10 +516,10 @@ srs_error_t SrsEdgeIngester::do_cycle()
}
// Use protocol in config.
string edge_protocol = _srs_config->get_vhost_edge_protocol(req_->vhost_);
string edge_protocol = config_->get_vhost_edge_protocol(req_->vhost_);
// If follow client protocol, change to protocol of client.
bool follow_client = _srs_config->get_vhost_edge_follow_client(req_->vhost_);
bool follow_client = config_->get_vhost_edge_follow_client(req_->vhost_);
if (follow_client && !req_->protocol_.empty()) {
edge_protocol = req_->protocol_;
}
@ -676,6 +702,14 @@ srs_error_t SrsEdgeIngester::process_publish_message(SrsRtmpCommonMessage *msg,
return err;
}
ISrsEdgeForwarder::ISrsEdgeForwarder()
{
}
ISrsEdgeForwarder::~ISrsEdgeForwarder()
{
}
SrsEdgeForwarder::SrsEdgeForwarder()
{
edge_ = NULL;
@ -687,6 +721,8 @@ SrsEdgeForwarder::SrsEdgeForwarder()
lb_ = new SrsLbRoundRobin();
trd_ = new SrsDummyCoroutine();
queue_ = new SrsMessageQueue();
config_ = _srs_config;
}
SrsEdgeForwarder::~SrsEdgeForwarder()
@ -696,6 +732,8 @@ SrsEdgeForwarder::~SrsEdgeForwarder()
srs_freep(lb_);
srs_freep(trd_);
srs_freep(queue_);
config_ = NULL;
}
void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size)
@ -709,7 +747,7 @@ void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size)
// IMPORTANT: All field initialization in this method MUST NOT cause coroutine context switches.
// This prevents the race condition where multiple coroutines could create duplicate sources
// for the same stream when context switches occurred during initialization.
srs_error_t SrsEdgeForwarder::initialize(SrsSharedPtr<SrsLiveSource> s, SrsPublishEdge *e, ISrsRequest *r)
srs_error_t SrsEdgeForwarder::initialize(SrsSharedPtr<SrsLiveSource> s, ISrsPublishEdge *e, ISrsRequest *r)
{
// Because source references to this object, so we should directly use the source ptr.
source_ = s.get();
@ -729,7 +767,7 @@ srs_error_t SrsEdgeForwarder::start()
std::string url;
if (true) {
SrsConfDirective *conf = _srs_config->get_vhost_edge_origin(req_->vhost_);
SrsConfDirective *conf = config_->get_vhost_edge_origin(req_->vhost_);
srs_assert(conf);
// select the origin.
@ -738,7 +776,7 @@ srs_error_t SrsEdgeForwarder::start()
srs_net_split_hostport(server, server, port);
// support vhost tranform for edge,
std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req_->vhost_);
std::string vhost = config_->get_vhost_edge_transform_vhost(req_->vhost_);
vhost = srs_strings_replace(vhost, "[vhost]", req_->vhost_);
url = srs_net_url_encode_rtmp_url(server, port, req_->host_, vhost, req_->app_, req_->stream_, req_->param_);
@ -761,7 +799,7 @@ srs_error_t SrsEdgeForwarder::start()
// For RTMP client, we pass the vhost in tcUrl when connecting,
// so we publish without vhost in stream.
string stream;
if ((err = sdk_->publish(_srs_config->get_chunk_size(req_->vhost_), false, &stream)) != srs_success) {
if ((err = sdk_->publish(config_->get_chunk_size(req_->vhost_), false, &stream)) != srs_success) {
return srs_error_wrap(err, "sdk publish");
}
@ -905,6 +943,14 @@ srs_error_t SrsEdgeForwarder::proxy(SrsRtmpCommonMessage *msg)
return err;
}
ISrsPlayEdge::ISrsPlayEdge()
{
}
ISrsPlayEdge::~ISrsPlayEdge()
{
}
SrsPlayEdge::SrsPlayEdge()
{
state_ = SrsEdgeStateInit;
@ -983,6 +1029,14 @@ srs_error_t SrsPlayEdge::on_ingest_play()
return err;
}
ISrsPublishEdge::ISrsPublishEdge()
{
}
ISrsPublishEdge::~ISrsPublishEdge()
{
}
SrsPublishEdge::SrsPublishEdge()
{
state_ = SrsEdgeStateInit;

View File

@ -33,6 +33,15 @@ class SrsHttpClient;
class ISrsHttpMessage;
class SrsHttpFileReader;
class SrsFlvDecoder;
class ISrsAppConfig;
class ISrsBasicRtmpClient;
class ISrsHttpClient;
class ISrsFileReader;
class ISrsFlvDecoder;
class ISrsLiveSource;
class ISrsPlayEdge;
class ISrsPublishEdge;
class ISrsAppFactory;
// The state of edge, auto machine
enum SrsEdgeState {
@ -57,11 +66,11 @@ enum SrsEdgeUserState {
};
// The upstream of edge, can be rtmp or http.
class SrsEdgeUpstream
class ISrsEdgeUpstream
{
public:
SrsEdgeUpstream();
virtual ~SrsEdgeUpstream();
ISrsEdgeUpstream();
virtual ~ISrsEdgeUpstream();
public:
virtual srs_error_t connect(ISrsRequest *r, ISrsLbRoundRobin *lb) = 0;
@ -75,13 +84,18 @@ public:
virtual void kbps_sample(const char *label, srs_utime_t age) = 0;
};
class SrsEdgeRtmpUpstream : public SrsEdgeUpstream
// The RTMP upstream of edge.
class SrsEdgeRtmpUpstream : public ISrsEdgeUpstream
{
private:
ISrsAppConfig *config_;
ISrsAppFactory *app_factory_;
private:
// For RTMP 302, if not empty,
// use this <ip[:port]> as upstream.
std::string redirect_;
SrsSimpleRtmpClient *sdk_;
ISrsBasicRtmpClient *sdk_;
private:
// Current selected server, the ip:port.
@ -105,16 +119,21 @@ public:
virtual void kbps_sample(const char *label, srs_utime_t age);
};
class SrsEdgeFlvUpstream : public SrsEdgeUpstream
// The HTTP FLV upstream of edge.
class SrsEdgeFlvUpstream : public ISrsEdgeUpstream
{
private:
ISrsAppConfig *config_;
ISrsAppFactory *app_factory_;
private:
std::string schema_;
SrsHttpClient *sdk_;
ISrsHttpClient *sdk_;
ISrsHttpMessage *hr_;
private:
SrsHttpFileReader *reader_;
SrsFlvDecoder *decoder_;
ISrsFileReader *reader_;
ISrsFlvDecoder *decoder_;
private:
// We might modify the request by HTTP redirect.
@ -144,26 +163,45 @@ public:
virtual void kbps_sample(const char *label, srs_utime_t age);
};
// The interface for edge ingester.
class ISrsEdgeIngester
{
public:
ISrsEdgeIngester();
virtual ~ISrsEdgeIngester();
public:
// Initialize the ingester.
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, ISrsPlayEdge *e, ISrsRequest *r) = 0;
// Start the ingester.
virtual srs_error_t start() = 0;
// Stop the ingester.
virtual void stop() = 0;
};
// The edge used to ingest stream from origin.
class SrsEdgeIngester : public ISrsCoroutineHandler
class SrsEdgeIngester : public ISrsCoroutineHandler, public ISrsEdgeIngester
{
private:
// Because source references to this object, so we should directly use the source ptr.
SrsLiveSource *source_;
ISrsAppConfig *config_;
private:
SrsPlayEdge *edge_;
// Because source references to this object, so we should directly use the source ptr.
ISrsLiveSource *source_;
private:
ISrsPlayEdge *edge_;
ISrsRequest *req_;
ISrsCoroutine *trd_;
ISrsLbRoundRobin *lb_;
SrsEdgeUpstream *upstream_;
ISrsEdgeUpstream *upstream_;
public:
SrsEdgeIngester();
virtual ~SrsEdgeIngester();
public:
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, SrsPlayEdge *e, ISrsRequest *r);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, ISrsPlayEdge *e, ISrsRequest *r);
virtual srs_error_t start();
virtual void stop();
@ -179,15 +217,38 @@ private:
virtual srs_error_t process_publish_message(SrsRtmpCommonMessage *msg, std::string &redirect);
};
// The interface for edge forwarder.
class ISrsEdgeForwarder
{
public:
ISrsEdgeForwarder();
virtual ~ISrsEdgeForwarder();
public:
// Set the queue size.
virtual void set_queue_size(srs_utime_t queue_size) = 0;
// Initialize the forwarder.
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, ISrsPublishEdge *e, ISrsRequest *r) = 0;
// Start the forwarder.
virtual srs_error_t start() = 0;
// Stop the forwarder.
virtual void stop() = 0;
// Proxy publish stream to edge.
virtual srs_error_t proxy(SrsRtmpCommonMessage *msg) = 0;
};
// The edge used to forward stream to origin.
class SrsEdgeForwarder : public ISrsCoroutineHandler
class SrsEdgeForwarder : public ISrsCoroutineHandler, public ISrsEdgeForwarder
{
private:
// Because source references to this object, so we should directly use the source ptr.
SrsLiveSource *source_;
ISrsAppConfig *config_;
private:
SrsPublishEdge *edge_;
// Because source references to this object, so we should directly use the source ptr.
ISrsLiveSource *source_;
private:
ISrsPublishEdge *edge_;
ISrsRequest *req_;
ISrsCoroutine *trd_;
SrsSimpleRtmpClient *sdk_;
@ -208,7 +269,7 @@ public:
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, SrsPublishEdge *e, ISrsRequest *r);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, ISrsPublishEdge *e, ISrsRequest *r);
virtual srs_error_t start();
virtual void stop();
// Interface ISrsReusableThread2Handler
@ -222,12 +283,24 @@ public:
virtual srs_error_t proxy(SrsRtmpCommonMessage *msg);
};
// The interface for play edge.
class ISrsPlayEdge
{
public:
ISrsPlayEdge();
virtual ~ISrsPlayEdge();
public:
// When ingester start to play stream.
virtual srs_error_t on_ingest_play() = 0;
};
// The play edge control service.
class SrsPlayEdge
class SrsPlayEdge : public ISrsPlayEdge
{
private:
SrsEdgeState state_;
SrsEdgeIngester *ingester_;
ISrsEdgeIngester *ingester_;
public:
SrsPlayEdge();
@ -248,12 +321,22 @@ public:
virtual srs_error_t on_ingest_play();
};
// The interface for publish edge.
class ISrsPublishEdge
{
public:
ISrsPublishEdge();
virtual ~ISrsPublishEdge();
public:
};
// The publish edge control service.
class SrsPublishEdge
class SrsPublishEdge : public ISrsPublishEdge
{
private:
SrsEdgeState state_;
SrsEdgeForwarder *forwarder_;
ISrsEdgeForwarder *forwarder_;
public:
SrsPublishEdge();

View File

@ -6,14 +6,27 @@
#include <srs_app_factory.hpp>
#include <srs_app_caster_flv.hpp>
#include <srs_app_config.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_rtmp_source.hpp>
#include <srs_app_st.hpp>
#include <srs_kernel_file.hpp>
#include <srs_kernel_flv.hpp>
#include <srs_kernel_hourglass.hpp>
#include <srs_kernel_ts.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_http_client.hpp>
#include <srs_protocol_st.hpp>
#include <srs_app_rtsp_source.hpp>
ISrsAppFactory::ISrsAppFactory()
{
}
ISrsAppFactory::~ISrsAppFactory()
{
}
SrsAppFactory::SrsAppFactory()
{
@ -60,6 +73,36 @@ ISrsHourGlass *SrsAppFactory::create_hourglass(const std::string &name, ISrsHour
return new SrsHourGlass(name, handler, interval);
}
ISrsBasicRtmpClient *SrsAppFactory::create_rtmp_client(std::string url, srs_utime_t cto, srs_utime_t sto)
{
return new SrsSimpleRtmpClient(url, cto, sto);
}
ISrsHttpClient *SrsAppFactory::create_http_client()
{
return new SrsHttpClient();
}
ISrsFileReader *SrsAppFactory::create_http_file_reader(ISrsHttpResponseReader *r)
{
return new SrsHttpFileReader(r);
}
ISrsFlvDecoder *SrsAppFactory::create_flv_decoder()
{
return new SrsFlvDecoder();
}
ISrsRtspSendTrack *SrsAppFactory::create_rtsp_audio_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc)
{
return new SrsRtspAudioSendTrack(session, track_desc);
}
ISrsRtspSendTrack *SrsAppFactory::create_rtsp_video_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc)
{
return new SrsRtspVideoSendTrack(session, track_desc);
}
SrsFinalFactory::SrsFinalFactory()
{
}

View File

@ -18,9 +18,40 @@ class SrsLiveSource;
class ISrsOriginHub;
class ISrsHourGlass;
class ISrsHourGlassHandler;
class ISrsBasicRtmpClient;
class ISrsHttpClient;
class ISrsFileReader;
class ISrsFlvDecoder;
class ISrsHttpResponseReader;
class ISrsRtspSendTrack;
class ISrsRtspConnection;
class SrsRtcTrackDescription;
// The factory to create app objects.
class SrsAppFactory
class ISrsAppFactory
{
public:
ISrsAppFactory();
virtual ~ISrsAppFactory();
public:
virtual ISrsFileWriter *create_file_writer() = 0;
virtual ISrsFileWriter *create_enc_file_writer() = 0;
virtual ISrsFileReader *create_file_reader() = 0;
virtual SrsPath *create_path() = 0;
virtual SrsLiveSource *create_live_source() = 0;
virtual ISrsOriginHub *create_origin_hub() = 0;
virtual ISrsHourGlass *create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval) = 0;
virtual ISrsBasicRtmpClient *create_rtmp_client(std::string url, srs_utime_t cto, srs_utime_t sto) = 0;
virtual ISrsHttpClient *create_http_client() = 0;
virtual ISrsFileReader *create_http_file_reader(ISrsHttpResponseReader *r) = 0;
virtual ISrsFlvDecoder *create_flv_decoder() = 0;
virtual ISrsRtspSendTrack *create_rtsp_audio_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc) = 0;
virtual ISrsRtspSendTrack *create_rtsp_video_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc) = 0;
};
// The factory to create app objects.
class SrsAppFactory : public ISrsAppFactory
{
public:
SrsAppFactory();
@ -34,9 +65,15 @@ public:
virtual SrsLiveSource *create_live_source();
virtual ISrsOriginHub *create_origin_hub();
virtual ISrsHourGlass *create_hourglass(const std::string &name, ISrsHourGlassHandler *handler, srs_utime_t interval);
virtual ISrsBasicRtmpClient *create_rtmp_client(std::string url, srs_utime_t cto, srs_utime_t sto);
virtual ISrsHttpClient *create_http_client();
virtual ISrsFileReader *create_http_file_reader(ISrsHttpResponseReader *r);
virtual ISrsFlvDecoder *create_flv_decoder();
virtual ISrsRtspSendTrack *create_rtsp_audio_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc);
virtual ISrsRtspSendTrack *create_rtsp_video_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc);
};
extern SrsAppFactory *_srs_app_factory;
extern ISrsAppFactory *_srs_app_factory;
// The factory to create kernel objects.
class SrsFinalFactory : public ISrsKernelFactory

View File

@ -38,7 +38,7 @@ class SrsTsContext;
class SrsFmp4SegmentEncoder;
class ISrsHttpHooks;
class ISrsAppConfig;
class SrsAppFactory;
class ISrsAppFactory;
// The wrapper of m3u8 segment from specification:
//
@ -186,7 +186,7 @@ class SrsHlsMuxer
{
private:
ISrsAppConfig *config_;
SrsAppFactory *app_factory_;
ISrsAppFactory *app_factory_;
private:
ISrsRequest *req_;
@ -329,7 +329,7 @@ class SrsHlsFmp4Muxer
{
private:
ISrsAppConfig *config_;
SrsAppFactory *app_factory_;
ISrsAppFactory *app_factory_;
private:
ISrsRequest *req_;

View File

@ -556,7 +556,12 @@ public:
//
// For performance, we use non-public from resource,
// see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a
class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, public ISrsExpire, public ISrsRtcPacketSender, public ISrsRtcPacketReceiver, public ISrsRtcConnectionNackTimerHandler
class SrsRtcConnection : public ISrsResource, // It's a resource.
public ISrsDisposingHandler,
public ISrsExpire,
public ISrsRtcPacketSender,
public ISrsRtcPacketReceiver,
public ISrsRtcConnectionNackTimerHandler
{
friend class SrsSecurityTransport;

View File

@ -154,7 +154,12 @@ public:
virtual const char *transport_type();
};
class SrsRtmpConn : public ISrsConnection, public ISrsStartable, public ISrsReloadHandler, public ISrsCoroutineHandler, public ISrsExpire
// The RTMP connection, for client to publish or play stream.
class SrsRtmpConn : public ISrsConnection, // It's a resource.
public ISrsStartable,
public ISrsReloadHandler,
public ISrsCoroutineHandler,
public ISrsExpire
{
// For the thread to directly access any field of connection.
friend class SrsPublishRecvThread;

View File

@ -60,7 +60,7 @@ class ISrsHds;
#endif
class ISrsNgExec;
class ISrsForwarder;
class SrsAppFactory;
class ISrsAppFactory;
class ISrsLiveConsumer;
// The time jitter algorithm:
@ -571,7 +571,7 @@ public:
class SrsLiveSourceManager : public ISrsHourGlassHandler, public ISrsLiveSourceManager
{
private:
SrsAppFactory *app_factory_;
ISrsAppFactory *app_factory_;
private:
srs_mutex_t lock_;
@ -624,6 +624,16 @@ public:
virtual SrsContextId pre_source_id() = 0;
virtual SrsMetaCache *meta() = 0;
virtual SrsRtmpFormat *format() = 0;
// The source id changed.
virtual srs_error_t on_source_id_changed(SrsContextId id) = 0;
// Publish stream event notify.
virtual srs_error_t on_publish() = 0;
virtual void on_unpublish() = 0;
// Handle media messages.
virtual srs_error_t on_audio(SrsRtmpCommonMessage *audio) = 0;
virtual srs_error_t on_video(SrsRtmpCommonMessage *video) = 0;
virtual srs_error_t on_aggregate(SrsRtmpCommonMessage *msg) = 0;
virtual srs_error_t on_meta_data(SrsRtmpCommonMessage *msg, SrsOnMetaDataPacket *metadata) = 0;
};
// The live streaming source.
@ -633,7 +643,7 @@ private:
ISrsAppConfig *config_;
ISrsStatistic *stat_;
ISrsLiveSourceHandler *handler_;
SrsAppFactory *app_factory_;
ISrsAppFactory *app_factory_;
private:
// For publish, it's the publish client id.

View File

@ -29,6 +29,7 @@ using namespace std;
#include <srs_protocol_rtsp_stack.hpp>
#include <srs_protocol_st.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_app_factory.hpp>
extern SrsPps *_srs_pps_snack;
extern SrsPps *_srs_pps_snack2;
@ -41,7 +42,15 @@ extern SrsPps *_srs_pps_rnack2;
extern SrsPps *_srs_pps_pub;
extern SrsPps *_srs_pps_conn;
SrsRtspPlayStream::SrsRtspPlayStream(SrsRtspConnection *s, const SrsContextId &cid) : source_(new SrsRtspSource())
ISrsRtspPlayStream::ISrsRtspPlayStream()
{
}
ISrsRtspPlayStream::~ISrsRtspPlayStream()
{
}
SrsRtspPlayStream::SrsRtspPlayStream(ISrsRtspConnection *s, const SrsContextId &cid) : source_(new SrsRtspSource())
{
cid_ = cid;
trd_ = NULL;
@ -53,6 +62,10 @@ SrsRtspPlayStream::SrsRtspPlayStream(SrsRtspConnection *s, const SrsContextId &c
cache_ssrc0_ = cache_ssrc1_ = cache_ssrc2_ = 0;
cache_track0_ = cache_track1_ = cache_track2_ = NULL;
app_factory_ = _srs_app_factory;
stat_ = _srs_stat;
rtsp_sources_ = _srs_rtsp_sources;
}
SrsRtspPlayStream::~SrsRtspPlayStream()
@ -61,23 +74,26 @@ SrsRtspPlayStream::~SrsRtspPlayStream()
srs_freep(req_);
if (true) {
std::map<uint32_t, SrsRtspAudioSendTrack *>::iterator it;
std::map<uint32_t, ISrsRtspSendTrack *>::iterator it;
for (it = audio_tracks_.begin(); it != audio_tracks_.end(); ++it) {
srs_freep(it->second);
}
}
if (true) {
std::map<uint32_t, SrsRtspVideoSendTrack *>::iterator it;
std::map<uint32_t, ISrsRtspSendTrack *>::iterator it;
for (it = video_tracks_.begin(); it != video_tracks_.end(); ++it) {
srs_freep(it->second);
}
}
// update the statistic when client coveried.
SrsStatistic *stat = _srs_stat;
// TODO: FIXME: Should finger out the err.
stat->on_disconnect(cid_.c_str(), srs_success);
stat_->on_disconnect(cid_.c_str(), srs_success);
app_factory_ = NULL;
stat_ = NULL;
rtsp_sources_ = NULL;
}
srs_error_t SrsRtspPlayStream::initialize(ISrsRequest *req, std::map<uint32_t, SrsRtcTrackDescription *> sub_relations)
@ -87,12 +103,11 @@ srs_error_t SrsRtspPlayStream::initialize(ISrsRequest *req, std::map<uint32_t, S
req_ = req->copy();
// We must do stat the client before hooks, because hooks depends on it.
SrsStatistic *stat = _srs_stat;
if ((err = stat->on_client(cid_.c_str(), req_, session_, SrsRtcConnPlay)) != srs_success) {
if ((err = stat_->on_client(cid_.c_str(), req_, session_, SrsRtcConnPlay)) != srs_success) {
return srs_error_wrap(err, "RTSP: stat client");
}
if ((err = _srs_rtsp_sources->fetch_or_create(req_, source_)) != srs_success) {
if ((err = rtsp_sources_->fetch_or_create(req_, source_)) != srs_success) {
return srs_error_wrap(err, "RTSP: fetch source failed");
}
@ -101,12 +116,12 @@ srs_error_t SrsRtspPlayStream::initialize(ISrsRequest *req, std::map<uint32_t, S
SrsRtcTrackDescription *desc = it->second;
if (desc->type_ == "audio") {
SrsRtspAudioSendTrack *track = new SrsRtspAudioSendTrack(session_, desc);
ISrsRtspSendTrack *track = app_factory_->create_rtsp_audio_send_track(session_, desc);
audio_tracks_.insert(make_pair(ssrc, track));
}
if (desc->type_ == "video") {
SrsRtspVideoSendTrack *track = new SrsRtspVideoSendTrack(session_, desc);
ISrsRtspSendTrack *track = app_factory_->create_rtsp_video_send_track(session_, desc);
video_tracks_.insert(make_pair(ssrc, track));
}
}
@ -125,15 +140,15 @@ void SrsRtspPlayStream::on_stream_change(SrsRtcSourceDescription *desc)
if (desc && desc->audio_track_desc_ && audio_tracks_.size() == 1) {
if (!audio_tracks_.empty()) {
uint32_t ssrc = desc->audio_track_desc_->ssrc_;
SrsRtspAudioSendTrack *track = audio_tracks_.begin()->second;
ISrsRtspSendTrack *track = audio_tracks_.begin()->second;
if (track->track_desc_->media_->pt_of_publisher_ != desc->audio_track_desc_->media_->pt_) {
track->track_desc_->media_->pt_of_publisher_ = desc->audio_track_desc_->media_->pt_;
if (track->track_desc()->media_->pt_of_publisher_ != desc->audio_track_desc_->media_->pt_) {
track->track_desc()->media_->pt_of_publisher_ = desc->audio_track_desc_->media_->pt_;
}
if (desc->audio_track_desc_->red_ && track->track_desc_->red_ &&
track->track_desc_->red_->pt_of_publisher_ != desc->audio_track_desc_->red_->pt_) {
track->track_desc_->red_->pt_of_publisher_ = desc->audio_track_desc_->red_->pt_;
if (desc->audio_track_desc_->red_ && track->track_desc()->red_ &&
track->track_desc()->red_->pt_of_publisher_ != desc->audio_track_desc_->red_->pt_) {
track->track_desc()->red_->pt_of_publisher_ = desc->audio_track_desc_->red_->pt_;
}
audio_tracks_.clear();
@ -147,15 +162,15 @@ void SrsRtspPlayStream::on_stream_change(SrsRtcSourceDescription *desc)
if (!video_tracks_.empty()) {
SrsRtcTrackDescription *vdesc = desc->video_track_descs_.at(0);
uint32_t ssrc = vdesc->ssrc_;
SrsRtspVideoSendTrack *track = video_tracks_.begin()->second;
ISrsRtspSendTrack *track = video_tracks_.begin()->second;
if (track->track_desc_->media_->pt_of_publisher_ != vdesc->media_->pt_) {
track->track_desc_->media_->pt_of_publisher_ = vdesc->media_->pt_;
if (track->track_desc()->media_->pt_of_publisher_ != vdesc->media_->pt_) {
track->track_desc()->media_->pt_of_publisher_ = vdesc->media_->pt_;
}
if (vdesc->red_ && track->track_desc_->red_ &&
track->track_desc_->red_->pt_of_publisher_ != vdesc->red_->pt_) {
track->track_desc_->red_->pt_of_publisher_ = vdesc->red_->pt_;
if (vdesc->red_ && track->track_desc()->red_ &&
track->track_desc()->red_->pt_of_publisher_ != vdesc->red_->pt_) {
track->track_desc()->red_->pt_of_publisher_ = vdesc->red_->pt_;
}
video_tracks_.clear();
@ -267,7 +282,7 @@ srs_error_t SrsRtspPlayStream::send_packet(SrsRtpPacket *&pkt)
uint32_t ssrc = pkt->header_.get_ssrc();
// Try to find track from cache.
SrsRtspSendTrack *track = NULL;
ISrsRtspSendTrack *track = NULL;
if (cache_ssrc0_ == ssrc) {
track = cache_track0_;
} else if (cache_ssrc1_ == ssrc) {
@ -279,12 +294,12 @@ srs_error_t SrsRtspPlayStream::send_packet(SrsRtpPacket *&pkt)
// Find by original tracks and build fast cache.
if (!track) {
if (pkt->is_audio()) {
map<uint32_t, SrsRtspAudioSendTrack *>::iterator it = audio_tracks_.find(ssrc);
map<uint32_t, ISrsRtspSendTrack *>::iterator it = audio_tracks_.find(ssrc);
if (it != audio_tracks_.end()) {
track = it->second;
}
} else {
map<uint32_t, SrsRtspVideoSendTrack *>::iterator it = video_tracks_.find(ssrc);
map<uint32_t, ISrsRtspSendTrack *>::iterator it = video_tracks_.find(ssrc);
if (it != video_tracks_.end()) {
track = it->second;
}
@ -324,9 +339,9 @@ void SrsRtspPlayStream::set_all_tracks_status(bool status)
// set video track status
if (true) {
std::map<uint32_t, SrsRtspVideoSendTrack *>::iterator it;
std::map<uint32_t, ISrsRtspSendTrack *>::iterator it;
for (it = video_tracks_.begin(); it != video_tracks_.end(); ++it) {
SrsRtspVideoSendTrack *track = it->second;
ISrsRtspSendTrack *track = it->second;
bool previous = track->set_track_status(status);
merged_log << "{track: " << track->get_track_id() << ", is_active: " << previous << "=>" << status << "},";
@ -335,9 +350,9 @@ void SrsRtspPlayStream::set_all_tracks_status(bool status)
// set audio track status
if (true) {
std::map<uint32_t, SrsRtspAudioSendTrack *>::iterator it;
std::map<uint32_t, ISrsRtspSendTrack *>::iterator it;
for (it = audio_tracks_.begin(); it != audio_tracks_.end(); ++it) {
SrsRtspAudioSendTrack *track = it->second;
ISrsRtspSendTrack *track = it->second;
bool previous = track->set_track_status(status);
merged_log << "{track: " << track->get_track_id() << ", is_active: " << previous << "=>" << status << "},";
@ -386,12 +401,21 @@ SrsRtspConnection::SrsRtspConnection(ISrsResourceManager *cm, ISrsProtocolReadWr
delta_ = new SrsEphemeralDelta();
security_ = new SrsSecurity();
_srs_rtsp_manager->subscribe(this);
rtsp_manager_ = _srs_rtsp_manager;
stat_ = _srs_stat;
config_ = _srs_config;
rtsp_sources_ = _srs_rtsp_sources;
hooks_ = _srs_hooks;
}
void SrsRtspConnection::assemble()
{
rtsp_manager_->subscribe(this);
}
SrsRtspConnection::~SrsRtspConnection()
{
_srs_rtsp_manager->unsubscribe(this);
rtsp_manager_->unsubscribe(this);
srs_freep(request_);
srs_freep(rtsp_);
@ -418,6 +442,12 @@ SrsRtspConnection::~SrsRtspConnection()
srs_freep(cache_iov_);
}
srs_freep(cache_buffer_);
rtsp_manager_ = NULL;
stat_ = NULL;
config_ = NULL;
rtsp_sources_ = NULL;
hooks_ = NULL;
}
srs_error_t SrsRtspConnection::do_send_packet(SrsRtpPacket *pkt)
@ -495,8 +525,7 @@ srs_error_t SrsRtspConnection::cycle()
err = do_cycle();
// Update statistic when done.
SrsStatistic *stat = _srs_stat;
stat->kbps_add_delta(get_id().c_str(), delta());
stat_->kbps_add_delta(get_id().c_str(), delta());
do_teardown();
@ -546,104 +575,116 @@ srs_error_t SrsRtspConnection::do_cycle()
if ((err = rtsp_->recv_message(&req_raw)) != srs_success) {
return srs_error_wrap(err, "recv message");
}
SrsUniquePtr<SrsRtspRequest> req(req_raw);
if (req->is_options()) {
srs_trace("RTSP: OPTIONS cseq=%ld, url=%s, client=%s:%d", req->seq_, req->uri_.c_str(), ip_.c_str(), port_);
SrsUniquePtr<SrsRtspOptionsResponse> res(new SrsRtspOptionsResponse((int)req->seq_));
if ((err = rtsp_->send_message(res.get())) != srs_success) {
return srs_error_wrap(err, "response option");
}
} else if (req->is_describe()) {
// create session.
if (session_id_.empty()) {
SrsRand rand;
session_id_ = rand.gen_str(8);
}
SrsUniquePtr<SrsRtspDescribeResponse> res(new SrsRtspDescribeResponse((int)req->seq_));
res->session_ = session_id_;
std::string sdp;
if ((err = do_describe(req.get(), sdp)) != srs_success) {
res->status_ = SRS_CONSTS_RTSP_InternalServerError;
if (srs_error_code(err) == ERROR_RTSP_NO_TRACK) {
res->status_ = SRS_CONSTS_RTSP_NotFound;
} else if (srs_error_code(err) == ERROR_SYSTEM_SECURITY_DENY) {
res->status_ = SRS_CONSTS_RTSP_Forbidden;
}
srs_warn("RTSP: DESCRIBE failed: %s", srs_error_desc(err).c_str());
srs_freep(err);
}
res->sdp_ = sdp;
if ((err = rtsp_->send_message(res.get())) != srs_success) {
return srs_error_wrap(err, "response describe");
}
// Filter the \r\n to \\r\\n for JSON.
std::string local_sdp_escaped = srs_strings_replace(sdp.c_str(), "\r\n", "\\r\\n");
srs_trace("RTSP: DESCRIBE cseq=%ld, session=%s, sdp: %s", req->seq_, session_id_.c_str(), local_sdp_escaped.c_str());
} else if (req->is_setup()) {
srs_assert(req->transport_);
SrsUniquePtr<SrsRtspSetupResponse> res(new SrsRtspSetupResponse((int)req->seq_));
res->session_ = session_id_;
uint32_t ssrc = 0;
if ((err = do_setup(req.get(), &ssrc)) != srs_success) {
if (srs_error_code(err) == ERROR_RTSP_TRANSPORT_NOT_SUPPORTED) {
res->status_ = SRS_CONSTS_RTSP_UnsupportedTransport;
srs_warn("RTSP: SETUP failed: %s", srs_error_summary(err).c_str());
} else {
res->status_ = SRS_CONSTS_RTSP_InternalServerError;
srs_warn("RTSP: SETUP failed: %s", srs_error_desc(err).c_str());
}
srs_freep(err);
}
res->transport_->copy(req->transport_);
res->session_ = session_id_;
res->ssrc_ = srs_strconv_format_int(ssrc);
res->client_port_min_ = req->transport_->client_port_min_;
res->client_port_max_ = req->transport_->client_port_max_;
// TODO: FIXME: listen local port
res->local_port_min_ = 0;
res->local_port_max_ = 0;
if ((err = rtsp_->send_message(res.get())) != srs_success) {
return srs_error_wrap(err, "response setup");
}
srs_trace("RTSP: SETUP cseq=%ld, session=%s, transport=%s/%s/%s, ssrc=%u, client_port=%d-%d",
req->seq_, session_id_.c_str(), req->transport_->transport_.c_str(), req->transport_->profile_.c_str(),
req->transport_->lower_transport_.c_str(), ssrc, req->transport_->client_port_min_, req->transport_->client_port_max_);
} else if (req->is_play()) {
SrsUniquePtr<SrsRtspResponse> res(new SrsRtspResponse((int)req->seq_));
res->session_ = session_id_;
if ((err = rtsp_->send_message(res.get())) != srs_success) {
return srs_error_wrap(err, "response record");
}
if ((err = do_play(req.get(), this)) != srs_success) {
return srs_error_wrap(err, "prepare play");
}
srs_trace("RTSP: PLAY cseq=%ld, session=%s, streaming started", req->seq_, session_id_.c_str());
} else if (req->is_teardown()) {
SrsUniquePtr<SrsRtspResponse> res(new SrsRtspResponse((int)req->seq_));
res->session_ = session_id_;
if ((err = rtsp_->send_message(res.get())) != srs_success) {
return srs_error_wrap(err, "response teardown");
}
if ((err = do_teardown()) != srs_success) {
return srs_error_wrap(err, "teardown");
}
srs_trace("RTSP: TEARDOWN cseq=%ld, session=%s, streaming stopped", req->seq_, session_id_.c_str());
if ((err = on_rtsp_request(req_raw)) != srs_success) {
return srs_error_wrap(err, "on rtsp request");
}
}
return err;
}
srs_error_t SrsRtspConnection::on_rtsp_request(SrsRtspRequest *req_raw)
{
srs_error_t err = srs_success;
SrsUniquePtr<SrsRtspRequest> req(req_raw);
if (req->is_options()) {
srs_trace("RTSP: OPTIONS cseq=%ld, url=%s, client=%s:%d", req->seq_, req->uri_.c_str(), ip_.c_str(), port_);
SrsUniquePtr<SrsRtspOptionsResponse> res(new SrsRtspOptionsResponse((int)req->seq_));
if ((err = rtsp_->send_message(res.get())) != srs_success) {
return srs_error_wrap(err, "response option");
}
} else if (req->is_describe()) {
// create session.
if (session_id_.empty()) {
SrsRand rand;
session_id_ = rand.gen_str(8);
}
SrsUniquePtr<SrsRtspDescribeResponse> res(new SrsRtspDescribeResponse((int)req->seq_));
res->session_ = session_id_;
std::string sdp;
if ((err = do_describe(req.get(), sdp)) != srs_success) {
res->status_ = SRS_CONSTS_RTSP_InternalServerError;
if (srs_error_code(err) == ERROR_RTSP_NO_TRACK) {
res->status_ = SRS_CONSTS_RTSP_NotFound;
} else if (srs_error_code(err) == ERROR_SYSTEM_SECURITY_DENY) {
res->status_ = SRS_CONSTS_RTSP_Forbidden;
}
srs_warn("RTSP: DESCRIBE failed: %s", srs_error_desc(err).c_str());
srs_freep(err);
}
res->sdp_ = sdp;
if ((err = rtsp_->send_message(res.get())) != srs_success) {
return srs_error_wrap(err, "response describe");
}
// Filter the \r\n to \\r\\n for JSON.
std::string local_sdp_escaped = srs_strings_replace(sdp.c_str(), "\r\n", "\\r\\n");
srs_trace("RTSP: DESCRIBE cseq=%ld, session=%s, sdp: %s", req->seq_, session_id_.c_str(), local_sdp_escaped.c_str());
} else if (req->is_setup()) {
srs_assert(req->transport_);
SrsUniquePtr<SrsRtspSetupResponse> res(new SrsRtspSetupResponse((int)req->seq_));
res->session_ = session_id_;
uint32_t ssrc = 0;
if ((err = do_setup(req.get(), &ssrc)) != srs_success) {
if (srs_error_code(err) == ERROR_RTSP_TRANSPORT_NOT_SUPPORTED) {
res->status_ = SRS_CONSTS_RTSP_UnsupportedTransport;
srs_warn("RTSP: SETUP failed: %s", srs_error_summary(err).c_str());
} else {
res->status_ = SRS_CONSTS_RTSP_InternalServerError;
srs_warn("RTSP: SETUP failed: %s", srs_error_desc(err).c_str());
}
srs_freep(err);
}
res->transport_->copy(req->transport_);
res->session_ = session_id_;
res->ssrc_ = srs_strconv_format_int(ssrc);
res->client_port_min_ = req->transport_->client_port_min_;
res->client_port_max_ = req->transport_->client_port_max_;
// TODO: FIXME: listen local port
res->local_port_min_ = 0;
res->local_port_max_ = 0;
if ((err = rtsp_->send_message(res.get())) != srs_success) {
return srs_error_wrap(err, "response setup");
}
srs_trace("RTSP: SETUP cseq=%ld, session=%s, transport=%s/%s/%s, ssrc=%u, client_port=%d-%d",
req->seq_, session_id_.c_str(), req->transport_->transport_.c_str(), req->transport_->profile_.c_str(),
req->transport_->lower_transport_.c_str(), ssrc, req->transport_->client_port_min_, req->transport_->client_port_max_);
} else if (req->is_play()) {
SrsUniquePtr<SrsRtspResponse> res(new SrsRtspResponse((int)req->seq_));
res->session_ = session_id_;
if ((err = rtsp_->send_message(res.get())) != srs_success) {
return srs_error_wrap(err, "response record");
}
if ((err = do_play(req.get(), this)) != srs_success) {
return srs_error_wrap(err, "prepare play");
}
srs_trace("RTSP: PLAY cseq=%ld, session=%s, streaming started", req->seq_, session_id_.c_str());
} else if (req->is_teardown()) {
SrsUniquePtr<SrsRtspResponse> res(new SrsRtspResponse((int)req->seq_));
res->session_ = session_id_;
if ((err = rtsp_->send_message(res.get())) != srs_success) {
return srs_error_wrap(err, "response teardown");
}
if ((err = do_teardown()) != srs_success) {
return srs_error_wrap(err, "teardown");
}
srs_trace("RTSP: TEARDOWN cseq=%ld, session=%s, streaming stopped", req->seq_, session_id_.c_str());
}
return err;
}
void SrsRtspConnection::on_before_dispose(ISrsResource *c)
{
if (disposing_) {
@ -698,7 +739,7 @@ srs_error_t SrsRtspConnection::do_describe(SrsRtspRequest *req, std::string &sdp
request_->app_, request_->stream_, request_->port_, request_->param_);
// discovery vhost, resolve the vhost from config
SrsConfDirective *parsed_vhost = _srs_config->get_vhost(request_->vhost_);
SrsConfDirective *parsed_vhost = config_->get_vhost(request_->vhost_);
if (parsed_vhost) {
request_->vhost_ = parsed_vhost->arg0();
}
@ -711,7 +752,7 @@ srs_error_t SrsRtspConnection::do_describe(SrsRtspRequest *req, std::string &sdp
return srs_error_wrap(err, "RTSP: http_hooks_on_play");
}
if ((err = _srs_rtsp_sources->fetch_or_create(request_, source_)) != srs_success) {
if ((err = rtsp_sources_->fetch_or_create(request_, source_)) != srs_success) {
return srs_error_wrap(err, "create source");
}
@ -863,7 +904,7 @@ srs_error_t SrsRtspConnection::http_hooks_on_play(ISrsRequest *req)
{
srs_error_t err = srs_success;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost_)) {
if (!config_->get_vhost_http_hooks_enabled(req->vhost_)) {
return err;
}
@ -873,7 +914,7 @@ srs_error_t SrsRtspConnection::http_hooks_on_play(ISrsRequest *req)
std::vector<std::string> hooks;
if (true) {
SrsConfDirective *conf = _srs_config->get_vhost_on_play(req->vhost_);
SrsConfDirective *conf = config_->get_vhost_on_play(req->vhost_);
if (!conf) {
return err;
@ -884,7 +925,7 @@ srs_error_t SrsRtspConnection::http_hooks_on_play(ISrsRequest *req)
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
if ((err = _srs_hooks->on_play(url, req)) != srs_success) {
if ((err = hooks_->on_play(url, req)) != srs_success) {
return srs_error_wrap(err, "on_play %s", url.c_str());
}
}

View File

@ -31,37 +31,67 @@ class SrsRtspConnection;
class SrsSecurity;
class SrsRtspRequest;
class SrsRtspStack;
class ISrsRtspConnection;
class ISrsRtspSendTrack;
class ISrsRtspStack;
class ISrsAppFactory;
class ISrsEphemeralDelta;
class ISrsSecurity;
class ISrsStatistic;
class ISrsRtspSourceManager;
class ISrsHttpHooks;
class ISrsAppConfig;
// The handler for RTSP play stream.
class ISrsRtspPlayStream
{
public:
ISrsRtspPlayStream();
virtual ~ISrsRtspPlayStream();
public:
virtual srs_error_t initialize(ISrsRequest *request, std::map<uint32_t, SrsRtcTrackDescription *> sub_relations) = 0;
virtual srs_error_t start() = 0;
virtual void stop() = 0;
// Directly set the status of track, generally for init to set the default value.
virtual void set_all_tracks_status(bool status) = 0;
};
// A RTSP play stream, client pull and play stream from SRS.
class SrsRtspPlayStream : public ISrsCoroutineHandler, public ISrsRtcSourceChangeCallback
class SrsRtspPlayStream : public ISrsRtspPlayStream, public ISrsCoroutineHandler, public ISrsRtcSourceChangeCallback
{
private:
ISrsAppFactory *app_factory_;
ISrsStatistic *stat_;
ISrsRtspSourceManager *rtsp_sources_;
private:
SrsContextId cid_;
ISrsCoroutine *trd_;
SrsRtspConnection *session_;
ISrsRtspConnection *session_;
private:
ISrsRequest *req_;
SrsSharedPtr<SrsRtspSource> source_;
// key: publish_ssrc, value: send track to process rtp/rtcp
std::map<uint32_t, SrsRtspAudioSendTrack *> audio_tracks_;
std::map<uint32_t, SrsRtspVideoSendTrack *> video_tracks_;
std::map<uint32_t, ISrsRtspSendTrack *> audio_tracks_;
std::map<uint32_t, ISrsRtspSendTrack *> video_tracks_;
private:
// Fast cache for tracks.
uint32_t cache_ssrc0_;
uint32_t cache_ssrc1_;
uint32_t cache_ssrc2_;
SrsRtspSendTrack *cache_track0_;
SrsRtspSendTrack *cache_track1_;
SrsRtspSendTrack *cache_track2_;
ISrsRtspSendTrack *cache_track0_;
ISrsRtspSendTrack *cache_track1_;
ISrsRtspSendTrack *cache_track2_;
private:
// Whether player started.
bool is_started;
public:
SrsRtspPlayStream(SrsRtspConnection *s, const SrsContextId &cid);
SrsRtspPlayStream(ISrsRtspConnection *s, const SrsContextId &cid);
virtual ~SrsRtspPlayStream();
public:
@ -89,7 +119,7 @@ public:
};
// The handler for RTSP connection send packet.
class ISrsRtspConnection
class ISrsRtspConnection : public ISrsExpire
{
public:
ISrsRtspConnection();
@ -102,11 +132,17 @@ public:
// A RTSP session, client request and response with RTSP.
class SrsRtspConnection : public ISrsResource, // It's a resource.
public ISrsDisposingHandler,
public ISrsExpire,
public ISrsCoroutineHandler,
public ISrsStartable,
public ISrsRtspConnection
{
private:
ISrsRtspSourceManager *rtsp_sources_;
ISrsResourceManager *rtsp_manager_;
ISrsStatistic *stat_;
ISrsAppConfig *config_;
ISrsHttpHooks *hooks_;
private:
bool disposing_;
@ -126,22 +162,23 @@ private:
// The ip and port of client.
std::string ip_;
int port_;
SrsRtspStack *rtsp_;
ISrsRtspStack *rtsp_;
std::string session_id_;
SrsSharedPtr<SrsRtspSource> source_;
SrsEphemeralDelta *delta_;
ISrsEphemeralDelta *delta_;
ISrsProtocolReadWriter *skt_;
SrsSecurity *security_;
ISrsSecurity *security_;
iovec *cache_iov_;
SrsBuffer *cache_buffer_;
// key: ssrc
std::map<uint32_t, SrsRtcTrackDescription *> tracks_;
// key: ssrc
std::map<uint32_t, ISrsStreamWriter *> networks_;
SrsRtspPlayStream *player_;
ISrsRtspPlayStream *player_;
public:
SrsRtspConnection(ISrsResourceManager *cm, ISrsProtocolReadWriter *skt, std::string cip, int port);
void assemble(); // Construct object, to avoid call function in constructor.
virtual ~SrsRtspConnection();
// interface ISrsDisposingHandler
public:
@ -179,6 +216,11 @@ public:
// Interface ISrsCoroutineHandler
public:
virtual srs_error_t cycle();
private:
srs_error_t do_cycle();
srs_error_t on_rtsp_request(SrsRtspRequest *req_raw);
// Interface ISrsExpire.
public:
virtual void expire();
@ -191,9 +233,6 @@ public:
bool is_alive();
void alive();
private:
srs_error_t do_cycle();
private:
srs_error_t http_hooks_on_play(ISrsRequest *req);
srs_error_t get_ssrc_by_stream_id(uint32_t stream_id, uint32_t *ssrc);

View File

@ -1005,6 +1005,14 @@ srs_error_t SrsRtspRtpBuilder::consume_packets(vector<SrsRtpPacket *> &pkts)
return err;
}
ISrsRtspSendTrack::ISrsRtspSendTrack()
{
}
ISrsRtspSendTrack::~ISrsRtspSendTrack()
{
}
SrsRtspSendTrack::SrsRtspSendTrack(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc, bool is_audio)
{
session_ = session;
@ -1039,6 +1047,11 @@ std::string SrsRtspSendTrack::get_track_id()
return track_desc_->id_;
}
SrsRtcTrackDescription *SrsRtspSendTrack::track_desc()
{
return track_desc_;
}
SrsRtspAudioSendTrack::SrsRtspAudioSendTrack(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc)
: SrsRtspSendTrack(session, track_desc, true)
{

View File

@ -267,7 +267,20 @@ private:
srs_error_t consume_packets(std::vector<SrsRtpPacket *> &pkts);
};
class SrsRtspSendTrack
class ISrsRtspSendTrack
{
public:
ISrsRtspSendTrack();
virtual ~ISrsRtspSendTrack();
public:
virtual bool set_track_status(bool active) = 0;
virtual std::string get_track_id() = 0;
virtual SrsRtcTrackDescription *track_desc() = 0;
virtual srs_error_t on_rtp(SrsRtpPacket *pkt) = 0;
};
class SrsRtspSendTrack : public ISrsRtspSendTrack
{
public:
// send track description
@ -287,9 +300,7 @@ public:
bool set_track_status(bool active);
bool get_track_status();
std::string get_track_id();
public:
virtual srs_error_t on_rtp(SrsRtpPacket *pkt) = 0;
virtual SrsRtcTrackDescription *track_desc();
};
class SrsRtspAudioSendTrack : public SrsRtspSendTrack

View File

@ -1541,7 +1541,9 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener *listener, srs_netfd_t &stf
resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port);
#ifdef SRS_RTSP
} else if (listener == rtsp_listener_) {
resource = new SrsRtspConnection(conn_manager_, new SrsTcpConnection(stfd2), ip, port);
SrsRtspConnection *conn = new SrsRtspConnection(conn_manager_, new SrsTcpConnection(stfd2), ip, port);
conn->assemble();
resource = conn;
#endif
} else if (listener == exporter_listener_) {
// TODO: FIXME: Maybe should support https metrics.

View File

@ -68,7 +68,7 @@ class ISrsRtspSourceManager;
class ISrsLog;
class ISrsStatistic;
class ISrsHourGlass;
class SrsAppFactory;
class ISrsAppFactory;
// Initialize global shared variables cross all threads.
extern srs_error_t srs_global_initialize();
@ -110,7 +110,7 @@ private:
#endif
ISrsLog *log_;
ISrsStatistic *stat_;
SrsAppFactory *app_factory_;
ISrsAppFactory *app_factory_;
private:
ISrsHttpServeMux *http_api_mux_;

View File

@ -77,7 +77,11 @@ private:
srs_error_t recv_err_;
};
class SrsMpegtsSrtConn : public ISrsConnection, public ISrsStartable, public ISrsCoroutineHandler, public ISrsExpire
// The SRT connection, for client to publish or play stream.
class SrsMpegtsSrtConn : public ISrsConnection, // It's a resource.
public ISrsStartable,
public ISrsCoroutineHandler,
public ISrsExpire
{
public:
SrsMpegtsSrtConn(ISrsResourceManager *resource_manager, srs_srt_t srt_fd, std::string ip, int port);

View File

@ -667,6 +667,14 @@ srs_error_t SrsFlvTransmuxer::write_tag(char *header, int header_size, char *tag
return err;
}
ISrsFlvDecoder::ISrsFlvDecoder()
{
}
ISrsFlvDecoder::~ISrsFlvDecoder()
{
}
SrsFlvDecoder::SrsFlvDecoder()
{
reader_ = NULL;

View File

@ -340,8 +340,28 @@ private:
virtual srs_error_t write_tag(char *header, int header_size, char *tag, int tag_size);
};
// The interface for FLV decoder.
class ISrsFlvDecoder
{
public:
ISrsFlvDecoder();
virtual ~ISrsFlvDecoder();
public:
// Initialize the underlayer file stream.
virtual srs_error_t initialize(ISrsReader *fr) = 0;
// Read the flv header.
virtual srs_error_t read_header(char header[9]) = 0;
// Read the tag header infos.
virtual srs_error_t read_tag_header(char *ptype, int32_t *pdata_size, uint32_t *ptime) = 0;
// Read the tag data.
virtual srs_error_t read_tag_data(char *data, int32_t size) = 0;
// Read the 4bytes previous tag size.
virtual srs_error_t read_previous_tag_size(char previous_tag_size[4]) = 0;
};
// Decode flv file.
class SrsFlvDecoder
class SrsFlvDecoder : public ISrsFlvDecoder
{
private:
ISrsReader *reader_;

View File

@ -612,6 +612,14 @@ ISrsKbpsDelta::~ISrsKbpsDelta()
{
}
ISrsEphemeralDelta::ISrsEphemeralDelta()
{
}
ISrsEphemeralDelta::~ISrsEphemeralDelta()
{
}
SrsEphemeralDelta::SrsEphemeralDelta()
{
in_ = out_ = 0;
@ -636,6 +644,14 @@ void SrsEphemeralDelta::remark(int64_t *in, int64_t *out)
in_ = out_ = 0;
}
ISrsNetworkDelta::ISrsNetworkDelta()
{
}
ISrsNetworkDelta::~ISrsNetworkDelta()
{
}
SrsNetworkDelta::SrsNetworkDelta()
{
in_ = out_ = NULL;

View File

@ -300,9 +300,20 @@ public:
virtual void remark(int64_t *in, int64_t *out) = 0;
};
// The interface which provices delta of bytes. For example, we got a delta from a UDP client:
class ISrsEphemeralDelta : public ISrsKbpsDelta
{
public:
ISrsEphemeralDelta();
virtual ~ISrsEphemeralDelta();
public:
virtual void add_delta(int64_t in, int64_t out) = 0;
};
// A delta data source for SrsKbps, used in ephemeral case, for example, UDP server to increase stat when received or
// sent out each UDP packet.
class SrsEphemeralDelta : public ISrsKbpsDelta
class SrsEphemeralDelta : public ISrsEphemeralDelta
{
private:
uint64_t in_;
@ -319,8 +330,19 @@ public:
virtual void remark(int64_t *in, int64_t *out);
};
// The interface which provices delta of bytes. For example, we got a delta from a TCP client:
class ISrsNetworkDelta : public ISrsKbpsDelta
{
public:
ISrsNetworkDelta();
virtual ~ISrsNetworkDelta();
public:
virtual void set_io(ISrsProtocolStatistic *in, ISrsProtocolStatistic *out) = 0;
};
// A network delta data source for SrsKbps.
class SrsNetworkDelta : public ISrsKbpsDelta
class SrsNetworkDelta : public ISrsNetworkDelta
{
private:
ISrsProtocolStatistic *in_;

View File

@ -61,7 +61,7 @@ SrsConfig *_srs_config = NULL;
// @global kernel factory.
ISrsKernelFactory *_srs_kernel_factory = new SrsFinalFactory();
SrsAppFactory *_srs_app_factory = new SrsAppFactory();
ISrsAppFactory *_srs_app_factory = new SrsAppFactory();
// @global version of srs, which can grep keyword "XCORE"
extern const char *_srs_version;

View File

@ -267,6 +267,14 @@ srs_error_t SrsSslClient::write(void *plaintext, size_t nn_plaintext, ssize_t *n
return err;
}
ISrsHttpClient::ISrsHttpClient()
{
}
ISrsHttpClient::~ISrsHttpClient()
{
}
SrsHttpClient::SrsHttpClient()
{
transport_ = NULL;

View File

@ -53,6 +53,26 @@ public:
virtual srs_error_t write(void *buf, size_t size, ssize_t *nwrite);
};
// The interface for http client.
class ISrsHttpClient
{
public:
ISrsHttpClient();
virtual ~ISrsHttpClient();
public:
// Initialize the client.
virtual srs_error_t initialize(std::string schema, std::string h, int p, srs_utime_t tm = SRS_HTTP_CLIENT_TIMEOUT) = 0;
// Get data from the uri.
virtual srs_error_t get(std::string path, std::string req, ISrsHttpMessage **ppmsg) = 0;
// Post data to the uri.
virtual srs_error_t post(std::string path, std::string req, ISrsHttpMessage **ppmsg) = 0;
// Set receive timeout.
virtual void set_recv_timeout(srs_utime_t tm) = 0;
// Sample kbps for statistics.
virtual void kbps_sample(const char *label, srs_utime_t age) = 0;
};
// The client to GET/POST/PUT/DELETE over HTTP.
// @remark We will reuse the TCP transport until initialize or channel error,
// such as send/recv failed.
@ -60,7 +80,7 @@ public:
// SrsHttpClient hc;
// hc.initialize("127.0.0.1", 80, 9000);
// hc.post("/api/v1/version", "Hello world!", NULL);
class SrsHttpClient
class SrsHttpClient : public ISrsHttpClient
{
private:
// The underlayer TCP transport, set to NULL when disconnect, or never not NULL when connected.

View File

@ -15,6 +15,14 @@ using namespace std;
#include <srs_protocol_st.hpp>
#include <srs_protocol_utility.hpp>
ISrsBasicRtmpClient::ISrsBasicRtmpClient()
{
}
ISrsBasicRtmpClient::~ISrsBasicRtmpClient()
{
}
SrsBasicRtmpClient::SrsBasicRtmpClient(string r, srs_utime_t ctm, srs_utime_t stm)
{
kbps_ = new SrsNetworkKbps();

View File

@ -21,6 +21,44 @@ class SrsNetworkKbps;
class SrsWallClock;
class SrsAmf0Object;
// The basic RTMP client interface.
class ISrsBasicRtmpClient
{
public:
ISrsBasicRtmpClient();
virtual ~ISrsBasicRtmpClient();
public:
// Connect, handshake and connect app to RTMP server.
virtual srs_error_t connect() = 0;
// Close the connection.
virtual void close() = 0;
public:
// Publish stream to RTMP server.
virtual srs_error_t publish(int chunk_size, bool with_vhost = true, std::string *pstream = NULL) = 0;
// Play stream from RTMP server.
virtual srs_error_t play(int chunk_size, bool with_vhost = true, std::string *pstream = NULL) = 0;
// Sample kbps for statistics.
virtual void kbps_sample(const char *label, srs_utime_t age) = 0;
// Get stream ID.
virtual int sid() = 0;
public:
// Receive RTMP message from server.
virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg) = 0;
// Decode RTMP message to packet.
virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket) = 0;
// Send media messages to server.
virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs) = 0;
// Send media message to server.
virtual srs_error_t send_and_free_message(SrsMediaPacket *msg) = 0;
public:
// Set receive timeout.
virtual void set_recv_timeout(srs_utime_t timeout) = 0;
};
// The simple RTMP client, provides friendly APIs.
// @remark Should never use client when closed.
// Usage:
@ -28,7 +66,7 @@ class SrsAmf0Object;
// client.connect();
// client.play();
// client.close();
class SrsBasicRtmpClient
class SrsBasicRtmpClient : public ISrsBasicRtmpClient
{
private:
std::string url_;

View File

@ -399,6 +399,14 @@ srs_error_t SrsRtspPlayResponse::encode_header(stringstream &ss)
return srs_success;
}
ISrsRtspStack::ISrsRtspStack()
{
}
ISrsRtspStack::~ISrsRtspStack()
{
}
SrsRtspStack::SrsRtspStack(ISrsProtocolReadWriter *s)
{
buf_ = new SrsSimpleStream();

View File

@ -345,8 +345,27 @@ protected:
virtual srs_error_t encode_header(std::stringstream &ss);
};
// The interface for rtsp stack.
class ISrsRtspStack
{
public:
ISrsRtspStack();
virtual ~ISrsRtspStack();
public:
// Recv rtsp message from underlayer io.
// @param preq the output rtsp request message, which user must free it.
// @return an int error code.
// ERROR_RTSP_REQUEST_HEADER_EOF indicates request header EOF.
virtual srs_error_t recv_message(SrsRtspRequest **preq) = 0;
// Send rtsp message over underlayer io.
// @param res the rtsp response message, which user should never free it.
// @return an int error code.
virtual srs_error_t send_message(SrsRtspResponse *res) = 0;
};
// The rtsp protocol stack to parse the rtsp packets.
class SrsRtspStack
class SrsRtspStack : public ISrsRtspStack
{
private:
// The cached bytes buffer.

View File

@ -51,7 +51,7 @@ bool _srs_config_by_env = false;
// @global kernel factory.
ISrsKernelFactory *_srs_kernel_factory = new SrsFinalFactory();
SrsAppFactory *_srs_app_factory = new SrsAppFactory();
ISrsAppFactory *_srs_app_factory = new SrsAppFactory();
// The binary name of SRS.
const char *_srs_binary = NULL;

View File

@ -606,7 +606,7 @@ VOID TEST(ServerTest, SetupTicksWithStatsAndHeartbeat)
// Create and inject mock app factory
MockAppFactoryForSetupTicks *mock_factory = new MockAppFactoryForSetupTicks();
SrsAppFactory *original_factory = server->app_factory_;
ISrsAppFactory *original_factory = server->app_factory_;
server->app_factory_ = mock_factory;
// Test major use scenario: setup_ticks with stats and heartbeat enabled

View File

@ -156,7 +156,7 @@ public:
virtual void untick(int event);
};
// Mock SrsAppFactory for testing SrsServer::setup_ticks()
// Mock ISrsAppFactory for testing SrsServer::setup_ticks()
class MockAppFactoryForSetupTicks : public SrsAppFactory
{
public:

View File

@ -3372,6 +3372,11 @@ srs_error_t MockRtspConnection::do_send_packet(SrsRtpPacket *pkt)
return srs_error_copy(send_error_);
}
void MockRtspConnection::expire()
{
// Mock implementation - does nothing for testing purposes
}
void MockRtspConnection::set_send_error(srs_error_t err)
{
srs_freep(send_error_);

View File

@ -152,6 +152,7 @@ public:
MockRtspConnection();
virtual ~MockRtspConnection();
virtual srs_error_t do_send_packet(SrsRtpPacket *pkt);
virtual void expire();
void set_send_error(srs_error_t err);
void reset();
};

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,481 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_UTEST_APP13_HPP
#define SRS_UTEST_APP13_HPP
/*
#include <srs_utest_app13.hpp>
*/
#include <srs_utest.hpp>
#include <srs_app_config.hpp>
#include <srs_app_edge.hpp>
#include <srs_app_factory.hpp>
#include <srs_app_rtc_source.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_rtsp_conn.hpp>
#include <srs_app_rtsp_source.hpp>
#include <srs_app_statistic.hpp>
#include <srs_kernel_balance.hpp>
#include <srs_kernel_flv.hpp>
#include <srs_protocol_http_client.hpp>
#include <srs_protocol_http_stack.hpp>
#include <srs_protocol_json.hpp>
#include <srs_protocol_rtmp_conn.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_protocol_rtsp_stack.hpp>
#include <srs_utest_app6.hpp>
#include <srs_utest_app12.hpp>
// Mock request class for testing edge upstream
class MockEdgeRequest : public ISrsRequest
{
public:
MockEdgeRequest(std::string vhost = "__defaultVhost__", std::string app = "live", std::string stream = "test");
virtual ~MockEdgeRequest();
virtual ISrsRequest *copy();
virtual std::string get_stream_url();
virtual void update_auth(ISrsRequest *req);
virtual void strip();
virtual ISrsRequest *as_http();
};
// Mock config class for testing edge upstream
class MockEdgeConfig : public MockAppConfig
{
public:
SrsConfDirective *edge_origin_directive_;
std::string edge_transform_vhost_;
int chunk_size_;
public:
MockEdgeConfig();
virtual ~MockEdgeConfig();
void reset();
public:
// Override methods needed for edge upstream testing
virtual SrsConfDirective *get_vhost_edge_origin(std::string vhost);
virtual std::string get_vhost_edge_transform_vhost(std::string vhost);
virtual int get_chunk_size(std::string vhost);
virtual srs_utime_t get_vhost_edge_origin_connect_timeout(std::string vhost);
virtual srs_utime_t get_vhost_edge_origin_stream_timeout(std::string vhost);
};
// Mock RTMP client for testing edge upstream
class MockEdgeRtmpClient : public ISrsBasicRtmpClient
{
public:
bool connect_called_;
bool play_called_;
bool close_called_;
bool recv_message_called_;
bool decode_message_called_;
bool set_recv_timeout_called_;
bool kbps_sample_called_;
srs_error_t connect_error_;
srs_error_t play_error_;
std::string play_stream_;
srs_utime_t recv_timeout_;
std::string kbps_label_;
srs_utime_t kbps_age_;
public:
MockEdgeRtmpClient();
virtual ~MockEdgeRtmpClient();
public:
virtual srs_error_t connect();
virtual void close();
virtual srs_error_t publish(int chunk_size, bool with_vhost = true, std::string *pstream = NULL);
virtual srs_error_t play(int chunk_size, bool with_vhost = true, std::string *pstream = NULL);
virtual void kbps_sample(const char *label, srs_utime_t age);
virtual int sid();
virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg);
virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket);
virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs);
virtual srs_error_t send_and_free_message(SrsMediaPacket *msg);
virtual void set_recv_timeout(srs_utime_t timeout);
};
// Mock app factory for testing edge upstream
class MockEdgeAppFactory : public SrsAppFactory
{
public:
MockEdgeRtmpClient *mock_client_;
public:
MockEdgeAppFactory();
virtual ~MockEdgeAppFactory();
public:
virtual ISrsBasicRtmpClient *create_rtmp_client(std::string url, srs_utime_t cto, srs_utime_t sto);
};
// Mock HTTP client for testing edge FLV upstream
class MockEdgeHttpClient : public ISrsHttpClient
{
public:
bool initialize_called_;
bool get_called_;
bool set_recv_timeout_called_;
bool kbps_sample_called_;
srs_error_t initialize_error_;
srs_error_t get_error_;
ISrsHttpMessage *mock_response_;
std::string schema_;
std::string host_;
int port_;
std::string path_;
std::string kbps_label_;
srs_utime_t kbps_age_;
public:
MockEdgeHttpClient();
virtual ~MockEdgeHttpClient();
public:
virtual srs_error_t initialize(std::string schema, std::string h, int p, srs_utime_t tm);
virtual srs_error_t get(std::string path, std::string req, ISrsHttpMessage **ppmsg);
virtual srs_error_t post(std::string path, std::string req, ISrsHttpMessage **ppmsg);
virtual void set_recv_timeout(srs_utime_t tm);
virtual void kbps_sample(const char *label, srs_utime_t age);
};
// Mock HTTP message for testing edge FLV upstream
class MockEdgeHttpMessage : public ISrsHttpMessage
{
public:
int status_code_;
SrsHttpHeader *header_;
ISrsHttpResponseReader *body_reader_;
public:
MockEdgeHttpMessage();
virtual ~MockEdgeHttpMessage();
public:
virtual uint8_t message_type();
virtual uint8_t method();
virtual uint16_t status_code();
virtual std::string method_str();
virtual bool is_http_get();
virtual bool is_http_put();
virtual bool is_http_post();
virtual bool is_http_delete();
virtual bool is_http_options();
virtual std::string uri();
virtual std::string url();
virtual std::string host();
virtual std::string path();
virtual std::string query();
virtual std::string ext();
virtual srs_error_t body_read_all(std::string &body);
virtual ISrsHttpResponseReader *body_reader();
virtual int64_t content_length();
virtual std::string query_get(std::string key);
virtual SrsHttpHeader *header();
virtual bool is_jsonp();
virtual bool is_keep_alive();
virtual std::string parse_rest_id(std::string pattern);
};
// Mock file reader for testing edge FLV upstream
class MockEdgeFileReader : public ISrsFileReader
{
public:
char *data_;
int size_;
int pos_;
public:
MockEdgeFileReader(const char *data, int size);
virtual ~MockEdgeFileReader();
public:
virtual srs_error_t open(std::string p);
virtual void close();
virtual bool is_open();
virtual int64_t tellg();
virtual void skip(int64_t size);
virtual int64_t seek2(int64_t offset);
virtual int64_t filesize();
virtual srs_error_t read(void *buf, size_t count, ssize_t *pnread);
virtual srs_error_t lseek(off_t offset, int whence, off_t *seeked);
};
// Mock FLV decoder for testing edge FLV upstream
class MockEdgeFlvDecoder : public ISrsFlvDecoder
{
public:
bool initialize_called_;
bool read_header_called_;
bool read_previous_tag_size_called_;
public:
MockEdgeFlvDecoder();
virtual ~MockEdgeFlvDecoder();
public:
virtual srs_error_t initialize(ISrsReader *fr);
virtual srs_error_t read_header(char header[9]);
virtual srs_error_t read_tag_header(char *ptype, int32_t *pdata_size, uint32_t *ptime);
virtual srs_error_t read_tag_data(char *data, int32_t size);
virtual srs_error_t read_previous_tag_size(char previous_tag_size[4]);
};
// Mock app factory for testing edge FLV upstream
class MockEdgeFlvAppFactory : public SrsAppFactory
{
public:
MockEdgeHttpClient *mock_http_client_;
MockEdgeFileReader *mock_file_reader_;
MockEdgeFlvDecoder *mock_flv_decoder_;
public:
MockEdgeFlvAppFactory();
virtual ~MockEdgeFlvAppFactory();
public:
virtual ISrsHttpClient *create_http_client();
virtual ISrsFileReader *create_http_file_reader(ISrsHttpResponseReader *r);
virtual ISrsFlvDecoder *create_flv_decoder();
};
// Mock play edge for testing SrsEdgeIngester
class MockPlayEdge : public ISrsPlayEdge
{
public:
int on_ingest_play_count_;
srs_error_t on_ingest_play_error_;
public:
MockPlayEdge();
virtual ~MockPlayEdge();
public:
virtual srs_error_t on_ingest_play();
void reset();
};
// Mock edge upstream for testing SrsEdgeIngester::process_publish_message
class MockEdgeUpstreamForIngester : public ISrsEdgeUpstream
{
public:
bool decode_message_called_;
SrsRtmpCommand *decode_message_packet_;
srs_error_t decode_message_error_;
public:
MockEdgeUpstreamForIngester();
virtual ~MockEdgeUpstreamForIngester();
public:
virtual srs_error_t connect(ISrsRequest *r, ISrsLbRoundRobin *lb);
virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg);
virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket);
virtual void close();
virtual void selected(std::string &server, int &port);
virtual void set_recv_timeout(srs_utime_t tm);
virtual void kbps_sample(const char *label, srs_utime_t age);
void reset();
};
// Mock publish edge for testing SrsEdgeForwarder
class MockPublishEdge : public ISrsPublishEdge
{
public:
MockPublishEdge();
virtual ~MockPublishEdge();
};
// Mock edge ingester for testing SrsPlayEdge
class MockEdgeIngester : public ISrsEdgeIngester
{
public:
bool initialize_called_;
bool start_called_;
bool stop_called_;
srs_error_t initialize_error_;
srs_error_t start_error_;
public:
MockEdgeIngester();
virtual ~MockEdgeIngester();
public:
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, ISrsPlayEdge *e, ISrsRequest *r);
virtual srs_error_t start();
virtual void stop();
void reset();
};
// Mock edge forwarder for testing SrsPublishEdge
class MockEdgeForwarder : public ISrsEdgeForwarder
{
public:
bool initialize_called_;
bool start_called_;
bool stop_called_;
bool set_queue_size_called_;
bool proxy_called_;
srs_utime_t queue_size_;
srs_error_t initialize_error_;
srs_error_t start_error_;
srs_error_t proxy_error_;
public:
MockEdgeForwarder();
virtual ~MockEdgeForwarder();
public:
virtual void set_queue_size(srs_utime_t queue_size);
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, ISrsPublishEdge *e, ISrsRequest *r);
virtual srs_error_t start();
virtual void stop();
virtual srs_error_t proxy(SrsRtmpCommonMessage *msg);
void reset();
};
// Mock ISrsStatistic for testing SrsRtspPlayStream
class MockStatisticForRtspPlayStream : public ISrsStatistic
{
public:
int on_client_count_;
srs_error_t on_client_error_;
public:
MockStatisticForRtspPlayStream();
virtual ~MockStatisticForRtspPlayStream();
public:
virtual void on_disconnect(std::string id, srs_error_t err);
virtual srs_error_t on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type);
virtual srs_error_t on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height);
virtual srs_error_t on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object);
virtual void on_stream_publish(ISrsRequest *req, std::string publisher_id);
virtual void on_stream_close(ISrsRequest *req);
virtual void kbps_add_delta(std::string id, ISrsKbpsDelta *delta);
virtual void kbps_sample();
virtual srs_error_t on_video_frames(ISrsRequest *req, int nb_frames);
virtual std::string server_id();
virtual std::string service_id();
virtual std::string service_pid();
virtual SrsStatisticVhost *find_vhost_by_id(std::string vid);
virtual SrsStatisticStream *find_stream(std::string sid);
virtual SrsStatisticClient *find_client(std::string client_id);
virtual srs_error_t dumps_vhosts(SrsJsonArray *arr);
virtual srs_error_t dumps_streams(SrsJsonArray *arr, int start, int count);
virtual srs_error_t dumps_clients(SrsJsonArray *arr, int start, int count);
virtual srs_error_t dumps_metrics(int64_t &send_bytes, int64_t &recv_bytes, int64_t &nstreams, int64_t &nclients, int64_t &total_nclients, int64_t &nerrs);
void reset();
};
// Mock ISrsRtspSourceManager for testing SrsRtspPlayStream
class MockRtspSourceManager : public ISrsRtspSourceManager
{
public:
int fetch_or_create_count_;
srs_error_t fetch_or_create_error_;
SrsSharedPtr<SrsRtspSource> mock_source_;
public:
MockRtspSourceManager();
virtual ~MockRtspSourceManager();
public:
virtual srs_error_t initialize();
virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsRtspSource> &pps);
virtual SrsSharedPtr<SrsRtspSource> fetch(ISrsRequest *r);
void reset();
};
// Mock ISrsRtspSendTrack for testing SrsRtspPlayStream
class MockRtspSendTrack : public ISrsRtspSendTrack
{
public:
std::string track_id_;
SrsRtcTrackDescription *track_desc_;
bool track_status_;
int on_rtp_count_;
uint32_t last_ssrc_;
uint16_t last_sequence_;
public:
MockRtspSendTrack(std::string track_id, SrsRtcTrackDescription *desc);
virtual ~MockRtspSendTrack();
public:
virtual bool set_track_status(bool active);
virtual std::string get_track_id();
virtual SrsRtcTrackDescription *track_desc();
virtual srs_error_t on_rtp(SrsRtpPacket *pkt);
void reset();
};
// Mock ISrsAppFactory for testing SrsRtspPlayStream
class MockAppFactoryForRtspPlayStream : public SrsAppFactory
{
public:
int create_rtsp_audio_send_track_count_;
int create_rtsp_video_send_track_count_;
public:
MockAppFactoryForRtspPlayStream();
virtual ~MockAppFactoryForRtspPlayStream();
public:
virtual ISrsRtspSendTrack *create_rtsp_audio_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc);
virtual ISrsRtspSendTrack *create_rtsp_video_send_track(ISrsRtspConnection *session, SrsRtcTrackDescription *track_desc);
void reset();
};
// Mock ISrsRtspStack for testing SrsRtspConnection
class MockRtspStack : public ISrsRtspStack
{
public:
bool send_message_called_;
int last_response_seq_;
std::string last_response_session_;
std::string last_response_type_; // "OPTIONS", "DESCRIBE", "SETUP", "PLAY", "TEARDOWN"
srs_error_t send_message_error_;
public:
MockRtspStack();
virtual ~MockRtspStack();
public:
virtual srs_error_t recv_message(SrsRtspRequest **preq);
virtual srs_error_t send_message(SrsRtspResponse *res);
void reset();
};
// Mock ISrsRtspPlayStream for testing SrsRtspConnection::do_play and do_teardown
class MockRtspPlayStream : public ISrsRtspPlayStream
{
public:
bool initialize_called_;
bool start_called_;
bool stop_called_;
bool set_all_tracks_status_called_;
bool set_all_tracks_status_value_;
srs_error_t initialize_error_;
srs_error_t start_error_;
public:
MockRtspPlayStream();
virtual ~MockRtspPlayStream();
public:
virtual srs_error_t initialize(ISrsRequest *request, std::map<uint32_t, SrsRtcTrackDescription *> sub_relations);
virtual srs_error_t start();
virtual void stop();
virtual void set_all_tracks_status(bool status);
void reset();
};
#endif

View File

@ -400,6 +400,9 @@ public:
virtual bool get_vhost_http_remux_has_video(std::string vhost) { return true; }
virtual bool get_vhost_http_remux_guess_has_av(std::string vhost) { return true; }
virtual std::string get_vhost_http_remux_mount(std::string vhost) { return ""; }
virtual std::string get_vhost_edge_protocol(std::string vhost) { return "rtmp"; }
virtual bool get_vhost_edge_follow_client(std::string vhost) { return false; }
virtual std::string get_vhost_edge_transform_vhost(std::string vhost) { return ""; }
void set_http_hooks_enabled(bool enabled);
void set_on_stop_urls(const std::vector<std::string> &urls);
void clear_on_stop_directive();

View File

@ -1580,6 +1580,40 @@ SrsRtmpFormat *MockLiveSourceForOriginHub::format()
return format_;
}
srs_error_t MockLiveSourceForOriginHub::on_source_id_changed(SrsContextId id)
{
return srs_success;
}
srs_error_t MockLiveSourceForOriginHub::on_publish()
{
return srs_success;
}
void MockLiveSourceForOriginHub::on_unpublish()
{
}
srs_error_t MockLiveSourceForOriginHub::on_audio(SrsRtmpCommonMessage *audio)
{
return srs_success;
}
srs_error_t MockLiveSourceForOriginHub::on_video(SrsRtmpCommonMessage *video)
{
return srs_success;
}
srs_error_t MockLiveSourceForOriginHub::on_aggregate(SrsRtmpCommonMessage *msg)
{
return srs_success;
}
srs_error_t MockLiveSourceForOriginHub::on_meta_data(SrsRtmpCommonMessage *msg, SrsOnMetaDataPacket *metadata)
{
return srs_success;
}
// Unit test for SrsOriginHub::initialize typical scenario
VOID TEST(AppOriginHubTest, InitializeTypicalScenario)
{

View File

@ -181,6 +181,13 @@ public:
virtual SrsContextId pre_source_id();
virtual SrsMetaCache *meta();
virtual SrsRtmpFormat *format();
virtual srs_error_t on_source_id_changed(SrsContextId id);
virtual srs_error_t on_publish();
virtual void on_unpublish();
virtual srs_error_t on_audio(SrsRtmpCommonMessage *audio);
virtual srs_error_t on_video(SrsRtmpCommonMessage *video);
virtual srs_error_t on_aggregate(SrsRtmpCommonMessage *msg);
virtual srs_error_t on_meta_data(SrsRtmpCommonMessage *msg, SrsOnMetaDataPacket *metadata);
};
// Mock ISrsStatistic for testing SrsOriginHub::on_video
@ -297,7 +304,7 @@ public:
virtual void untick(int event);
};
// Mock SrsAppFactory for testing SrsLiveSourceManager::fetch_or_create
// Mock ISrsAppFactory for testing SrsLiveSourceManager::fetch_or_create
class MockAppFactoryForSourceManager : public SrsAppFactory
{
public:
@ -331,7 +338,7 @@ public:
virtual void on_unpublish();
};
// Mock SrsAppFactory for testing SrsLiveSource::initialize
// Mock ISrsAppFactory for testing SrsLiveSource::initialize
class MockAppFactoryForLiveSource : public SrsAppFactory
{
public: