From e6d6bdfe57d2af6db420be7b72fc2858dc35d2fe Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 12:27:30 +0800 Subject: [PATCH 01/11] refine the hls_on_notify, only read a chunk. --- trunk/src/app/srs_app_hls.cpp | 2 +- trunk/src/app/srs_app_http_hooks.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index 240ada91c..bbc62cd95 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -243,7 +243,7 @@ int SrsDvrAsyncCallOnHlsNotify::call() for (int i = 0; i < (int)on_hls->args.size(); i++) { std::string url = on_hls->args.at(i); if ((ret = SrsHttpHooks::on_hls_notify(url, req, ts_url)) != ERROR_SUCCESS) { - srs_error("hook client on_hls_notify failed. url=%s, ret=%d", url.c_str(), ret); + srs_error("hook client on_hls_notify failed. url=%s, ts=%s, ret=%d", url.c_str(), ts_url.c_str(), ret); return ret; } } diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index e4bb70461..cc814d4b7 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -361,9 +361,9 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts ISrsHttpResponseReader* br = msg->body_reader(); while (!br->eof()) { std::string data; - if ((ret = br->read(data)) != ERROR_SUCCESS) { - break; - } + // for notify, only read some data. + ret = br->read(data); + break; } srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, ret=%d", From 310157ab77f64e4874b4712ae768890c029bb646 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 12:30:22 +0800 Subject: [PATCH 02/11] refine the hls_on_notify, calc the spent time in ms. --- trunk/src/app/srs_app_http_hooks.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index cc814d4b7..59a83ccc6 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -341,6 +341,8 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts url = srs_string_replace(url, "[stream]", req->stream); url = srs_string_replace(url, "[ts_url]", ts_url); + int64_t starttime = srs_update_system_time_ms(); + SrsHttpUri uri; if ((ret = uri.initialize(url)) != ERROR_SUCCESS) { srs_error("http: post failed. url=%s, ret=%d", url.c_str(), ret); @@ -366,8 +368,9 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts break; } - srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, ret=%d", - client_id, url.c_str(), msg->status_code(), ret); + int spenttime = (int)(srs_update_system_time_ms() - starttime); + srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, spent=%dms, ret=%d", + client_id, url.c_str(), msg->status_code(), spenttime, ret); // ignore any error for on_hls_notify. ret = ERROR_SUCCESS; From d8988da0eaf1c2bde24462327460ed62afd2d03b Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 12:32:34 +0800 Subject: [PATCH 03/11] refine the hls_on_notify, calc the spent time in ms. --- trunk/src/kernel/srs_kernel_utility.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp index 32598a13f..af5d5d18c 100644 --- a/trunk/src/kernel/srs_kernel_utility.cpp +++ b/trunk/src/kernel/srs_kernel_utility.cpp @@ -134,7 +134,7 @@ int64_t srs_update_system_time_ms() if (_srs_system_time_us_cache <= 0) { _srs_system_time_us_cache = now_us; _srs_system_time_startup_time = now_us; - return _srs_system_time_us_cache; + return _srs_system_time_us_cache / 1000; } // use relative time. @@ -151,7 +151,7 @@ int64_t srs_update_system_time_ms() srs_info("system time updated, startup=%"PRId64"us, now=%"PRId64"us", _srs_system_time_startup_time, _srs_system_time_us_cache); - return _srs_system_time_us_cache; + return _srs_system_time_us_cache / 1000; } string srs_dns_resolve(string host) From 1f93fb3399d92a9ea0cb2eae0e90a3843170da6d Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 13:45:21 +0800 Subject: [PATCH 04/11] refine hls notify, support timeout. --- trunk/src/app/srs_app_http_client.cpp | 16 +++++++++------- trunk/src/app/srs_app_http_client.hpp | 6 +++++- trunk/src/app/srs_app_http_hooks.cpp | 5 ++++- trunk/src/kernel/srs_kernel_utility.cpp | 2 +- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/trunk/src/app/srs_app_http_client.cpp b/trunk/src/app/srs_app_http_client.cpp index 6eb0f663b..299ea180f 100644 --- a/trunk/src/app/srs_app_http_client.cpp +++ b/trunk/src/app/srs_app_http_client.cpp @@ -37,15 +37,13 @@ using namespace std; #include #include -// when error, http client sleep for a while and retry. -#define SRS_HTTP_CLIENT_SLEEP_US (int64_t)(3*1000*1000LL) - SrsHttpClient::SrsHttpClient() { connected = false; stfd = NULL; skt = NULL; parser = NULL; + timeout_us = 0; } SrsHttpClient::~SrsHttpClient() @@ -54,7 +52,7 @@ SrsHttpClient::~SrsHttpClient() srs_freep(parser); } -int SrsHttpClient::initialize(string h, int p) +int SrsHttpClient::initialize(string h, int p, int64_t t_us) { int ret = ERROR_SUCCESS; @@ -68,6 +66,7 @@ int SrsHttpClient::initialize(string h, int p) host = h; port = p; + timeout_us = t_us; return ret; } @@ -183,10 +182,9 @@ int SrsHttpClient::connect() disconnect(); // open socket. - int64_t timeout = SRS_HTTP_CLIENT_SLEEP_US; - if ((ret = srs_socket_connect(host, port, timeout, &stfd)) != ERROR_SUCCESS) { + if ((ret = srs_socket_connect(host, port, timeout_us, &stfd)) != ERROR_SUCCESS) { srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", - host.c_str(), port, timeout, ret); + host.c_str(), port, timeout_us, ret); return ret; } srs_info("connect to server success. server=%s, port=%d", host, port); @@ -195,6 +193,10 @@ int SrsHttpClient::connect() skt = new SrsStSocket(stfd); connected = true; + // set the recv/send timeout in us. + skt->set_recv_timeout(timeout_us); + skt->set_send_timeout(timeout_us); + return ret; } diff --git a/trunk/src/app/srs_app_http_client.hpp b/trunk/src/app/srs_app_http_client.hpp index 451f336bf..67a3314c2 100644 --- a/trunk/src/app/srs_app_http_client.hpp +++ b/trunk/src/app/srs_app_http_client.hpp @@ -40,6 +40,9 @@ class SrsHttpParser; class SrsHttpMessage; class SrsStSocket; +// the default timeout for http client. +#define SRS_HTTP_CLIENT_TIMEOUT_US (int64_t)(30*1000*1000LL) + /** * http client to GET/POST/PUT/DELETE uri */ @@ -51,6 +54,7 @@ private: SrsStSocket* skt; SrsHttpParser* parser; private: + int64_t timeout_us; // host name or ip. std::string host; int port; @@ -61,7 +65,7 @@ public: /** * initialize the client, connect to host and port. */ - virtual int initialize(std::string h, int p); + virtual int initialize(std::string h, int p, int64_t t_us = SRS_HTTP_CLIENT_TIMEOUT_US); public: /** * to post data to the uri. diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 59a83ccc6..7824a6978 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -44,6 +44,9 @@ using namespace std; #define SRS_HTTP_HEADER_BUFFER 1024 #define SRS_HTTP_BODY_BUFFER 32 * 1024 +// the timeout for hls notify, in us. +#define SRS_HLS_NOTIFY_TIMEOUT_US (int64_t)(10*1000*1000LL) + SrsHttpHooks::SrsHttpHooks() { } @@ -350,7 +353,7 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts } SrsHttpClient http; - if ((ret = http.initialize(uri.get_host(), uri.get_port())) != ERROR_SUCCESS) { + if ((ret = http.initialize(uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TIMEOUT_US)) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp index af5d5d18c..d0daaef50 100644 --- a/trunk/src/kernel/srs_kernel_utility.cpp +++ b/trunk/src/kernel/srs_kernel_utility.cpp @@ -390,7 +390,7 @@ bool srs_aac_startswith_adts(SrsStream* stream) char* bytes = stream->data() + stream->pos(); char* p = bytes; - if (!stream->require(p - bytes + 2)) { + if (!stream->require((int)(p - bytes) + 2)) { return false; } From 5c00ce8a96d6bac86fa8ef9ebb3c2758c79e1e15 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 14:17:49 +0800 Subject: [PATCH 05/11] refine hls on_hls_notify, read a little of ts. --- trunk/src/app/srs_app_http.cpp | 163 +++++++++++++++++---------- trunk/src/app/srs_app_http.hpp | 22 ++-- trunk/src/app/srs_app_http_hooks.cpp | 13 +-- 3 files changed, 122 insertions(+), 76 deletions(-) diff --git a/trunk/src/app/srs_app_http.cpp b/trunk/src/app/srs_app_http.cpp index bb858032e..2be383c0d 100644 --- a/trunk/src/app/srs_app_http.cpp +++ b/trunk/src/app/srs_app_http.cpp @@ -53,6 +53,9 @@ using namespace std; #define SRS_CONSTS_HTTP_PUT HTTP_PUT #define SRS_CONSTS_HTTP_DELETE HTTP_DELETE +// for ead all of http body, read each time. +#define SRS_HTTP_READ_CACHE_BYTES 4096 + #define SRS_HTTP_DEFAULT_PAGE "index.html" int srs_go_http_response_json(ISrsHttpResponseWriter* w, string data) @@ -889,7 +892,8 @@ SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* i skt = io; owner = msg; is_eof = false; - nb_read = 0; + nb_total_read = 0; + nb_left_chunk = 0; buffer = NULL; } @@ -901,6 +905,8 @@ int SrsHttpResponseReader::initialize(SrsFastBuffer* body) { int ret = ERROR_SUCCESS; + nb_left_chunk = 0; + nb_total_read = 0; buffer = body; return ret; @@ -911,7 +917,7 @@ bool SrsHttpResponseReader::eof() return is_eof; } -int SrsHttpResponseReader::read(std::string& data) +int SrsHttpResponseReader::read(char* data, int nb_data, int* nb_read) { int ret = ERROR_SUCCESS; @@ -923,95 +929,115 @@ int SrsHttpResponseReader::read(std::string& data) // chunked encoding. if (owner->is_chunked()) { - return read_chunked(data); + return read_chunked(data, nb_data, nb_read); } // read by specified content-length - int max = (int)owner->content_length() - (int)nb_read; + int max = (int)owner->content_length() - (int)nb_total_read; if (max <= 0) { is_eof = true; return ret; } - return read_specified(max, data); + + // change the max to read. + nb_data = srs_min(nb_data, max); + return read_specified(data, nb_data, nb_read); } -int SrsHttpResponseReader::read_chunked(std::string& data) +int SrsHttpResponseReader::read_chunked(char* data, int nb_data, int* nb_read) { int ret = ERROR_SUCCESS; + // when no bytes left in chunk, // parse the chunk length first. - char* at = NULL; - int length = 0; - while (!at) { - // find the CRLF of chunk header end. - char* start = buffer->bytes(); - char* end = start + buffer->size(); - for (char* p = start; p < end - 1; p++) { - if (p[0] == SRS_HTTP_CR && p[1] == SRS_HTTP_LF) { - // invalid chunk, ignore. - if (p == start) { - ret = ERROR_HTTP_INVALID_CHUNK_HEADER; - srs_error("chunk header start with CRLF. ret=%d", ret); - return ret; + if (nb_left_chunk <= 0) { + char* at = NULL; + int length = 0; + while (!at) { + // find the CRLF of chunk header end. + char* start = buffer->bytes(); + char* end = start + buffer->size(); + for (char* p = start; p < end - 1; p++) { + if (p[0] == SRS_HTTP_CR && p[1] == SRS_HTTP_LF) { + // invalid chunk, ignore. + if (p == start) { + ret = ERROR_HTTP_INVALID_CHUNK_HEADER; + srs_error("chunk header start with CRLF. ret=%d", ret); + return ret; + } + length = (int)(p - start + 2); + at = buffer->read_slice(length); + break; } - length = (int)(p - start + 2); - at = buffer->read_slice(length); + } + + // got at, ok. + if (at) { break; } - } - - // got at, ok. - if (at) { - break; - } - - // when empty, only grow 1bytes, but the buffer will cache more. - if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("read body from server failed. ret=%d", ret); + + // when empty, only grow 1bytes, but the buffer will cache more. + if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("read body from server failed. ret=%d", ret); + } + return ret; } + } + srs_assert(length >= 3); + + // it's ok to set the pos and pos+1 to NULL. + at[length - 1] = 0; + at[length - 2] = 0; + + // size is the bytes size, excludes the chunk header and end CRLF. + int ilength = (int)::strtol(at, NULL, 16); + if (ilength < 0) { + ret = ERROR_HTTP_INVALID_CHUNK_HEADER; + srs_error("chunk header negative, length=%d. ret=%d", ilength, ret); return ret; } + + // all bytes in chunk is left now. + nb_left_chunk = ilength; } - srs_assert(length >= 3); - // it's ok to set the pos and pos+1 to NULL. - at[length - 1] = 0; - at[length - 2] = 0; + // left bytes in chunk, read some. + srs_assert(nb_left_chunk); - // size is the bytes size, excludes the chunk header and end CRLF. - int ilength = (int)::strtol(at, NULL, 16); - if (ilength < 0) { - ret = ERROR_HTTP_INVALID_CHUNK_HEADER; - srs_error("chunk header negative, length=%d. ret=%d", ilength, ret); + int nb_bytes = srs_min(nb_left_chunk, nb_data); + ret = read_specified(data, nb_bytes, &nb_bytes); + + // the nb_bytes used for output already read size of bytes. + if (nb_read) { + *nb_read = nb_bytes; + } + nb_left_chunk -= nb_bytes; + + // error or still left bytes in chunk, ignore and read in future. + if (nb_left_chunk > 0 || (ret != ERROR_SUCCESS)) { return ret; } - - // when empty, only grow 1bytes, but the buffer will cache more. - if ((ret = buffer->grow(skt, ilength + 2)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("read body from server failed. ret=%d", ret); - } - return ret; - } - srs_info("http: read %d chunk", ilength); + srs_info("http: read %d bytes of chunk", nb_bytes); // read payload when length specifies some payload. - if (ilength <= 0) { + if (nb_left_chunk <= 0) { is_eof = true; - } else { - srs_assert(ilength); - data.append(buffer->read_slice(ilength), ilength); - nb_read += ilength; } // the CRLF of chunk payload end. + if ((ret = buffer->grow(skt, 2)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("read EOF of chunk from server failed. ret=%d", ret); + } + return ret; + } buffer->read_slice(2); return ret; } -int SrsHttpResponseReader::read_specified(int max, std::string& data) +int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read) { int ret = ERROR_SUCCESS; @@ -1025,14 +1051,21 @@ int SrsHttpResponseReader::read_specified(int max, std::string& data) } } - int nb_bytes = srs_min(max, buffer->size()); + int nb_bytes = srs_min(nb_data, buffer->size()); + // read data to buffer. srs_assert(nb_bytes); - data.append(buffer->read_slice(nb_bytes), nb_bytes); - nb_read += nb_bytes; + char* p = buffer->read_slice(nb_bytes); + memcpy(data, p, nb_bytes); + if (nb_read) { + *nb_read = nb_bytes; + } + + // increase the total read to determine whether EOF. + nb_total_read += nb_bytes; // when read completed, eof. - if (nb_read >= (int)owner->content_length()) { + if (nb_total_read >= (int)owner->content_length()) { is_eof = true; } @@ -1223,11 +1256,19 @@ int SrsHttpMessage::body_read_all(string& body) { int ret = ERROR_SUCCESS; + // cache to read. + char* buf = new char[SRS_HTTP_READ_CACHE_BYTES]; + SrsAutoFree(char, buf); + // whatever, read util EOF. while (!_body->eof()) { - if ((ret = _body->read(body)) != ERROR_SUCCESS) { + int nb_read = 0; + if ((ret = _body->read(buf, SRS_HTTP_READ_CACHE_BYTES, &nb_read)) != ERROR_SUCCESS) { return ret; } + + srs_assert (nb_read > 0); + body.append(buf, nb_read); } return ret; diff --git a/trunk/src/app/srs_app_http.hpp b/trunk/src/app/srs_app_http.hpp index bc524666e..76cf602da 100644 --- a/trunk/src/app/srs_app_http.hpp +++ b/trunk/src/app/srs_app_http.hpp @@ -196,10 +196,13 @@ public: */ virtual bool eof() = 0; /** - * read from the response body. - * @remark when eof(), return error. - */ - virtual int read(std::string& data) = 0; + * read from the response body. + * @param data, the buffer to read data buffer to. + * @param nb_data, the max size of data buffer. + * @param nb_read, the actual read size of bytes. NULL to ignore. + * @remark when eof(), return error. + */ + virtual int read(char* data, int nb_data, int* nb_read) = 0; }; // Objects implementing the Handler interface can be @@ -431,7 +434,10 @@ private: SrsHttpMessage* owner; SrsFastBuffer* buffer; bool is_eof; - int64_t nb_read; + // the left bytes in chunk. + int nb_left_chunk; + // already read total bytes. + int64_t nb_total_read; public: SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io); virtual ~SrsHttpResponseReader(); @@ -443,10 +449,10 @@ public: // interface ISrsHttpResponseReader public: virtual bool eof(); - virtual int read(std::string& data); + virtual int read(char* data, int nb_data, int* nb_read); private: - virtual int read_chunked(std::string& data); - virtual int read_specified(int max, std::string& data); + virtual int read_chunked(char* data, int nb_data, int* nb_read); + virtual int read_specified(char* data, int nb_data, int* nb_read); }; // for http header. diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 7824a6978..c262ba07f 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -363,17 +363,16 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts } SrsAutoFree(SrsHttpMessage, msg); + int nb_read = 0; ISrsHttpResponseReader* br = msg->body_reader(); - while (!br->eof()) { - std::string data; - // for notify, only read some data. - ret = br->read(data); - break; + if (!br->eof()) { + char buf[64]; // only read a little of bytes of ts. + ret = br->read(buf, 64, &nb_read); } int spenttime = (int)(srs_update_system_time_ms() - starttime); - srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, spent=%dms, ret=%d", - client_id, url.c_str(), msg->status_code(), spenttime, ret); + srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, spent=%dms, read=%dB, ret=%d", + client_id, url.c_str(), msg->status_code(), spenttime, nb_read, ret); // ignore any error for on_hls_notify. ret = ERROR_SUCCESS; From 0a7cea063c0b9b460587e3ad55508a10f38a948d Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 14:25:14 +0800 Subject: [PATCH 06/11] refine http read, support c style api --- trunk/src/app/srs_app_http.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/trunk/src/app/srs_app_http.cpp b/trunk/src/app/srs_app_http.cpp index 2be383c0d..fa4fe6168 100644 --- a/trunk/src/app/srs_app_http.cpp +++ b/trunk/src/app/srs_app_http.cpp @@ -1267,8 +1267,9 @@ int SrsHttpMessage::body_read_all(string& body) return ret; } - srs_assert (nb_read > 0); - body.append(buf, nb_read); + if (nb_read > 0) { + body.append(buf, nb_read); + } } return ret; From 5d37e47783abcdd3d7aa6d2fb12fd9f059ee03bf Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 14:44:18 +0800 Subject: [PATCH 07/11] support config the hls_nb_notify. --- trunk/conf/full.conf | 5 +++++ trunk/src/app/srs_app_config.cpp | 18 +++++++++++++++++- trunk/src/app/srs_app_config.hpp | 6 ++++++ trunk/src/app/srs_app_hls.cpp | 3 ++- trunk/src/app/srs_app_http_hooks.cpp | 10 +++++++--- trunk/src/app/srs_app_http_hooks.hpp | 3 ++- 6 files changed, 39 insertions(+), 6 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 217a7f95c..7bcae649d 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -613,6 +613,11 @@ vhost with-hls.srs.com { # whether cleanup the old ts files. # default: on hls_cleanup on; + # the max size to notify hls, + # to read max bytes from ts of specified cdn network, + # @remark only used when on_hls_notify is config. + # default: 64 + hls_nb_notify 64; # on_hls, never config in here, should config in http_hooks. # for the hls http callback, @see http_hooks.on_hls of vhost hooks.callback.srs.com diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 5b9563fb8..7236bf162 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1487,7 +1487,7 @@ int SrsConfig::check_config() string m = conf->at(j)->name.c_str(); if (m != "enabled" && m != "hls_entry_prefix" && m != "hls_path" && m != "hls_fragment" && m != "hls_window" && m != "hls_on_error" && m != "hls_storage" && m != "hls_mount" && m != "hls_td_ratio" && m != "hls_aof_ratio" && m != "hls_acodec" && m != "hls_vcodec" - && m != "hls_m3u8_file" && m != "hls_ts_file" && m != "hls_ts_floor" && m != "hls_cleanup" + && m != "hls_m3u8_file" && m != "hls_ts_file" && m != "hls_ts_floor" && m != "hls_cleanup" && m != "hls_nb_notify" ) { ret = ERROR_SYSTEM_CONFIG_INVALID; srs_error("unsupported vhost hls directive %s, ret=%d", m.c_str(), ret); @@ -2440,6 +2440,22 @@ SrsConfDirective* SrsConfig::get_vhost_on_hls_notify(string vhost) return conf->get("on_hls_notify"); } +int SrsConfig::get_vhost_hls_nb_notify(string vhost) +{ + SrsConfDirective* conf = get_vhost_http_hooks(vhost); + + if (!conf) { + return SRS_CONF_DEFAULT_HLS_NB_NOTIFY; + } + + conf = conf->get("hls_nb_notify"); + if (!conf || conf->arg0().empty()) { + return SRS_CONF_DEFAULT_HLS_NB_NOTIFY; + } + + return ::atoi(conf->arg0().c_str()); +} + bool SrsConfig::get_bw_check_enabled(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index d31889bba..02d81f917 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -63,6 +63,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_CONF_DEFAULT_HLS_ACODEC "aac" #define SRS_CONF_DEFAULT_HLS_VCODEC "h264" #define SRS_CONF_DEFAULT_HLS_CLEANUP true +#define SRS_CONF_DEFAULT_HLS_NB_NOTIFY 64 #define SRS_CONF_DEFAULT_DVR_PATH "./objs/nginx/html/[app]/[stream].[timestamp].flv" #define SRS_CONF_DEFAULT_DVR_PLAN_SESSION "session" #define SRS_CONF_DEFAULT_DVR_PLAN_SEGMENT "segment" @@ -651,6 +652,11 @@ public: * @return the on_hls_notify callback directive, the args is the url to callback. */ virtual SrsConfDirective* get_vhost_on_hls_notify(std::string vhost); + /** + * get the size of bytes to read from cdn network, for the on_hls_notify callback, + * that is, to read max bytes of the bytes from the callback, or timeout or error. + */ + virtual int get_vhost_hls_nb_notify(std::string vhost); // bwct(bandwidth check tool) section public: /** diff --git a/trunk/src/app/srs_app_hls.cpp b/trunk/src/app/srs_app_hls.cpp index bbc62cd95..d63d5f408 100644 --- a/trunk/src/app/srs_app_hls.cpp +++ b/trunk/src/app/srs_app_hls.cpp @@ -240,9 +240,10 @@ int SrsDvrAsyncCallOnHlsNotify::call() return ret; } + int nb_notify = _srs_config->get_vhost_hls_nb_notify(req->vhost); for (int i = 0; i < (int)on_hls->args.size(); i++) { std::string url = on_hls->args.at(i); - if ((ret = SrsHttpHooks::on_hls_notify(url, req, ts_url)) != ERROR_SUCCESS) { + if ((ret = SrsHttpHooks::on_hls_notify(url, req, ts_url, nb_notify)) != ERROR_SUCCESS) { srs_error("hook client on_hls_notify failed. url=%s, ts=%s, ret=%d", url.c_str(), ts_url.c_str(), ret); return ret; } diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index c262ba07f..5e79e7c74 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -329,7 +329,7 @@ int SrsHttpHooks::on_hls(string url, SrsRequest* req, string file, int sn, doubl return ret; } -int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts_url) +int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts_url, int nb_notify) { int ret = ERROR_SUCCESS; @@ -365,9 +365,13 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts int nb_read = 0; ISrsHttpResponseReader* br = msg->body_reader(); - if (!br->eof()) { + while (nb_read < nb_notify && !br->eof()) { char buf[64]; // only read a little of bytes of ts. - ret = br->read(buf, 64, &nb_read); + int nb_buf = 64; + if ((ret = br->read(buf, nb_buf, &nb_buf)) != ERROR_SUCCESS) { + break; + } + nb_read += nb_buf; } int spenttime = (int)(srs_update_system_time_ms() - starttime); diff --git a/trunk/src/app/srs_app_http_hooks.hpp b/trunk/src/app/srs_app_http_hooks.hpp index 1a63bdce0..d63eca3af 100644 --- a/trunk/src/app/srs_app_http_hooks.hpp +++ b/trunk/src/app/srs_app_http_hooks.hpp @@ -110,8 +110,9 @@ public: * @param url the api server url, to process the event. * ignore if empty. * @param ts_url the ts uri, used to replace the variable [ts_url] in url. + * @param nb_notify the max bytes to read from notify server. */ - static int on_hls_notify(std::string url, SrsRequest* req, std::string ts_url); + static int on_hls_notify(std::string url, SrsRequest* req, std::string ts_url, int nb_notify); private: static int do_post(std::string url, std::string req, int& code, std::string& res); }; From e5b8e0da01eee95eb06fa5b37459af274feded0f Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 15:00:07 +0800 Subject: [PATCH 08/11] fix async call and hls_nb_notify bug. --- trunk/src/app/srs_app_async_call.cpp | 2 +- trunk/src/app/srs_app_config.cpp | 32 ++++++++++++++-------------- trunk/src/app/srs_app_config.hpp | 13 ++++++----- trunk/src/app/srs_app_http_hooks.cpp | 12 +++++++---- 4 files changed, 31 insertions(+), 28 deletions(-) diff --git a/trunk/src/app/srs_app_async_call.cpp b/trunk/src/app/srs_app_async_call.cpp index 3bfac6391..471564343 100644 --- a/trunk/src/app/srs_app_async_call.cpp +++ b/trunk/src/app/srs_app_async_call.cpp @@ -87,7 +87,7 @@ int SrsDvrAsyncCallThread::cycle() for (it = copies.begin(); it != copies.end(); ++it) { ISrsDvrAsyncCall* call = *it; if ((ret = call->call()) != ERROR_SUCCESS) { - srs_warn("dvr: ignore callback %s, ret=%d", call->to_string().c_str(), ret); + srs_warn("ignore async callback %s, ret=%d", call->to_string().c_str(), ret); } srs_freep(call); } diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 7236bf162..527084b66 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2440,22 +2440,6 @@ SrsConfDirective* SrsConfig::get_vhost_on_hls_notify(string vhost) return conf->get("on_hls_notify"); } -int SrsConfig::get_vhost_hls_nb_notify(string vhost) -{ - SrsConfDirective* conf = get_vhost_http_hooks(vhost); - - if (!conf) { - return SRS_CONF_DEFAULT_HLS_NB_NOTIFY; - } - - conf = conf->get("hls_nb_notify"); - if (!conf || conf->arg0().empty()) { - return SRS_CONF_DEFAULT_HLS_NB_NOTIFY; - } - - return ::atoi(conf->arg0().c_str()); -} - bool SrsConfig::get_bw_check_enabled(string vhost) { SrsConfDirective* conf = get_vhost(vhost); @@ -3434,6 +3418,22 @@ string SrsConfig::get_hls_vcodec(string vhost) return conf->arg0(); } +int SrsConfig::get_vhost_hls_nb_notify(string vhost) +{ + SrsConfDirective* conf = get_hls(vhost); + + if (!conf) { + return SRS_CONF_DEFAULT_HLS_NB_NOTIFY; + } + + conf = conf->get("hls_nb_notify"); + if (!conf || conf->arg0().empty()) { + return SRS_CONF_DEFAULT_HLS_NB_NOTIFY; + } + + return ::atoi(conf->arg0().c_str()); +} + bool SrsConfig::get_hls_cleanup(string vhost) { SrsConfDirective* hls = get_hls(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 02d81f917..3c829ab72 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -652,11 +652,6 @@ public: * @return the on_hls_notify callback directive, the args is the url to callback. */ virtual SrsConfDirective* get_vhost_on_hls_notify(std::string vhost); - /** - * get the size of bytes to read from cdn network, for the on_hls_notify callback, - * that is, to read max bytes of the bytes from the callback, or timeout or error. - */ - virtual int get_vhost_hls_nb_notify(std::string vhost); // bwct(bandwidth check tool) section public: /** @@ -965,8 +960,12 @@ public: * whether cleanup the old ts files. */ virtual bool get_hls_cleanup(std::string vhost); - - // hds section + /** + * get the size of bytes to read from cdn network, for the on_hls_notify callback, + * that is, to read max bytes of the bytes from the callback, or timeout or error. + */ + virtual int get_vhost_hls_nb_notify(std::string vhost); +// hds section private: /** * get the hds directive of vhost. diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 5e79e7c74..3055da53f 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -42,6 +42,7 @@ using namespace std; #define SRS_HTTP_RESPONSE_OK SRS_XSTR(ERROR_SUCCESS) #define SRS_HTTP_HEADER_BUFFER 1024 +#define SRS_HTTP_READ_BUFFER 4096 #define SRS_HTTP_BODY_BUFFER 32 * 1024 // the timeout for hls notify, in us. @@ -363,15 +364,18 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts } SrsAutoFree(SrsHttpMessage, msg); + int nb_buf = srs_min(nb_notify, SRS_HTTP_READ_BUFFER); + char* buf = new char[nb_buf]; + SrsAutoFree(char, buf); + int nb_read = 0; ISrsHttpResponseReader* br = msg->body_reader(); while (nb_read < nb_notify && !br->eof()) { - char buf[64]; // only read a little of bytes of ts. - int nb_buf = 64; - if ((ret = br->read(buf, nb_buf, &nb_buf)) != ERROR_SUCCESS) { + int nb_bytes = 0; + if ((ret = br->read(buf, nb_buf, &nb_bytes)) != ERROR_SUCCESS) { break; } - nb_read += nb_buf; + nb_read += nb_bytes; } int spenttime = (int)(srs_update_system_time_ms() - starttime); From fe1886aa0857be59fd81ac0a4fe1b9901bbc837c Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 15:50:12 +0800 Subject: [PATCH 09/11] add proxy for hls --- trunk/research/api-server/server.py | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index 57baeff4b..ce9880f83 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -308,6 +308,36 @@ class RESTDvrs(object): return code + +''' +handle the hls proxy requests: hls stream. +''' +class RESTProxy(object): + exposed = True + + ''' + for SRS hook: on_hls_notify + on_hls_notify: + when srs reap a ts file of hls, call this hook, + used to push file to cdn network, by get the ts file from cdn network. + so we use HTTP GET and use the variable following: + [app], replace with the app. + [stream], replace with the stream. + [ts_url], replace with the ts url. + ignore any return data of server. + ''' + def GET(self, *args, **kwargs): + enable_crossdomain() + + hls = { + "args": args, + "kwargs": kwargs + } + + ret = json.dumps(hls) + print ret + return ret + ''' handle the hls requests: hls stream. ''' @@ -1195,6 +1225,7 @@ class V1(object): self.sessions = RESTSessions() self.dvrs = RESTDvrs() self.hls = RESTHls() + self.proxy = RESTProxy() self.chats = RESTChats() self.servers = RESTServers() self.nodes = RESTNodes() From 0000c945ea2c7911af123c1fdf80d0c9ecbe0c6c Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 16:07:45 +0800 Subject: [PATCH 10/11] refine the proxy api server. --- trunk/research/api-server/server.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index ce9880f83..40451b5cd 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -36,7 +36,7 @@ reload(sys) exec("sys.setdefaultencoding('utf-8')") assert sys.getdefaultencoding().lower() == "utf-8" -import os, json, time, datetime, cherrypy, threading +import os, json, time, datetime, cherrypy, threading, urllib2 # simple log functions. def trace(msg): @@ -328,15 +328,20 @@ class RESTProxy(object): ''' def GET(self, *args, **kwargs): enable_crossdomain() - - hls = { - "args": args, - "kwargs": kwargs - } - ret = json.dumps(hls) - print ret - return ret + url = "http://" + "/".join(args); + print "start to proxy url: %s"%url + + f = None + try: + f = urllib2.urlopen(url) + f.read() + except: + print "error proxy url: %s"%url + finally: + if f: f.close() + print "completed proxy url: %s"%url + return url ''' handle the hls requests: hls stream. From bedc4f006c7ffc61159f5ecda0859696dd40df81 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 10 Apr 2015 21:06:09 +0800 Subject: [PATCH 11/11] update donation. --- DONATIONS.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/DONATIONS.txt b/DONATIONS.txt index d4fbc796d..36a80325b 100644 --- a/DONATIONS.txt +++ b/DONATIONS.txt @@ -13,6 +13,7 @@ RMB 500-999 * [2015-xx-xx xx:xx] xxx RMB 100-499 +* [2015-04-10 19:52] 阳成飞 * [2015-03-30 13:34] 扶凯 * [2015-03-29 11-07] 姚伟斌 * [2015-03-14 20:21] 万伟