AI: Add utest to cover srt module.

This commit is contained in:
OSSRS-AI 2025-10-06 21:04:29 -04:00 committed by winlin
parent 1509fde2da
commit 809d77b662
6 changed files with 1776 additions and 26 deletions

2
trunk/configure vendored
View File

@ -384,7 +384,7 @@ if [[ $SRS_UTEST == YES ]]; then
"srs_utest_coworkers" "srs_utest_pithy_print" "srs_utest_kernel3" "srs_utest_protocol4"
"srs_utest_protocol3" "srs_utest_app" "srs_utest_app2" "srs_utest_app3" "srs_utest_app4"
"srs_utest_app5" "srs_utest_app6" "srs_utest_app7" "srs_utest_app8" "srs_utest_app9"
"srs_utest_app10" "srs_utest_app11")
"srs_utest_app10" "srs_utest_app11" "srs_utest_app12")
# Always include SRT utest
MODULE_FILES+=("srs_utest_srt")
if [[ $SRS_GB28181 == YES ]]; then

View File

@ -524,13 +524,14 @@ srs_error_t SrsMpegtsSrtConn::do_playing()
{
srs_error_t err = srs_success;
SrsSrtConsumer *consumer_raw = NULL;
ISrsSrtConsumer *consumer_raw = NULL;
if ((err = srt_source_->create_consumer(consumer_raw)) != srs_success) {
return srs_error_wrap(err, "create consumer, ts source=%s", req_->get_stream_url().c_str());
}
srs_assert(consumer_raw);
SrsUniquePtr<SrsSrtConsumer> consumer(consumer_raw);
SrsUniquePtr<ISrsSrtConsumer> consumer(consumer_raw);
SrsSrtConsumer *consumer_impl = dynamic_cast<SrsSrtConsumer *>(consumer_raw);
// TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames.
if ((err = srt_source_->consumer_dumps(consumer.get())) != srs_success) {

View File

@ -220,7 +220,15 @@ SrsSharedPtr<SrsSrtSource> SrsSrtSourceManager::fetch(ISrsRequest *r)
SrsSrtSourceManager *_srs_srt_sources = NULL;
SrsSrtConsumer::SrsSrtConsumer(SrsSrtSource *s)
ISrsSrtConsumer::ISrsSrtConsumer()
{
}
ISrsSrtConsumer::~ISrsSrtConsumer()
{
}
SrsSrtConsumer::SrsSrtConsumer(ISrsSrtSource *s)
{
source_ = s;
should_update_source_id_ = false;
@ -916,12 +924,22 @@ srs_error_t SrsSrtFrameBuilder::on_aac_frame(SrsTsMessage *msg, uint32_t pts, ch
return err;
}
ISrsSrtSource::ISrsSrtSource()
{
}
ISrsSrtSource::~ISrsSrtSource()
{
}
SrsSrtSource::SrsSrtSource()
{
req_ = NULL;
can_publish_ = true;
srt_bridge_ = NULL;
stream_die_at_ = 0;
stat_ = _srs_stat;
}
SrsSrtSource::~SrsSrtSource()
@ -937,6 +955,8 @@ SrsSrtSource::~SrsSrtSource()
if (cid.empty())
cid = _pre_source_id;
srs_trace("free srt source id=[%s]", cid.c_str());
stat_ = NULL;
}
// CRITICAL: This method is called AFTER the source has been added to the source pool
@ -995,10 +1015,13 @@ srs_error_t SrsSrtSource::on_source_id_changed(SrsContextId id)
_source_id = id;
// notice all consumer
std::vector<SrsSrtConsumer *>::iterator it;
std::vector<ISrsSrtConsumer *>::iterator it;
for (it = consumers_.begin(); it != consumers_.end(); ++it) {
SrsSrtConsumer *consumer = *it;
consumer->update_source_id();
ISrsSrtConsumer *consumer = *it;
SrsSrtConsumer *consumer_impl = dynamic_cast<SrsSrtConsumer *>(consumer);
if (consumer_impl) {
consumer_impl->update_source_id();
}
}
return err;
@ -1025,7 +1048,7 @@ void SrsSrtSource::set_bridge(ISrsSrtBridge *bridge)
srt_bridge_ = bridge;
}
srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer *&consumer)
srs_error_t SrsSrtSource::create_consumer(ISrsSrtConsumer *&consumer)
{
srs_error_t err = srs_success;
@ -1037,7 +1060,7 @@ srs_error_t SrsSrtSource::create_consumer(SrsSrtConsumer *&consumer)
return err;
}
srs_error_t SrsSrtSource::consumer_dumps(SrsSrtConsumer *consumer)
srs_error_t SrsSrtSource::consumer_dumps(ISrsSrtConsumer *consumer)
{
srs_error_t err = srs_success;
@ -1047,9 +1070,9 @@ srs_error_t SrsSrtSource::consumer_dumps(SrsSrtConsumer *consumer)
return err;
}
void SrsSrtSource::on_consumer_destroy(SrsSrtConsumer *consumer)
void SrsSrtSource::on_consumer_destroy(ISrsSrtConsumer *consumer)
{
std::vector<SrsSrtConsumer *>::iterator it;
std::vector<ISrsSrtConsumer *>::iterator it;
it = std::find(consumers_.begin(), consumers_.end(), consumer);
if (it != consumers_.end()) {
it = consumers_.erase(it);
@ -1080,8 +1103,7 @@ srs_error_t SrsSrtSource::on_publish()
return srs_error_wrap(err, "bridge on publish");
}
SrsStatistic *stat = _srs_stat;
stat->on_stream_publish(req_, _source_id.c_str());
stat_->on_stream_publish(req_, _source_id.c_str());
return err;
}
@ -1093,8 +1115,7 @@ void SrsSrtSource::on_unpublish()
return;
}
SrsStatistic *stat = _srs_stat;
stat->on_stream_close(req_);
stat_->on_stream_close(req_);
if (srt_bridge_) {
srt_bridge_->on_unpublish();
@ -1115,7 +1136,7 @@ srs_error_t SrsSrtSource::on_packet(SrsSrtPacket *packet)
srs_error_t err = srs_success;
for (int i = 0; i < (int)consumers_.size(); i++) {
SrsSrtConsumer *consumer = consumers_.at(i);
ISrsSrtConsumer *consumer = consumers_.at(i);
if ((err = consumer->enqueue(packet->copy())) != srs_success) {
return srs_error_wrap(err, "consume ts packet");
}

View File

@ -24,6 +24,9 @@ class SrsLiveSource;
class SrsSrtSource;
class SrsAlonePithyPrint;
class SrsSrtFrameBuilder;
class ISrsStatistic;
class ISrsSrtConsumer;
class ISrsSrtSource;
// The SRT packet with shared message.
class SrsSrtPacket
@ -97,15 +100,29 @@ public:
// Global singleton instance.
extern SrsSrtSourceManager *_srs_srt_sources;
class SrsSrtConsumer
// The SRT consumer interface.
class ISrsSrtConsumer
{
public:
SrsSrtConsumer(SrsSrtSource *source);
virtual ~SrsSrtConsumer();
ISrsSrtConsumer();
virtual ~ISrsSrtConsumer();
public:
virtual srs_error_t enqueue(SrsSrtPacket *packet) = 0;
virtual srs_error_t dump_packet(SrsSrtPacket **ppkt) = 0;
virtual void wait(int nb_msgs, srs_utime_t timeout) = 0;
};
// The SRT consumer, consume packets from SRT stream source.
class SrsSrtConsumer : public ISrsSrtConsumer
{
private:
// Because source references to this object, so we should directly use the source ptr.
SrsSrtSource *source_;
ISrsSrtSource *source_;
public:
SrsSrtConsumer(ISrsSrtSource *source);
virtual ~SrsSrtConsumer();
private:
std::vector<SrsSrtPacket *> queue_;
@ -185,9 +202,25 @@ private:
SrsAlonePithyPrint *pp_audio_duration_;
};
// A SRT source is a stream, to publish and to play with.
class SrsSrtSource : public ISrsSrtTarget
// The SRT source interface.
class ISrsSrtSource : public ISrsSrtTarget
{
public:
ISrsSrtSource();
virtual ~ISrsSrtSource();
public:
virtual SrsContextId source_id() = 0;
virtual SrsContextId pre_source_id() = 0;
virtual void on_consumer_destroy(ISrsSrtConsumer *consumer) = 0;
};
// A SRT source is a stream, to publish and to play with.
class SrsSrtSource : public ISrsSrtSource
{
private:
ISrsStatistic *stat_;
public:
SrsSrtSource();
virtual ~SrsSrtSource();
@ -214,10 +247,10 @@ public:
public:
// Create consumer
// @param consumer, output the create consumer.
virtual srs_error_t create_consumer(SrsSrtConsumer *&consumer);
virtual srs_error_t create_consumer(ISrsSrtConsumer *&consumer);
// Dumps packets in cache to consumer.
virtual srs_error_t consumer_dumps(SrsSrtConsumer *consumer);
virtual void on_consumer_destroy(SrsSrtConsumer *consumer);
virtual srs_error_t consumer_dumps(ISrsSrtConsumer *consumer);
virtual void on_consumer_destroy(ISrsSrtConsumer *consumer);
// Whether we can publish stream to the source, return false if it exists.
virtual bool can_publish();
// When start publish stream.
@ -235,7 +268,7 @@ private:
SrsContextId _pre_source_id;
ISrsRequest *req_;
// To delivery packets to clients.
std::vector<SrsSrtConsumer *> consumers_;
std::vector<ISrsSrtConsumer *> consumers_;
bool can_publish_;
// The last die time, while die means neither publishers nor players.
srs_utime_t stream_die_at_;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,111 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_UTEST_APP12_HPP
#define SRS_UTEST_APP12_HPP
/*
#include <srs_utest_app12.hpp>
*/
#include <srs_utest.hpp>
#include <srs_utest_app6.hpp>
#include <srs_app_stream_bridge.hpp>
#include <srs_protocol_rtmp_stack.hpp>
// Mock frame target for testing SrsSrtFrameBuilder
class MockSrtFrameTarget : public ISrsFrameTarget
{
public:
int on_frame_count_;
SrsMediaPacket *last_frame_;
srs_error_t frame_error_;
public:
MockSrtFrameTarget();
virtual ~MockSrtFrameTarget();
virtual srs_error_t on_frame(SrsMediaPacket *frame);
void reset();
void set_frame_error(srs_error_t err);
};
// Mock statistic for testing SrsSrtSource publish/unpublish
class MockSrtStatistic : public ISrsStatistic
{
public:
int on_stream_publish_count_;
int on_stream_close_count_;
std::string last_publisher_id_;
ISrsRequest *last_publish_req_;
ISrsRequest *last_close_req_;
public:
MockSrtStatistic();
virtual ~MockSrtStatistic();
virtual void on_disconnect(std::string id, srs_error_t err);
virtual srs_error_t on_client(std::string id, ISrsRequest *req, ISrsExpire *conn, SrsRtmpConnType type);
virtual srs_error_t on_video_info(ISrsRequest *req, SrsVideoCodecId vcodec, int avc_profile, int avc_level, int width, int height);
virtual srs_error_t on_audio_info(ISrsRequest *req, SrsAudioCodecId acodec, SrsAudioSampleRate asample_rate, SrsAudioChannels asound_type, SrsAacObjectType aac_object);
virtual void on_stream_publish(ISrsRequest *req, std::string publisher_id);
virtual void on_stream_close(ISrsRequest *req);
virtual void kbps_add_delta(std::string id, ISrsKbpsDelta *delta);
virtual void kbps_sample();
virtual srs_error_t on_video_frames(ISrsRequest *req, int nb_frames);
virtual std::string server_id();
virtual std::string service_id();
virtual std::string service_pid();
virtual SrsStatisticVhost *find_vhost_by_id(std::string vid);
virtual SrsStatisticStream *find_stream(std::string sid);
virtual SrsStatisticClient *find_client(std::string client_id);
virtual srs_error_t dumps_vhosts(SrsJsonArray *arr);
virtual srs_error_t dumps_streams(SrsJsonArray *arr, int start, int count);
virtual srs_error_t dumps_clients(SrsJsonArray *arr, int start, int count);
virtual srs_error_t dumps_metrics(int64_t &send_bytes, int64_t &recv_bytes, int64_t &nstreams, int64_t &nclients, int64_t &total_nclients, int64_t &nerrs);
void reset();
};
// Mock SRT bridge for testing SrsSrtSource publish/unpublish
class MockSrtBridge : public ISrsSrtBridge
{
public:
int on_publish_count_;
int on_unpublish_count_;
int on_packet_count_;
srs_error_t on_publish_error_;
srs_error_t on_packet_error_;
public:
MockSrtBridge();
virtual ~MockSrtBridge();
virtual srs_error_t initialize(ISrsRequest *r);
virtual srs_error_t on_publish();
virtual void on_unpublish();
virtual srs_error_t on_packet(SrsSrtPacket *packet);
void set_on_publish_error(srs_error_t err);
void set_on_packet_error(srs_error_t err);
void reset();
};
// Mock SRT consumer for testing SrsSrtSource on_packet
class MockSrtConsumer : public ISrsSrtConsumer
{
public:
int enqueue_count_;
srs_error_t enqueue_error_;
std::vector<SrsSrtPacket *> packets_;
public:
MockSrtConsumer();
virtual ~MockSrtConsumer();
virtual srs_error_t enqueue(SrsSrtPacket *packet);
virtual srs_error_t dump_packet(SrsSrtPacket **ppkt);
virtual void wait(int nb_msgs, srs_utime_t timeout);
void set_enqueue_error(srs_error_t err);
void reset();
};
#endif