AI: Support anonymous coroutine with code block. v7.0.80 (#4475)

This PR introduces anonymous coroutine macros for easier coroutine
creation and improves the State Threads (ST) mutex and condition
variable handling in SRS.

- **Added coroutine macros**: `SRS_COROUTINE_GO`, `SRS_COROUTINE_GO2`,
`SRS_COROUTINE_GO_CTX`, `SRS_COROUTINE_GO_CTX2`
- **Added `SrsCoroutineChan`**: Channel for sharing data between
coroutines with coroutine-safe operations
- **Simplified coroutine creation**: Go-like syntax for creating
anonymous coroutines with code blocks

---------

Co-authored-by: Jacob Su <suzp1984@gmail.com>
Co-authored-by: OSSRS-AI <winlinam@gmail.com>
This commit is contained in:
Winlin 2025-09-06 08:10:49 -04:00 committed by GitHub
parent 8f09c4186e
commit 8976ce4c8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 593 additions and 22 deletions

2
trunk/configure vendored
View File

@ -378,7 +378,7 @@ if [[ $SRS_UTEST == YES ]]; then
"srs_utest_mp4" "srs_utest_service" "srs_utest_app" "srs_utest_rtc" "srs_utest_config2"
"srs_utest_protocol" "srs_utest_protocol2" "srs_utest_kernel2" "srs_utest_protocol3"
"srs_utest_st" "srs_utest_rtc2" "srs_utest_rtc3" "srs_utest_fmp4" "srs_utest_source_lock"
"srs_utest_stream_token" "srs_utest_rtc_recv_track")
"srs_utest_stream_token" "srs_utest_rtc_recv_track" "srs_utest_st2")
# Always include SRT utest
MODULE_FILES+=("srs_utest_srt")
if [[ $SRS_GB28181 == YES ]]; then

View File

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a>
## SRS 7.0 Changelog
* v7.0, 2025-09-06, Merge [#4475](https://github.com/ossrs/srs/pull/4475): AI: Support anonymous coroutine with code block. v7.0.80 (#4475)
* v7.0, 2025-09-05, Merge [#4474](https://github.com/ossrs/srs/pull/4474): WebRTC: Fix race condition in RTC nack timer callbacks. v7.0.79 (#4474)
* v7.0, 2025-09-04, Merge [#4467](https://github.com/ossrs/srs/pull/4467): WebRTC: Fix NACK recovered packets not being added to receive queue. v7.0.78 (#4467)
* v7.0, 2025-09-03, Merge [#4469](https://github.com/ossrs/srs/pull/4469): Upgrade HTTP parser from http-parser to llhttp. v7.0.77 (#4469)

View File

@ -103,7 +103,7 @@ void SrsAsyncCallWorker::flush_tasks()
// Avoid the async call blocking other coroutines.
std::vector<ISrsAsyncCallTask *> copy;
if (true) {
SrsLocker(lock);
SrsLocker(&lock);
if (tasks.empty()) {
return;

View File

@ -935,7 +935,7 @@ SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream *p) : p_(p)
SrsRtcPublishRtcpTimer::~SrsRtcPublishRtcpTimer()
{
if (true) {
SrsLocker(lock_);
SrsLocker(&lock_);
_srs_shared_timer->timer1s()->unsubscribe(this);
}
srs_mutex_destroy(lock_);
@ -949,7 +949,7 @@ srs_error_t SrsRtcPublishRtcpTimer::on_timer(srs_utime_t interval)
// Therefore, during this function, the 'this' pointer might become invalid because
// the object could be freed by another thread. As a result, we must lock the object
// to prevent it from being freed.
SrsLocker(lock_);
SrsLocker(&lock_);
++_srs_pps_pub->sugar;
@ -982,7 +982,7 @@ SrsRtcPublishTwccTimer::SrsRtcPublishTwccTimer(SrsRtcPublishStream *p) : p_(p)
SrsRtcPublishTwccTimer::~SrsRtcPublishTwccTimer()
{
if (true) {
SrsLocker(lock_);
SrsLocker(&lock_);
_srs_shared_timer->timer100ms()->unsubscribe(this);
}
srs_mutex_destroy(lock_);
@ -996,7 +996,7 @@ srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval)
// Therefore, during this function, the 'this' pointer might become invalid because
// the object could be freed by another thread. As a result, we must lock the object
// to prevent it from being freed.
SrsLocker(lock_);
SrsLocker(&lock_);
++_srs_pps_pub->sugar;
@ -1752,7 +1752,7 @@ SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(SrsRtcConnection *p) : p_(p
SrsRtcConnectionNackTimer::~SrsRtcConnectionNackTimer()
{
if (true) {
SrsLocker(lock_);
SrsLocker(&lock_);
_srs_shared_timer->timer20ms()->unsubscribe(this);
}
srs_mutex_destroy(lock_);
@ -1766,7 +1766,7 @@ srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval)
// Therefore, during this function, the 'this' pointer might become invalid because
// the object could be freed by another thread. As a result, we must lock the object
// to prevent it from being freed.
SrsLocker(lock_);
SrsLocker(&lock_);
if (!p_->nack_enabled_) {
return err;

View File

@ -302,7 +302,7 @@ srs_error_t SrsRtcSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<Sr
if (true) {
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
SrsLocker(&lock);
string stream_url = r->get_stream_url();
std::map<std::string, SrsSharedPtr<SrsRtcSource> >::iterator it = pool.find(stream_url);
@ -339,7 +339,7 @@ SrsSharedPtr<SrsRtcSource> SrsRtcSourceManager::fetch(ISrsRequest *r)
{
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
SrsLocker(&lock);
string stream_url = r->get_stream_url();
std::map<std::string, SrsSharedPtr<SrsRtcSource> >::iterator it = pool.find(stream_url);

View File

@ -176,7 +176,7 @@ srs_error_t SrsRtspSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<S
if (true) {
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
SrsLocker(&lock);
string stream_url = r->get_stream_url();
std::map<std::string, SrsSharedPtr<SrsRtspSource> >::iterator it = pool.find(stream_url);
@ -213,7 +213,7 @@ SrsSharedPtr<SrsRtspSource> SrsRtspSourceManager::fetch(ISrsRequest *r)
{
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
SrsLocker(&lock);
string stream_url = r->get_stream_url();
std::map<std::string, SrsSharedPtr<SrsRtspSource> >::iterator it = pool.find(stream_url);

View File

@ -1579,7 +1579,7 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<S
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
// TODO: FIXME: Use smaller scope lock.
SrsLocker(lock);
SrsLocker(&lock);
string stream_url = r->get_stream_url();
std::map<std::string, SrsSharedPtr<SrsLiveSource> >::iterator it = pool.find(stream_url);
@ -1620,7 +1620,7 @@ SrsSharedPtr<SrsLiveSource> SrsLiveSourceManager::fetch(ISrsRequest *r)
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
// TODO: FIXME: Use smaller scope lock.
SrsLocker(lock);
SrsLocker(&lock);
string stream_url = r->get_stream_url();
std::map<std::string, SrsSharedPtr<SrsLiveSource> >::iterator it = pool.find(stream_url);

View File

@ -160,7 +160,7 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(ISrsRequest *r, SrsSharedPtr<Sr
if (true) {
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
SrsLocker(&lock);
string stream_url = r->get_stream_url();
std::map<std::string, SrsSharedPtr<SrsSrtSource> >::iterator it = pool.find(stream_url);
@ -196,7 +196,7 @@ SrsSharedPtr<SrsSrtSource> SrsSrtSourceManager::fetch(ISrsRequest *r)
{
// Use lock to protect coroutine switch.
// @bug https://github.com/ossrs/srs/issues/1230
SrsLocker(lock);
SrsLocker(&lock);
string stream_url = r->get_stream_url();
std::map<std::string, SrsSharedPtr<SrsSrtSource> >::iterator it = pool.find(stream_url);

View File

@ -84,7 +84,7 @@ srs_error_t SrsStreamPublishTokenManager::acquire_token(ISrsRequest *req, SrsStr
std::string stream_url = req->get_stream_url();
SrsContextId current_cid = _srs_context->get_id();
SrsLocker(mutex_);
SrsLocker(&mutex_);
// Get or create token for this stream
SrsStreamPublishToken *stream_token = NULL;
@ -116,7 +116,7 @@ srs_error_t SrsStreamPublishTokenManager::acquire_token(ISrsRequest *req, SrsStr
void SrsStreamPublishTokenManager::release_token(const std::string &stream_url)
{
SrsLocker(mutex_);
SrsLocker(&mutex_);
// Find and erase the token from the map
std::map<std::string, SrsStreamPublishToken *>::iterator it = tokens_.find(stream_url);

View File

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7
#define VERSION_MINOR 0
#define VERSION_REVISION 79
#define VERSION_REVISION 80
#endif

View File

@ -392,7 +392,13 @@ srs_cond_t srs_cond_new()
int srs_cond_destroy(srs_cond_t cond)
{
return st_cond_destroy((st_cond_t)cond);
if (!cond) {
return 0;
}
int r0 = st_cond_destroy((st_cond_t)cond);
srs_assert(r0 == 0);
return r0;
}
int srs_cond_wait(srs_cond_t cond)
@ -425,7 +431,10 @@ int srs_mutex_destroy(srs_mutex_t mutex)
if (!mutex) {
return 0;
}
return st_mutex_destroy((st_mutex_t)mutex);
int r0 = st_mutex_destroy((st_mutex_t)mutex);
srs_assert(r0 == 0);
return r0;
}
int srs_mutex_lock(srs_mutex_t mutex)
@ -438,6 +447,61 @@ int srs_mutex_unlock(srs_mutex_t mutex)
return st_mutex_unlock((st_mutex_t)mutex);
}
SrsCond::SrsCond()
{
cond_ = srs_cond_new();
}
SrsCond::~SrsCond()
{
srs_cond_destroy(cond_);
}
int SrsCond::wait()
{
return srs_cond_wait(cond_);
}
int SrsCond::timedwait(srs_utime_t timeout)
{
return srs_cond_timedwait(cond_, timeout);
}
int SrsCond::signal()
{
return srs_cond_signal(cond_);
}
int SrsCond::broadcast()
{
return srs_cond_broadcast(cond_);
}
SrsMutex::SrsMutex()
{
mutex_ = srs_mutex_new();
}
SrsMutex::~SrsMutex()
{
srs_mutex_destroy(mutex_);
}
int SrsMutex::lock()
{
return srs_mutex_lock(mutex_);
}
int SrsMutex::unlock()
{
return srs_mutex_unlock(mutex_);
}
srs_mutex_t *SrsMutex::get()
{
return &mutex_;
}
int srs_key_create(int *keyp, void (*destructor)(void *))
{
return st_key_create(keyp, destructor);

View File

@ -84,6 +84,45 @@ extern int srs_mutex_destroy(srs_mutex_t mutex);
extern int srs_mutex_lock(srs_mutex_t mutex);
extern int srs_mutex_unlock(srs_mutex_t mutex);
// Wrap as ptr, so you can use SrsUniquePtr and SrsSharedPtr to manage it.
// For example:
// SrsUniquePtr<SrsCond> cond(new SrsCond());
// cond->signal();
class SrsCond
{
private:
srs_cond_t cond_;
public:
SrsCond();
virtual ~SrsCond();
public:
int wait();
int timedwait(srs_utime_t timeout);
int signal();
int broadcast();
};
// Wrap as ptr, so you can use SrsUniquePtr and SrsSharedPtr to manage it.
// For example:
// SrsUniquePtr<SrsMutex> mutex(new SrsMutex());
// SrsLocker(mutex->get());
class SrsMutex
{
private:
srs_mutex_t mutex_;
public:
SrsMutex();
virtual ~SrsMutex();
public:
int lock();
int unlock();
srs_mutex_t *get();
};
extern int srs_key_create(int *keyp, void (*destructor)(void *));
extern int srs_thread_setspecific(int key, void *value);
extern int srs_thread_setspecific2(srs_thread_t thread, int key, void *value);
@ -109,7 +148,7 @@ extern bool srs_is_never_timeout(srs_utime_t tm);
// The mutex locker.
#define SrsLocker(instance) \
impl__SrsLocker _SRS_free_##instance(&instance)
impl__SrsLocker _SRS_free_instance(instance)
class impl__SrsLocker
{

View File

@ -10,8 +10,10 @@
#include <srs_app_log.hpp>
#include <srs_app_rtc_dtls.hpp>
#include <srs_app_server.hpp>
#include <srs_app_st.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_protocol_st.hpp>
#include <string>
using namespace std;
@ -253,3 +255,39 @@ int MockProtectedBuffer::alloc(int size)
return 0;
}
SrsCoroutineChan::SrsCoroutineChan()
{
lock_ = srs_mutex_new();
}
SrsCoroutineChan::~SrsCoroutineChan()
{
srs_mutex_destroy(lock_);
}
SrsCoroutineChan &SrsCoroutineChan::push(void *value)
{
SrsLocker(&lock_);
args_.push_back(value);
return *this;
}
void *SrsCoroutineChan::pop()
{
SrsLocker(&lock_);
void *arg = *args_.begin();
args_.erase(args_.begin());
return arg;
}
SrsCoroutineChan *SrsCoroutineChan::copy()
{
SrsLocker(&lock_);
SrsCoroutineChan *cp = new SrsCoroutineChan();
cp->args_ = args_;
return cp;
}

View File

@ -131,4 +131,147 @@ public:
int alloc(int size);
};
// The chan for anonymous coroutine to share variables.
// The chan never free the args, you must manage the memory.
class SrsCoroutineChan
{
private:
std::vector<void *> args_;
srs_mutex_t lock_;
public:
SrsCoroutineChan();
virtual ~SrsCoroutineChan();
public:
SrsCoroutineChan &push(void *value);
void *pop();
SrsCoroutineChan *copy();
};
// A helper to create a anonymous coroutine like goroutine in Go.
// * The context is used to share variables between coroutines.
// * The id is used to identify the coroutine.
// * The code_block is the code to run in the coroutine.
//
// The correct way is to avoid the block, unless you intend to do it,
// so you should create in the same scope, and use id to distinguish them.
// For example:
// SrsCoroutineChan ctx;
//
// SRS_COROUTINE_GO_IMPL(&ctx, coroutine1, {
// srs_usleep(1000 * SRS_UTIME_MILLISECONDS);
// });
//
// SRS_COROUTINE_GO_IMPL(&ctx, coroutine2, {
// srs_usleep(1000 * SRS_UTIME_MILLISECONDS);
// });
//
// // It won't wait for the coroutine to terminate.
// // So you will expect to run to here immediately.
//
// CAUTION: Note that if use a block to run the coroutine, it will
// stop and wait for the coroutine to terminate. So it maybe crash
// for the current thread is interrupted and stopping, such as the
// ctx.pop() will crash for requiring a lock on a stopping thread.
// For example:
// SrsCoroutineChan ctx;
//
// // Generally we SHOULD NOT do this, unless you intend to.
// if (true) {
// SRS_COROUTINE_GO_IMPL(&ctx, coroutine, {
// srs_usleep(1000 * SRS_UTIME_MILLISECONDS);
// });
// }
// if (true) {
// SRS_COROUTINE_GO_IMPL(&ctx, coroutine, {
// srs_usleep(1000 * SRS_UTIME_MILLISECONDS);
// });
// }
//
// // The coroutine will be stopped and wait for it to terminate.
// // So maybe it won't execute all your code there.
//
// Enjoiy the sugar for coroutines.
#define SRS_COROUTINE_GO_IMPL(context, id, code_block) \
class AnonymousCoroutineHandler##id : public ISrsCoroutineHandler \
{ \
private: \
SrsCoroutineChan *ctx_; \
\
public: \
AnonymousCoroutineHandler##id(SrsCoroutineChan *c) \
{ \
/* Copy the context so that we can pop it in different coroutines. */ \
ctx_ = c->copy(); \
} \
~AnonymousCoroutineHandler##id() \
{ \
srs_freep(ctx_); \
} \
\
public: \
virtual srs_error_t cycle() \
{ \
SrsCoroutineChan &ctx = *ctx_; \
(void)ctx; \
code_block; \
return srs_success; \
} \
}; \
AnonymousCoroutineHandler##id handler##id(context); \
SrsSTCoroutine st##id("anonymous", &handler##id); \
srs_error_t err_coroutine##id = st##id.start(); \
srs_assert(err_coroutine##id == srs_success)
// A helper to create a anonymous coroutine like goroutine in Go.
// For example:
// SRS_COROUTINE_GO({
// srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// });
#define SRS_COROUTINE_GO(code_block) \
SrsCoroutineChan context##id; \
SRS_COROUTINE_GO_IMPL(&context##id, coroutine0, code_block)
// A helper to create a anonymous coroutine like goroutine in Go.
// With the id, it allows you to create multiple coroutines.
// For example:
// SRS_COROUTINE_GO2(coroutine1, {
// srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// });
// SRS_COROUTINE_GO2(coroutine2, {
// srs_usleep(1 * SRS_UTIME_MILLISECONDS);
// });
#define SRS_COROUTINE_GO2(id, code_block) \
SrsCoroutineChan context##id; \
SRS_COROUTINE_GO_IMPL(&context##id, id, code_block)
// A helper to create a anonymous coroutine like goroutine in Go.
// With the context, it allows you to share variables between coroutines.
// For example:
// SrsCoroutineChan ctx;
// ctx.push(1);
// SRS_COROUTINE_GO_CTX(ctx, {
// int v = (int)ctx.pop();
// srs_usleep(v * SRS_UTIME_MILLISECONDS);
// });
#define SRS_COROUTINE_GO_CTX(ctx, code_block) \
SRS_COROUTINE_GO_IMPL(ctx, coroutine0, code_block)
// A helper to create a anonymous coroutine like goroutine in Go.
// With the context and id, it allows you to create multiple coroutines.
// For example:
// SrsCoroutineChan ctx;
// ctx.push(1);
// SRS_COROUTINE_GO_CTX2(ctx, coroutine1, {
// int v = (int)ctx.pop();
// srs_usleep(v * SRS_UTIME_MILLISECONDS);
// });
// SRS_COROUTINE_GO_CTX2(ctx, coroutine2, {
// int v = (int)ctx.pop();
// srs_usleep(v * SRS_UTIME_MILLISECONDS);
// });
#define SRS_COROUTINE_GO_CTX2(ctx, id, code_block) \
SRS_COROUTINE_GO_IMPL(ctx, id, code_block)
#endif

View File

@ -14,6 +14,26 @@
using namespace std;
VOID TEST(StTest, CondPtrSugar)
{
SrsUniquePtr<SrsCond> cond(new SrsCond());
cond->signal();
}
VOID TEST(StTest, MutexPtrSugar)
{
if (true) {
SrsUniquePtr<SrsMutex> mutex(new SrsMutex());
SrsLocker(mutex->get());
}
if (true) {
SrsUniquePtr<SrsMutex> mutex(new SrsMutex());
mutex->lock();
mutex->unlock();
}
}
VOID TEST(StTest, StUtimeInMicroseconds)
{
st_utime_t st_time_1 = st_utime();

View File

@ -0,0 +1,252 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#include <srs_utest_st2.hpp>
#include <srs_app_st.hpp>
#include <srs_core_autofree.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_st.hpp>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
using namespace std;
VOID TEST(StTest, AnonymouseSingleCoroutine)
{
SRS_COROUTINE_GO({
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
});
// Wait for coroutine to terminate. Otherwise, it will be stopped
// and terminated, which cause some of the code not executed.
srs_usleep(50 * SRS_UTIME_MILLISECONDS);
}
VOID TEST(StTest, AnonymouseMultipleCoroutines)
{
SRS_COROUTINE_GO2(coroutine1, {
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
});
// If multiple coroutines in the same scope, we should use different id.
SRS_COROUTINE_GO2(coroutine2, {
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
});
// Wait for coroutine to terminate. Otherwise, it will be stopped
// and terminated, which cause some of the code not executed.
srs_usleep(50 * SRS_UTIME_MILLISECONDS);
}
VOID TEST(StTest, AnonymouseCoroutineWithContext)
{
int counter = 0;
SrsCoroutineChan ctx;
ctx.push(&counter);
SRS_COROUTINE_GO_CTX(&ctx, {
int *counter = (int *)ctx.pop();
(*counter)++;
});
// Coroutine not terminated, so the counter is not increased.
EXPECT_TRUE(counter == 0);
// Wait for coroutine to run and terminated, or it will crash
// because the ctx.pop is called after coroutine terminated.
srs_usleep(50 * SRS_UTIME_MILLISECONDS);
EXPECT_TRUE(counter == 1);
// Wait for coroutine to terminate. Otherwise, it will be stopped
// and terminated, which cause some of the code not executed.
srs_usleep(50 * SRS_UTIME_MILLISECONDS);
}
VOID TEST(StTest, AnonymouseCoroutineWithSync)
{
SrsUniquePtr<SrsCond> cond(new SrsCond());
int counter = 0;
SrsCoroutineChan ctx;
ctx.push(cond.get());
ctx.push(&counter);
SRS_COROUTINE_GO_CTX(&ctx, {
SrsCond *cond = (SrsCond *)ctx.pop();
int *counter = (int *)ctx.pop();
(*counter)++;
// Notify main thread the work is done.
cond->signal();
});
// The coroutine not terminated, so the counter is not increased.
EXPECT_TRUE(counter == 0);
// Wait for the coroutine to terminate. The counter is increased.
cond->wait();
EXPECT_TRUE(counter == 1);
}
VOID TEST(StTest, AnonymouseCoroutineWithWaitgroup)
{
SrsWaitGroup wg;
int counter = 0;
SrsCoroutineChan ctx;
ctx.push(&wg);
ctx.push(&counter);
wg.add(1);
SRS_COROUTINE_GO_CTX(&ctx, {
SrsWaitGroup *wg = (SrsWaitGroup *)ctx.pop();
int *counter = (int *)ctx.pop();
(*counter)++;
// Notify main thread the work is done.
wg->done();
});
// The coroutine not terminated, so the counter is not increased.
EXPECT_TRUE(counter == 0);
// Wait for the coroutine to terminate. The counter is increased.
wg.wait();
EXPECT_TRUE(counter == 1);
}
VOID TEST(StTest, AnonymouseCoroutineWithWaitgroups)
{
SrsWaitGroup wg;
int counter = 0;
SrsCoroutineChan ctx;
ctx.push(&wg);
ctx.push(&counter);
wg.add(1);
SRS_COROUTINE_GO_CTX2(&ctx, coroutine1, {
// The ctx is copied, so we can pop it again in different coroutines.
SrsWaitGroup *wg = (SrsWaitGroup *)ctx.pop();
int *counter = (int *)ctx.pop();
(*counter)++;
// Notify main thread the work is done.
wg->done();
});
wg.add(1);
SRS_COROUTINE_GO_CTX2(&ctx, coroutine2, {
// The ctx is copied, so we can pop it again in different coroutines.
SrsWaitGroup *wg = (SrsWaitGroup *)ctx.pop();
int *counter = (int *)ctx.pop();
(*counter)++;
// Notify main thread the work is done.
wg->done();
});
// The coroutine not terminated, so the counter is not increased.
EXPECT_TRUE(counter == 0);
// Wait for the coroutine to terminate. The counter is increased.
wg.wait();
EXPECT_TRUE(counter == 2);
}
VOID TEST(StTest, VerifyUsingRawCoroutine)
{
srs_error_t err;
class NormalThread : public ISrsCoroutineHandler
{
public:
virtual srs_error_t cycle()
{
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
return srs_success;
}
};
NormalThread trd;
SrsSTCoroutine st("test", &trd);
HELPER_ASSERT_SUCCESS(st.start());
}
VOID TEST(StTest, VerifyMultipleAnonymousClasses)
{
do {
class AnonymousCoroutineHandler
{
};
} while (0);
do {
class AnonymousCoroutineHandler
{
};
} while (0);
SrsUniquePtr<SrsCond> cond(new SrsCond());
cond->signal();
SrsUniquePtr<SrsMutex> mutex(new SrsMutex());
SrsLocker(mutex->get());
}
// CAUTION: Badcase, you should not follow this style.
VOID TEST(StTest, AnonymouseBadcase)
{
// Generally we SHOULD NOT do this, unless you intend to.
if (true) {
SRS_COROUTINE_GO({
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
});
}
// CAUTION: If multiple coroutines in the different scope, it's ok without id,
// but it's not recommended, becuase it will be stopped and your code
// maybe not executed.
// Generally we SHOULD NOT do this, unless you intend to.
if (true) {
SRS_COROUTINE_GO({
srs_usleep(1 * SRS_UTIME_MILLISECONDS);
});
}
}
// CAUTION: Badcase, you should not follow this style.
VOID TEST(StTest, AnonymouseBadcase2)
{
int counter = 0;
SrsCoroutineChan ctx;
ctx.push(&counter);
// Generally we SHOULD NOT do this, unless you intend to.
if (true) {
SRS_COROUTINE_GO_CTX(&ctx, {
int *counter = (int *)ctx.pop();
(*counter)++;
});
// Wait for coroutine to terminate. Otherwise, it will crash, for the
// coroutine is terminated while ctx.pop(), the lock is invalid.
srs_usleep(100 * SRS_UTIME_MILLISECONDS);
}
// Coroutine terminated, so the counter is increased.
EXPECT_TRUE(counter == 1);
}

View File

@ -0,0 +1,14 @@
//
// Copyright (c) 2013-2025 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_UTEST_ST2_HPP
#define SRS_UTEST_ST2_HPP
#include <srs_utest.hpp>
#include <st.h>
#endif // SRS_UTEST_ST2_HPP