srs/trunk/src/app/srs_app_http_stream.hpp
Winlin 96f89c58ee FLV: Refine source and http handler. v6.0.155 (#4165)
1. Do not create a source when mounting FLV because it may not unmount
FLV when freeing the source. If you access the FLV stream without any
publisher, then wait for source cleanup and review the FLV stream again,
there is an annoying warning message.

```bash
HTTP #0 127.0.0.1:58026 GET http://localhost:8080/live/livestream.flv, content-length=-1
new live source, stream_url=/live/livestream
http: mount flv stream for sid=/live/livestream, mount=/live/livestream.flv

client disconnect peer. ret=1007
Live: cleanup die source, id=[], total=1

HTTP #0 127.0.0.1:58040 GET http://localhost:8080/live/livestream.flv, content-length=-1
serve error code=1097(NoSource)(No source found) : process request=0 : cors serve : serve http : no source for /live/livestream
serve_http() [srs_app_http_stream.cpp:641]
```

> Note: There is an inconsistency. The first time, you can access the
FLV stream and wait for the publisher, but the next time, you cannot.

2. Create a source when starting to serve the FLV client. We do not need
to create the source when creating the HTTP handler. Instead, we should
try to create the source in the cache or stream. Because the source
cleanup does not unmount the HTTP handler, the handler remains after the
source is destroyed. The next time you access the FLV stream, the source
is not found.

```cpp
srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) {
    SrsSharedPtr<SrsLiveSource> live_source;
    if ((err = _srs_sources->fetch_or_create(r.get(), server, live_source)) != srs_success) { }
    if ((err = http_mount(r.get())) != srs_success) { }

srs_error_t SrsBufferCache::cycle() {
    SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
    if (!live_source.get()) {
        return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
    }

srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) {
    SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
    if (!live_source.get()) {
        return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
    }
```

> Note: We should not create the source in hijack, instead, we create it
in cache or stream:

```cpp
srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph) {
    if ((err = http_mount(r.get())) != srs_success) { }

srs_error_t SrsBufferCache::cycle() {
    SrsSharedPtr<SrsLiveSource> live_source;
    if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) { }

srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) {
    SrsSharedPtr<SrsLiveSource> live_source;
    if ((err = _srs_sources->fetch_or_create(req, server_, live_source)) != srs_success) { }
```

> Note: This fixes the failure and annoying warning message, and
maintains consistency by always waiting for the stream to be ready if
there is no publisher.

3. Fail the http request if the HTTP handler is disposing, and also keep
the handler entry when disposing the stream, because we should dispose
the handler entry and stream at the same time.

```cpp
srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r) {
        entry = streamHandlers[sid];
        if (entry->disposing) {
            return srs_error_new(ERROR_STREAM_DISPOSING, "stream is disposing");
        }

void SrsHttpStreamServer::http_unmount(SrsRequest* r) {
    std::map<std::string, SrsLiveEntry*>::iterator it = streamHandlers.find(sid);
    SrsUniquePtr<SrsLiveEntry> entry(it->second);
    entry->disposing = true;
```

> Note: If the disposal process takes a long time, this will prevent
unexpected behavior or access to the resource that is being disposed of.

4. In edge mode, the edge ingester will unpublish the source when the
last consumer quits, which is actually triggered by the HTTP stream.
While it also waits for the stream to quit when the HTTP unmounts, there
is a self-destruction risk: the HTTP live stream object destroys itself.

```cpp
srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) {
    SrsUniquePtr<SrsLiveConsumer> consumer(consumer_raw); // Trigger destroy.

void SrsHttpStreamServer::http_unmount(SrsRequest* r) {
    for (;;) { if (!cache->alive() && !stream->alive()) { break; } // A circle reference.
    mux.unhandle(entry->mount, stream.get()); // Free the SrsLiveStream itself.
```

> Note: It also introduces a circular reference in the object
relationships, the stream reference to itself when unmount:

```text
SrsLiveStream::serve_http
    -> SrsLiveConsumer::~SrsLiveConsumer -> SrsEdgeIngester::stop
    -> SrsLiveSource::on_unpublish -> SrsHttpStreamServer::http_unmount
        -> SrsLiveStream::alive
```

> Note: We should use an asynchronous worker to perform the cleanup to
avoid the stream destroying itself and to prevent self-referencing.

```cpp
void SrsHttpStreamServer::http_unmount(SrsRequest* r) {
    entry->disposing = true;
    if ((err = async_->execute(new SrsHttpStreamDestroy(&mux, &streamHandlers, sid))) != srs_success) { }
```

> Note: This also ensures there are no circular references and no
self-destruction.

---------

Co-authored-by: Jacob Su <suzp1984@gmail.com>
2024-09-01 13:03:50 +08:00

290 lines
9.2 KiB
C++
Executable File

//
// Copyright (c) 2013-2024 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_APP_HTTP_STREAM_HPP
#define SRS_APP_HTTP_STREAM_HPP
#include <srs_core.hpp>
#include <srs_app_security.hpp>
#include <srs_app_http_conn.hpp>
#include <srs_app_async_call.hpp>
#include <vector>
class SrsAacTransmuxer;
class SrsMp3Transmuxer;
class SrsFlvTransmuxer;
class SrsTsTransmuxer;
class SrsAsyncCallWorker;
// A cache for HTTP Live Streaming encoder, to make android(weixin) happy.
class SrsBufferCache : public ISrsCoroutineHandler
{
private:
srs_utime_t fast_cache;
SrsServer* server_;
private:
SrsMessageQueue* queue;
SrsRequest* req;
SrsCoroutine* trd;
public:
SrsBufferCache(SrsServer* s, SrsRequest* r);
virtual ~SrsBufferCache();
virtual srs_error_t update_auth(SrsRequest* r);
public:
virtual srs_error_t start();
virtual void stop();
virtual bool alive();
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
// Interface ISrsEndlessThreadHandler.
public:
virtual srs_error_t cycle();
};
// The encoder to transmux RTMP stream.
class ISrsBufferEncoder
{
public:
ISrsBufferEncoder();
virtual ~ISrsBufferEncoder();
public:
// Initialize the encoder with file writer(to http response) and stream cache.
// @param w the writer to write to http response.
// @param c the stream cache for audio stream fast startup.
virtual srs_error_t initialize(SrsFileWriter* w, SrsBufferCache* c) = 0;
// Write rtmp video/audio/metadata.
virtual srs_error_t write_audio(int64_t timestamp, char* data, int size) = 0;
virtual srs_error_t write_video(int64_t timestamp, char* data, int size) = 0;
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size) = 0;
public:
// For some stream, for example, mp3 and aac, the audio stream,
// we use large gop cache in encoder, for the gop cache of SrsLiveSource is ignore audio.
// @return true to use gop cache of encoder; otherwise, use SrsLiveSource.
virtual bool has_cache() = 0;
// Dumps the cache of encoder to consumer.
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter) = 0;
};
// Transmux RTMP to HTTP Live Streaming.
class SrsFlvStreamEncoder : public ISrsBufferEncoder
{
private:
SrsFlvTransmuxer* enc;
bool header_written;
bool has_audio_;
bool has_video_;
bool guess_has_av_;
public:
SrsFlvStreamEncoder();
virtual ~SrsFlvStreamEncoder();
public:
virtual srs_error_t initialize(SrsFileWriter* w, SrsBufferCache* c);
virtual srs_error_t write_audio(int64_t timestamp, char* data, int size);
virtual srs_error_t write_video(int64_t timestamp, char* data, int size);
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public:
void set_drop_if_not_match(bool v);
void set_has_audio(bool v);
void set_has_video(bool v);
void set_guess_has_av(bool v);
public:
virtual bool has_cache();
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
public:
// Write the tags in a time.
virtual srs_error_t write_tags(SrsSharedPtrMessage** msgs, int count);
private:
virtual srs_error_t write_header(bool has_video, bool has_audio);
};
// Transmux RTMP to HTTP TS Streaming.
class SrsTsStreamEncoder : public ISrsBufferEncoder
{
private:
SrsTsTransmuxer* enc;
public:
SrsTsStreamEncoder();
virtual ~SrsTsStreamEncoder();
public:
virtual srs_error_t initialize(SrsFileWriter* w, SrsBufferCache* c);
virtual srs_error_t write_audio(int64_t timestamp, char* data, int size);
virtual srs_error_t write_video(int64_t timestamp, char* data, int size);
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
public:
void set_has_audio(bool v);
void set_has_video(bool v);
void set_guess_has_av(bool v);
};
// Transmux RTMP with AAC stream to HTTP AAC Streaming.
class SrsAacStreamEncoder : public ISrsBufferEncoder
{
private:
SrsAacTransmuxer* enc;
SrsBufferCache* cache;
public:
SrsAacStreamEncoder();
virtual ~SrsAacStreamEncoder();
public:
virtual srs_error_t initialize(SrsFileWriter* w, SrsBufferCache* c);
virtual srs_error_t write_audio(int64_t timestamp, char* data, int size);
virtual srs_error_t write_video(int64_t timestamp, char* data, int size);
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
};
// Transmux RTMP with MP3 stream to HTTP MP3 Streaming.
class SrsMp3StreamEncoder : public ISrsBufferEncoder
{
private:
SrsMp3Transmuxer* enc;
SrsBufferCache* cache;
public:
SrsMp3StreamEncoder();
virtual ~SrsMp3StreamEncoder();
public:
virtual srs_error_t initialize(SrsFileWriter* w, SrsBufferCache* c);
virtual srs_error_t write_audio(int64_t timestamp, char* data, int size);
virtual srs_error_t write_video(int64_t timestamp, char* data, int size);
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size);
public:
virtual bool has_cache();
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
};
// Write stream to http response direclty.
class SrsBufferWriter : public SrsFileWriter
{
private:
ISrsHttpResponseWriter* writer;
public:
SrsBufferWriter(ISrsHttpResponseWriter* w);
virtual ~SrsBufferWriter();
public:
virtual srs_error_t open(std::string file);
virtual void close();
public:
virtual bool is_open();
virtual int64_t tellg();
public:
virtual srs_error_t write(void* buf, size_t count, ssize_t* pnwrite);
virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite);
};
// HTTP Live Streaming, to transmux RTMP to HTTP FLV or other format.
// TODO: FIXME: Rename to SrsHttpLive
class SrsLiveStream : public ISrsHttpHandler, public ISrsExpire
{
private:
SrsRequest* req;
SrsBufferCache* cache;
SrsSecurity* security_;
SrsServer* server_;
// For multiple viewers, which means there will more than one alive viewers for a live stream, so we must
// use an int value to represent if there is any viewer is alive. We should never do cleanup unless all
// viewers closed the connection.
std::vector<ISrsExpire*> viewers_;
public:
SrsLiveStream(SrsServer* s, SrsRequest* r, SrsBufferCache* c);
virtual ~SrsLiveStream();
virtual srs_error_t update_auth(SrsRequest* r);
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual bool alive();
// Interface ISrsExpire
public:
virtual void expire();
private:
virtual srs_error_t do_serve_http(SrsLiveSource* source, SrsLiveConsumer* consumer, ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual srs_error_t http_hooks_on_play(ISrsHttpMessage* r);
virtual void http_hooks_on_stop(ISrsHttpMessage* r);
virtual srs_error_t streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs);
};
// The Live Entry, to handle HTTP Live Streaming.
struct SrsLiveEntry
{
private:
bool _is_flv;
bool _is_ts;
bool _is_aac;
bool _is_mp3;
public:
// We will free the request.
SrsRequest* req;
public:
// For template, the mount contains variables.
// For concrete stream, the mount is url to access.
std::string mount;
SrsLiveStream* stream;
SrsBufferCache* cache;
// Whether is disposing the entry.
bool disposing;
SrsLiveEntry(std::string m);
virtual ~SrsLiveEntry();
bool is_flv();
bool is_ts();
bool is_mp3();
bool is_aac();
};
// The HTTP Live Streaming Server, to serve FLV/TS/MP3/AAC stream.
// TODO: Support multiple stream.
class SrsHttpStreamServer : public ISrsReloadHandler
, public ISrsHttpMatchHijacker
{
private:
SrsServer* server;
SrsAsyncCallWorker* async_;
public:
SrsHttpServeMux mux;
// The http live streaming template, to create streams.
std::map<std::string, SrsLiveEntry*> templateHandlers;
// The http live streaming streams, created by template.
std::map<std::string, SrsLiveEntry*> streamHandlers;
public:
SrsHttpStreamServer(SrsServer* svr);
virtual ~SrsHttpStreamServer();
public:
virtual srs_error_t initialize();
public:
// HTTP flv/ts/mp3/aac stream
virtual srs_error_t http_mount(SrsRequest* r);
virtual void http_unmount(SrsRequest* r);
// Interface ISrsHttpMatchHijacker
public:
virtual srs_error_t hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph);
private:
virtual srs_error_t initialize_flv_streaming();
virtual srs_error_t initialize_flv_entry(std::string vhost);
};
class SrsHttpStreamDestroy : public ISrsAsyncCallTask
{
private:
std::string sid_;
std::map<std::string, SrsLiveEntry*>* streamHandlers_;
SrsHttpServeMux* mux_;
public:
SrsHttpStreamDestroy(SrsHttpServeMux* mux, std::map<std::string, SrsLiveEntry*>* handlers, std::string sid);
virtual ~SrsHttpStreamDestroy();
public:
virtual srs_error_t call();
virtual std::string to_string();
};
#endif