From 3982ec1d87134a72d6a58769309e9ecbb7eab91f Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 6 Mar 2015 11:36:26 +0800 Subject: [PATCH] refine http request parse. 2.0.132. --- README.md | 3 +- trunk/src/app/srs_app_http.cpp | 265 ++++++++++++++++--------- trunk/src/app/srs_app_http.hpp | 48 +++-- trunk/src/app/srs_app_http_client.cpp | 8 +- trunk/src/core/srs_core.hpp | 2 +- trunk/src/kernel/srs_kernel_error.hpp | 2 + trunk/src/main/srs_main_server.cpp | 18 -- trunk/src/protocol/srs_rtmp_buffer.cpp | 10 + trunk/src/protocol/srs_rtmp_buffer.hpp | 12 ++ 9 files changed, 234 insertions(+), 134 deletions(-) diff --git a/README.md b/README.md index 493c25e3b..ca13cbe83 100755 --- a/README.md +++ b/README.md @@ -550,7 +550,8 @@ Supported operating systems and hardware: ## History ### SRS 2.0 history -. + +* v2.0, 2015-03-06, refine http request parse. 2.0.132. * v2.0, 2015-03-01, for [#179](https://github.com/winlinvip/simple-rtmp-server/issues/179), revert dvr http api. 2.0.128. * v2.0, 2015-02-24, for [#304](https://github.com/winlinvip/simple-rtmp-server/issues/304), fix hls bug, write pts/dts error. 2.0.124 * v2.0, 2015-02-19, refine dvr, append file when dvr file exists. 2.0.122. diff --git a/trunk/src/app/srs_app_http.cpp b/trunk/src/app/srs_app_http.cpp index 685ee97c8..6b7535ec9 100644 --- a/trunk/src/app/srs_app_http.cpp +++ b/trunk/src/app/srs_app_http.cpp @@ -39,6 +39,7 @@ using namespace std; #include #include #include +#include #define SRS_DEFAULT_HTTP_PORT 80 @@ -846,57 +847,152 @@ SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* i { skt = io; owner = msg; - cache = new SrsSimpleBuffer(); + is_eof = false; + nb_read = 0; + buffer = NULL; } SrsHttpResponseReader::~SrsHttpResponseReader() { - srs_freep(cache); } -bool SrsHttpResponseReader::empty() -{ - return cache->length() == 0; -} - -int SrsHttpResponseReader::append(char* data, int size) +int SrsHttpResponseReader::initialize(SrsFastBuffer* body) { int ret = ERROR_SUCCESS; - cache->append(data, size); + buffer = body; return ret; } -int SrsHttpResponseReader::read(int max, std::string& data) +bool SrsHttpResponseReader::eof() +{ + return is_eof; +} + +int SrsHttpResponseReader::read(std::string& data) { int ret = ERROR_SUCCESS; - // TODO: FIXME: decode the chunked bytes. + if (is_eof) { + ret = ERROR_HTTP_RESPONSE_EOF; + srs_error("http: response EOF. ret=%d", ret); + return ret; + } - // read from cache first. - if (cache->length() > 0) { - int nb_bytes = srs_min(cache->length(), max); - data.append(cache->bytes(), nb_bytes); - cache->erase(nb_bytes); + // chunked encoding. + if (owner->is_chunked()) { + return read_chunked(data); + } + + // read by specified content-length + int max = (int)owner->content_length() - nb_read; + if (max <= 0) { + is_eof = true; + return ret; + } + return read_specified(max, data); +} + +int SrsHttpResponseReader::read_chunked(std::string& data) +{ + int ret = ERROR_SUCCESS; + + // parse the chunk length first. + char* at = NULL; + int length = 0; + while (!at) { + // find the CRLF of chunk header end. + char* start = buffer->bytes(); + char* end = start + buffer->size(); + for (char* p = start; p < end - 1; p++) { + if (p[0] == __SRS_HTTP_CR && p[1] == __SRS_HTTP_LF) { + // invalid chunk, ignore. + if (p == start) { + ret = ERROR_HTTP_INVALID_CHUNK_HEADER; + srs_error("chunk header start with CRLF. ret=%d", ret); + return ret; + } + length = p - start + 2; + at = buffer->read_slice(length); + break; + } + } + // got at, ok. + if (at) { + break; + } + + // when empty, only grow 1bytes, but the buffer will cache more. + if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("read body from server failed. ret=%d", ret); + } + return ret; + } + } + srs_assert(length >= 3); + + // it's ok to set the pos and pos+1 to NULL. + at[length - 1] = NULL; + at[length - 2] = NULL; + + // size is the bytes size, excludes the chunk header and end CRLF. + int ilength = ::strtol(at, NULL, 16); + if (ilength < 0) { + ret = ERROR_HTTP_INVALID_CHUNK_HEADER; + srs_error("chunk header negative, length=%d. ret=%d", ilength, ret); return ret; } - // read some from underlayer. - int left = srs_max(SRS_HTTP_BODY_BUFFER, max); - - // read from io. - char* buf = new char[left]; - SrsAutoFree(char, buf); - - ssize_t nread = 0; - if ((ret = skt->read(buf, left, &nread)) != ERROR_SUCCESS) { + // when empty, only grow 1bytes, but the buffer will cache more. + if ((ret = buffer->grow(skt, ilength + 2)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("read body from server failed. ret=%d", ret); + } return ret; } + srs_trace("http: read %d chunk", ilength); - if (nread) { - data.append(buf, nread); + // read payload when length specifies some payload. + if (ilength <= 0) { + is_eof = true; + } else { + srs_assert(ilength); + data.append(buffer->read_slice(ilength), ilength); + nb_read += ilength; + } + + // the CRLF of chunk payload end. + buffer->read_slice(2); + + return ret; +} + +int SrsHttpResponseReader::read_specified(int max, std::string& data) +{ + int ret = ERROR_SUCCESS; + + if (buffer->size() <= 0) { + // when empty, only grow 1bytes, but the buffer will cache more. + if ((ret = buffer->grow(skt, 1)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("read body from server failed. ret=%d", ret); + } + return ret; + } + } + + int nb_bytes = srs_min(max, buffer->size()); + + srs_assert(nb_bytes); + data.append(buffer->read_slice(nb_bytes), nb_bytes); + nb_read += nb_bytes; + + // when read completed, eof. + if (nb_read >= (int)owner->content_length()) { + is_eof = true; } return ret; @@ -917,7 +1013,7 @@ SrsHttpMessage::~SrsHttpMessage() srs_freep(_http_ts_send_buffer); } -int SrsHttpMessage::initialize(string url, http_parser* header, string body, vector& headers) +int SrsHttpMessage::update(string url, http_parser* header, SrsFastBuffer* body, vector& headers) { int ret = ERROR_SUCCESS; @@ -928,10 +1024,10 @@ int SrsHttpMessage::initialize(string url, http_parser* header, string body, vec // whether chunked. std::string transfer_encoding = get_request_header("Transfer-Encoding"); chunked = (transfer_encoding == "chunked"); - - // TODO: FIXME: remove it, use fast buffer instead. - if (!body.empty()) { - _body->append((char*)body.data(), (int)body.length()); + + // set the buffer. + if ((ret = _body->initialize(body)) != ERROR_SUCCESS) { + return ret; } // parse uri from url. @@ -946,6 +1042,18 @@ int SrsHttpMessage::initialize(string url, http_parser* header, string body, vec // parse uri to schema/server:port/path?query std::string uri = "http://" + host + _url; + + return update(uri); +} + +int SrsHttpMessage::update(string uri) +{ + int ret = ERROR_SUCCESS; + + if (uri.empty()) { + return ret; + } + if ((ret = _uri->initialize(uri)) != ERROR_SUCCESS) { return ret; } @@ -1069,47 +1177,29 @@ string SrsHttpMessage::path() return _uri->get_path(); } -int SrsHttpMessage::body_read_all(string body) +int SrsHttpMessage::body_read_all(string& body) { int ret = ERROR_SUCCESS; - int64_t content_length = (int64_t)_header.content_length; + // chunked, always read with + if (chunked) { + return _body->read(body); + } + + int content_length = (int)(int64_t)_header.content_length; // ignore if not set, should be zero length body. - if (content_length < 0) { - if (!_body->empty()) { - srs_warn("unspecified content-length with body cached."); - } else { - srs_info("unspecified content-length with body empty."); - } + if (content_length <= 0) { + srs_info("unspecified content-length with body empty."); return ret; } // when content length specified, read specified length. - if (content_length > 0) { - int left = (int)content_length; - while (left > 0) { - int nb_read = (int)body.length(); - if ((ret = _body->read(left, body)) != ERROR_SUCCESS) { - return ret; - } - - left -= (int)body.length() - nb_read; - } - return ret; - } - - // chunked encoding, read util got size=0 chunk. - for (;;) { - int nb_read = (int)body.length(); - if ((ret = _body->read(0, body)) != ERROR_SUCCESS) { + int expect = content_length + (int)body.length(); + while ((int)body.length() < expect) { + if ((ret = _body->read(body)) != ERROR_SUCCESS) { return ret; } - - // eof. - if (nb_read == (int)body.length()) { - break; - } } return ret; @@ -1173,10 +1263,12 @@ string SrsHttpMessage::get_request_header(string name) SrsHttpParser::SrsHttpParser() { + buffer = new SrsFastBuffer(); } SrsHttpParser::~SrsHttpParser() { + srs_freep(buffer); } int SrsHttpParser::initialize(enum http_parser_type type) @@ -1213,7 +1305,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) header = http_parser(); url = ""; headers.clear(); - body = ""; + body_parsed = 0; // do parse if ((ret = parse_message_imp(skt)) != ERROR_SUCCESS) { @@ -1227,7 +1319,7 @@ int SrsHttpParser::parse_message(SrsStSocket* skt, SrsHttpMessage** ppmsg) SrsHttpMessage* msg = new SrsHttpMessage(skt); // initalize http msg, parse url. - if ((ret = msg->initialize(url, &header, body, headers)) != ERROR_SUCCESS) { + if ((ret = msg->update(url, &header, buffer, headers)) != ERROR_SUCCESS) { srs_error("initialize http msg failed. ret=%d", ret); srs_freep(msg); return ret; @@ -1243,48 +1335,35 @@ int SrsHttpParser::parse_message_imp(SrsStSocket* skt) { int ret = ERROR_SUCCESS; - ssize_t nread = 0; - ssize_t nparsed = 0; - - char* buf = new char[SRS_HTTP_HEADER_BUFFER]; - SrsAutoFree(char, buf); - - // parser header. - for (;;) { - if ((ret = skt->read(buf, (size_t)sizeof(buf), &nread)) != ERROR_SUCCESS) { - if (!srs_is_client_gracefully_close(ret)) { - srs_error("read body from server failed. ret=%d", ret); + while (true) { + if (buffer->size() <= 0) { + // when empty, only grow 1bytes, but the buffer will cache more. + if ((ret = buffer->grow(skt, 1)) != ERROR_SUCCESS) { + if (!srs_is_client_gracefully_close(ret)) { + srs_error("read body from server failed. ret=%d", ret); + } + return ret; } - return ret; } - nparsed = http_parser_execute(&parser, &settings, buf, nread); - srs_info("read_size=%d, nparsed=%d", (int)nread, (int)nparsed); + int nb_header = srs_min(SRS_HTTP_HEADER_BUFFER, buffer->size()); + ssize_t nparsed = http_parser_execute(&parser, &settings, buffer->bytes(), nb_header); + srs_info("buffer=%d, nparsed=%d, body=%d", buffer->size(), (int)nparsed, body_parsed); + if (nparsed - body_parsed > 0) { + buffer->read_slice(nparsed - body_parsed); + } // ok atleast header completed, // never wait for body completed, for maybe chunked. if (state == SrsHttpParseStateHeaderComplete || state == SrsHttpParseStateMessageComplete) { break; } - - // when not complete, the parser should consume all bytes. - if (nparsed != nread) { - ret = ERROR_HTTP_PARSE_HEADER; - srs_error("parse response error, parsed(%d)!=read(%d), ret=%d", (int)nparsed, (int)nread, ret); - return ret; - } } // parse last header. if (!filed_name.empty() && !field_value.empty()) { headers.push_back(std::make_pair(filed_name, field_value)); } - - // when parse completed, cache the left body. - if (nread && nparsed < nread) { - body.append(buf + nparsed, nread - nparsed); - srs_info("cache %d bytes read body.", nread - nparsed); - } return ret; } @@ -1385,9 +1464,7 @@ int SrsHttpParser::on_body(http_parser* parser, const char* at, size_t length) SrsHttpParser* obj = (SrsHttpParser*)parser->data; srs_assert(obj); - if (length > 0) { - obj->body.append(at, (int)length); - } + obj->body_parsed += length; srs_info("Body: %.*s", (int)length, at); diff --git a/trunk/src/app/srs_app_http.hpp b/trunk/src/app/srs_app_http.hpp index c77cdc352..d7006932b 100644 --- a/trunk/src/app/srs_app_http.hpp +++ b/trunk/src/app/srs_app_http.hpp @@ -192,10 +192,14 @@ public: virtual ~ISrsHttpResponseReader(); public: /** - * read from the response body. - * @param max the max size to read. 0 to ignore. + * whether response read EOF. */ - virtual int read(int max, std::string& data) = 0; + virtual bool eof() = 0; + /** + * read from the response body. + * @remark when eof(), return error. + */ + virtual int read(std::string& data) = 0; }; // Objects implementing the Handler interface can be @@ -394,24 +398,24 @@ class SrsHttpResponseReader : virtual public ISrsHttpResponseReader private: SrsStSocket* skt; SrsHttpMessage* owner; - SrsSimpleBuffer* cache; + SrsFastBuffer* buffer; + bool is_eof; + int64_t nb_read; public: SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io); virtual ~SrsHttpResponseReader(); public: /** - * whether the cache is empty. + * initialize the response reader with buffer. */ - virtual bool empty(); - /** - * append specified size of bytes data to reader. - * when we read http message from socket, we maybe read header+body, - * so the reader should provides stream cache feature. - */ - virtual int append(char* data, int size); + virtual int initialize(SrsFastBuffer* buffer); // interface ISrsHttpResponseReader public: - virtual int read(int max, std::string& data); + virtual bool eof(); + virtual int read(std::string& data); +private: + virtual int read_chunked(std::string& data); + virtual int read_specified(int max, std::string& data); }; // for http header. @@ -453,6 +457,7 @@ private: /** * use a buffer to read and send ts file. */ + // TODO: FIXME: remove it. char* _http_ts_send_buffer; // http headers std::vector _headers; @@ -463,11 +468,16 @@ public: virtual ~SrsHttpMessage(); public: /** - * set the original messages, then initialize the message. + * set the original messages, then update the message. */ - virtual int initialize(std::string url, http_parser* header, - std::string body, std::vector& headers + virtual int update(std::string url, http_parser* header, + SrsFastBuffer* body, std::vector& headers ); + /** + * update the request with uri. + * @remark user can invoke this multiple times. + */ + virtual int update(std::string uri); public: virtual char* http_ts_send_buffer(); public: @@ -485,7 +495,7 @@ public: virtual std::string host(); virtual std::string path(); public: - virtual int body_read_all(std::string body); + virtual int body_read_all(std::string& body); virtual ISrsHttpResponseReader* body_reader(); virtual int64_t content_length(); /** @@ -510,7 +520,7 @@ private: http_parser_settings settings; http_parser parser; // the global parse buffer. - SrsFastBuffer* fbuffer; + SrsFastBuffer* buffer; private: // http parse data, reset before parse message. bool expect_filed_name; @@ -520,7 +530,7 @@ private: http_parser header; std::string url; std::vector headers; - std::string body; + int body_parsed; public: SrsHttpParser(); virtual ~SrsHttpParser(); diff --git a/trunk/src/app/srs_app_http_client.cpp b/trunk/src/app/srs_app_http_client.cpp index 9068082a0..2139285f4 100644 --- a/trunk/src/app/srs_app_http_client.cpp +++ b/trunk/src/app/srs_app_http_client.cpp @@ -165,8 +165,14 @@ int SrsHttpClient::get(SrsHttpUri* uri, std::string req, SrsHttpMessage** ppmsg) srs_error("parse http post response failed. ret=%d", ret); return ret; } - srs_assert(msg); + + // for GET, server response no uri, we update with request uri. + if ((ret = msg->update(uri->get_url())) != ERROR_SUCCESS) { + srs_freep(msg); + return ret; + } + *ppmsg = msg; srs_info("parse http get response success."); diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 52fd20a21..945870807 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 131 +#define VERSION_REVISION 132 // server info. #define RTMP_SIG_SRS_KEY "SRS" diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index be35e673d..b9e19b9f2 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -243,6 +243,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_STREAM_CASTER_AVC_SPS 4022 #define ERROR_STREAM_CASTER_AVC_PPS 4023 #define ERROR_STREAM_CASTER_FLV_TAG 4024 +#define ERROR_HTTP_RESPONSE_EOF 4025 +#define ERROR_HTTP_INVALID_CHUNK_HEADER 4026 /** * whether the error code is an system control error. diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index ebcbbc2c6..783c9a7b9 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -312,8 +312,6 @@ int run() return run_master(); } -#include -#include int run_master() { int ret = ERROR_SUCCESS; @@ -329,22 +327,6 @@ int run_master() if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) { return ret; } -/*SrsHttpClient client; -SrsHttpUri uri; -if ((ret = uri.initialize("http://ossrs.net:8081/live/livestream.flv")) != ERROR_SUCCESS) { - return ret; -} -SrsHttpMessage* msg = NULL; -if ((ret = client.get(&uri, "", &msg)) != ERROR_SUCCESS) { - return ret; -} -for (;;) { - ISrsHttpResponseReader* br = msg->body_reader(); - std::string data; - if ((ret = br->read(0, data)) != ERROR_SUCCESS) { - return ret; - } -}*/ if ((ret = _srs_server->listen()) != ERROR_SUCCESS) { return ret; diff --git a/trunk/src/protocol/srs_rtmp_buffer.cpp b/trunk/src/protocol/srs_rtmp_buffer.cpp index d72a1d857..651ff2b64 100644 --- a/trunk/src/protocol/srs_rtmp_buffer.cpp +++ b/trunk/src/protocol/srs_rtmp_buffer.cpp @@ -62,6 +62,16 @@ SrsFastBuffer::SrsFastBuffer() p = end = buffer; } +int SrsFastBuffer::size() +{ + return end - p; +} + +char* SrsFastBuffer::bytes() +{ + return p; +} + void SrsFastBuffer::set_buffer(int buffer_size) { // the user-space buffer size limit to a max value. diff --git a/trunk/src/protocol/srs_rtmp_buffer.hpp b/trunk/src/protocol/srs_rtmp_buffer.hpp index 9382cbfbc..912417d5b 100644 --- a/trunk/src/protocol/srs_rtmp_buffer.hpp +++ b/trunk/src/protocol/srs_rtmp_buffer.hpp @@ -85,6 +85,16 @@ public: SrsFastBuffer(); virtual ~SrsFastBuffer(); public: + /** + * get the size of current bytes in buffer. + */ + virtual int size(); + /** + * get the current bytes in buffer. + * @remark user should use read_slice() if possible, + * the bytes() is used to test bytes, for example, to detect the bytes schema. + */ + virtual char* bytes(); /** * create buffer with specifeid size. * @param buffer the size of buffer. @@ -110,6 +120,8 @@ public: * skip some bytes in buffer. * @param size the bytes to skip. positive to next; negative to previous. * @remark assert buffer already grow(size). + * @remark always use read_slice to consume bytes, which will reset for EOF. + * while skip never consume bytes. */ virtual void skip(int size); public: