diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 9f06b6b03..aade09c8a 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -455,24 +455,37 @@ vhost with-hls.srs.com { # default: 60 hls_window 60; # the error strategy. canbe: - # ignore, when error ignore and disable hls. - # disconnect, when error disconnect the publish connection. - # continue, when error ignore and continue output hls. + # ignore, when error ignore and disable hls. + # disconnect, when error disconnect the publish connection. + # continue, when error ignore and continue output hls. # @see https://github.com/winlinvip/simple-rtmp-server/issues/264 # default: ignore hls_on_error ignore; # the hls output path. # the app dir is auto created under the hls_path. # for example, for rtmp stream: - # rtmp://127.0.0.1/live/livestream - # http://127.0.0.1/live/livestream.m3u8 + # rtmp://127.0.0.1/live/livestream + # http://127.0.0.1/live/livestream.m3u8 # where hls_path is /hls, srs will create the following files: - # /hls/live the app dir for all streams. - # /hls/live/livestream.m3u8 the HLS m3u8 file. - # /hls/live/livestream-1.ts the HLS media/ts file. + # /hls/live the app dir for all streams. + # /hls/live/livestream.m3u8 the HLS m3u8 file. + # /hls/live/livestream-1.ts the HLS media/ts file. # in a word, the hls_path is for vhost. # default: ./objs/nginx/html hls_path ./objs/nginx/html; + # the hls storage: disk, ram or both. + # disk, to write hls m3u8/ts to disk. + # ram, serve m3u8/ts in memory, which use embeded http server to delivery. + # both, disk and ram. + # default: disk + hls_storage disk; + # the hls mount for hls_storage ram, + # which use srs embeded http server to delivery HLS, + # where the mount specifies the HTTP url to mount. + # @see the mount of http_remux. + # @remark the hls_mount must endswith .m3u8. + # default: [vhost]/[app]/[stream].m3u8 + hls_mount [vhost]/[app]/[stream].m3u8; } } # the vhost with hls disabled. diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index f15cd4ea3..38c20cedc 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1479,7 +1479,9 @@ int SrsConfig::check_config() } else if (n == "hls") { for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name.c_str(); - if (m != "enabled" && m != "hls_path" && m != "hls_fragment" && m != "hls_window" && m != "hls_on_error") { + if (m != "enabled" && m != "hls_path" && m != "hls_fragment" && m != "hls_window" && m != "hls_on_error" + && m != "hls_storage" && m != "hls_mount" + ) { ret = ERROR_SYSTEM_CONFIG_INVALID; srs_error("unsupported vhost hls directive %s, ret=%d", m.c_str(), ret); return ret; @@ -1799,6 +1801,7 @@ int SrsConfig::check_config() } } #endif + // TODO: FIXME: required http server when hls storage is ram or both. } return ret; @@ -3278,6 +3281,40 @@ string SrsConfig::get_hls_on_error(string vhost) return conf->arg0(); } +string SrsConfig::get_hls_storage(string vhost) +{ + SrsConfDirective* hls = get_hls(vhost); + + if (!hls) { + return SRS_CONF_DEFAULT_HLS_STORAGE; + } + + SrsConfDirective* conf = hls->get("hls_storage"); + + if (!conf) { + return SRS_CONF_DEFAULT_HLS_STORAGE; + } + + return conf->arg0(); +} + +string SrsConfig::get_hls_mount(string vhost) +{ + SrsConfDirective* hls = get_hls(vhost); + + if (!hls) { + return SRS_CONF_DEFAULT_HLS_MOUNT; + } + + SrsConfDirective* conf = hls->get("hls_mount"); + + if (!conf) { + return SRS_CONF_DEFAULT_HLS_MOUNT; + } + + return conf->arg0(); +} + SrsConfDirective* SrsConfig::get_dvr(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index eb3a29707..b988f98d0 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -52,6 +52,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_CONF_DEFAULT_HLS_ON_ERROR_DISCONNECT "disconnect" #define SRS_CONF_DEFAULT_HLS_ON_ERROR_CONTINUE "continue" #define SRS_CONF_DEFAULT_HLS_ON_ERROR SRS_CONF_DEFAULT_HLS_ON_ERROR_IGNORE +#define SRS_CONF_DEFAULT_HLS_STORAGE "disk" +#define SRS_CONF_DEFAULT_HLS_MOUNT "[vhost]/[app]/[stream].m3u8" #define SRS_CONF_DEFAULT_DVR_PATH "./objs/nginx/html" #define SRS_CONF_DEFAULT_DVR_PLAN_SESSION "session" #define SRS_CONF_DEFAULT_DVR_PLAN_SEGMENT "segment" @@ -906,6 +908,14 @@ public: * @see https://github.com/winlinvip/simple-rtmp-server/issues/264 */ virtual std::string get_hls_on_error(std::string vhost); + /** + * get the HLS storage type. + */ + virtual std::string get_hls_storage(std::string vhost); + /** + * get the HLS mount url for HTTP server. + */ + virtual std::string get_hls_mount(std::string vhost); // dvr section private: /** diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 8e749142f..a7e0e0836 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -55,13 +55,87 @@ using namespace std; // drop the segment when duration of ts too small. #define SRS_AUTO_HLS_SEGMENT_MIN_DURATION_MS 100 -SrsHlsSegment::SrsHlsSegment() +ISrsHlsHandler::ISrsHlsHandler() +{ +} + +ISrsHlsHandler::~ISrsHlsHandler() +{ +} + +SrsHlsCacheWriter::SrsHlsCacheWriter(bool write_cache, bool write_file) +{ + should_write_cache = write_cache; + should_write_file = write_file; +} + +SrsHlsCacheWriter::~SrsHlsCacheWriter() +{ +} + +int SrsHlsCacheWriter::open(string file) +{ + if (!should_write_file) { + return ERROR_SUCCESS; + } + + return impl.open(file); +} + +void SrsHlsCacheWriter::close() +{ + if (!should_write_file) { + return; + } + + impl.close(); +} + +bool SrsHlsCacheWriter::is_open() +{ + if (!should_write_file) { + return true; + } + + return impl.is_open(); +} + +int64_t SrsHlsCacheWriter::tellg() +{ + if (!should_write_file) { + return 0; + } + + return impl.tellg(); +} + +int SrsHlsCacheWriter::write(void* buf, size_t count, ssize_t* pnwrite) +{ + if (should_write_cache) { + if (count > 0) { + data.append((char*)buf, count); + } + } + + if (should_write_file) { + return impl.write(buf, count, pnwrite); + } + + return ERROR_SUCCESS; +} + +string SrsHlsCacheWriter::cache() +{ + return data; +} + +SrsHlsSegment::SrsHlsSegment(bool write_cache, bool write_file) { duration = 0; sequence_no = 0; segment_start_dts = 0; is_sequence_header = false; - writer = new SrsFileWriter(); + writer = new SrsHlsCacheWriter(write_cache, write_file); muxer = new SrsTSMuxer(writer); } @@ -87,12 +161,16 @@ void SrsHlsSegment::update_duration(int64_t current_frame_dts) return; } -SrsHlsMuxer::SrsHlsMuxer() +SrsHlsMuxer::SrsHlsMuxer(ISrsHlsHandler* h) { + req = NULL; + handler = h; hls_fragment = hls_window = 0; _sequence_no = 0; current = NULL; acodec = SrsCodecAudioReserved1; + should_write_cache = false; + should_write_file = true; } SrsHlsMuxer::~SrsHlsMuxer() @@ -105,6 +183,7 @@ SrsHlsMuxer::~SrsHlsMuxer() segments.clear(); srs_freep(current); + srs_freep(req); } int SrsHlsMuxer::sequence_no() @@ -112,16 +191,29 @@ int SrsHlsMuxer::sequence_no() return _sequence_no; } -int SrsHlsMuxer::update_config( - string _app, string _stream, string path, int fragment, int window -) { +int SrsHlsMuxer::update_config(SrsRequest* r, string path, int fragment, int window) +{ int ret = ERROR_SUCCESS; - app = _app; - stream = _stream; + srs_freep(req); + req = r->copy(); + hls_path = path; hls_fragment = fragment; hls_window = window; + + std::string storage = _srs_config->get_hls_storage(r->vhost); + if (storage == "ram") { + should_write_cache = true; + should_write_file = false; + } else if (storage == "disk") { + should_write_cache = false; + should_write_file = true; + } else { + srs_assert(storage == "both"); + should_write_cache = true; + should_write_file = true; + } return ret; } @@ -137,7 +229,7 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts) // TODO: create all parents dirs. // create dir for app. - if ((ret = create_dir()) != ERROR_SUCCESS) { + if (should_write_file && (ret = create_dir()) != ERROR_SUCCESS) { return ret; } @@ -145,19 +237,19 @@ int SrsHlsMuxer::segment_open(int64_t segment_start_dts) srs_assert(!current); // new segment. - current = new SrsHlsSegment(); + current = new SrsHlsSegment(should_write_cache, should_write_file); current->sequence_no = _sequence_no++; current->segment_start_dts = segment_start_dts; // generate filename. char filename[128]; snprintf(filename, sizeof(filename), - "%s-%d.ts", stream.c_str(), current->sequence_no); + "%s-%d.ts", req->stream.c_str(), current->sequence_no); // TODO: use temp file and rename it. current->full_path = hls_path; current->full_path += "/"; - current->full_path += app; + current->full_path += req->app; current->full_path += "/"; current->full_path += filename; @@ -289,6 +381,13 @@ int SrsHlsMuxer::segment_close(string log_desc) srs_info("%s reap ts segment, sequence_no=%d, uri=%s, duration=%.2f, start=%"PRId64"", log_desc.c_str(), current->sequence_no, current->uri.c_str(), current->duration, current->segment_start_dts); + + // notify handler for update ts. + srs_assert(current->writer); + if (handler && (ret = handler->on_update_ts(req, current->uri, current->writer->cache())) != ERROR_SUCCESS) { + srs_error("notify handler for update ts failed. ret=%d", ret); + return ret; + } // close the muxer of finished segment. srs_freep(current->muxer); @@ -297,7 +396,7 @@ int SrsHlsMuxer::segment_close(string log_desc) // rename from tmp to real path std::string tmp_file = full_path + ".tmp"; - if (rename(tmp_file.c_str(), full_path.c_str()) < 0) { + if (should_write_file && rename(tmp_file.c_str(), full_path.c_str()) < 0) { ret = ERROR_HLS_WRITE_FAILED; srs_error("rename ts file failed, %s => %s. ret=%d", tmp_file.c_str(), full_path.c_str(), ret); @@ -313,7 +412,9 @@ int SrsHlsMuxer::segment_close(string log_desc) // rename from tmp to real path std::string tmp_file = current->full_path + ".tmp"; - unlink(tmp_file.c_str()); + if (should_write_file) { + unlink(tmp_file.c_str()); + } srs_freep(current); } @@ -365,22 +466,18 @@ int SrsHlsMuxer::refresh_m3u8() std::string m3u8_file = hls_path; m3u8_file += "/"; - m3u8_file += app; + m3u8_file += req->app; m3u8_file += "/"; - m3u8_file += stream; + m3u8_file += req->stream; m3u8_file += ".m3u8"; m3u8 = m3u8_file; m3u8_file += ".temp"; - int fd = -1; - ret = _refresh_m3u8(fd, m3u8_file); - if (fd >= 0) { - close(fd); - if (rename(m3u8_file.c_str(), m3u8.c_str()) < 0) { + if ((ret = _refresh_m3u8(m3u8_file)) == ERROR_SUCCESS) { + if (should_write_file && rename(m3u8_file.c_str(), m3u8.c_str()) < 0) { ret = ERROR_HLS_WRITE_FAILED; - srs_error("rename m3u8 file failed. " - "%s => %s, ret=%d", m3u8_file.c_str(), m3u8.c_str(), ret); + srs_error("rename m3u8 file failed. %s => %s, ret=%d", m3u8_file.c_str(), m3u8.c_str(), ret); } } @@ -390,7 +487,7 @@ int SrsHlsMuxer::refresh_m3u8() return ret; } -int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file) +int SrsHlsMuxer::_refresh_m3u8(string m3u8_file) { int ret = ERROR_SUCCESS; @@ -398,11 +495,9 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file) if (segments.size() == 0) { return ret; } - - int flags = O_CREAT|O_WRONLY|O_TRUNC; - mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH; - if ((fd = ::open(m3u8_file.c_str(), flags, mode)) < 0) { - ret = ERROR_HLS_OPEN_FAILED; + + SrsHlsCacheWriter writer(should_write_cache, should_write_file); + if ((ret = writer.open(m3u8_file)) != ERROR_SUCCESS) { srs_error("open m3u8 file %s failed. ret=%d", m3u8_file.c_str(), ret); return ret; } @@ -419,8 +514,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file) 0x23, 0x45, 0x58, 0x54, 0x2d, 0x58, 0x2d, 0x41, 0x4c, 0x4c, 0x4f, 0x57, 0x2d, 0x43, 0x41, 0x43, 0x48, 0x45, 0x3a, 0x4e, 0x4f, 0x0a }; - if (::write(fd, header, sizeof(header)) != sizeof(header)) { - ret = ERROR_HLS_WRITE_FAILED; + if ((ret = writer.write(header, sizeof(header), NULL)) != ERROR_SUCCESS) { srs_error("write m3u8 header failed. ret=%d", ret); return ret; } @@ -430,8 +524,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file) SrsHlsSegment* first = *segments.begin(); char sequence[34] = {}; int len = snprintf(sequence, sizeof(sequence), "#EXT-X-MEDIA-SEQUENCE:%d\n", first->sequence_no); - if (::write(fd, sequence, len) != len) { - ret = ERROR_HLS_WRITE_FAILED; + if ((ret = writer.write(sequence, len, NULL)) != ERROR_SUCCESS) { srs_error("write m3u8 sequence failed. ret=%d", ret); return ret; } @@ -448,8 +541,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file) target_duration += 1; char duration[34]; // 23+10+1 len = snprintf(duration, sizeof(duration), "#EXT-X-TARGETDURATION:%d\n", target_duration); - if (::write(fd, duration, len) != len) { - ret = ERROR_HLS_WRITE_FAILED; + if ((ret = writer.write(duration, len, NULL)) != ERROR_SUCCESS) { srs_error("write m3u8 duration failed. ret=%d", ret); return ret; } @@ -463,8 +555,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file) // #EXT-X-DISCONTINUITY\n char ext_discon[22]; // 21+1 len = snprintf(ext_discon, sizeof(ext_discon), "#EXT-X-DISCONTINUITY\n"); - if (::write(fd, ext_discon, len) != len) { - ret = ERROR_HLS_WRITE_FAILED; + if ((ret = writer.write(ext_discon, len, NULL)) != ERROR_SUCCESS) { srs_error("write m3u8 segment discontinuity failed. ret=%d", ret); return ret; } @@ -474,8 +565,7 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file) // "#EXTINF:4294967295.208,\n" char ext_info[25]; // 14+10+1 len = snprintf(ext_info, sizeof(ext_info), "#EXTINF:%.3f\n", segment->duration); - if (::write(fd, ext_info, len) != len) { - ret = ERROR_HLS_WRITE_FAILED; + if ((ret = writer.write(ext_info, len, NULL)) != ERROR_SUCCESS) { srs_error("write m3u8 segment info failed. ret=%d", ret); return ret; } @@ -484,14 +574,19 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file) // file name std::string filename = segment->uri; filename += "\n"; - if (::write(fd, filename.c_str(), filename.length()) != (int)filename.length()) { - ret = ERROR_HLS_WRITE_FAILED; + if ((ret = writer.write((char*)filename.c_str(), (int)filename.length(), NULL)) != ERROR_SUCCESS) { srs_error("write m3u8 segment uri failed. ret=%d", ret); return ret; } srs_verbose("write m3u8 segment uri success."); } srs_info("write m3u8 %s success.", m3u8_file.c_str()); + + // notify handler for update m3u8. + if (handler && (ret = handler->on_update_m3u8(req, writer.cache())) != ERROR_SUCCESS) { + srs_error("notify handler for update m3u8 failed. ret=%d", ret); + return ret; + } return ret; } @@ -499,10 +594,14 @@ int SrsHlsMuxer::_refresh_m3u8(int& fd, string m3u8_file) int SrsHlsMuxer::create_dir() { int ret = ERROR_SUCCESS; + + if (!should_write_file) { + return ret; + } std::string app_dir = hls_path; app_dir += "/"; - app_dir += app; + app_dir += req->app; // TODO: cleanup the dir when startup. @@ -543,7 +642,7 @@ int SrsHlsCache::on_publish(SrsHlsMuxer* muxer, SrsRequest* req, int64_t segment // for the HLS donot requires the EXT-X-MEDIA-SEQUENCE be monotonically increase. // open muxer - if ((ret = muxer->update_config(app, stream, hls_path, hls_fragment, hls_window)) != ERROR_SUCCESS) { + if ((ret = muxer->update_config(req, hls_path, hls_fragment, hls_window)) != ERROR_SUCCESS) { srs_error("m3u8 muxer update config failed. ret=%d", ret); return ret; } @@ -679,16 +778,18 @@ int SrsHlsCache::reap_segment(string log_desc, SrsHlsMuxer* muxer, int64_t segme return ret; } -SrsHls::SrsHls(SrsSource* _source) +SrsHls::SrsHls(SrsSource* s, ISrsHlsHandler* h) { - hls_enabled = false; + source = s; + handler = h; - source = _source; + hls_enabled = false; + codec = new SrsAvcAacCodec(); sample = new SrsCodecSample(); jitter = new SrsRtmpJitter(); - muxer = new SrsHlsMuxer(); + muxer = new SrsHlsMuxer(h); hls_cache = new SrsHlsCache(); pithy_print = new SrsPithyPrint(SRS_CONSTS_STAGE_HLS); diff --git a/trunk/src/app/srs_app_hls.hpp b/trunk/src/app/srs_app_hls.hpp index 824730e01..ae08431c8 100644 --- a/trunk/src/app/srs_app_hls.hpp +++ b/trunk/src/app/srs_app_hls.hpp @@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include class SrsSharedPtrMessage; class SrsCodecSample; @@ -53,6 +54,70 @@ class SrsFileWriter; class SrsSimpleBuffer; class SrsTsAacJitter; class SrsTsCache; +class SrsHlsSegment; + +/** +* the handler for hls event. +* for example, we use memory only hls for +*/ +class ISrsHlsHandler +{ +public: + ISrsHlsHandler(); + virtual ~ISrsHlsHandler(); +public: + /** + * when publish stream + */ + virtual int on_hls_publish(SrsRequest* req) = 0; + /** + * when update the m3u8 file. + */ + virtual int on_update_m3u8(SrsRequest* r, std::string m3u8) = 0; + /** + * when reap new ts file. + */ + virtual int on_update_ts(SrsRequest* r, std::string uri, std::string ts) = 0; + /** + * when unpublish stream + */ + virtual int on_hls_unpublish(SrsRequest* req) = 0; +}; + +/** +* write to file and cache. +*/ +class SrsHlsCacheWriter : public SrsFileWriter +{ +private: + SrsFileWriter impl; + std::string data; + bool should_write_cache; + bool should_write_file; +public: + SrsHlsCacheWriter(bool write_cache, bool write_file); + virtual ~SrsHlsCacheWriter(); +public: + /** + * open file writer, can open then close then open... + */ + virtual int open(std::string file); + virtual void close(); +public: + virtual bool is_open(); + virtual int64_t tellg(); +public: + /** + * write to file. + * @param pnwrite the output nb_write, NULL to ignore. + */ + virtual int write(void* buf, size_t count, ssize_t* pnwrite); +public: + /** + * get the string cache. + */ + virtual std::string cache(); +}; /** * the wrapper of m3u8 segment from specification: @@ -72,16 +137,16 @@ public: // ts full file to write. std::string full_path; // the muxer to write ts. - SrsFileWriter* writer; + SrsHlsCacheWriter* writer; SrsTSMuxer* muxer; // current segment start dts for m3u8 int64_t segment_start_dts; // whether current segement is sequence header. bool is_sequence_header; - - SrsHlsSegment(); +public: + SrsHlsSegment(bool write_cache, bool write_file); virtual ~SrsHlsSegment(); - +public: /** * update the segment duration. * @current_frame_dts the dts of frame, in tbn of ts. @@ -100,8 +165,7 @@ public: class SrsHlsMuxer { private: - std::string app; - std::string stream; + SrsRequest* req; private: std::string hls_path; int hls_fragment; @@ -109,6 +173,10 @@ private: private: int _sequence_no; std::string m3u8; +private: + ISrsHlsHandler* handler; + bool should_write_cache; + bool should_write_file; private: /** * m3u8 segments. @@ -125,12 +193,15 @@ private: */ SrsCodecAudio acodec; public: - SrsHlsMuxer(); + SrsHlsMuxer(ISrsHlsHandler* h); virtual ~SrsHlsMuxer(); public: virtual int sequence_no(); public: - virtual int update_config(std::string _app, std::string _stream, std::string path, int fragment, int window); + /** + * when publish, update the config for muxer. + */ + virtual int update_config(SrsRequest* r, std::string path, int fragment, int window); /** * open a new segment(a new ts file), * @param segment_start_dts use to calc the segment duration, @@ -160,7 +231,7 @@ public: virtual int segment_close(std::string log_desc); private: virtual int refresh_m3u8(); - virtual int _refresh_m3u8(int& fd, std::string m3u8_file); + virtual int _refresh_m3u8(std::string m3u8_file); virtual int create_dir(); }; @@ -229,6 +300,7 @@ class SrsHls private: SrsHlsMuxer* muxer; SrsHlsCache* hls_cache; + ISrsHlsHandler* handler; private: bool hls_enabled; SrsSource* source; @@ -251,7 +323,7 @@ private: */ int64_t stream_dts; public: - SrsHls(SrsSource* _source); + SrsHls(SrsSource* s, ISrsHlsHandler* h); virtual ~SrsHls(); public: /** diff --git a/trunk/src/app/srs_app_http.cpp b/trunk/src/app/srs_app_http.cpp index 6402615b8..6282fad46 100644 --- a/trunk/src/app/srs_app_http.cpp +++ b/trunk/src/app/srs_app_http.cpp @@ -379,7 +379,9 @@ int SrsGoHttpFileServer::serve_file(ISrsGoHttpResponseWriter* w, SrsHttpMessage* // write body. int64_t left = length; if ((ret = copy(w, &fs, r, left)) != ERROR_SUCCESS) { - srs_warn("read file=%s size=%d failed, ret=%d", fullpath.c_str(), left, ret); + if (!srs_is_client_gracefully_close(ret)) { + srs_error("read file=%s size=%d failed, ret=%d", fullpath.c_str(), left, ret); + } return ret; } diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 86fbdfad3..304da8cf2 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -680,18 +680,96 @@ SrsLiveEntry::SrsLiveEntry() cache = NULL; } +SrsHlsM3u8Stream::SrsHlsM3u8Stream() +{ +} + +SrsHlsM3u8Stream::~SrsHlsM3u8Stream() +{ +} + +void SrsHlsM3u8Stream::set_m3u8(std::string v) +{ + m3u8 = v; +} + +int SrsHlsM3u8Stream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) +{ + int ret = ERROR_SUCCESS; + + std::string data = m3u8; + + w->header()->set_content_length((int)data.length()); + w->header()->set_content_type("application/x-mpegURL;charset=utf-8"); + + if ((ret = w->write((char*)data.data(), (int)data.length())) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("send m3u8 failed. ret=%d", ret); + } + return ret; + } + + return ret; +} + +SrsHlsTsStream::SrsHlsTsStream() +{ +} + +SrsHlsTsStream::~SrsHlsTsStream() +{ +} + +void SrsHlsTsStream::set_ts(std::string v) +{ + ts = v; +} + +int SrsHlsTsStream::serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r) +{ + int ret = ERROR_SUCCESS; + + std::string data = ts; + + w->header()->set_content_length((int)data.length()); + w->header()->set_content_type("video/MP2T"); + + if ((ret = w->write((char*)data.data(), (int)data.length())) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("send ts failed. ret=%d", ret); + } + return ret; + } + + return ret; +} + +SrsHlsEntry::SrsHlsEntry() +{ +} + SrsHttpServer::SrsHttpServer() { } SrsHttpServer::~SrsHttpServer() { - std::map::iterator it; - for (it = flvs.begin(); it != flvs.end(); ++it) { - SrsLiveEntry* entry = it->second; - srs_freep(entry); + if (true) { + std::map::iterator it; + for (it = flvs.begin(); it != flvs.end(); ++it) { + SrsLiveEntry* entry = it->second; + srs_freep(entry); + } + flvs.clear(); + } + if (true) { + std::map::iterator it; + for (it = hls.begin(); it != hls.end(); ++it) { + SrsHlsEntry* entry = it->second; + srs_freep(entry); + } + hls.clear(); } - flvs.clear(); } int SrsHttpServer::initialize() @@ -700,12 +778,17 @@ int SrsHttpServer::initialize() // static file // flv vod streaming. - if ((ret = mount_static_file()) != ERROR_SUCCESS) { + if ((ret = initialize_static_file()) != ERROR_SUCCESS) { return ret; } // remux rtmp to flv live streaming - if ((ret = mount_flv_streaming()) != ERROR_SUCCESS) { + if ((ret = initialize_flv_streaming()) != ERROR_SUCCESS) { + return ret; + } + + // remux rtmp to hls live streaming + if ((ret = initialize_hls_streaming()) != ERROR_SUCCESS) { return ret; } @@ -769,6 +852,128 @@ void SrsHttpServer::unmount(SrsSource* s, SrsRequest* r) entry->stream->entry->enabled = false; } +int SrsHttpServer::mount_hls(SrsRequest* r) +{ + int ret = ERROR_SUCCESS; + + if (hls.find(r->vhost) == hls.end()) { + srs_info("ignore mount hls stream for disabled"); + return ret; + } + + SrsHlsEntry* entry = hls[r->vhost]; + + // TODO: FIXME: supports reload. + std::map::iterator it; + for (it = entry->streams.begin(); it != entry->streams.end(); ++it) { + ISrsGoHttpHandler* stream = it->second; + stream->entry->enabled = true; + } + + return ret; +} + +int SrsHttpServer::hls_update_m3u8(SrsRequest* r, string m3u8) +{ + int ret = ERROR_SUCCESS; + + srs_assert(hls.find(r->vhost) != hls.end()); + SrsHlsEntry* entry = hls[r->vhost]; + srs_assert(entry); + + std::string mount = entry->mount; + + // replace the vhost variable + mount = srs_string_replace(mount, "[vhost]", r->vhost); + mount = srs_string_replace(mount, "[app]", r->app); + mount = srs_string_replace(mount, "[stream]", r->stream); + + // remove the default vhost mount + mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); + + if (entry->streams.find(mount) == entry->streams.end()) { + ISrsGoHttpHandler* he = new SrsHlsM3u8Stream(); + entry->streams[mount] = he; + + if ((ret = mux.handle(mount, he)) != ERROR_SUCCESS) { + srs_error("handle mount=%s failed. ret=%d", mount.c_str(), ret); + return ret; + } + } + + // update the m3u8 stream. + SrsHlsM3u8Stream* hms = dynamic_cast(entry->streams[mount]); + if (hms) { + hms->set_m3u8(m3u8); + } + srs_trace("hls update m3u8 ok, mount=%s", mount.c_str()); + + return ret; +} + +int SrsHttpServer::hls_update_ts(SrsRequest* r, string uri, string ts) +{ + int ret = ERROR_SUCCESS; + + srs_assert(hls.find(r->vhost) != hls.end()); + SrsHlsEntry* entry = hls[r->vhost]; + srs_assert(entry); + + std::string mount = entry->mount; + + // the ts is relative from the m3u8, the same start dir. + size_t pos = string::npos; + if ((pos = mount.rfind("/")) != string::npos) { + mount = mount.substr(0, pos); + } + + // replace the vhost variable + mount = srs_string_replace(mount, "[vhost]", r->vhost); + mount = srs_string_replace(mount, "[app]", r->app); + + // remove the default vhost mount + mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); + + // mount with ts. + mount += "/"; + mount += uri; + + if (entry->streams.find(mount) == entry->streams.end()) { + ISrsGoHttpHandler* he = new SrsHlsTsStream(); + entry->streams[mount] = he; + + if ((ret = mux.handle(mount, he)) != ERROR_SUCCESS) { + srs_error("handle mount=%s failed. ret=%d", mount.c_str(), ret); + return ret; + } + } + + // update the ts stream. + SrsHlsTsStream* hts = dynamic_cast(entry->streams[mount]); + if (hts) { + hts->set_ts(ts); + } + srs_trace("hls update ts ok, mount=%s", mount.c_str()); + + return ret; +} + +void SrsHttpServer::unmount_hls(SrsRequest* r) +{ + if (hls.find(r->vhost) == hls.end()) { + srs_info("ignore unmount hls stream for disabled"); + return; + } + + SrsHlsEntry* entry = hls[r->vhost]; + + std::map::iterator it; + for (it = entry->streams.begin(); it != entry->streams.end(); ++it) { + ISrsGoHttpHandler* stream = it->second; + stream->entry->enabled = false; + } +} + int SrsHttpServer::on_reload_vhost_http_updated() { int ret = ERROR_SUCCESS; @@ -783,7 +988,14 @@ int SrsHttpServer::on_reload_vhost_http_remux_updated() return ret; } -int SrsHttpServer::mount_static_file() +int SrsHttpServer::on_reload_vhost_hls(string vhost) +{ + int ret = ERROR_SUCCESS; + // TODO: FIXME: implements it. + return ret; +} + +int SrsHttpServer::initialize_static_file() { int ret = ERROR_SUCCESS; @@ -843,7 +1055,7 @@ int SrsHttpServer::mount_static_file() return ret; } -int SrsHttpServer::mount_flv_streaming() +int SrsHttpServer::initialize_flv_streaming() { int ret = ERROR_SUCCESS; @@ -872,6 +1084,40 @@ int SrsHttpServer::mount_flv_streaming() return ret; } +int SrsHttpServer::initialize_hls_streaming() +{ + int ret = ERROR_SUCCESS; + + // http hls live stream mount for each vhost. + SrsConfDirective* root = _srs_config->get_root(); + for (int i = 0; i < (int)root->directives.size(); i++) { + SrsConfDirective* conf = root->at(i); + + if (!conf->is_vhost()) { + continue; + } + + std::string vhost = conf->arg0(); + if (!_srs_config->get_hls_enabled(vhost)) { + continue; + } + + std::string storage = _srs_config->get_hls_storage(vhost); + if (storage != "ram" && storage != "both") { + continue; + } + + SrsHlsEntry* entry = new SrsHlsEntry(); + entry->vhost = vhost; + entry->mount = _srs_config->get_hls_mount(vhost); + hls[vhost] = entry; + srs_trace("http hls live stream, vhost=%s, mount=%s", + vhost.c_str(), entry->mount.c_str()); + } + + return ret; +} + SrsHttpConn::SrsHttpConn(SrsServer* svr, st_netfd_t fd, SrsHttpServer* m) : SrsConnection(svr, fd) { diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index dedae7d12..5a63418d1 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -260,6 +260,53 @@ struct SrsLiveEntry SrsLiveEntry(); }; +/** +* the m3u8 stream handler. +*/ +class SrsHlsM3u8Stream : public ISrsGoHttpHandler +{ +private: + std::string m3u8; +public: + SrsHlsM3u8Stream(); + virtual ~SrsHlsM3u8Stream(); +public: + virtual void set_m3u8(std::string v); +public: + virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r); +}; + +/** +* the ts stream handler. +*/ +class SrsHlsTsStream : public ISrsGoHttpHandler +{ +private: + std::string ts; +public: + SrsHlsTsStream(); + virtual ~SrsHlsTsStream(); +public: + virtual void set_ts(std::string v); +public: + virtual int serve_http(ISrsGoHttpResponseWriter* w, SrsHttpMessage* r); +}; + +/** +* the srs hls entry. +*/ +struct SrsHlsEntry +{ + std::string vhost; + std::string mount; + + // key: the m3u8/ts file path. + // value: the http handler. + std::map streams; + + SrsHlsEntry(); +}; + /** * the http server instance, * serve http static file, flv vod stream and flv live stream. @@ -270,21 +317,32 @@ public: SrsGoHttpServeMux mux; // the flv live streaming template. std::map flvs; + // the hls live streaming template. + std::map hls; public: SrsHttpServer(); virtual ~SrsHttpServer(); public: virtual int initialize(); +// http flv/ts/mp3/aac stream public: virtual int mount(SrsSource* s, SrsRequest* r); virtual void unmount(SrsSource* s, SrsRequest* r); +// hls stream +public: + virtual int mount_hls(SrsRequest* r); + virtual int hls_update_m3u8(SrsRequest* r, std::string m3u8); + virtual int hls_update_ts(SrsRequest* r, std::string uri, std::string ts); + virtual void unmount_hls(SrsRequest* r); // interface ISrsThreadHandler. public: virtual int on_reload_vhost_http_updated(); virtual int on_reload_vhost_http_remux_updated(); + virtual int on_reload_vhost_hls(std::string vhost); private: - virtual int mount_static_file(); - virtual int mount_flv_streaming(); + virtual int initialize_static_file(); + virtual int initialize_flv_streaming(); + virtual int initialize_hls_streaming(); }; class SrsHttpConn : public SrsConnection diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 14754d6c1..070af6154 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -393,7 +393,7 @@ int SrsRtmpConn::stream_service_cycle() // find a source to serve. SrsSource* source = NULL; - if ((ret = SrsSource::find(req, server, &source)) != ERROR_SUCCESS) { + if ((ret = SrsSource::find(req, server, server, &source)) != ERROR_SUCCESS) { return ret; } srs_assert(source != NULL); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 300b8c528..d7a89a080 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -32,6 +32,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +using namespace std; #include #include @@ -1277,3 +1278,53 @@ void SrsServer::on_unpublish(SrsSource* s, SrsRequest* r) #endif } +int SrsServer::on_hls_publish(SrsRequest* r) +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_HTTP_SERVER + if ((ret = http_stream_mux->mount_hls(r)) != ERROR_SUCCESS) { + return ret; + } +#endif + + return ret; +} + +int SrsServer::on_update_m3u8(SrsRequest* r, string m3u8) +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_HTTP_SERVER + if ((ret = http_stream_mux->hls_update_m3u8(r, m3u8)) != ERROR_SUCCESS) { + return ret; + } +#endif + + return ret; +} + +int SrsServer::on_update_ts(SrsRequest* r, string uri, string ts) +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_HTTP_SERVER + if ((ret = http_stream_mux->hls_update_ts(r, uri, ts)) != ERROR_SUCCESS) { + return ret; + } +#endif + + return ret; +} + +int SrsServer::on_hls_unpublish(SrsRequest* r) +{ + int ret = ERROR_SUCCESS; + +#ifdef SRS_AUTO_HTTP_SERVER + http_stream_mux->unmount_hls(r); +#endif + + return ret; +} + diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 775722aaa..79112840f 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -31,11 +31,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include #include #include #include #include +#include class SrsServer; class SrsConnection; @@ -142,7 +144,7 @@ private: * start connection service thread, destroy client. */ class SrsServer : virtual public ISrsReloadHandler - , virtual public ISrsSourceHandler + , virtual public ISrsSourceHandler, virtual public ISrsHlsHandler { private: #ifdef SRS_AUTO_HTTP_API @@ -275,6 +277,12 @@ public: public: virtual int on_publish(SrsSource* s, SrsRequest* r); virtual void on_unpublish(SrsSource* s, SrsRequest* r); +// interface ISrsHlsHandler +public: + virtual int on_hls_publish(SrsRequest* r); + virtual int on_update_m3u8(SrsRequest* r, std::string m3u8); + virtual int on_update_ts(SrsRequest* r, std::string uri, std::string ts); + virtual int on_hls_unpublish(SrsRequest* r); }; #endif diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index fa88d5b18..874419dc3 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -713,7 +713,7 @@ ISrsSourceHandler::~ISrsSourceHandler() std::map SrsSource::pool; -int SrsSource::find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps) +int SrsSource::find(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps) { int ret = ERROR_SUCCESS; @@ -721,7 +721,7 @@ int SrsSource::find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps) string vhost = r->vhost; if (pool.find(stream_url) == pool.end()) { - SrsSource* source = new SrsSource(); + SrsSource* source = new SrsSource(hh); if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) { srs_freep(source); return ret; @@ -754,13 +754,14 @@ void SrsSource::destroy() pool.clear(); } -SrsSource::SrsSource() +SrsSource::SrsSource(ISrsHlsHandler* hh) { _req = NULL; jitter_algorithm = SrsRtmpJitterAlgorithmOFF; #ifdef SRS_AUTO_HLS - hls = new SrsHls(this); + // TODO: FIXME: refine code, use subscriber pattern. + hls = new SrsHls(this, hh); #endif #ifdef SRS_AUTO_DVR dvr = new SrsDvr(this); diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 859f395ca..eb18082f1 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -61,6 +61,7 @@ class SrsDvr; class SrsEncoder; #endif class SrsStream; +class ISrsHlsHandler; /** * the time jitter algorithm: @@ -376,9 +377,10 @@ public: * find stream by vhost/app/stream. * @param r the client request. * @param h the event handler for source. + * @param hh the event handler for hls. * @param pps the matched source, if success never be NULL. */ - static int find(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps); + static int find(SrsRequest* r, ISrsSourceHandler* h, ISrsHlsHandler* hh, SrsSource** pps); /** * when system exit, destroy the sources, * for gmc to analysis mem leaks. @@ -451,7 +453,7 @@ public: * @param _req the client request object, * this object will deep copy it for reload. */ - SrsSource(); + SrsSource(ISrsHlsHandler* hh); virtual ~SrsSource(); // initialize, get and setter. public: diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 90142fb6f..7c341e8e9 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 111 +#define VERSION_REVISION 112 // server info. #define RTMP_SIG_SRS_KEY "SRS"