AI: Add workflow utest for RTMP conn

This commit is contained in:
OSSRS-AI 2025-10-17 22:19:02 -04:00 committed by winlin
parent 054d3a3563
commit f86c1348b1
44 changed files with 1315 additions and 1346 deletions

3
trunk/configure vendored
View File

@ -385,7 +385,8 @@ if [[ $SRS_UTEST == YES ]]; then
"srs_utest_protocol3" "srs_utest_app" "srs_utest_app2" "srs_utest_app3" "srs_utest_app4"
"srs_utest_app5" "srs_utest_app6" "srs_utest_app7" "srs_utest_app8" "srs_utest_app9"
"srs_utest_app10" "srs_utest_app11" "srs_utest_app15" "srs_utest_app16" "srs_utest_app17"
"srs_utest_mock" "srs_utest_rtc_playstream" "srs_utest_rtc_publishstream" "srs_utest_rtc_conn")
"srs_utest_mock" "srs_utest_rtc_playstream" "srs_utest_rtc_publishstream" "srs_utest_rtc_conn"
"srs_utest_rtmp_conn")
# Always include SRT utest
MODULE_FILES+=("srs_utest_srt")
if [[ $SRS_GB28181 == YES ]]; then

View File

@ -616,6 +616,8 @@ public:
virtual std::vector<std::string> get_engine_aparams(SrsConfDirective *conf) = 0;
virtual std::string get_engine_oformat(SrsConfDirective *conf) = 0;
virtual std::string get_engine_output(SrsConfDirective *conf) = 0;
virtual bool get_security_enabled(std::string vhost) = 0;
virtual SrsConfDirective *get_security_rules(std::string vhost) = 0;
};
// The config service provider.
@ -689,16 +691,14 @@ public:
SRS_DECLARE_PROTECTED: // clang-format on
// Reload from the config.
// @remark, use protected for the utest to override with mock.
virtual srs_error_t
reload_conf(SrsConfig *conf);
virtual srs_error_t reload_conf(SrsConfig *conf);
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Parse options and file
public:
// Parse the cli, the main(argc,argv) function.
virtual srs_error_t
parse_options(int argc, char **argv);
virtual srs_error_t parse_options(int argc, char **argv);
// initialize the cwd for server,
// because we may change the workdir.
virtual srs_error_t initialize_cwd();
@ -723,8 +723,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Parse each argv.
virtual srs_error_t
parse_argv(int &i, char **argv);
virtual srs_error_t parse_argv(int &i, char **argv);
// Print help and exit.
virtual void print_help(char **argv);
@ -735,8 +734,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Build a buffer from a src, which is string content or filename.
virtual srs_error_t
build_buffer(std::string src, srs_internal::SrsConfigBuffer **pbuffer);
virtual srs_error_t build_buffer(std::string src, srs_internal::SrsConfigBuffer **pbuffer);
public:
// Check the parsed config.
@ -752,8 +750,7 @@ SRS_DECLARE_PROTECTED: // clang-format on
// Parse config from the buffer.
// @param buffer, the config buffer, user must delete it.
// @remark, use protected for the utest to override with mock.
virtual srs_error_t
parse_buffer(srs_internal::SrsConfigBuffer *buffer);
virtual srs_error_t parse_buffer(srs_internal::SrsConfigBuffer *buffer);
// global env
public:
// Get the current work directory.

View File

@ -380,8 +380,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Create session if no one, or bind to an existed session.
srs_error_t
bind_session(uint32_t ssrc, ISrsGbSession **psession);
srs_error_t bind_session(uint32_t ssrc, ISrsGbSession **psession);
};
// The interface for mpegps queue.
@ -494,8 +493,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Connect to RTMP server.
virtual srs_error_t
connect();
virtual srs_error_t connect();
// Close the connection to RTMP server.
virtual void close();
};
@ -533,8 +531,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Decode the RTP payload as PS pack stream.
virtual srs_error_t
decode(SrsBuffer *stream, ISrsPsMessageHandler *handler);
virtual srs_error_t decode(SrsBuffer *stream, ISrsPsMessageHandler *handler);
// When got error, drop data and enter recover mode.
srs_error_t enter_recover_mode(SrsBuffer *stream, ISrsPsMessageHandler *handler, int pos, srs_error_t err);
// Quit Recover mode when got pack header.

View File

@ -592,8 +592,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
// close current segment, open a new segment,
// then write the key frame to the new segment.
// so, user must reap_segment then flush_video to hls muxer.
virtual srs_error_t
reap_segment();
virtual srs_error_t reap_segment();
};
// HLS controller for fMP4 (.m4s) segments with init.mp4.

View File

@ -85,8 +85,7 @@ SRS_DECLARE_PROTECTED: // clang-format on
// For example, http://server/file.flv?start=10240
// server will write flv header and sequence header,
// then seek(10240) and response flv tag data.
virtual srs_error_t
serve_flv_stream(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, std::string fullpath, int64_t offset);
virtual srs_error_t serve_flv_stream(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, std::string fullpath, int64_t offset);
// Support mp4 with start and offset in query string.
virtual srs_error_t serve_mp4_stream(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, std::string fullpath, int64_t start, int64_t end);
// Support HLS streaming with pseudo session id.

View File

@ -125,8 +125,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Notify FFMPEG to fast stop.
virtual void
fast_stop();
virtual void fast_stop();
// When SRS quit, directly kill FFMPEG after fast stop.
virtual void fast_kill();
// Interface ISrsReusableThreadHandler.

View File

@ -177,8 +177,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Connect to RTMP server.
virtual srs_error_t
connect();
virtual srs_error_t connect();
// Close the connection to RTMP server.
virtual void close();
};

View File

@ -92,8 +92,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Redirect standard I/O.
virtual srs_error_t
redirect_io();
virtual srs_error_t redirect_io();
public:
// Start the process, ignore when already started.

View File

@ -228,6 +228,11 @@ srs_error_t SrsQueueRecvThread::error_code()
srs_error_t SrsQueueRecvThread::consume(SrsRtmpCommonMessage *msg)
{
// Ignore empty message, which is generated when io closing.
if (!msg) {
return srs_success;
}
// put into queue, the send thread will get and process it,
// @see SrsRtmpConn::process_play_control_msg
queue_.push_back(msg);

View File

@ -291,8 +291,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// The stream source changed.
virtual srs_error_t
on_source_changed();
virtual srs_error_t on_source_changed();
public:
// Get current source id.
@ -387,8 +386,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Lazy initialization methods
srs_error_t
initialize_audio_track(SrsAudioCodecId codec);
srs_error_t initialize_audio_track(SrsAudioCodecId codec);
srs_error_t initialize_video_track(SrsVideoCodecId codec);
public:

View File

@ -120,6 +120,11 @@ srs_netfd_t SrsRtmpTransport::fd()
return stfd_;
}
int SrsRtmpTransport::osfd()
{
return srs_netfd_fileno(stfd_);
}
ISrsProtocolReadWriter *SrsRtmpTransport::io()
{
return skt_;
@ -292,7 +297,7 @@ srs_error_t SrsRtmpConn::do_cycle()
{
srs_error_t err = srs_success;
srs_trace("RTMP client transport=%s, ip=%s:%d, fd=%d", transport_->transport_type(), ip_.c_str(), port_, srs_netfd_fileno(transport_->fd()));
srs_trace("RTMP client transport=%s, ip=%s:%d, fd=%d", transport_->transport_type(), ip_.c_str(), port_, transport_->osfd());
if ((err = transport_->handshake()) != srs_success) {
return srs_error_wrap(err, "transport handshake");
@ -395,7 +400,7 @@ srs_error_t SrsRtmpConn::service_cycle()
}
// get the ip which client connected.
std::string local_ip = srs_get_local_ip(srs_netfd_fileno(transport_->fd()));
std::string local_ip = srs_get_local_ip(transport_->osfd());
// set chunk size to larger.
// set the chunk size before any larger response greater than 128,
@ -782,17 +787,18 @@ srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveC
}
}
// quit when recv thread error.
if ((err = rtrd->error_code()) != srs_success) {
return srs_error_wrap(err, "rtmp: recv thread");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// wait for message to incoming.
// @see https://github.com/ossrs/srs/issues/257
consumer->wait(mw_msgs_, mw_sleep_);
#endif
// Quit when recv thread error. Check recv thread error when wakeup, in order
// to detect the client disconnecting event.
if ((err = rtrd->error_code()) != srs_success) {
return srs_error_wrap(err, "rtmp: recv thread");
}
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
// @remark when enable send_min_interval, only fetch one message a time.
@ -892,7 +898,7 @@ srs_error_t SrsRtmpConn::publishing(SrsSharedPtr<SrsLiveSource> source)
if ((err = acquire_err) == srs_success) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
SrsPublishRecvThread rtrd(rtmp_, req, srs_netfd_fileno(transport_->fd()), 0, this, source, _srs_context->get_id());
SrsPublishRecvThread rtrd(rtmp_, req, transport_->osfd(), 0, this, source, _srs_context->get_id());
rtrd.assemble();
err = do_publishing(source, &rtrd);
@ -963,7 +969,8 @@ srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPu
rtrd->wait(publish_normal_timeout_);
}
// check the thread error code.
// Quit when recv thread error. Check recv thread error when wakeup, in order
// to detect the client disconnecting event.
if ((err = rtrd->error_code()) != srs_success) {
return srs_error_wrap(err, "rtmp: receive thread");
}
@ -1553,6 +1560,12 @@ srs_error_t SrsRtmpConn::start()
return err;
}
void SrsRtmpConn::stop()
{
trd_->interrupt();
trd_->stop();
}
srs_error_t SrsRtmpConn::cycle()
{
srs_error_t err = srs_success;

View File

@ -39,6 +39,7 @@ class ISrsWakable;
class SrsRtmpCommonMessage;
class SrsRtmpCommand;
class SrsNetworkDelta;
class ISrsNetworkDelta;
class ISrsAppConfig;
class SrsSslConnection;
class ISrsResourceManager;
@ -96,6 +97,7 @@ public:
public:
virtual srs_netfd_t fd() = 0;
virtual int osfd() = 0;
virtual ISrsProtocolReadWriter *io() = 0;
virtual srs_error_t handshake() = 0;
virtual const char *transport_type() = 0;
@ -120,6 +122,7 @@ public:
public:
// Get the file descriptor for logging and identification
virtual srs_netfd_t fd();
virtual int osfd();
// Get the appropriate I/O interface (TCP)
virtual ISrsProtocolReadWriter *io();
// Perform handshake (no-op for plain RTMP)
@ -222,7 +225,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
std::string ip_;
int port_;
// The delta for statistic.
SrsNetworkDelta *delta_;
ISrsNetworkDelta *delta_;
SrsNetworkKbps *kbps_;
// The create time in milliseconds.
// for current connection to log self create time and calculate the living time.
@ -246,8 +249,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// When valid and connected to vhost/app, service the client.
virtual srs_error_t
service_cycle();
virtual srs_error_t service_cycle();
// The stream(play/publish) service cycle, identify client first.
virtual srs_error_t stream_service_cycle();
virtual srs_error_t check_vhost(bool try_default_vhost);
@ -271,8 +273,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
SRS_DECLARE_PRIVATE: // clang-format on
// When the connection disconnect, call this method.
// e.g. log msg of connection and report to other system.
virtual srs_error_t
on_disconnect();
virtual srs_error_t on_disconnect();
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
@ -293,6 +294,7 @@ public:
// when client cycle thread stop, invoke the on_thread_stop(), which will use server
// To remove the client by server->remove(this).
virtual srs_error_t start();
virtual void stop();
// Interface ISrsCoroutineHandler
public:
// The thread cycle function,

View File

@ -268,7 +268,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsMediaPacket *msg, bool *is_overflow)
// If jitter is off, the timestamp of first sequence header is zero, which wll cause SRS to shrink and drop the
// keyframes even if there is not overflow packets in queue, so we must ignore the zero timestamps, please
// @see https://github.com/ossrs/srs/pull/2186#issuecomment-953383063
if (msg->is_av() && msg->timestamp_ != 0) {
if (msg->is_av() && (msg->timestamp_ != 0 || av_end_time_ == -1)) {
if (av_start_time_ == -1) {
av_start_time_ = srs_utime_t(msg->timestamp_ * SRS_UTIME_MILLISECONDS);
}
@ -495,7 +495,7 @@ srs_error_t SrsLiveConsumer::enqueue(SrsMediaPacket *shared_msg, bool atc, SrsRt
}
// when duration ok, signal to flush.
if (match_min_msgs && duration > mw_duration_) {
if (match_min_msgs && duration >= mw_duration_) {
srs_cond_signal(mw_wait_);
mw_waiting_ = false;
return err;
@ -2060,7 +2060,6 @@ srs_error_t SrsLiveSource::on_meta_data(SrsRtmpCommonMessage *msg, SrsOnMetaData
srs_error_t SrsLiveSource::on_audio(SrsRtmpCommonMessage *shared_audio)
{
// Detect where stream is monotonically increasing.
if (!mix_correct_ && is_monotonically_increase_) {
if (last_packet_time_ > 0 && shared_audio->header_.timestamp_ < last_packet_time_) {

View File

@ -190,8 +190,7 @@ public:
SRS_DECLARE_PRIVATE: // clang-format on
// Remove a gop from the front.
// if no iframe found, clear it.
virtual void
shrink();
virtual void shrink();
public:
// clear all messages in queue.

View File

@ -179,8 +179,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// The stream source changed.
virtual srs_error_t
on_source_changed();
virtual srs_error_t on_source_changed();
public:
// Get current source id.
@ -257,8 +256,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Lazy initialization methods
srs_error_t
initialize_audio_track(SrsAudioCodecId codec);
srs_error_t initialize_audio_track(SrsAudioCodecId codec);
srs_error_t initialize_video_track(SrsVideoCodecId codec);
public:

View File

@ -21,10 +21,12 @@ ISrsSecurity::~ISrsSecurity()
SrsSecurity::SrsSecurity()
{
config_ = _srs_config;
}
SrsSecurity::~SrsSecurity()
{
config_ = NULL;
}
srs_error_t SrsSecurity::check(SrsRtmpConnType type, string ip, ISrsRequest *req)
@ -32,12 +34,12 @@ srs_error_t SrsSecurity::check(SrsRtmpConnType type, string ip, ISrsRequest *req
srs_error_t err = srs_success;
// allow all if security disabled.
if (!_srs_config->get_security_enabled(req->vhost_)) {
if (!config_->get_security_enabled(req->vhost_)) {
return err; // OK
}
// rules to apply
SrsConfDirective *rules = _srs_config->get_security_rules(req->vhost_);
SrsConfDirective *rules = config_->get_security_rules(req->vhost_);
return do_check(rules, type, ip, req);
}

View File

@ -15,6 +15,7 @@
#include <srs_protocol_utility.hpp>
class SrsConfDirective;
class ISrsAppConfig;
// The security interface.
class ISrsSecurity
@ -30,6 +31,10 @@ public:
// The security apply on vhost.
class SrsSecurity : public ISrsSecurity
{
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
ISrsAppConfig *config_;
public:
SrsSecurity();
virtual ~SrsSecurity();

View File

@ -233,8 +233,7 @@ public:
SRS_DECLARE_PRIVATE: // clang-format on
// When SIGTERM, SRS should do cleanup, for example,
// to stop all ingesters, cleanup HLS and dvr.
virtual void
dispose();
virtual void dispose();
// Close listener to stop accepting new connections,
// then wait and quit when all connections finished.
virtual void gracefully_dispose();
@ -291,8 +290,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
// The server thread main cycle,
// update the global static data, for instance, the current time,
// the cpu/mem/network statistic.
virtual srs_error_t
do_cycle();
virtual srs_error_t do_cycle();
virtual srs_error_t do2_cycle();
// interface ISrsHourGlassHandler
@ -304,8 +302,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Resample the server kbs.
virtual void
resample_kbps();
virtual void resample_kbps();
// SRT-related methods
virtual srs_error_t listen_srt_mpegts();
@ -316,8 +313,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// WebRTC-related methods
virtual srs_error_t
listen_rtc_udp();
virtual srs_error_t listen_rtc_udp();
// Interface ISrsUdpMuxHandler
public:
@ -437,8 +433,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Close the PID file descriptor.
virtual void
close();
virtual void close();
};
#endif

View File

@ -263,8 +263,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Cleanup the stream if stream is not active and for the last client.
void
cleanup_stream(SrsStatisticStream *stream);
void cleanup_stream(SrsStatisticStream *stream);
public:
// Sample the kbps, add delta bytes of conn.

View File

@ -195,8 +195,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Reset the shared ptr.
void
reset()
void reset()
{
if (!ref_count_)
return;

View File

@ -162,8 +162,7 @@ public:
SRS_DECLARE_PRIVATE: // clang-format on
// Cycle the hourglass, which will sleep resolution every time.
// and call handler when ticked.
virtual srs_error_t
cycle();
virtual srs_error_t cycle();
};
// To monitor the system wall clock timer deviation.

View File

@ -2526,8 +2526,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Reset the jitter state (useful for new recording sessions)
virtual void
reset();
virtual void reset();
// Check if both audio and video start times have been captured
virtual bool is_initialized();
};
@ -2581,8 +2580,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
// @param tses The temporary samples, key is offset, value is sample.
// @param tt The type of sample, convert to flv tag type.
// TODO: Support co64 for stco.
virtual srs_error_t
load_trak(std::map<uint64_t, SrsMp4Sample *> &tses, SrsFrameType tt,
virtual srs_error_t load_trak(std::map<uint64_t, SrsMp4Sample *> &tses, SrsFrameType tt,
SrsMp4MediaHeaderBox *mdhd, SrsMp4ChunkOffsetBox *stco, SrsMp4SampleSizeBox *stsz, SrsMp4Sample2ChunkBox *stsc,
SrsMp4DecodingTime2SampleBox *stts, SrsMp4CompositionTime2SampleBox *ctts, SrsMp4SyncSampleBox *stss);
};
@ -2703,8 +2701,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
SRS_DECLARE_PRIVATE: // clang-format on
// Load the next box from reader.
// @param required_box_type The box type required, 0 for any box.
virtual srs_error_t
load_next_box(SrsMp4Box **ppbox, uint32_t required_box_type);
virtual srs_error_t load_next_box(SrsMp4Box **ppbox, uint32_t required_box_type);
// @remark Never load the mdat box content, for it's too large.
virtual srs_error_t do_load_next_box(SrsMp4Box **ppbox, uint32_t required_box_type);
};
@ -2886,8 +2883,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
* | | |schi|
* | | | |tenc|
*/
virtual srs_error_t
config_sample_description_encryption(SrsMp4SampleEntry *box);
virtual srs_error_t config_sample_description_encryption(SrsMp4SampleEntry *box);
};
// The fMP4 segment encoder interface.

View File

@ -258,8 +258,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
// The packet is muxed in FLV format, defined in flv specification.
// Demux the sps/pps from sequence header.
// Demux the samples from NALUs.
virtual srs_error_t
video_avc_demux(SrsBuffer *stream, int64_t timestamp);
virtual srs_error_t video_avc_demux(SrsBuffer *stream, int64_t timestamp);
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
@ -281,16 +280,14 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Parse the H.264 SPS/PPS.
virtual srs_error_t
avc_demux_sps_pps(SrsBuffer *stream);
virtual srs_error_t avc_demux_sps_pps(SrsBuffer *stream);
virtual srs_error_t avc_demux_sps();
virtual srs_error_t avc_demux_sps_rbsp(char *rbsp, int nb_rbsp);
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Parse the H.264 or H.265 NALUs.
virtual srs_error_t
video_nalu_demux(SrsBuffer *stream);
virtual srs_error_t video_nalu_demux(SrsBuffer *stream);
// Demux the avc NALU in "AnnexB" from ISO_IEC_14496-10-AVC-2003.pdf, page 211.
virtual srs_error_t avc_demux_annexb_format(SrsBuffer *stream);
virtual srs_error_t do_avc_demux_annexb_format(SrsBuffer *stream);
@ -303,8 +300,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
// Demux the audio packet in AAC codec.
// Demux the asc from sequence header.
// Demux the sampels from RAW data.
virtual srs_error_t
audio_aac_demux(SrsBuffer *stream, int64_t timestamp);
virtual srs_error_t audio_aac_demux(SrsBuffer *stream, int64_t timestamp);
virtual srs_error_t audio_mp3_demux(SrsBuffer *stream, int64_t timestamp, bool fresh);
public:

View File

@ -87,8 +87,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// parse the HTTP message to member field: msg.
virtual srs_error_t
parse_message_imp(ISrsReader *reader);
virtual srs_error_t parse_message_imp(ISrsReader *reader);
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on

View File

@ -360,8 +360,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// For utest to mock the fs.
virtual void
set_fs_factory(ISrsFileReaderFactory *v);
virtual void set_fs_factory(ISrsFileReaderFactory *v);
// For utest to mock the path utility.
virtual void set_path(SrsPath *v);
@ -371,16 +370,14 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Serve the file by specified path
virtual srs_error_t
serve_file(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, std::string fullpath);
virtual srs_error_t serve_file(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, std::string fullpath);
virtual srs_error_t serve_flv_file(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, std::string fullpath);
virtual srs_error_t serve_mp4_file(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, std::string fullpath);
// clang-format off
SRS_DECLARE_PROTECTED: // clang-format on
// When access flv file with x.flv?start=xxx
virtual srs_error_t
serve_flv_stream(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, std::string fullpath, int64_t offset);
virtual srs_error_t serve_flv_stream(ISrsHttpResponseWriter *w, ISrsHttpMessage *r, std::string fullpath, int64_t offset);
// When access mp4 file with x.mp4?range=start-end
// @param start the start offset in bytes.
// @param end the end offset in bytes. -1 to end of file.
@ -402,8 +399,7 @@ SRS_DECLARE_PROTECTED: // clang-format on
// clang-format off
SRS_DECLARE_PROTECTED: // clang-format on
// Copy the fs to response writer in size bytes.
virtual srs_error_t
copy(ISrsHttpResponseWriter *w, SrsFileReader *fs, ISrsHttpMessage *r, int64_t size);
virtual srs_error_t copy(ISrsHttpResponseWriter *w, SrsFileReader *fs, ISrsHttpMessage *r, int64_t size);
};
// The mux entry for server mux.
@ -727,8 +723,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Simple URL parser to replace http-parser URL parsing
virtual srs_error_t
parse_url_simple(const std::string &url, std::string &schema, std::string &host, int &port,
virtual srs_error_t parse_url_simple(const std::string &url, std::string &schema, std::string &host, int &port,
std::string &path, std::string &query, std::string &fragment,
std::string &username, std::string &password);
srs_error_t parse_query();

View File

@ -40,8 +40,7 @@ public:
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Zero initialize the message array.
virtual void
zero(int count);
virtual void zero(int count);
};
#endif

View File

@ -349,8 +349,7 @@ public:
SRS_DECLARE_PRIVATE: // clang-format on
// Send out the messages, donot free it,
// The caller must free the param msgs.
virtual srs_error_t
do_send_messages(SrsMediaPacket **msgs, int nb_msgs);
virtual srs_error_t do_send_messages(SrsMediaPacket **msgs, int nb_msgs);
// Send iovs. send multiple times if exceed limits.
virtual srs_error_t do_iovs_send(iovec *iovs, int size);
// The underlayer api for send and free packet.
@ -379,8 +378,7 @@ SRS_DECLARE_PRIVATE: // clang-format on
// clang-format off
SRS_DECLARE_PRIVATE: // clang-format on
// Auto response the ack message.
virtual srs_error_t
response_acknowledgement_message();
virtual srs_error_t response_acknowledgement_message();
// Auto response the ping message.
virtual srs_error_t response_ping_message(int32_t timestamp);

View File

@ -267,8 +267,7 @@ public:
// clang-format off
SRS_DECLARE_PROTECTED: // clang-format on
// Sub classes override this to encode the headers.
virtual srs_error_t
encode_header(std::stringstream &ss);
virtual srs_error_t encode_header(std::stringstream &ss);
};
// 10.1 OPTIONS, @see rfc2326-1998-rtsp.pdf, page 59

View File

@ -1085,159 +1085,6 @@ void MockAppConfigForRtmpConn::reset()
last_subscribed_handler_ = NULL;
}
MockRtmpServerForStreamService::MockRtmpServerForStreamService()
{
identify_type_ = SrsRtmpConnPlay;
identify_stream_ = "";
identify_duration_ = 0;
start_play_count_ = 0;
start_fmle_publish_count_ = 0;
start_flash_publish_count_ = 0;
start_haivision_publish_count_ = 0;
}
MockRtmpServerForStreamService::~MockRtmpServerForStreamService()
{
}
void MockRtmpServerForStreamService::set_recv_timeout(srs_utime_t tm)
{
}
void MockRtmpServerForStreamService::set_send_timeout(srs_utime_t tm)
{
}
srs_error_t MockRtmpServerForStreamService::handshake()
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::connect_app(ISrsRequest *req)
{
return srs_success;
}
uint32_t MockRtmpServerForStreamService::proxy_real_ip()
{
return 0;
}
srs_error_t MockRtmpServerForStreamService::set_window_ack_size(int ack_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::set_peer_bandwidth(int bandwidth, int type)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::set_chunk_size(int chunk_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::response_connect_app(ISrsRequest *req, const char *server_ip)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::on_bw_done()
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration)
{
type = identify_type_;
stream_name = identify_stream_;
duration = identify_duration_;
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::start_play(int stream_id)
{
start_play_count_++;
// Return an error to exit the test cleanly after verifying selection worked
return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "mock start play");
}
srs_error_t MockRtmpServerForStreamService::start_fmle_publish(int stream_id)
{
start_fmle_publish_count_++;
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::start_haivision_publish(int stream_id)
{
start_haivision_publish_count_++;
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::start_flash_publish(int stream_id)
{
start_flash_publish_count_++;
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::fmle_unpublish(int stream_id, double unpublish_tid)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::start_publishing(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::redirect(ISrsRequest *r, std::string url, bool &accepted)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::send_and_free_packet(SrsRtmpCommand *packet, int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::on_play_client_pause(int stream_id, bool is_pause)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::set_in_window_ack_size(int ack_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForStreamService::recv_message(SrsRtmpCommonMessage **pmsg)
{
return srs_success;
}
void MockRtmpServerForStreamService::set_auto_response(bool v)
{
}
void MockRtmpServerForStreamService::set_merge_read(bool v, IMergeReadHandler *handler)
{
}
void MockRtmpServerForStreamService::set_recv_buffer(int buffer_size)
{
}
MockCoroutineForRtmpConn::MockCoroutineForRtmpConn()
{
pull_error_ = srs_success;
@ -1313,6 +1160,11 @@ srs_netfd_t MockRtmpTransportForDoCycle::fd()
return dummy_stfd;
}
int MockRtmpTransportForDoCycle::osfd()
{
return 0;
}
ISrsProtocolReadWriter *MockRtmpTransportForDoCycle::io()
{
return NULL;
@ -1534,185 +1386,6 @@ VOID TEST(SrsUtilityTest, UpdateProcStat)
#endif
}
MockSecurityForStreamService::MockSecurityForStreamService()
{
}
MockSecurityForStreamService::~MockSecurityForStreamService()
{
}
srs_error_t MockSecurityForStreamService::check(SrsRtmpConnType type, std::string ip, ISrsRequest *req)
{
return srs_success;
}
MockRtmpServerForHandlePublishMessage::MockRtmpServerForHandlePublishMessage()
{
decode_message_error_ = srs_success;
decode_message_packet_ = NULL;
decode_message_count_ = 0;
fmle_unpublish_error_ = srs_success;
fmle_unpublish_count_ = 0;
}
MockRtmpServerForHandlePublishMessage::~MockRtmpServerForHandlePublishMessage()
{
srs_freep(decode_message_error_);
srs_freep(decode_message_packet_);
srs_freep(fmle_unpublish_error_);
}
void MockRtmpServerForHandlePublishMessage::set_recv_timeout(srs_utime_t tm)
{
}
void MockRtmpServerForHandlePublishMessage::set_send_timeout(srs_utime_t tm)
{
}
srs_error_t MockRtmpServerForHandlePublishMessage::handshake()
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::connect_app(ISrsRequest *req)
{
return srs_success;
}
uint32_t MockRtmpServerForHandlePublishMessage::proxy_real_ip()
{
return 0;
}
srs_error_t MockRtmpServerForHandlePublishMessage::set_window_ack_size(int ack_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::set_peer_bandwidth(int bandwidth, int type)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::set_chunk_size(int chunk_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::response_connect_app(ISrsRequest *req, const char *server_ip)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::on_bw_done()
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::start_play(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::start_fmle_publish(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::start_haivision_publish(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::fmle_unpublish(int stream_id, double unpublish_tid)
{
fmle_unpublish_count_++;
return srs_error_copy(fmle_unpublish_error_);
}
srs_error_t MockRtmpServerForHandlePublishMessage::start_flash_publish(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::start_publishing(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::redirect(ISrsRequest *r, std::string url, bool &accepted)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket)
{
decode_message_count_++;
if (decode_message_error_ != srs_success) {
return srs_error_copy(decode_message_error_);
}
// Return the configured packet (can be NULL or a specific packet type)
*ppacket = decode_message_packet_;
decode_message_packet_ = NULL; // Transfer ownership
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::send_and_free_packet(SrsRtmpCommand *packet, int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::on_play_client_pause(int stream_id, bool is_pause)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::set_in_window_ack_size(int ack_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForHandlePublishMessage::recv_message(SrsRtmpCommonMessage **pmsg)
{
return srs_success;
}
void MockRtmpServerForHandlePublishMessage::set_auto_response(bool v)
{
}
void MockRtmpServerForHandlePublishMessage::set_merge_read(bool v, IMergeReadHandler *handler)
{
}
void MockRtmpServerForHandlePublishMessage::set_recv_buffer(int buffer_size)
{
}
void MockRtmpServerForHandlePublishMessage::reset()
{
srs_freep(decode_message_error_);
srs_freep(decode_message_packet_);
srs_freep(fmle_unpublish_error_);
decode_message_error_ = srs_success;
decode_message_packet_ = NULL;
decode_message_count_ = 0;
fmle_unpublish_error_ = srs_success;
fmle_unpublish_count_ = 0;
}
VOID TEST(SrsRtmpConnTest, StreamServiceCycleSelection)
{
srs_error_t err = srs_success;
@ -1724,7 +1397,9 @@ VOID TEST(SrsRtmpConnTest, StreamServiceCycleSelection)
SrsRtmpConn *conn = new SrsRtmpConn(mock_transport, "192.168.1.100", 1935);
// Create mock rtmp server
MockRtmpServerForStreamService *mock_rtmp = new MockRtmpServerForStreamService();
MockRtmpServer *mock_rtmp = new MockRtmpServer();
// Return an error to exit the test cleanly after verifying selection worked
mock_rtmp->start_play_error_ = srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "mock start play");
// Create mock coroutine that always returns error in pull()
MockCoroutineForRtmpConn *mock_trd = new MockCoroutineForRtmpConn();
@ -1741,8 +1416,8 @@ VOID TEST(SrsRtmpConnTest, StreamServiceCycleSelection)
// Note: stream_service_cycle() is NOT executed because trd_->pull() returns error
// in service_cycle() before the while loop calls stream_service_cycle()
if (true) {
mock_rtmp->identify_type_ = SrsRtmpConnPlay;
mock_rtmp->identify_stream_ = "livestream";
mock_rtmp->type_ = SrsRtmpConnPlay;
mock_rtmp->stream_ = "livestream";
mock_rtmp->start_play_count_ = 0;
mock_rtmp->start_fmle_publish_count_ = 0;
mock_rtmp->start_flash_publish_count_ = 0;
@ -1972,8 +1647,9 @@ VOID TEST(SrsRtmpConnTest, StreamServiceCycleThreadQuitCheck)
// check for thread quit (trd_->pull()) and return immediately if the thread is quitting.
MockRtmpTransportForDoCycle *mock_transport = new MockRtmpTransportForDoCycle();
MockRtmpServerForStreamService *mock_rtmp = new MockRtmpServerForStreamService();
MockSecurityForStreamService *mock_security = new MockSecurityForStreamService();
MockRtmpServer *mock_rtmp = new MockRtmpServer();
mock_rtmp->start_play_error_ = srs_error_new(ERROR_THREAD_INTERRUPED, "thread interrupted");
MockSecurity *mock_security = new MockSecurity();
MockCoroutineForRtmpConn *mock_trd = new MockCoroutineForRtmpConn();
MockAppConfigForRtmpConn *mock_config = new MockAppConfigForRtmpConn();
@ -1997,8 +1673,8 @@ VOID TEST(SrsRtmpConnTest, StreamServiceCycleThreadQuitCheck)
mock_trd->pull_error_ = srs_error_new(ERROR_THREAD_INTERRUPED, "thread interrupted");
// Set connection type to Play
mock_rtmp->identify_type_ = SrsRtmpConnPlay;
mock_rtmp->identify_stream_ = "livestream";
mock_rtmp->type_ = SrsRtmpConnPlay;
mock_rtmp->stream_ = "livestream";
conn->info_->type_ = SrsRtmpConnPlay;
// Set up request with valid tcUrl to pass tcUrl parsing
@ -2194,7 +1870,7 @@ VOID TEST(SrsRtmpConnTest, HandlePublishMessageFlashRepublish)
conn->assemble();
// Create mock rtmp server
MockRtmpServerForHandlePublishMessage *mock_rtmp = new MockRtmpServerForHandlePublishMessage();
MockRtmpServer *mock_rtmp = new MockRtmpServer();
// Inject mock into connection
srs_freep(conn->rtmp_);
@ -2252,7 +1928,7 @@ VOID TEST(SrsRtmpConnTest, HandlePublishMessageFMLERepublish)
conn->assemble();
// Create mock rtmp server
MockRtmpServerForHandlePublishMessage *mock_rtmp = new MockRtmpServerForHandlePublishMessage();
MockRtmpServer *mock_rtmp = new MockRtmpServer();
// Inject mock into connection
srs_freep(conn->rtmp_);
@ -2315,7 +1991,7 @@ VOID TEST(SrsRtmpConnTest, HandlePublishMessageFMLEIgnoreCommand)
conn->assemble();
// Create mock rtmp server
MockRtmpServerForHandlePublishMessage *mock_rtmp = new MockRtmpServerForHandlePublishMessage();
MockRtmpServer *mock_rtmp = new MockRtmpServer();
// Inject mock into connection
srs_freep(conn->rtmp_);
@ -2371,10 +2047,11 @@ VOID TEST(SrsRtmpConnTest, AcquirePublishStreamBusyCheck)
conn->assemble();
// Create mock rtmp server
MockRtmpServerForStreamService *mock_rtmp = new MockRtmpServerForStreamService();
MockRtmpServer *mock_rtmp = new MockRtmpServer();
mock_rtmp->start_play_error_ = srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "mock start play");
// Create mock security
MockSecurityForStreamService *mock_security = new MockSecurityForStreamService();
MockSecurity *mock_security = new MockSecurity();
// Inject mocks into connection
srs_freep(conn->rtmp_);
@ -2431,7 +2108,7 @@ VOID TEST(SrsRtmpConnTest, HandlePublishMessageVideoSuccess)
conn->assemble();
// Create mock rtmp server
MockRtmpServerForHandlePublishMessage *mock_rtmp = new MockRtmpServerForHandlePublishMessage();
MockRtmpServer *mock_rtmp = new MockRtmpServer();
// Inject mock into connection
srs_freep(conn->rtmp_);
@ -2468,77 +2145,6 @@ VOID TEST(SrsRtmpConnTest, HandlePublishMessageVideoSuccess)
srs_freep(mock_config);
}
MockRtmpServerForPlayControl::MockRtmpServerForPlayControl()
{
decode_message_packet_ = NULL;
decode_message_count_ = 0;
send_and_free_packet_count_ = 0;
on_play_client_pause_count_ = 0;
last_pause_state_ = false;
}
MockRtmpServerForPlayControl::~MockRtmpServerForPlayControl()
{
srs_freep(decode_message_packet_);
}
void MockRtmpServerForPlayControl::set_recv_timeout(srs_utime_t tm) {}
void MockRtmpServerForPlayControl::set_send_timeout(srs_utime_t tm) {}
srs_error_t MockRtmpServerForPlayControl::handshake() { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::connect_app(ISrsRequest *req) { return srs_success; }
uint32_t MockRtmpServerForPlayControl::proxy_real_ip() { return 0; }
srs_error_t MockRtmpServerForPlayControl::set_window_ack_size(int ack_size) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::set_peer_bandwidth(int bandwidth, int type) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::set_chunk_size(int chunk_size) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::response_connect_app(ISrsRequest *req, const char *server_ip) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::on_bw_done() { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::start_play(int stream_id) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::start_fmle_publish(int stream_id) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::start_haivision_publish(int stream_id) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::fmle_unpublish(int stream_id, double unpublish_tid) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::start_flash_publish(int stream_id) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::start_publishing(int stream_id) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::redirect(ISrsRequest *r, std::string url, bool &accepted) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket)
{
decode_message_count_++;
*ppacket = decode_message_packet_;
decode_message_packet_ = NULL;
return srs_success;
}
srs_error_t MockRtmpServerForPlayControl::send_and_free_packet(SrsRtmpCommand *packet, int stream_id)
{
send_and_free_packet_count_++;
srs_freep(packet);
return srs_success;
}
srs_error_t MockRtmpServerForPlayControl::on_play_client_pause(int stream_id, bool is_pause)
{
on_play_client_pause_count_++;
last_pause_state_ = is_pause;
return srs_success;
}
srs_error_t MockRtmpServerForPlayControl::set_in_window_ack_size(int ack_size) { return srs_success; }
srs_error_t MockRtmpServerForPlayControl::recv_message(SrsRtmpCommonMessage **pmsg) { return srs_success; }
void MockRtmpServerForPlayControl::set_auto_response(bool v) {}
void MockRtmpServerForPlayControl::set_merge_read(bool v, IMergeReadHandler *handler) {}
void MockRtmpServerForPlayControl::set_recv_buffer(int buffer_size) {}
void MockRtmpServerForPlayControl::reset()
{
srs_freep(decode_message_packet_);
decode_message_count_ = 0;
send_and_free_packet_count_ = 0;
on_play_client_pause_count_ = 0;
last_pause_state_ = false;
}
MockLiveConsumerForPlayControl::MockLiveConsumerForPlayControl(ISrsLiveSource *source)
: SrsLiveConsumer(source)
{
@ -2827,7 +2433,7 @@ VOID TEST(SrsRtmpConnTest, ProcessPlayControlMsgPauseSuccess)
conn->assemble();
// Create mock rtmp server
MockRtmpServerForPlayControl *mock_rtmp = new MockRtmpServerForPlayControl();
MockRtmpServer *mock_rtmp = new MockRtmpServer();
// Create pause packet to be returned by decode_message
SrsPausePacket *pause_pkt = new SrsPausePacket();

View File

@ -340,52 +340,6 @@ public:
void reset();
};
// Mock ISrsRtmpServer for testing SrsRtmpConn::stream_service_cycle()
class MockRtmpServerForStreamService : public ISrsRtmpServer
{
public:
SrsRtmpConnType identify_type_;
std::string identify_stream_;
srs_utime_t identify_duration_;
int start_play_count_;
int start_fmle_publish_count_;
int start_flash_publish_count_;
int start_haivision_publish_count_;
public:
MockRtmpServerForStreamService();
virtual ~MockRtmpServerForStreamService();
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_error_t handshake();
virtual srs_error_t connect_app(ISrsRequest *req);
virtual uint32_t proxy_real_ip();
virtual srs_error_t set_window_ack_size(int ack_size);
virtual srs_error_t set_peer_bandwidth(int bandwidth, int type);
virtual srs_error_t set_chunk_size(int chunk_size);
virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip);
virtual srs_error_t on_bw_done();
virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration);
virtual srs_error_t start_play(int stream_id);
virtual srs_error_t start_fmle_publish(int stream_id);
virtual srs_error_t start_haivision_publish(int stream_id);
virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid);
virtual srs_error_t start_flash_publish(int stream_id);
virtual srs_error_t start_publishing(int stream_id);
virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted);
virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id);
virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket);
virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id);
virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause);
virtual srs_error_t set_in_window_ack_size(int ack_size);
virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg);
virtual void set_auto_response(bool v);
virtual void set_merge_read(bool v, IMergeReadHandler *handler);
virtual void set_recv_buffer(int buffer_size);
};
// Mock ISrsCoroutine for testing SrsRtmpConn::service_cycle()
class MockCoroutineForRtmpConn : public ISrsCoroutine
{
@ -415,6 +369,7 @@ public:
public:
virtual srs_netfd_t fd();
virtual int osfd();
virtual ISrsProtocolReadWriter *io();
virtual srs_error_t handshake();
virtual const char *transport_type();
@ -424,109 +379,6 @@ public:
virtual int64_t get_send_bytes();
};
// Mock ISrsSecurity for testing SrsRtmpConn::stream_service_cycle()
class MockSecurityForStreamService : public ISrsSecurity
{
public:
MockSecurityForStreamService();
virtual ~MockSecurityForStreamService();
public:
virtual srs_error_t check(SrsRtmpConnType type, std::string ip, ISrsRequest *req);
};
// Mock ISrsRtmpServer for testing SrsRtmpConn::handle_publish_message()
class MockRtmpServerForHandlePublishMessage : public ISrsRtmpServer
{
public:
srs_error_t decode_message_error_;
SrsRtmpCommand *decode_message_packet_;
int decode_message_count_;
srs_error_t fmle_unpublish_error_;
int fmle_unpublish_count_;
public:
MockRtmpServerForHandlePublishMessage();
virtual ~MockRtmpServerForHandlePublishMessage();
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_error_t handshake();
virtual srs_error_t connect_app(ISrsRequest *req);
virtual uint32_t proxy_real_ip();
virtual srs_error_t set_window_ack_size(int ack_size);
virtual srs_error_t set_peer_bandwidth(int bandwidth, int type);
virtual srs_error_t set_chunk_size(int chunk_size);
virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip);
virtual srs_error_t on_bw_done();
virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration);
virtual srs_error_t start_play(int stream_id);
virtual srs_error_t start_fmle_publish(int stream_id);
virtual srs_error_t start_haivision_publish(int stream_id);
virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid);
virtual srs_error_t start_flash_publish(int stream_id);
virtual srs_error_t start_publishing(int stream_id);
virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted);
virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id);
virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket);
virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id);
virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause);
virtual srs_error_t set_in_window_ack_size(int ack_size);
virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg);
virtual void set_auto_response(bool v);
virtual void set_merge_read(bool v, IMergeReadHandler *handler);
virtual void set_recv_buffer(int buffer_size);
void reset();
};
// Mock ISrsRtmpServer for testing SrsRtmpConn::process_play_control_msg()
class MockRtmpServerForPlayControl : public ISrsRtmpServer
{
public:
SrsRtmpCommand *decode_message_packet_;
int decode_message_count_;
int send_and_free_packet_count_;
int on_play_client_pause_count_;
bool last_pause_state_;
public:
MockRtmpServerForPlayControl();
virtual ~MockRtmpServerForPlayControl();
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_error_t handshake();
virtual srs_error_t connect_app(ISrsRequest *req);
virtual uint32_t proxy_real_ip();
virtual srs_error_t set_window_ack_size(int ack_size);
virtual srs_error_t set_peer_bandwidth(int bandwidth, int type);
virtual srs_error_t set_chunk_size(int chunk_size);
virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip);
virtual srs_error_t on_bw_done();
virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration);
virtual srs_error_t start_play(int stream_id);
virtual srs_error_t start_fmle_publish(int stream_id);
virtual srs_error_t start_haivision_publish(int stream_id);
virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid);
virtual srs_error_t start_flash_publish(int stream_id);
virtual srs_error_t start_publishing(int stream_id);
virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted);
virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id);
virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket);
virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id);
virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause);
virtual srs_error_t set_in_window_ack_size(int ack_size);
virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg);
virtual void set_auto_response(bool v);
virtual void set_merge_read(bool v, IMergeReadHandler *handler);
virtual void set_recv_buffer(int buffer_size);
void reset();
};
// Mock SrsLiveConsumer for testing SrsRtmpConn::process_play_control_msg()
class MockLiveConsumerForPlayControl : public SrsLiveConsumer
{

View File

@ -933,7 +933,7 @@ VOID TEST(SrsLiveStreamTest, ServeHttpWithDisabledEntry)
// Create and set mock dependencies
MockStatisticForLiveStream mock_stat;
MockSecurityForLiveStream mock_security;
MockSecurity mock_security;
// Replace dependencies with mocks
live_stream->stat_ = &mock_stat;
@ -1086,23 +1086,6 @@ srs_error_t MockStatisticForLiveStream::dumps_metrics(int64_t &send_bytes, int64
return srs_success;
}
// Mock ISrsSecurity implementation for SrsLiveStream testing
MockSecurityForLiveStream::MockSecurityForLiveStream()
{
check_error_ = srs_success;
check_count_ = 0;
}
MockSecurityForLiveStream::~MockSecurityForLiveStream()
{
}
srs_error_t MockSecurityForLiveStream::check(SrsRtmpConnType type, std::string ip, ISrsRequest *req)
{
check_count_++;
return srs_error_copy(check_error_);
}
// Mock config implementation for SrsLiveStream hooks testing
MockAppConfigForLiveStreamHooks::MockAppConfigForLiveStreamHooks()
{

View File

@ -263,21 +263,6 @@ public:
virtual srs_error_t dumps_metrics(int64_t &send_bytes, int64_t &recv_bytes, int64_t &nstreams, int64_t &nclients, int64_t &total_nclients, int64_t &nerrs);
};
// Mock ISrsSecurity for testing SrsLiveStream::serve_http_impl
class MockSecurityForLiveStream : public ISrsSecurity
{
public:
srs_error_t check_error_;
int check_count_;
public:
MockSecurityForLiveStream();
virtual ~MockSecurityForLiveStream();
public:
virtual srs_error_t check(SrsRtmpConnType type, std::string ip, ISrsRequest *req);
};
// Mock ISrsBufferCache for testing SrsHttpStreamDestroy
class MockBufferCacheForDestroy : public ISrsBufferCache
{

View File

@ -2559,7 +2559,7 @@ VOID TEST(RtspConnectionTest, DoDescribeWithAudioAndVideo)
// Create mock objects
MockEdgeConfig mock_config;
MockSecurityForLiveStream mock_security;
MockSecurity mock_security;
MockHttpHooks mock_hooks;
MockRtspSourceManager mock_rtsp_sources;

View File

@ -1522,73 +1522,6 @@ VOID TEST(DashTest, PublishLifecycleWithAudioVideo)
srs_freep(mock_hub);
}
// Test SrsMpdWriter::dispose() - major use scenario for cleaning up MPD file
// This test covers the major use scenario for SrsMpdWriter disposal and file cleanup
VOID TEST(MpdWriterTest, DisposeRemovesMpdFile)
{
srs_error_t err;
// Create SrsMpdWriter object
SrsUniquePtr<SrsMpdWriter> mpd_writer(new SrsMpdWriter());
// Create mock config with DASH settings
SrsUniquePtr<MockAppConfig> mock_config(new MockAppConfig());
// Create mock request
SrsUniquePtr<MockSrsRequest> mock_req(new MockSrsRequest("test.vhost", "live", "livestream"));
// Inject mock config into SrsMpdWriter
mpd_writer->config_ = mock_config.get();
// Initialize the MPD writer with the request
HELPER_EXPECT_SUCCESS(mpd_writer->initialize(mock_req.get()));
// Call on_publish() to set up home directory and mpd_file
HELPER_EXPECT_SUCCESS(mpd_writer->on_publish());
// Set up the MPD file path for testing
// home_ = "./[vhost]/[app]/[stream]/"
// mpd_file_ = "[stream].mpd"
// After srs_path_build_stream: mpd_path = "livestream.mpd"
// full_path = "./[vhost]/[app]/[stream]/" + "/" + "livestream.mpd"
// = "./test.vhost/live/livestream/livestream.mpd"
// Create the directory structure and MPD file for testing
SrsPath path;
string mpd_path = srs_path_build_stream(mpd_writer->mpd_file_, mock_req->vhost_, mock_req->app_, mock_req->stream_);
string full_path = mpd_writer->home_ + "/" + mpd_path;
string full_home = path.filepath_dir(full_path);
// Create the directory
HELPER_EXPECT_SUCCESS(path.mkdir_all(full_home));
// Create a test MPD file using real file writer
SrsUniquePtr<SrsFileWriter> real_fw(new SrsFileWriter());
HELPER_EXPECT_SUCCESS(real_fw->open(full_path));
const char *test_content = "<?xml version=\"1.0\"?><MPD></MPD>";
HELPER_EXPECT_SUCCESS(real_fw->write((void *)test_content, strlen(test_content), NULL));
real_fw->close();
// Verify the file exists before dispose
EXPECT_TRUE(path.exists(full_path));
// Test dispose() - major use scenario: remove MPD file on cleanup
mpd_writer->dispose();
// Verify the file was deleted
EXPECT_FALSE(path.exists(full_path));
// Test dispose() when file doesn't exist - should not crash
mpd_writer->dispose();
// Test dispose() when req_ is NULL - should not crash
mpd_writer->req_ = NULL;
mpd_writer->dispose();
// Clean up - set to NULL to avoid double-free
mpd_writer->config_ = NULL;
}
// Test SrsFragmentedMp4 delegation to fragment_ member
// This test covers the major use scenario for SrsFragmentedMp4 ISrsFragment interface delegation
VOID TEST(FragmentedMp4Test, FragmentDelegation)
@ -1663,85 +1596,6 @@ VOID TEST(FragmentedMp4Test, FragmentDelegation)
srs_freep(mock_fragment);
}
VOID TEST(MpdWriterTest, WriteTypicalScenario)
{
srs_error_t err;
// Create SrsMpdWriter object
SrsUniquePtr<SrsMpdWriter> mpd_writer(new SrsMpdWriter());
// Create mock dependencies
SrsUniquePtr<MockAppConfig> mock_config(new MockAppConfig());
// Use real app factory to create real file writers (mock file writers don't support rename)
SrsUniquePtr<SrsAppFactory> app_factory(new SrsAppFactory());
// Create mock request
SrsUniquePtr<MockSrsRequest> mock_req(new MockSrsRequest("test.vhost", "live", "livestream"));
// Inject mock dependencies
mpd_writer->config_ = mock_config.get();
mpd_writer->app_factory_ = app_factory.get();
// Initialize the MPD writer
HELPER_EXPECT_SUCCESS(mpd_writer->initialize(mock_req.get()));
// Override the home path to use a simple test directory
// (The default mock config returns "./[vhost]/[app]/[stream]/" which contains template variables)
mpd_writer->home_ = "./dash_test";
// Call on_publish to initialize other DASH settings
HELPER_EXPECT_SUCCESS(mpd_writer->on_publish());
// Restore the home path after on_publish (which would overwrite it)
mpd_writer->home_ = "./dash_test";
// Create the directory structure that will be used by the MPD writer
SrsPath path;
HELPER_EXPECT_SUCCESS(path.mkdir_all("./dash_test"));
// Create mock format with audio and video codecs
SrsUniquePtr<MockSrsFormat> format(new MockSrsFormat());
// Create mock fragment windows with sample fragments
SrsUniquePtr<SrsFragmentWindow> afragments(new SrsFragmentWindow());
SrsUniquePtr<SrsFragmentWindow> vfragments(new SrsFragmentWindow());
// Create and add audio fragments (3 fragments, each 2 seconds)
for (int i = 0; i < 3; i++) {
SrsFragment *afrag = new SrsFragment();
afrag->set_number(i + 1);
afrag->append(i * 2000); // Start DTS in ms
afrag->append((i + 1) * 2000); // End DTS in ms
afragments->append(afrag);
}
// Create and add video fragments (3 fragments, each 2 seconds)
for (int i = 0; i < 3; i++) {
SrsFragment *vfrag = new SrsFragment();
vfrag->set_number(i + 1);
vfrag->append(i * 2000); // Start DTS in ms
vfrag->append((i + 1) * 2000); // End DTS in ms
vfragments->append(vfrag);
}
// Test write() - should generate MPD file successfully
// This tests the major use scenario: writing MPD with both audio and video fragments
HELPER_EXPECT_SUCCESS(mpd_writer->write(format.get(), afragments.get(), vfragments.get()));
// The successful return from write() indicates:
// 1. MPD XML was generated with proper structure
// 2. Audio and video adaptation sets were created
// 3. Segment timeline was populated with fragment information
// 4. File was written and renamed successfully
// Clean up test directory
system("rm -rf ./dash_test");
// Clean up - set to NULL to avoid double-free
mpd_writer->config_ = NULL;
mpd_writer->app_factory_ = NULL;
}
// Mock SrsRtcConnection implementation
MockRtcConnectionForNackApi::MockRtcConnectionForNackApi()
{
@ -2075,24 +1929,6 @@ srs_error_t MockHttpHooksForRtcPlay::on_forward_backend(std::string url, ISrsReq
return srs_success;
}
// Mock ISrsSecurity implementation for SrsGoApiRtcPlay::serve_http()
MockSecurityForRtcPlay::MockSecurityForRtcPlay()
{
check_error_ = srs_success;
check_count_ = 0;
}
MockSecurityForRtcPlay::~MockSecurityForRtcPlay()
{
srs_freep(check_error_);
}
srs_error_t MockSecurityForRtcPlay::check(SrsRtmpConnType type, std::string ip, ISrsRequest *req)
{
check_count_++;
return srs_error_copy(check_error_);
}
// Mock SrsRtcConnection implementation for SrsGoApiRtcPlay::serve_http()
MockRtcConnectionForPlay::MockRtcConnectionForPlay()
{
@ -2722,7 +2558,7 @@ VOID TEST(GoApiRtcPlayTest, ServeHttpSuccess)
SrsUniquePtr<MockHttpHooksForRtcPlay> mock_hooks(new MockHttpHooksForRtcPlay());
// Create mock security
SrsUniquePtr<MockSecurityForRtcPlay> mock_security(new MockSecurityForRtcPlay());
SrsUniquePtr<MockSecurity> mock_security(new MockSecurity());
// Inject mocks into api
api->config_ = mock_config.get();
@ -2829,7 +2665,7 @@ VOID TEST(GoApiRtcPublishTest, ServeHttpSuccess)
SrsUniquePtr<MockHttpHooksForRtcPlay> mock_hooks(new MockHttpHooksForRtcPlay());
// Create mock security
SrsUniquePtr<MockSecurityForRtcPlay> mock_security(new MockSecurityForRtcPlay());
SrsUniquePtr<MockSecurity> mock_security(new MockSecurity());
// Create mock statistic
SrsUniquePtr<MockStatisticForRtcApi> mock_stat(new MockStatisticForRtcApi());

View File

@ -404,21 +404,6 @@ public:
virtual srs_error_t on_forward_backend(std::string url, ISrsRequest *req, std::vector<std::string> &rtmp_urls);
};
// Mock ISrsSecurity for testing SrsGoApiRtcPlay::serve_http()
class MockSecurityForRtcPlay : public ISrsSecurity
{
public:
srs_error_t check_error_;
int check_count_;
public:
MockSecurityForRtcPlay();
virtual ~MockSecurityForRtcPlay();
public:
virtual srs_error_t check(SrsRtmpConnType type, std::string ip, ISrsRequest *req);
};
// Mock SrsRtcConnection for testing SrsGoApiRtcPlay::serve_http()
class MockRtcConnectionForPlay
{

View File

@ -2868,155 +2868,6 @@ VOID TEST(HttpxConnTest, OnConnDoneWithNonTimeoutError)
srs_freep(mock_manager);
}
// Mock ISrsRtmpServer implementation for SrsQueueRecvThread
MockRtmpServerForQueueRecvThread::MockRtmpServerForQueueRecvThread()
{
set_auto_response_called_ = false;
auto_response_value_ = true;
}
MockRtmpServerForQueueRecvThread::~MockRtmpServerForQueueRecvThread()
{
}
void MockRtmpServerForQueueRecvThread::set_recv_timeout(srs_utime_t tm)
{
}
void MockRtmpServerForQueueRecvThread::set_send_timeout(srs_utime_t tm)
{
}
srs_error_t MockRtmpServerForQueueRecvThread::handshake()
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::connect_app(ISrsRequest *req)
{
return srs_success;
}
uint32_t MockRtmpServerForQueueRecvThread::proxy_real_ip()
{
return 0;
}
srs_error_t MockRtmpServerForQueueRecvThread::set_window_ack_size(int ack_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::set_peer_bandwidth(int bandwidth, int type)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::set_chunk_size(int chunk_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::response_connect_app(ISrsRequest *req, const char *server_ip)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::on_bw_done()
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::start_play(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::start_fmle_publish(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::start_haivision_publish(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::fmle_unpublish(int stream_id, double unpublish_tid)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::start_flash_publish(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::start_publishing(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::redirect(ISrsRequest *r, std::string url, bool &accepted)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::send_and_free_packet(SrsRtmpCommand *packet, int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::on_play_client_pause(int stream_id, bool is_pause)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::set_in_window_ack_size(int ack_size)
{
return srs_success;
}
srs_error_t MockRtmpServerForQueueRecvThread::recv_message(SrsRtmpCommonMessage **pmsg)
{
return srs_success;
}
void MockRtmpServerForQueueRecvThread::set_auto_response(bool v)
{
set_auto_response_called_ = true;
auto_response_value_ = v;
}
void MockRtmpServerForQueueRecvThread::set_merge_read(bool v, IMergeReadHandler *handler)
{
}
void MockRtmpServerForQueueRecvThread::set_recv_buffer(int buffer_size)
{
}
void MockRtmpServerForQueueRecvThread::reset()
{
set_auto_response_called_ = false;
auto_response_value_ = true;
}
// Test SrsQueueRecvThread basic queue operations
// This test covers the major use scenario: consume messages, check queue state, pump messages, and handle errors
VOID TEST(QueueRecvThreadTest, BasicQueueOperations)
@ -3024,7 +2875,7 @@ VOID TEST(QueueRecvThreadTest, BasicQueueOperations)
srs_error_t err;
// Create mock RTMP server
SrsUniquePtr<MockRtmpServerForQueueRecvThread> mock_rtmp(new MockRtmpServerForQueueRecvThread());
SrsUniquePtr<MockRtmpServer> mock_rtmp(new MockRtmpServer());
// Create SrsQueueRecvThread (without starting the actual recv thread)
SrsUniquePtr<SrsQueueRecvThread> queue_thread(new SrsQueueRecvThread(NULL, mock_rtmp.get(), 5 * SRS_UTIME_SECONDS, SrsContextId()));
@ -3112,7 +2963,7 @@ VOID TEST(PublishRecvThreadTest, BasicOperations)
srs_error_t err;
// Create mock dependencies
SrsUniquePtr<MockRtmpServerForQueueRecvThread> mock_rtmp(new MockRtmpServerForQueueRecvThread());
SrsUniquePtr<MockRtmpServer> mock_rtmp(new MockRtmpServer());
SrsUniquePtr<MockSrsRequest> mock_req(new MockSrsRequest("__defaultVhost__", "live", "test_stream"));
SrsSharedPtr<SrsLiveSource> mock_source; // NULL is fine for this test

View File

@ -668,48 +668,6 @@ public:
virtual void expire();
};
// Mock ISrsRtmpServer for testing SrsQueueRecvThread
class MockRtmpServerForQueueRecvThread : public ISrsRtmpServer
{
public:
bool set_auto_response_called_;
bool auto_response_value_;
public:
MockRtmpServerForQueueRecvThread();
virtual ~MockRtmpServerForQueueRecvThread();
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_error_t handshake();
virtual srs_error_t connect_app(ISrsRequest *req);
virtual uint32_t proxy_real_ip();
virtual srs_error_t set_window_ack_size(int ack_size);
virtual srs_error_t set_peer_bandwidth(int bandwidth, int type);
virtual srs_error_t set_chunk_size(int chunk_size);
virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip);
virtual srs_error_t on_bw_done();
virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration);
virtual srs_error_t start_play(int stream_id);
virtual srs_error_t start_fmle_publish(int stream_id);
virtual srs_error_t start_haivision_publish(int stream_id);
virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid);
virtual srs_error_t start_flash_publish(int stream_id);
virtual srs_error_t start_publishing(int stream_id);
virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted);
virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id);
virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket);
virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id);
virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause);
virtual srs_error_t set_in_window_ack_size(int ack_size);
virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg);
virtual void set_auto_response(bool v);
virtual void set_merge_read(bool v, IMergeReadHandler *handler);
virtual void set_recv_buffer(int buffer_size);
void reset();
};
// Mock ISrsFFMPEG for testing SrsEncoder
class MockFFMPEGForEncoder : public ISrsFFMPEG
{

View File

@ -3802,188 +3802,6 @@ VOID TEST(RtcAsyncCallOnUnpublishTest, CallWithContextSwitching)
// because it uses _srs_context directly instead of a member variable like SrsRtcAsyncCallOnStop
}
// Mock live source manager implementation
MockLiveSourceManager::MockLiveSourceManager()
{
fetch_or_create_error_ = srs_success;
fetch_or_create_count_ = 0;
can_publish_ = true;
// Create a mock live source
mock_source_ = SrsSharedPtr<SrsLiveSource>(new MockLiveSource());
}
MockLiveSourceManager::~MockLiveSourceManager()
{
srs_freep(fetch_or_create_error_);
}
srs_error_t MockLiveSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsLiveSource> &pps)
{
fetch_or_create_count_++;
if (fetch_or_create_error_ != srs_success) {
return srs_error_copy(fetch_or_create_error_);
}
pps = mock_source_;
return srs_success;
}
SrsSharedPtr<SrsLiveSource> MockLiveSourceManager::fetch(ISrsRequest *r)
{
return mock_source_;
}
void MockLiveSourceManager::dispose()
{
// Mock implementation - no-op for testing
}
srs_error_t MockLiveSourceManager::initialize()
{
// Mock implementation - always succeeds
return srs_success;
}
void MockLiveSourceManager::set_fetch_or_create_error(srs_error_t err)
{
srs_freep(fetch_or_create_error_);
fetch_or_create_error_ = srs_error_copy(err);
}
void MockLiveSourceManager::set_can_publish(bool can_publish)
{
can_publish_ = can_publish;
if (mock_source_.get()) {
MockLiveSource *mock_live_source = dynamic_cast<MockLiveSource *>(mock_source_.get());
if (mock_live_source) {
mock_live_source->set_can_publish(can_publish);
}
}
}
void MockLiveSourceManager::reset()
{
srs_freep(fetch_or_create_error_);
fetch_or_create_error_ = srs_success;
fetch_or_create_count_ = 0;
can_publish_ = true;
}
// Mock live source implementation
MockLiveSource::MockLiveSource()
{
can_publish_result_ = true;
}
MockLiveSource::~MockLiveSource()
{
}
bool MockLiveSource::can_publish(bool is_edge)
{
return can_publish_result_;
}
void MockLiveSource::set_can_publish(bool can_publish)
{
can_publish_result_ = can_publish;
}
srs_error_t MockLiveSource::on_publish()
{
// Mock implementation - just return success
return srs_success;
}
srs_error_t MockLiveSource::on_edge_start_publish()
{
// Mock implementation - just return success
return srs_success;
}
// Mock SRT source implementation
MockSrtSource::MockSrtSource()
{
can_publish_result_ = true;
}
MockSrtSource::~MockSrtSource()
{
}
bool MockSrtSource::can_publish()
{
return can_publish_result_;
}
void MockSrtSource::set_can_publish(bool can_publish)
{
can_publish_result_ = can_publish;
}
// Mock SRT source manager implementation
MockSrtSourceManager::MockSrtSourceManager()
{
initialize_error_ = srs_success;
fetch_or_create_error_ = srs_success;
initialize_count_ = 0;
fetch_or_create_count_ = 0;
can_publish_ = true;
// Create a mock SRT source
mock_source_ = SrsSharedPtr<SrsSrtSource>(new MockSrtSource());
}
MockSrtSourceManager::~MockSrtSourceManager()
{
srs_freep(initialize_error_);
srs_freep(fetch_or_create_error_);
}
srs_error_t MockSrtSourceManager::initialize()
{
initialize_count_++;
return srs_error_copy(initialize_error_);
}
srs_error_t MockSrtSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsSrtSource> &pps)
{
fetch_or_create_count_++;
if (fetch_or_create_error_ != srs_success) {
return srs_error_copy(fetch_or_create_error_);
}
pps = mock_source_;
return srs_success;
}
SrsSharedPtr<SrsSrtSource> MockSrtSourceManager::fetch(ISrsRequest *r)
{
return mock_source_;
}
void MockSrtSourceManager::set_initialize_error(srs_error_t err)
{
srs_freep(initialize_error_);
initialize_error_ = srs_error_copy(err);
}
void MockSrtSourceManager::set_fetch_or_create_error(srs_error_t err)
{
srs_freep(fetch_or_create_error_);
fetch_or_create_error_ = srs_error_copy(err);
}
void MockSrtSourceManager::set_can_publish(bool can_publish)
{
can_publish_ = can_publish;
if (mock_source_.get()) {
MockSrtSource *mock_srt_source = dynamic_cast<MockSrtSource *>(mock_source_.get());
if (mock_srt_source) {
mock_srt_source->set_can_publish(can_publish);
}
}
}
void MockSrtSourceManager::reset()
{
srs_freep(initialize_error_);

View File

@ -453,76 +453,4 @@ public:
void reset();
};
// Mock live source manager for testing SrsRtcPublishStream
class MockLiveSourceManager : public ISrsLiveSourceManager
{
public:
srs_error_t fetch_or_create_error_;
int fetch_or_create_count_;
SrsSharedPtr<SrsLiveSource> mock_source_;
bool can_publish_;
public:
MockLiveSourceManager();
virtual ~MockLiveSourceManager();
virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsLiveSource> &pps);
virtual SrsSharedPtr<SrsLiveSource> fetch(ISrsRequest *r);
virtual void dispose();
virtual srs_error_t initialize();
void set_fetch_or_create_error(srs_error_t err);
void set_can_publish(bool can_publish);
void reset();
};
// Mock live source for testing SrsRtcPublishStream
class MockLiveSource : public SrsLiveSource
{
public:
bool can_publish_result_;
public:
MockLiveSource();
virtual ~MockLiveSource();
virtual bool can_publish(bool is_edge);
void set_can_publish(bool can_publish);
virtual srs_error_t on_publish();
virtual srs_error_t on_edge_start_publish();
};
// Mock SRT source for testing SrsRtcPublishStream
class MockSrtSource : public SrsSrtSource
{
public:
bool can_publish_result_;
public:
MockSrtSource();
virtual ~MockSrtSource();
virtual bool can_publish();
void set_can_publish(bool can_publish);
};
// Mock SRT source manager for testing SrsRtcPublishStream
class MockSrtSourceManager : public ISrsSrtSourceManager
{
public:
srs_error_t initialize_error_;
srs_error_t fetch_or_create_error_;
int initialize_count_;
int fetch_or_create_count_;
SrsSharedPtr<SrsSrtSource> mock_source_;
bool can_publish_;
public:
MockSrtSourceManager();
virtual ~MockSrtSourceManager();
virtual srs_error_t initialize();
virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsSrtSource> &pps);
virtual SrsSharedPtr<SrsSrtSource> fetch(ISrsRequest *r);
void set_initialize_error(srs_error_t err);
void set_fetch_or_create_error(srs_error_t err);
void set_can_publish(bool can_publish);
void reset();
};
#endif

View File

@ -5989,8 +5989,6 @@ extern int64_t _srs_system_time_us_cache;
VOID TEST(KernelUtilityTest, CoverTimeUtilityAll)
{
srs_error_t err;
_srs_system_time_us_cache = 0;
_srs_system_time_startup_time = 0;
EXPECT_TRUE(srs_time_since_startup() > 0);
@ -6174,12 +6172,6 @@ VOID TEST(KernelUtilityTest, CoverTimeUtilityAll)
EXPECT_TRUE(ip == "127.0.0.1" || ip == "::1");
}
if (true) {
SrsPath path;
EXPECT_TRUE(path.exists("."));
HELPER_EXPECT_SUCCESS(path.mkdir_all("."));
}
if (true) {
char buf[16] = {0};
EXPECT_STREQ("FE", srs_hex_encode_to_string(buf, (const uint8_t *)"\xfe", 1));

View File

@ -577,13 +577,17 @@ MockAppConfig::MockAppConfig()
resolve_api_domain_ = true;
keep_api_domain_ = false;
mw_msgs_ = 8;
mw_sleep_ = 350 * SRS_UTIME_MILLISECONDS;
rtc_dtls_role_ = "passive";
default_vhost_ = NULL;
}
MockAppConfig::~MockAppConfig()
{
clear_on_stop_directive();
clear_on_unpublish_directive();
srs_freep(default_vhost_);
}
srs_utime_t MockAppConfig::get_pithy_print()
@ -991,6 +995,16 @@ void MockAppConfig::set_keep_api_domain(bool enabled)
keep_api_domain_ = enabled;
}
bool MockAppConfig::get_security_enabled(std::string vhost)
{
return false;
}
SrsConfDirective *MockAppConfig::get_security_rules(std::string vhost)
{
return NULL;
}
// Mock RTC packet receiver implementation
MockRtcPacketReceiver::MockRtcPacketReceiver()
{
@ -1072,3 +1086,548 @@ void MockRtcPacketReceiver::reset()
send_rtcp_fb_pli_count_ = 0;
check_send_nacks_count_ = 0;
}
MockSecurity::MockSecurity()
{
check_error_ = srs_success;
check_count_ = 0;
}
MockSecurity::~MockSecurity()
{
srs_freep(check_error_);
}
srs_error_t MockSecurity::check(SrsRtmpConnType type, std::string ip, ISrsRequest *req)
{
check_count_++;
return srs_error_copy(check_error_);
}
// Mock live source manager implementation
MockLiveSourceManager::MockLiveSourceManager()
{
fetch_or_create_error_ = srs_success;
fetch_or_create_count_ = 0;
can_publish_ = true;
// Create a mock live source
mock_source_ = SrsSharedPtr<SrsLiveSource>(new MockLiveSource());
}
MockLiveSourceManager::~MockLiveSourceManager()
{
srs_freep(fetch_or_create_error_);
}
srs_error_t MockLiveSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsLiveSource> &pps)
{
srs_error_t err = srs_success;
if (fetch_or_create_count_ == 0) {
err = mock_source_->initialize(mock_source_, r);
}
fetch_or_create_count_++;
if (fetch_or_create_error_ != srs_success) {
return srs_error_copy(fetch_or_create_error_);
}
pps = mock_source_;
return err;
}
SrsSharedPtr<SrsLiveSource> MockLiveSourceManager::fetch(ISrsRequest *r)
{
return mock_source_;
}
void MockLiveSourceManager::dispose()
{
// Mock implementation - no-op for testing
}
srs_error_t MockLiveSourceManager::initialize()
{
// Mock implementation - always succeeds
return srs_success;
}
void MockLiveSourceManager::set_fetch_or_create_error(srs_error_t err)
{
srs_freep(fetch_or_create_error_);
fetch_or_create_error_ = srs_error_copy(err);
}
void MockLiveSourceManager::set_can_publish(bool can_publish)
{
can_publish_ = can_publish;
if (mock_source_.get()) {
MockLiveSource *mock_live_source = dynamic_cast<MockLiveSource *>(mock_source_.get());
if (mock_live_source) {
mock_live_source->set_can_publish(can_publish);
}
}
}
void MockLiveSourceManager::reset()
{
srs_freep(fetch_or_create_error_);
fetch_or_create_error_ = srs_success;
fetch_or_create_count_ = 0;
can_publish_ = true;
}
// Mock live source implementation
MockLiveSource::MockLiveSource()
{
can_publish_result_ = true;
on_audio_count_ = 0;
on_video_count_ = 0;
}
MockLiveSource::~MockLiveSource()
{
}
bool MockLiveSource::can_publish(bool is_edge)
{
return can_publish_result_;
}
void MockLiveSource::set_can_publish(bool can_publish)
{
can_publish_result_ = can_publish;
}
srs_error_t MockLiveSource::on_publish()
{
// Mock implementation - just return success
return srs_success;
}
srs_error_t MockLiveSource::on_edge_start_publish()
{
// Mock implementation - just return success
return srs_success;
}
srs_error_t MockLiveSource::on_audio(SrsRtmpCommonMessage *audio)
{
on_audio_count_++;
return SrsLiveSource::on_audio(audio);
}
srs_error_t MockLiveSource::on_video(SrsRtmpCommonMessage *video)
{
on_video_count_++;
return SrsLiveSource::on_video(video);
}
// Mock SRT source implementation
MockSrtSource::MockSrtSource()
{
can_publish_result_ = true;
}
MockSrtSource::~MockSrtSource()
{
}
bool MockSrtSource::can_publish()
{
return can_publish_result_;
}
void MockSrtSource::set_can_publish(bool can_publish)
{
can_publish_result_ = can_publish;
}
// Mock SRT source manager implementation
MockSrtSourceManager::MockSrtSourceManager()
{
initialize_error_ = srs_success;
fetch_or_create_error_ = srs_success;
initialize_count_ = 0;
fetch_or_create_count_ = 0;
can_publish_ = true;
// Create a mock SRT source
mock_source_ = SrsSharedPtr<SrsSrtSource>(new MockSrtSource());
}
MockSrtSourceManager::~MockSrtSourceManager()
{
srs_freep(initialize_error_);
srs_freep(fetch_or_create_error_);
}
srs_error_t MockSrtSourceManager::initialize()
{
initialize_count_++;
return srs_error_copy(initialize_error_);
}
srs_error_t MockSrtSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsSrtSource> &pps)
{
fetch_or_create_count_++;
if (fetch_or_create_error_ != srs_success) {
return srs_error_copy(fetch_or_create_error_);
}
pps = mock_source_;
return srs_success;
}
SrsSharedPtr<SrsSrtSource> MockSrtSourceManager::fetch(ISrsRequest *r)
{
return mock_source_;
}
void MockSrtSourceManager::set_initialize_error(srs_error_t err)
{
srs_freep(initialize_error_);
initialize_error_ = srs_error_copy(err);
}
void MockSrtSourceManager::set_fetch_or_create_error(srs_error_t err)
{
srs_freep(fetch_or_create_error_);
fetch_or_create_error_ = srs_error_copy(err);
}
void MockSrtSourceManager::set_can_publish(bool can_publish)
{
can_publish_ = can_publish;
if (mock_source_.get()) {
MockSrtSource *mock_srt_source = dynamic_cast<MockSrtSource *>(mock_source_.get());
if (mock_srt_source) {
mock_srt_source->set_can_publish(can_publish);
}
}
}
MockRtmpServer::MockRtmpServer()
{
type_ = SrsRtmpConnFMLEPublish;
stream_ = "livestream";
duration_ = 0;
recv_err_ = srs_success;
cond_ = new SrsCond();
nb_sent_messages_ = 0;
start_play_error_ = srs_success;
start_publish_error_ = srs_success;
start_play_count_ = 0;
start_fmle_publish_count_ = 0;
start_flash_publish_count_ = 0;
start_haivision_publish_count_ = 0;
// Initialize fields for handle_publish_message testing
decode_message_error_ = srs_success;
decode_message_packet_ = NULL;
decode_message_count_ = 0;
fmle_unpublish_error_ = srs_success;
fmle_unpublish_count_ = 0;
// Initialize fields for process_play_control_msg testing
send_and_free_packet_count_ = 0;
on_play_client_pause_count_ = 0;
last_pause_state_ = false;
// Initialize fields for set_auto_response testing
set_auto_response_called_ = false;
auto_response_value_ = true;
}
MockRtmpServer::~MockRtmpServer()
{
srs_freep(start_play_error_);
srs_freep(start_publish_error_);
srs_freep(recv_err_);
srs_freep(cond_);
srs_freep(decode_message_error_);
srs_freep(decode_message_packet_);
srs_freep(fmle_unpublish_error_);
for (vector<SrsRtmpCommonMessage *>::iterator it = recv_msgs_.begin(); it != recv_msgs_.end(); ++it) {
SrsRtmpCommonMessage *msg = *it;
srs_freep(msg);
}
recv_msgs_.clear();
}
void MockRtmpServer::set_recv_timeout(srs_utime_t tm)
{
}
void MockRtmpServer::set_send_timeout(srs_utime_t tm)
{
}
srs_error_t MockRtmpServer::handshake()
{
return srs_success;
}
srs_error_t MockRtmpServer::connect_app(ISrsRequest *req)
{
req->ip_ = ip_;
req->vhost_ = vhost_;
req->app_ = app_;
req->tcUrl_ = tcUrl_;
req->schema_ = schema_;
req->host_ = host_;
req->port_ = port_;
return srs_success;
}
uint32_t MockRtmpServer::proxy_real_ip()
{
return 0;
}
srs_error_t MockRtmpServer::set_window_ack_size(int ack_size)
{
return srs_success;
}
srs_error_t MockRtmpServer::set_in_window_ack_size(int ack_size)
{
return srs_success;
}
srs_error_t MockRtmpServer::set_peer_bandwidth(int bandwidth, int type)
{
return srs_success;
}
srs_error_t MockRtmpServer::set_chunk_size(int chunk_size)
{
return srs_success;
}
srs_error_t MockRtmpServer::response_connect_app(ISrsRequest *req, const char *server_ip)
{
return srs_success;
}
srs_error_t MockRtmpServer::on_bw_done()
{
return srs_success;
}
srs_error_t MockRtmpServer::identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration)
{
type = type_;
stream_name = stream_;
duration = duration_;
return srs_success;
}
srs_error_t MockRtmpServer::start_play(int stream_id)
{
start_play_count_++;
return srs_error_copy(start_play_error_);
}
srs_error_t MockRtmpServer::start_fmle_publish(int stream_id)
{
start_fmle_publish_count_++;
return srs_error_copy(start_publish_error_);
}
srs_error_t MockRtmpServer::start_haivision_publish(int stream_id)
{
start_haivision_publish_count_++;
return srs_success;
}
srs_error_t MockRtmpServer::fmle_unpublish(int stream_id, double unpublish_tid)
{
fmle_unpublish_count_++;
return srs_error_copy(fmle_unpublish_error_);
}
srs_error_t MockRtmpServer::start_flash_publish(int stream_id)
{
start_flash_publish_count_++;
return srs_success;
}
srs_error_t MockRtmpServer::start_publishing(int stream_id)
{
return srs_success;
}
srs_error_t MockRtmpServer::redirect(ISrsRequest *r, std::string url, bool &accepted)
{
return srs_success;
}
srs_error_t MockRtmpServer::send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id)
{
nb_sent_messages_ += nb_msgs;
return srs_success;
}
srs_error_t MockRtmpServer::decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket)
{
decode_message_count_++;
if (decode_message_error_ != srs_success) {
return srs_error_copy(decode_message_error_);
}
*ppacket = decode_message_packet_;
decode_message_packet_ = NULL; // Transfer ownership
return srs_success;
}
srs_error_t MockRtmpServer::send_and_free_packet(SrsRtmpCommand *packet, int stream_id)
{
send_and_free_packet_count_++;
srs_freep(packet);
return srs_success;
}
srs_error_t MockRtmpServer::on_play_client_pause(int stream_id, bool is_pause)
{
on_play_client_pause_count_++;
last_pause_state_ = is_pause;
return srs_success;
}
srs_error_t MockRtmpServer::recv_message(SrsRtmpCommonMessage **pmsg)
{
// No message received during playing util get control event.
cond_->wait();
if (!recv_msgs_.empty()) {
*pmsg = recv_msgs_.front();
recv_msgs_.erase(recv_msgs_.begin());
}
return srs_error_copy(recv_err_);
}
void MockRtmpServer::set_merge_read(bool v, IMergeReadHandler *handler)
{
}
void MockRtmpServer::set_recv_buffer(int buffer_size)
{
}
void MockRtmpServer::set_auto_response(bool v)
{
set_auto_response_called_ = true;
auto_response_value_ = v;
}
void MockRtmpServer::reset()
{
srs_freep(decode_message_error_);
srs_freep(decode_message_packet_);
srs_freep(fmle_unpublish_error_);
decode_message_error_ = srs_success;
decode_message_packet_ = NULL;
decode_message_count_ = 0;
fmle_unpublish_error_ = srs_success;
fmle_unpublish_count_ = 0;
send_and_free_packet_count_ = 0;
on_play_client_pause_count_ = 0;
last_pause_state_ = false;
set_auto_response_called_ = false;
auto_response_value_ = true;
}
MockRtmpTransport::MockRtmpTransport()
{
}
MockRtmpTransport::~MockRtmpTransport()
{
}
srs_netfd_t MockRtmpTransport::fd()
{
return NULL;
}
int MockRtmpTransport::osfd()
{
return -1;
}
ISrsProtocolReadWriter *MockRtmpTransport::io()
{
return this;
}
srs_error_t MockRtmpTransport::handshake()
{
return srs_success;
}
const char *MockRtmpTransport::transport_type()
{
return "mock";
}
srs_error_t MockRtmpTransport::set_socket_buffer(srs_utime_t buffer_v)
{
return srs_success;
}
srs_error_t MockRtmpTransport::set_tcp_nodelay(bool v)
{
return srs_success;
}
int64_t MockRtmpTransport::get_recv_bytes()
{
return 0;
}
int64_t MockRtmpTransport::get_send_bytes()
{
return 0;
}
srs_error_t MockRtmpTransport::read(void *buf, size_t size, ssize_t *nread)
{
return srs_success;
}
srs_error_t MockRtmpTransport::read_fully(void *buf, size_t size, ssize_t *nread)
{
return srs_success;
}
void MockRtmpTransport::set_recv_timeout(srs_utime_t tm)
{
}
srs_utime_t MockRtmpTransport::get_recv_timeout()
{
return 0;
}
srs_error_t MockRtmpTransport::write(void *buf, size_t size, ssize_t *nwrite)
{
return srs_success;
}
void MockRtmpTransport::set_send_timeout(srs_utime_t tm)
{
}
srs_utime_t MockRtmpTransport::get_send_timeout()
{
return 0;
}
srs_error_t MockRtmpTransport::writev(const iovec *iov, int iov_size, ssize_t *nwrite)
{
return srs_success;
}

View File

@ -35,6 +35,11 @@
#ifdef SRS_GB28181
#include <srs_app_gb28181.hpp>
#endif
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_rtmp_source.hpp>
#include <srs_app_security.hpp>
#include <srs_app_srt_source.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_protocol_utility.hpp>
// Forward declarations
@ -47,6 +52,7 @@ class MockSrsFileReader;
class MockSrtCoroutine;
class ISrsGbSession;
class ISrsProtocolUtility;
class SrsCond;
// Mock SDP factory for creating test SDP offers/answers
class MockSdpFactory
@ -255,7 +261,9 @@ public:
bool resolve_api_domain_;
bool keep_api_domain_;
int mw_msgs_;
srs_utime_t mw_sleep_;
std::string rtc_dtls_role_;
SrsConfDirective *default_vhost_;
public:
MockAppConfig();
@ -356,7 +364,7 @@ public:
virtual int get_dying_pulse() { return 0; }
virtual std::string get_rtmps_ssl_cert() { return ""; }
virtual std::string get_rtmps_ssl_key() { return ""; }
virtual SrsConfDirective *get_vhost(std::string vhost, bool try_default_vhost = true) { return NULL; }
virtual SrsConfDirective *get_vhost(std::string vhost, bool try_default_vhost = true) { return default_vhost_; }
virtual bool get_vhost_enabled(std::string vhost) { return true; }
virtual bool get_debug_srs_upnode(std::string vhost) { return true; }
virtual int get_out_ack_size(std::string vhost) { return 2500000; }
@ -365,7 +373,7 @@ public:
virtual bool get_gop_cache(std::string vhost) { return true; }
virtual int get_gop_cache_max_frames(std::string vhost) { return 2500; }
virtual bool get_tcp_nodelay(std::string vhost) { return false; }
virtual srs_utime_t get_mw_sleep(std::string vhost, bool is_rtc = false) { return 350 * SRS_UTIME_MILLISECONDS; }
virtual srs_utime_t get_mw_sleep(std::string vhost, bool is_rtc = false) { return mw_sleep_; }
virtual srs_utime_t get_send_min_interval(std::string vhost) { return 0; }
virtual bool get_mr_enabled(std::string vhost) { return false; }
virtual srs_utime_t get_mr_sleep(std::string vhost) { return 350 * SRS_UTIME_MILLISECONDS; }
@ -536,6 +544,8 @@ public:
void set_api_as_candidates(bool enabled);
void set_resolve_api_domain(bool enabled);
void set_keep_api_domain(bool enabled);
virtual bool get_security_enabled(std::string vhost);
virtual SrsConfDirective *get_security_rules(std::string vhost);
};
// Mock RTC packet receiver for testing SrsRtcPublishStream
@ -571,4 +581,209 @@ public:
void reset();
};
// Mock ISrsSecurity for testing
class MockSecurity : public ISrsSecurity
{
public:
srs_error_t check_error_;
int check_count_;
public:
MockSecurity();
virtual ~MockSecurity();
public:
virtual srs_error_t check(SrsRtmpConnType type, std::string ip, ISrsRequest *req);
};
// Mock live source manager for testing SrsRtcPublishStream
class MockLiveSourceManager : public ISrsLiveSourceManager
{
public:
srs_error_t fetch_or_create_error_;
int fetch_or_create_count_;
SrsSharedPtr<SrsLiveSource> mock_source_;
bool can_publish_;
public:
MockLiveSourceManager();
virtual ~MockLiveSourceManager();
virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsLiveSource> &pps);
virtual SrsSharedPtr<SrsLiveSource> fetch(ISrsRequest *r);
virtual void dispose();
virtual srs_error_t initialize();
void set_fetch_or_create_error(srs_error_t err);
void set_can_publish(bool can_publish);
void reset();
};
// Mock live source for testing SrsRtcPublishStream
class MockLiveSource : public SrsLiveSource
{
public:
bool can_publish_result_;
int on_audio_count_;
int on_video_count_;
public:
MockLiveSource();
virtual ~MockLiveSource();
virtual bool can_publish(bool is_edge);
void set_can_publish(bool can_publish);
virtual srs_error_t on_publish();
virtual srs_error_t on_edge_start_publish();
public:
virtual srs_error_t on_audio(SrsRtmpCommonMessage *audio);
virtual srs_error_t on_video(SrsRtmpCommonMessage *video);
};
// Mock SRT source for testing SrsRtcPublishStream
class MockSrtSource : public SrsSrtSource
{
public:
bool can_publish_result_;
public:
MockSrtSource();
virtual ~MockSrtSource();
virtual bool can_publish();
void set_can_publish(bool can_publish);
};
// Mock SRT source manager for testing SrsRtcPublishStream
class MockSrtSourceManager : public ISrsSrtSourceManager
{
public:
srs_error_t initialize_error_;
srs_error_t fetch_or_create_error_;
int initialize_count_;
int fetch_or_create_count_;
SrsSharedPtr<SrsSrtSource> mock_source_;
bool can_publish_;
public:
MockSrtSourceManager();
virtual ~MockSrtSourceManager();
virtual srs_error_t initialize();
virtual srs_error_t fetch_or_create(ISrsRequest *r, SrsSharedPtr<SrsSrtSource> &pps);
virtual SrsSharedPtr<SrsSrtSource> fetch(ISrsRequest *r);
void set_initialize_error(srs_error_t err);
void set_fetch_or_create_error(srs_error_t err);
void set_can_publish(bool can_publish);
void reset();
};
class MockRtmpServer : public ISrsRtmpServer
{
public:
SrsRtmpConnType type_;
std::string host_;
std::string ip_;
std::string vhost_;
std::string app_;
std::string stream_;
std::string tcUrl_;
std::string schema_;
int port_;
srs_utime_t duration_;
public:
srs_error_t recv_err_;
std::vector<SrsRtmpCommonMessage *> recv_msgs_;
SrsCond *cond_;
public:
int nb_sent_messages_;
srs_error_t start_play_error_;
int start_play_count_;
srs_error_t start_publish_error_;
int start_fmle_publish_count_;
int start_flash_publish_count_;
int start_haivision_publish_count_;
public:
// Fields for handle_publish_message testing
srs_error_t decode_message_error_;
SrsRtmpCommand *decode_message_packet_;
int decode_message_count_;
srs_error_t fmle_unpublish_error_;
int fmle_unpublish_count_;
public:
// Fields for process_play_control_msg testing
int send_and_free_packet_count_;
int on_play_client_pause_count_;
bool last_pause_state_;
public:
// Fields for set_auto_response testing
bool set_auto_response_called_;
bool auto_response_value_;
public:
MockRtmpServer();
virtual ~MockRtmpServer();
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_error_t handshake();
virtual srs_error_t connect_app(ISrsRequest *req);
virtual uint32_t proxy_real_ip();
virtual srs_error_t set_window_ack_size(int ack_size);
virtual srs_error_t set_peer_bandwidth(int bandwidth, int type);
virtual srs_error_t set_chunk_size(int chunk_size);
virtual srs_error_t response_connect_app(ISrsRequest *req, const char *server_ip = NULL);
virtual srs_error_t on_bw_done();
virtual srs_error_t identify_client(int stream_id, SrsRtmpConnType &type, std::string &stream_name, srs_utime_t &duration);
virtual srs_error_t start_play(int stream_id);
virtual srs_error_t start_fmle_publish(int stream_id);
virtual srs_error_t start_haivision_publish(int stream_id);
virtual srs_error_t fmle_unpublish(int stream_id, double unpublish_tid);
virtual srs_error_t start_flash_publish(int stream_id);
virtual srs_error_t start_publishing(int stream_id);
virtual srs_error_t redirect(ISrsRequest *r, std::string url, bool &accepted);
virtual srs_error_t send_and_free_messages(SrsMediaPacket **msgs, int nb_msgs, int stream_id);
virtual srs_error_t decode_message(SrsRtmpCommonMessage *msg, SrsRtmpCommand **ppacket);
virtual srs_error_t send_and_free_packet(SrsRtmpCommand *packet, int stream_id);
virtual srs_error_t on_play_client_pause(int stream_id, bool is_pause);
virtual srs_error_t set_in_window_ack_size(int ack_size);
virtual srs_error_t recv_message(SrsRtmpCommonMessage **pmsg);
virtual void set_auto_response(bool v);
virtual void set_merge_read(bool v, IMergeReadHandler *handler);
virtual void set_recv_buffer(int buffer_size);
public:
void reset();
};
class MockRtmpTransport : public ISrsRtmpTransport, public ISrsProtocolReadWriter
{
public:
MockRtmpTransport();
virtual ~MockRtmpTransport();
public:
virtual srs_netfd_t fd();
virtual int osfd();
virtual ISrsProtocolReadWriter *io();
virtual srs_error_t handshake();
virtual const char *transport_type();
virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v);
virtual srs_error_t set_tcp_nodelay(bool v);
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
public:
virtual srs_error_t read(void *buf, size_t size, ssize_t *nread);
virtual srs_error_t read_fully(void *buf, size_t size, ssize_t *nread);
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual srs_error_t write(void *buf, size_t size, ssize_t *nwrite);
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t *nwrite);
};
#endif

View File

@ -0,0 +1,387 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2025 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_utest_rtmp_conn.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_protocol_conn.hpp>
#include <srs_protocol_io.hpp>
#include <srs_protocol_rtmp_stack.hpp>
#include <srs_protocol_st.hpp>
#include <srs_utest_app10.hpp>
#include <srs_utest_app13.hpp>
#include <srs_utest_app16.hpp>
#include <srs_utest_app6.hpp>
#include <srs_utest_mock.hpp>
#include <srs_utest_service.hpp>
#include <sys/socket.h>
#include <unistd.h>
// This test is used to verify the basic workflow of the RTMP connection.
// It's finished with the help of AI, but each step is manually designed
// and verified. So this is not dominated by AI, but by humanbeing.
VOID TEST(RtmpConnTest, ManuallyVerifyBasicWorkflowForPublisher)
{
srs_error_t err;
// Mock all interface dependencies
SrsUniquePtr<MockAppConfig> mock_config(new MockAppConfig());
SrsUniquePtr<MockConnectionManager> mock_manager(new MockConnectionManager());
SrsUniquePtr<MockLiveSourceManager> mock_sources(new MockLiveSourceManager());
SrsUniquePtr<MockStreamPublishTokenManager> mock_tokens(new MockStreamPublishTokenManager());
SrsUniquePtr<MockRtcStatistic> mock_stat(new MockRtcStatistic());
SrsUniquePtr<MockHttpHooks> mock_hooks(new MockHttpHooks());
SrsUniquePtr<MockRtcSourceManager> mock_rtc_sources(new MockRtcSourceManager());
SrsUniquePtr<MockSrtSourceManager> mock_srt_sources(new MockSrtSourceManager());
#ifdef SRS_RTSP
SrsUniquePtr<MockRtspSourceManager> mock_rtsp_sources(new MockRtspSourceManager());
#endif
MockRtmpServer *mock_rtmp_server = new MockRtmpServer();
MockSecurity *mock_security = new MockSecurity();
mock_config->default_vhost_ = new SrsConfDirective();
mock_config->default_vhost_->name_ = "vhost";
mock_config->default_vhost_->args_.push_back("__defaultVhost__");
mock_config->mw_msgs_ = 0; // Handle each RTMP message, no merging write.
mock_config->mw_sleep_ = 0; // Handle each RTMP message, no merging write.
mock_rtmp_server->type_ = SrsRtmpConnFMLEPublish;
mock_rtmp_server->stream_ = "livestream";
mock_rtmp_server->ip_ = "192.168.1.100";
mock_rtmp_server->vhost_ = "utest.ossrs.io";
mock_rtmp_server->app_ = "utest";
mock_rtmp_server->stream_ = "livestream";
mock_rtmp_server->tcUrl_ = "rtmp://127.0.0.1/utest";
mock_rtmp_server->schema_ = "rtmp";
mock_rtmp_server->port_ = 1935;
mock_rtmp_server->host_ = "127.0.0.1";
// Create SrsRtmpConn - it takes ownership of transport
ISrsRtmpTransport *transport = new MockRtmpTransport();
SrsUniquePtr<SrsRtmpConn> conn(new SrsRtmpConn(transport, "192.168.1.100", 1935));
conn->config_ = mock_config.get();
conn->manager_ = mock_manager.get();
conn->live_sources_ = mock_sources.get();
conn->stream_publish_tokens_ = mock_tokens.get();
conn->stat_ = mock_stat.get();
conn->hooks_ = mock_hooks.get();
conn->rtc_sources_ = mock_rtc_sources.get();
conn->srt_sources_ = mock_srt_sources.get();
#ifdef SRS_RTSP
conn->rtsp_sources_ = mock_rtsp_sources.get();
#endif
srs_freep(conn->rtmp_);
conn->rtmp_ = mock_rtmp_server;
srs_freep(conn->security_);
conn->security_ = mock_security;
// Start the RTMP connection.
if (true) {
// Mock the client type to be a player
HELPER_EXPECT_SUCCESS(conn->start());
// Wait for coroutine to start.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Verify the req should be parsed.
ISrsRequest *req = conn->info_->req_;
EXPECT_STREQ("192.168.1.100", req->ip_.c_str());
EXPECT_STREQ("rtmp://127.0.0.1/utest", req->tcUrl_.c_str());
EXPECT_STREQ("rtmp", req->schema_.c_str());
EXPECT_STREQ("__defaultVhost__", req->vhost_.c_str());
EXPECT_STREQ("127.0.0.1", req->host_.c_str());
EXPECT_EQ(1935, req->port_);
EXPECT_STREQ("utest", req->app_.c_str());
EXPECT_STREQ("livestream", req->stream_.c_str());
EXPECT_EQ(0, req->duration_);
EXPECT_TRUE(NULL == req->args_);
EXPECT_STREQ("rtmp", req->protocol_.c_str());
EXPECT_FALSE(conn->info_->edge_);
}
// Create an RTMP audio message to feed consumer.
MockLiveSource *mock_source = dynamic_cast<MockLiveSource *>(mock_sources->mock_source_.get());
if (true) {
// Create a real AAC audio message with proper format.
// AAC audio format in RTMP/FLV:
// Byte 0: (SoundFormat << 4) | (SoundRate << 2) | (SoundSize << 1) | SoundType
// SoundFormat=10 (AAC), SoundRate=3 (44kHz), SoundSize=1 (16-bit), SoundType=1 (stereo)
// = 0xAF
// Byte 1: AACPacketType (0=sequence header, 1=raw data)
// Remaining bytes: AAC data
int payload_size = 10;
SrsRtmpCommonMessage *msg = new SrsRtmpCommonMessage();
msg->header_.initialize_audio(payload_size, 0, 1);
msg->create_payload(payload_size);
// Fill in AAC audio data
SrsBuffer stream(msg->payload(), payload_size);
// Audio format byte: AAC(10), 44kHz(3), 16-bit(1), stereo(1) = 0xAF
stream.write_1bytes(0xAF);
// AAC packet type: 1 = AAC raw data
stream.write_1bytes(0x01);
// AAC raw data (8 bytes of dummy audio data)
for (int i = 0; i < 8; i++) {
stream.write_1bytes(0x00);
}
// Feed audio to rtmp server.
mock_rtmp_server->recv_msgs_.push_back(msg);
mock_rtmp_server->cond_->signal();
// Wait for consumer to process the message.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Verify that the message is sent to the client.
EXPECT_EQ(1, mock_source->on_audio_count_);
}
// Create an RTMP video message to feed consumer.
if (true) {
// Create a real H.264 video message with proper format.
// H.264 video format in RTMP/FLV:
// Byte 0: (FrameType << 4) | CodecID (CodecID=7 for H.264)
// FrameType=1 (key frame), CodecID=7 (H.264) = 0x17
// Byte 1: AVCPacketType (0=sequence header, 1=NALU, 2=end of sequence)
// Byte 2-4: CompositionTime (3bytes little-endian int24)
// Remaining bytes: H.264 data
int payload_size = 10;
SrsRtmpCommonMessage *msg = new SrsRtmpCommonMessage();
msg->header_.initialize_video(payload_size, 0, 1);
msg->create_payload(payload_size);
// Fill in H.264 video data
SrsBuffer stream(msg->payload(), payload_size);
// Frame type & Codec ID: Key frame (1) + H.264 (7) = 0x17
stream.write_1bytes(0x17);
// AVC packet type: 1 = NALU
stream.write_1bytes(0x01);
// Composition time: 0 (3bytes little-endian int24)
stream.write_3bytes(0x000000);
// H.264 raw data (5 bytes of dummy video data)
for (int i = 0; i < 5; i++) {
stream.write_1bytes(0x00);
}
// Feed audio to rtmp server.
mock_rtmp_server->recv_msgs_.push_back(msg);
mock_rtmp_server->cond_->signal();
// Wait for consumer to process the message.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Verify that the message is sent to the client.
EXPECT_EQ(1, mock_source->on_video_count_);
}
// Simulate client quit event, the receive thread will get this error.
if (true) {
mock_rtmp_server->recv_err_ = srs_error_new(ERROR_SOCKET_READ, "mock client quit");
mock_rtmp_server->cond_->signal();
// Wait for coroutine to stop.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
}
// Stop the RTMP connection.
conn->stop();
}
// This test is used to verify the basic workflow of the RTMP connection.
// It's finished with the help of AI, but each step is manually designed
// and verified. So this is not dominated by AI, but by humanbeing.
VOID TEST(RtmpConnTest, ManuallyVerifyBasicWorkflowForPlayer)
{
srs_error_t err;
// Mock all interface dependencies
SrsUniquePtr<MockAppConfig> mock_config(new MockAppConfig());
SrsUniquePtr<MockConnectionManager> mock_manager(new MockConnectionManager());
SrsUniquePtr<MockLiveSourceManager> mock_sources(new MockLiveSourceManager());
SrsUniquePtr<MockStreamPublishTokenManager> mock_tokens(new MockStreamPublishTokenManager());
SrsUniquePtr<MockRtcStatistic> mock_stat(new MockRtcStatistic());
SrsUniquePtr<MockHttpHooks> mock_hooks(new MockHttpHooks());
SrsUniquePtr<MockRtcSourceManager> mock_rtc_sources(new MockRtcSourceManager());
SrsUniquePtr<MockSrtSourceManager> mock_srt_sources(new MockSrtSourceManager());
#ifdef SRS_RTSP
SrsUniquePtr<MockRtspSourceManager> mock_rtsp_sources(new MockRtspSourceManager());
#endif
MockRtmpServer *mock_rtmp_server = new MockRtmpServer();
MockSecurity *mock_security = new MockSecurity();
mock_config->default_vhost_ = new SrsConfDirective();
mock_config->default_vhost_->name_ = "vhost";
mock_config->default_vhost_->args_.push_back("__defaultVhost__");
mock_config->mw_msgs_ = 0; // Handle each RTMP message, no merging write.
mock_config->mw_sleep_ = 0; // Handle each RTMP message, no merging write.
mock_rtmp_server->type_ = SrsRtmpConnPlay;
mock_rtmp_server->ip_ = "192.168.1.100";
mock_rtmp_server->vhost_ = "utest.ossrs.io";
mock_rtmp_server->app_ = "utest";
mock_rtmp_server->stream_ = "livestream";
mock_rtmp_server->tcUrl_ = "rtmp://127.0.0.1/utest";
mock_rtmp_server->schema_ = "rtmp";
mock_rtmp_server->port_ = 1935;
mock_rtmp_server->host_ = "127.0.0.1";
// Create SrsRtmpConn - it takes ownership of transport
ISrsRtmpTransport *transport = new MockRtmpTransport();
SrsUniquePtr<SrsRtmpConn> conn(new SrsRtmpConn(transport, "192.168.1.100", 1935));
conn->config_ = mock_config.get();
conn->manager_ = mock_manager.get();
conn->live_sources_ = mock_sources.get();
conn->stream_publish_tokens_ = mock_tokens.get();
conn->stat_ = mock_stat.get();
conn->hooks_ = mock_hooks.get();
conn->rtc_sources_ = mock_rtc_sources.get();
conn->srt_sources_ = mock_srt_sources.get();
#ifdef SRS_RTSP
conn->rtsp_sources_ = mock_rtsp_sources.get();
#endif
srs_freep(conn->rtmp_);
conn->rtmp_ = mock_rtmp_server;
srs_freep(conn->security_);
conn->security_ = mock_security;
// Start the RTMP connection.
if (true) {
// Mock the client type to be a player
HELPER_EXPECT_SUCCESS(conn->start());
// Wait for coroutine to start.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Verify the req should be parsed.
ISrsRequest *req = conn->info_->req_;
EXPECT_STREQ("192.168.1.100", req->ip_.c_str());
EXPECT_STREQ("rtmp://127.0.0.1/utest", req->tcUrl_.c_str());
EXPECT_STREQ("rtmp", req->schema_.c_str());
EXPECT_STREQ("__defaultVhost__", req->vhost_.c_str());
EXPECT_STREQ("127.0.0.1", req->host_.c_str());
EXPECT_EQ(1935, req->port_);
EXPECT_STREQ("utest", req->app_.c_str());
EXPECT_STREQ("livestream", req->stream_.c_str());
EXPECT_EQ(0, req->duration_);
EXPECT_TRUE(NULL == req->args_);
EXPECT_STREQ("rtmp", req->protocol_.c_str());
EXPECT_FALSE(conn->info_->edge_);
}
// Create an RTMP audio message to feed consumer.
if (true) {
// Create a real AAC audio message with proper format.
// AAC audio format in RTMP/FLV:
// Byte 0: (SoundFormat << 4) | (SoundRate << 2) | (SoundSize << 1) | SoundType
// SoundFormat=10 (AAC), SoundRate=3 (44kHz), SoundSize=1 (16-bit), SoundType=1 (stereo)
// = 0xAF
// Byte 1: AACPacketType (0=sequence header, 1=raw data)
// Remaining bytes: AAC data
int payload_size = 10;
SrsRtmpCommonMessage *msg = new SrsRtmpCommonMessage();
msg->header_.initialize_audio(payload_size, 0, 1);
msg->create_payload(payload_size);
// Fill in AAC audio data
SrsBuffer stream(msg->payload(), payload_size);
// Audio format byte: AAC(10), 44kHz(3), 16-bit(1), stereo(1) = 0xAF
stream.write_1bytes(0xAF);
// AAC packet type: 1 = AAC raw data
stream.write_1bytes(0x01);
// AAC raw data (8 bytes of dummy audio data)
for (int i = 0; i < 8; i++) {
stream.write_1bytes(0x00);
}
// Feed audio to source.
SrsLiveSource *source = mock_sources->mock_source_.get();
HELPER_EXPECT_SUCCESS(source->on_audio(msg));
// Wait for consumer to process the message.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Sleep again because player coroutine yield control, so we need to wait for
// it to run again.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Verify that the message is sent to the client.
EXPECT_EQ(1, mock_rtmp_server->nb_sent_messages_);
}
// Create an RTMP video message to feed consumer.
if (true) {
// Create a real H.264 video message with proper format.
// H.264 video format in RTMP/FLV:
// Byte 0: (FrameType << 4) | CodecID (CodecID=7 for H.264)
// FrameType=1 (key frame), CodecID=7 (H.264) = 0x17
// Byte 1: AVCPacketType (0=sequence header, 1=NALU, 2=end of sequence)
// Byte 2-4: CompositionTime (3bytes little-endian int24)
// Remaining bytes: H.264 data
int payload_size = 10;
SrsRtmpCommonMessage *msg = new SrsRtmpCommonMessage();
msg->header_.initialize_video(payload_size, 0, 1);
msg->create_payload(payload_size);
// Fill in H.264 video data
SrsBuffer stream(msg->payload(), payload_size);
// Frame type & Codec ID: Key frame (1) + H.264 (7) = 0x17
stream.write_1bytes(0x17);
// AVC packet type: 1 = NALU
stream.write_1bytes(0x01);
// Composition time: 0 (3bytes little-endian int24)
stream.write_3bytes(0x000000);
// H.264 raw data (5 bytes of dummy video data)
for (int i = 0; i < 5; i++) {
stream.write_1bytes(0x00);
}
// Feed video to source.
SrsLiveSource *source = mock_sources->mock_source_.get();
HELPER_EXPECT_SUCCESS(source->on_video(msg));
// Wait for consumer to process the message.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Sleep again because player coroutine yield control, so we need to wait for
// it to run again.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// Verify that the message is sent to the client.
EXPECT_EQ(2, mock_rtmp_server->nb_sent_messages_);
}
// Simulate client quit event, the receive thread will get this error.
if (true) {
mock_rtmp_server->recv_err_ = srs_error_new(ERROR_SOCKET_READ, "mock client quit");
mock_rtmp_server->cond_->signal();
// Wait for coroutine to stop.
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
}
// Stop the RTMP connection.
conn->stop();
}

View File

@ -0,0 +1,29 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2025 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_UTEST_RTMP_CONN_HPP
#define SRS_UTEST_RTMP_CONN_HPP
#include <srs_utest.hpp>
#endif