From 7eccc9da269e2367d7da92feabdc06e5fcf00232 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 24 Dec 2019 14:58:35 +0800 Subject: [PATCH] For #1508, support chunk length and content in multiple parts. --- README.md | 1 + trunk/src/app/srs_app_caster_flv.cpp | 4 +- trunk/src/app/srs_app_http_hooks.cpp | 4 +- trunk/src/protocol/srs_http_stack.hpp | 16 +- trunk/src/service/srs_service_http_conn.cpp | 30 ++- trunk/src/service/srs_service_http_conn.hpp | 10 +- trunk/src/utest/srs_utest_http.cpp | 263 ++++++++++++++++---- 7 files changed, 243 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index cadc13d8b..05022960e 100755 --- a/README.md +++ b/README.md @@ -146,6 +146,7 @@ For previous versions, please read: ## V3 changes +* v3.0, 2019-12-24, For [#1508][bug #1508], support chunk length and content in multiple parts. * v3.0, 2019-12-23, Merge SRS2 for running srs-librtmp on Windows. 3.0.80 * v3.0, 2019-12-23, For [#1535][bug #1535], deprecate Adobe FMS/AMS edge token traversing([CN][v3_CN_DRM2], [EN][v3_EN_DRM2]) authentication. 3.0.79 * v3.0, 2019-12-23, For [#1535][bug #1535], deprecate BWT(bandwidth testing)([CN][v1_CN_BandwidthTestTool], [EN][v1_EN_BandwidthTestTool]). 3.0.78 diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index 87f7a893e..d3b96de46 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -295,7 +295,7 @@ srs_error_t SrsHttpFileReader::read(void* buf, size_t count, ssize_t* pnread) int total_read = 0; while (total_read < (int)count) { - int nread = 0; + ssize_t nread = 0; if ((err = http->read((char*)buf + total_read, (int)(count - total_read), &nread)) != srs_success) { return srs_error_wrap(err, "read"); } @@ -306,7 +306,7 @@ srs_error_t SrsHttpFileReader::read(void* buf, size_t count, ssize_t* pnread) } srs_assert(nread); - total_read += nread; + total_read += (int)nread; } if (pnread) { diff --git a/trunk/src/app/srs_app_http_hooks.cpp b/trunk/src/app/srs_app_http_hooks.cpp index 45a396ae5..241eb8976 100644 --- a/trunk/src/app/srs_app_http_hooks.cpp +++ b/trunk/src/app/srs_app_http_hooks.cpp @@ -398,11 +398,11 @@ srs_error_t SrsHttpHooks::on_hls_notify(int cid, std::string url, SrsRequest* re int nb_read = 0; ISrsHttpResponseReader* br = msg->body_reader(); while (nb_read < nb_notify && !br->eof()) { - int nb_bytes = 0; + ssize_t nb_bytes = 0; if ((err = br->read(buf, nb_buf, &nb_bytes)) != srs_success) { break; } - nb_read += nb_bytes; + nb_read += (int)nb_bytes; } int spenttime = (int)(srsu2ms(srs_update_system_time()) - starttime); diff --git a/trunk/src/protocol/srs_http_stack.hpp b/trunk/src/protocol/srs_http_stack.hpp index b880e60ba..34eb7ae45 100644 --- a/trunk/src/protocol/srs_http_stack.hpp +++ b/trunk/src/protocol/srs_http_stack.hpp @@ -26,6 +26,8 @@ #include +#include + // Default http listen port. #define SRS_DEFAULT_HTTP_PORT 80 @@ -215,7 +217,7 @@ public: }; // The reader interface for http response. -class ISrsHttpResponseReader +class ISrsHttpResponseReader : public ISrsReader { public: ISrsHttpResponseReader(); @@ -223,18 +225,6 @@ public: public: // Whether response read EOF. virtual bool eof() = 0; - // Read from the response body. - // @param data, the buffer to read data buffer to. - // @param nb_data, the max size of data buffer. - // @param nb_read, the actual read size of bytes. NULL to ignore. - // @remark when eof(), return error. - // @remark for some server, the content-length not specified and not chunked, - // which is actually the infinite chunked encoding, which after http header - // is http response data, it's ok for browser. that is, - // when user call this read, please ensure there is data to read(by content-length - // or by chunked), because the sdk never know whether there is no data or - // infinite chunked. - virtual srs_error_t read(char* data, int nb_data, int* nb_read) = 0; }; // Objects implementing the Handler interface can be diff --git a/trunk/src/service/srs_service_http_conn.cpp b/trunk/src/service/srs_service_http_conn.cpp index 3d97faa98..8a6a1b876 100644 --- a/trunk/src/service/srs_service_http_conn.cpp +++ b/trunk/src/service/srs_service_http_conn.cpp @@ -185,6 +185,10 @@ int SrsHttpParser::on_headers_complete(http_parser* parser) // save the parser when header parse completed. obj->state = SrsHttpParseStateHeaderComplete; + // We must update the body start when header complete, because sometimes we only got header. + // When we got the body start event, we will update it to much precious position. + obj->p_body_start = obj->buffer->bytes() + obj->buffer->size(); + srs_info("***HEADERS COMPLETE***"); // see http_parser.c:1570, return 1 to skip body. @@ -551,7 +555,7 @@ srs_error_t SrsHttpMessage::body_read_all(string& body) // whatever, read util EOF. while (!_body->eof()) { - int nb_read = 0; + ssize_t nb_read = 0; if ((err = _body->read(buf, SRS_HTTP_READ_CACHE_BYTES, &nb_read)) != srs_success) { return srs_error_wrap(err, "read body"); } @@ -914,7 +918,7 @@ bool SrsHttpResponseReader::eof() return is_eof; } -srs_error_t SrsHttpResponseReader::read(char* data, int nb_data, int* nb_read) +srs_error_t SrsHttpResponseReader::read(void* data, size_t nb_data, ssize_t* nb_read) { srs_error_t err = srs_success; @@ -929,7 +933,7 @@ srs_error_t SrsHttpResponseReader::read(char* data, int nb_data, int* nb_read) // read by specified content-length if (owner->content_length() != -1) { - int max = (int)owner->content_length() - (int)nb_total_read; + size_t max = (size_t)owner->content_length() - (size_t)nb_total_read; if (max <= 0) { is_eof = true; return err; @@ -953,7 +957,7 @@ srs_error_t SrsHttpResponseReader::read(char* data, int nb_data, int* nb_read) return err; } -srs_error_t SrsHttpResponseReader::read_chunked(char* data, int nb_data, int* nb_read) +srs_error_t SrsHttpResponseReader::read_chunked(void* data, size_t nb_data, ssize_t* nb_read) { srs_error_t err = srs_success; @@ -1005,32 +1009,34 @@ srs_error_t SrsHttpResponseReader::read_chunked(char* data, int nb_data, int* nb } // all bytes in chunk is left now. - nb_chunk = nb_left_chunk = ilength; + nb_chunk = nb_left_chunk = (size_t)ilength; } if (nb_chunk <= 0) { // for the last chunk, eof. is_eof = true; + *nb_read = 0; } else { // for not the last chunk, there must always exists bytes. // left bytes in chunk, read some. srs_assert(nb_left_chunk); - int nb_bytes = srs_min(nb_left_chunk, nb_data); - err = read_specified(data, nb_bytes, &nb_bytes); + size_t nb_bytes = srs_min(nb_left_chunk, nb_data); + err = read_specified(data, nb_bytes, (ssize_t*)&nb_bytes); // the nb_bytes used for output already read size of bytes. if (nb_read) { *nb_read = nb_bytes; } nb_left_chunk -= nb_bytes; - - // error or still left bytes in chunk, ignore and read in future. + if (err != srs_success) { return srs_error_wrap(err, "read specified"); } + + // If still left bytes in chunk, ignore and read in future. if (nb_left_chunk > 0) { - return srs_error_new(ERROR_HTTP_INVALID_CHUNK_HEADER, "read specified left=%d", nb_left_chunk); + return err; } } @@ -1043,7 +1049,7 @@ srs_error_t SrsHttpResponseReader::read_chunked(char* data, int nb_data, int* nb return err; } -srs_error_t SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read) +srs_error_t SrsHttpResponseReader::read_specified(void* data, size_t nb_data, ssize_t* nb_read) { srs_error_t err = srs_success; @@ -1054,7 +1060,7 @@ srs_error_t SrsHttpResponseReader::read_specified(char* data, int nb_data, int* } } - int nb_bytes = srs_min(nb_data, buffer->size()); + size_t nb_bytes = srs_min(nb_data, (size_t)buffer->size()); // read data to buffer. srs_assert(nb_bytes); diff --git a/trunk/src/service/srs_service_http_conn.hpp b/trunk/src/service/srs_service_http_conn.hpp index 59fb50fb7..f8eaaa23f 100644 --- a/trunk/src/service/srs_service_http_conn.hpp +++ b/trunk/src/service/srs_service_http_conn.hpp @@ -261,9 +261,9 @@ private: SrsFastStream* buffer; bool is_eof; // The left bytes in chunk. - int nb_left_chunk; + size_t nb_left_chunk; // The number of bytes of current chunk. - int nb_chunk; + size_t nb_chunk; // Already read total bytes. int64_t nb_total_read; public: @@ -274,10 +274,10 @@ public: // Interface ISrsHttpResponseReader public: virtual bool eof(); - virtual srs_error_t read(char* data, int nb_data, int* nb_read); + virtual srs_error_t read(void* buf, size_t size, ssize_t* nread); private: - virtual srs_error_t read_chunked(char* data, int nb_data, int* nb_read); - virtual srs_error_t read_specified(char* data, int nb_data, int* nb_read); + virtual srs_error_t read_chunked(void* buf, size_t size, ssize_t* nread); + virtual srs_error_t read_specified(void* buf, size_t size, ssize_t* nread); }; #endif diff --git a/trunk/src/utest/srs_utest_http.cpp b/trunk/src/utest/srs_utest_http.cpp index 425020adf..4ad2cbe68 100644 --- a/trunk/src/utest/srs_utest_http.cpp +++ b/trunk/src/utest/srs_utest_http.cpp @@ -34,6 +34,60 @@ using namespace std; #include #include +class MockMSegmentsReader : public ISrsReader +{ +public: + vector in_bytes; +public: + MockMSegmentsReader(); + virtual ~MockMSegmentsReader(); +public: + virtual void append(string b) { + in_bytes.push_back(b); + } + virtual srs_error_t read(void* buf, size_t size, ssize_t* nread); +}; + +MockMSegmentsReader::MockMSegmentsReader() +{ +} + +MockMSegmentsReader::~MockMSegmentsReader() +{ +} + +srs_error_t MockMSegmentsReader::read(void* buf, size_t size, ssize_t* nread) +{ + srs_error_t err = srs_success; + + for (;;) { + if (in_bytes.empty() || size <= 0) { + return srs_error_new(-1, "EOF"); + } + + string v = in_bytes[0]; + if (v.empty()) { + in_bytes.erase(in_bytes.begin()); + continue; + } + + int nn = srs_min(size, v.length()); + memcpy(buf, v.data(), nn); + if (nread) { + *nread = nn; + } + + if (nn < (int)v.length()) { + in_bytes[0] = string(v.data() + nn, v.length() - nn); + } else { + in_bytes.erase(in_bytes.begin()); + } + break; + } + + return err; +} + class MockResponseWriter : virtual public ISrsHttpResponseWriter, virtual public ISrsHttpHeaderFilter { public: @@ -316,6 +370,164 @@ VOID TEST(ProtocolHTTPTest, ResponseWriter) } } +VOID TEST(ProtocolHTTPTest, ChunkSmallBuffer) +{ + srs_error_t err; + + // No chunk end flag, error. + if (true) { + MockMSegmentsReader io; + io.append(mock_http_response2(200, "0d\r\n")); + io.append("Hello, world!\r\n"); + + SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false)); + ISrsHttpMessage* msg = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &msg)); + + char buf[32]; ssize_t nread = 0; + ISrsHttpResponseReader* r = msg->body_reader(); + + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(r->read(buf, 32, &nread)); + EXPECT_EQ(13, nread); + EXPECT_STREQ("Hello, world!", buf); + + err = r->read(buf, 32, &nread); + EXPECT_EQ(-1, srs_error_code(err)); + srs_freep(err); + + srs_freep(msg); + } + + // Read util EOF(nread=0) or err(ERROR_HTTP_RESPONSE_EOF). + if (true) { + MockMSegmentsReader io; + io.append(mock_http_response2(200, "0d\r\n")); + io.append("Hello, world!\r\n0\r\n\r\n"); + + SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false)); + ISrsHttpMessage* msg = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &msg)); + + char buf[32]; ssize_t nread = 0; + ISrsHttpResponseReader* r = msg->body_reader(); + + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(r->read(buf, 32, &nread)); + EXPECT_EQ(13, nread); + EXPECT_STREQ("Hello, world!", buf); + + HELPER_ASSERT_SUCCESS(r->read(buf, 32, &nread)); + EXPECT_EQ(0, nread); + + err = r->read(buf, 32, &nread); + EXPECT_EQ(ERROR_HTTP_RESPONSE_EOF, srs_error_code(err)); + srs_freep(err); + + srs_freep(msg); + } + + // In this case, we only got header complete, no body start event. + if (true) { + MockMSegmentsReader io; + io.append(mock_http_response2(200, "0d\r\n")); + io.append("Hello, world!\r\n0\r\n\r\n"); + + SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false)); + ISrsHttpMessage* msg = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &msg)); + + char buf[32]; ssize_t nread = 0; + ISrsHttpResponseReader* r = msg->body_reader(); + + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(r->read(buf, 32, &nread)); + EXPECT_EQ(13, nread); + EXPECT_STREQ("Hello, world!", buf); + + srs_freep(msg); + } +} + +VOID TEST(ProtocolHTTPTest, ClientSmallBuffer) +{ + srs_error_t err; + + // The chunk content is sent in multiple parts. + if (true) { + MockMSegmentsReader io; + io.append(mock_http_response2(200, "0d\r\n")); + io.append("Hello,"); + io.append(" world!"); + io.append("\r\n0\r\n\r\n"); + + SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false)); + ISrsHttpMessage* msg = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &msg)); + + char buf[32]; ssize_t nread = 0; + ISrsHttpResponseReader* r = msg->body_reader(); + + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(r->read(buf, 32, &nread)); + EXPECT_EQ(6, nread); + EXPECT_STREQ("Hello,", buf); + + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(r->read(buf, 32, &nread)); + EXPECT_EQ(7, nread); + EXPECT_STREQ(" world!", buf); + + srs_freep(msg); + } + + // The chunk size is sent separately before chunk content. + if (true) { + MockMSegmentsReader io; + io.append(mock_http_response2(200, "0d\r\n")); + io.append("Hello, world!\r\n0\r\n\r\n"); + + SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false)); + ISrsHttpMessage* msg = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &msg)); + + char buf[32]; ssize_t nread = 0; + ISrsHttpResponseReader* r = msg->body_reader(); + + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(r->read(buf, 32, &nread)); + EXPECT_EQ(13, nread); + EXPECT_STREQ("Hello, world!", buf); + + srs_freep(msg); + } + + // If buffer is smaller than chunk, we could read N times to get the whole chunk. + if (true) { + MockBufferIO io; io.append(mock_http_response2(200, "0d\r\nHello, world!\r\n0\r\n\r\n")); + SrsHttpParser hp; HELPER_ASSERT_SUCCESS(hp.initialize(HTTP_RESPONSE, false)); + ISrsHttpMessage* msg = NULL; HELPER_ASSERT_SUCCESS(hp.parse_message(&io, &msg)); + + char buf[32]; ssize_t nread = 0; + ISrsHttpResponseReader* r = msg->body_reader(); + + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(r->read(buf, 5, &nread)); + EXPECT_EQ(5, nread); + EXPECT_STREQ("Hello", buf); + + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(r->read(buf, 7, &nread)); + EXPECT_EQ(7, nread); + EXPECT_STREQ(", world", buf); + + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(r->read(buf, 7, &nread)); + EXPECT_EQ(1, nread); + EXPECT_STREQ("!", buf); + + HELPER_ASSERT_SUCCESS(r->read(buf, 7, &nread)); + EXPECT_EQ(0, nread); + + srs_freep(msg); + } +} + VOID TEST(ProtocolHTTPTest, ClientRequest) { srs_error_t err; @@ -1186,57 +1398,6 @@ VOID TEST(ProtocolHTTPTest, BasicHandlers) } } -class MockMSegmentsReader : public ISrsReader -{ -public: - vector in_bytes; -public: - MockMSegmentsReader(); - virtual ~MockMSegmentsReader(); -public: - virtual srs_error_t read(void* buf, size_t size, ssize_t* nread); -}; - -MockMSegmentsReader::MockMSegmentsReader() -{ -} - -MockMSegmentsReader::~MockMSegmentsReader() -{ -} - -srs_error_t MockMSegmentsReader::read(void* buf, size_t size, ssize_t* nread) -{ - srs_error_t err = srs_success; - - for (;;) { - if (in_bytes.empty() || size <= 0) { - return srs_error_new(-1, "EOF"); - } - - string v = in_bytes[0]; - if (v.empty()) { - in_bytes.erase(in_bytes.begin()); - continue; - } - - int nn = srs_min(size, v.length()); - memcpy(buf, v.data(), nn); - if (nread) { - *nread = nn; - } - - if (nn < (int)v.length()) { - in_bytes[0] = string(v.data() + nn, v.length() - nn); - } else { - in_bytes.erase(in_bytes.begin()); - } - break; - } - - return err; -} - VOID TEST(ProtocolHTTPTest, MSegmentsReader) { srs_error_t err;