From 850946bb13ce68a1183b549bbce6a0324253c656 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 3 Dec 2014 22:39:25 +0800 Subject: [PATCH] for bug #241, calc the small and sleep for merged read. --- trunk/src/app/srs_app_recv_thread.cpp | 64 ++++++++++++++++++++++---- trunk/src/app/srs_app_recv_thread.hpp | 10 +++- trunk/src/app/srs_app_rtmp_conn.cpp | 4 +- trunk/src/rtmp/srs_protocol_buffer.cpp | 23 ++++++--- trunk/src/rtmp/srs_protocol_buffer.hpp | 13 ++++-- trunk/src/rtmp/srs_protocol_rtmp.cpp | 4 +- trunk/src/rtmp/srs_protocol_rtmp.hpp | 3 +- trunk/src/rtmp/srs_protocol_stack.cpp | 4 +- trunk/src/rtmp/srs_protocol_stack.hpp | 3 +- 9 files changed, 99 insertions(+), 29 deletions(-) diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index dc1a26ea6..6550c7def 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -27,13 +27,21 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include // when we read from socket less than this value, // sleep a while to merge read. // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 -#define SRS_MERGED_READ_SIZE(buffer) (buffer / 10) -// the time to sleep to merge read, to read more bytes. -#define SRS_MERGED_READ_US (300 * 1000) +// use the bitrate in kbps to calc the max sleep time. +#define SRS_MR_MAX_BITRATE_KBPS 10000 +#define SRS_MR_AVERAGE_BITRATE_KBPS 1000 +#define SRS_MR_MIN_BITRATE_KBPS 64 +// the max sleep time in ms +#define SRS_MR_MAX_SLEEP_MS 3000 +// the max small bytes to group +#define SRS_MR_SMALL_BYTES 64 +// the percent of buffer to set as small bytes +#define SRS_MR_SMALL_PERCENT 100 ISrsMessageHandler::ISrsMessageHandler() { @@ -226,7 +234,7 @@ void SrsQueueRecvThread::on_thread_stop() } SrsPublishRecvThread::SrsPublishRecvThread( - SrsRtmpServer* rtmp_sdk, int timeout_ms, + SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge ): trd(this, rtmp_sdk, timeout_ms) { @@ -239,6 +247,10 @@ SrsPublishRecvThread::SrsPublishRecvThread( recv_error_code = ERROR_SUCCESS; _nb_msgs = 0; error = st_cond_new(); + + mr_fd = fd; + mr_small_bytes = 0; + mr_sleep_ms = 0; } SrsPublishRecvThread::~SrsPublishRecvThread() @@ -284,9 +296,17 @@ void SrsPublishRecvThread::on_thread_start() // we donot set the auto response to false, // for the main thread never send message. + // 128KB recv buffer. + int nb_rbuf = 128 * 1024; + socklen_t sock_buf_size = sizeof(int); + if (setsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, sock_buf_size) < 0) { + srs_warn("set sock SO_RCVBUF=%d failed.", nb_rbuf); + } + getsockopt(mr_fd, SOL_SOCKET, SO_RCVBUF, &nb_rbuf, &sock_buf_size); + // enable the merge read // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 - rtmp->set_merge_read(true, this); + rtmp->set_merge_read(true, nb_rbuf, this); } void SrsPublishRecvThread::on_thread_stop() @@ -300,7 +320,7 @@ void SrsPublishRecvThread::on_thread_stop() // disable the merge read // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 - rtmp->set_merge_read(false, NULL); + rtmp->set_merge_read(false, 0, NULL); } bool SrsPublishRecvThread::can_handle() @@ -334,9 +354,9 @@ void SrsPublishRecvThread::on_recv_error(int ret) st_cond_signal(error); } -void SrsPublishRecvThread::on_read(int nb_buffer, ssize_t nread) +void SrsPublishRecvThread::on_read(ssize_t nread) { - if (nread < 0) { + if (nread < 0 || mr_sleep_ms <= 0) { return; } @@ -346,7 +366,31 @@ void SrsPublishRecvThread::on_read(int nb_buffer, ssize_t nread) * that is, we merge some data to read together. * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 */ - if (nread < SRS_MERGED_READ_SIZE(nb_buffer)) { - st_usleep(SRS_MERGED_READ_US); + if (nread < mr_small_bytes) { + st_usleep(mr_sleep_ms * 1000); } } + +void SrsPublishRecvThread::on_buffer_change(int nb_buffer) +{ + // set percent. + mr_small_bytes = (int)(nb_buffer / SRS_MR_SMALL_PERCENT); + // select the smaller + mr_small_bytes = srs_min(mr_small_bytes, SRS_MR_SMALL_BYTES); + + // the recv sleep is [buffer / max_kbps, buffer / min_kbps] + // for example, buffer is 256KB, max kbps is 10Mbps, min kbps is 10Kbps, + // the buffer is 256KB*8=2048Kb, which can provides sleep time in + // min: 2038Kb/10Mbps=2038Kb/10Kbpms=203.8ms + // max: 2038Kb/10Kbps=203.8s + // sleep = Xb * 8 / (N * 1000 b / 1000 ms) = (X * 8 / N) ms + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 + int min_sleep = (int)(nb_buffer * 8.0 / SRS_MR_MAX_BITRATE_KBPS); + int average_sleep = (int)(nb_buffer * 8.0 / SRS_MR_AVERAGE_BITRATE_KBPS); + int max_sleep = (int)(nb_buffer * 8.0 / SRS_MR_MIN_BITRATE_KBPS); + // 80% min, 16% average, 4% max. + mr_sleep_ms = (int)(min_sleep * 0.8 + average_sleep * 0.16 + max_sleep * 0.04); + mr_sleep_ms = srs_min(mr_sleep_ms, SRS_MR_MAX_SLEEP_MS); + + srs_trace("merged read, buffer=%d, small=%d, sleep=%d", nb_buffer, mr_small_bytes, mr_sleep_ms); +} diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 534a051cc..7b947562c 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -140,6 +140,11 @@ private: SrsRtmpServer* rtmp; // the msgs already got. int64_t _nb_msgs; + // for mr(merged read), + // @see https://github.com/winlinvip/simple-rtmp-server/issues/241 + int mr_fd; + int mr_small_bytes; + int mr_sleep_ms; // the recv thread error code. int recv_error_code; SrsRtmpConn* _conn; @@ -151,7 +156,7 @@ private: // @see https://github.com/winlinvip/simple-rtmp-server/issues/244 st_cond_t error; public: - SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, int timeout_ms, + SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, int fd, int timeout_ms, SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge); virtual ~SrsPublishRecvThread(); public: @@ -173,7 +178,8 @@ public: virtual void on_recv_error(int ret); // interface IMergeReadHandler public: - virtual void on_read(int nb_buffer, ssize_t nread); + virtual void on_read(ssize_t nread); + virtual void on_buffer_change(int nb_buffer); }; #endif diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 33e0bb518..3ba916879 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -659,7 +659,7 @@ int SrsRtmpConn::fmle_publishing(SrsSource* source) // use isolate thread to recv, // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 - SrsPublishRecvThread trd(rtmp, + SrsPublishRecvThread trd(rtmp, st_netfd_fileno(stfd), SRS_CONSTS_RTMP_RECV_TIMEOUT_US / 1000, this, source, true, vhost_is_edge); @@ -695,7 +695,7 @@ int SrsRtmpConn::flash_publishing(SrsSource* source) // use isolate thread to recv, // @see: https://github.com/winlinvip/simple-rtmp-server/issues/237 - SrsPublishRecvThread trd(rtmp, + SrsPublishRecvThread trd(rtmp, st_netfd_fileno(stfd), SRS_CONSTS_RTMP_RECV_TIMEOUT_US / 1000, this, source, false, vhost_is_edge); diff --git a/trunk/src/rtmp/srs_protocol_buffer.cpp b/trunk/src/rtmp/srs_protocol_buffer.cpp index f4d949cec..72873ac4b 100644 --- a/trunk/src/rtmp/srs_protocol_buffer.cpp +++ b/trunk/src/rtmp/srs_protocol_buffer.cpp @@ -112,7 +112,7 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size) * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 */ if (merged_read && _handler) { - _handler->on_read(nb_buffer, nread); + _handler->on_read(nread); } srs_assert((int)nread > 0); @@ -122,10 +122,14 @@ int SrsBuffer::grow(ISrsBufferReader* reader, int required_size) return ret; } -void SrsBuffer::set_merge_read(bool v, IMergeReadHandler* handler) +void SrsBuffer::set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler) { merged_read = v; _handler = handler; + + if (v && max_buffer != nb_buffer) { + reset_buffer(max_buffer); + } } void SrsBuffer::on_chunk_size(int32_t chunk_size) @@ -134,10 +138,7 @@ void SrsBuffer::on_chunk_size(int32_t chunk_size) return; } - srs_freep(buffer); - - nb_buffer = chunk_size; - buffer = new char[nb_buffer]; + reset_buffer(chunk_size); } int SrsBuffer::buffer_size() @@ -145,4 +146,14 @@ int SrsBuffer::buffer_size() return nb_buffer; } +void SrsBuffer::reset_buffer(int size) +{ + srs_freep(buffer); + nb_buffer = size; + buffer = new char[nb_buffer]; + + if (_handler) { + _handler->on_buffer_change(nb_buffer); + } +} diff --git a/trunk/src/rtmp/srs_protocol_buffer.hpp b/trunk/src/rtmp/srs_protocol_buffer.hpp index 35f9c4615..be087f3da 100644 --- a/trunk/src/rtmp/srs_protocol_buffer.hpp +++ b/trunk/src/rtmp/srs_protocol_buffer.hpp @@ -51,7 +51,12 @@ public: * some small bytes. * @remark, it only for server-side, client srs-librtmp just ignore. */ - virtual void on_read(int nb_buffer, ssize_t nread) = 0; + virtual void on_read(ssize_t nread) = 0; + /** + * when buffer size changed. + * @param nb_buffer the new buffer size. + */ + virtual void on_buffer_change(int nb_buffer) = 0; }; /** @@ -110,11 +115,11 @@ public: * when it on and read small bytes, we sleep to wait more data., * that is, we merge some data to read together. * @param v true to ename merged read. + * @param max_buffer the max buffer size, the socket buffer. * @param handler the handler when merge read is enabled. * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 */ - virtual void set_merge_read(bool v, IMergeReadHandler* handler); -public: + virtual void set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler); /** * when chunk size changed, the buffer should change the buffer also. * to keep the socket buffer size always greater than chunk size. @@ -125,6 +130,8 @@ public: * get the size of socket buffer to read. */ virtual int buffer_size(); +private: + virtual void reset_buffer(int size); }; #endif diff --git a/trunk/src/rtmp/srs_protocol_rtmp.cpp b/trunk/src/rtmp/srs_protocol_rtmp.cpp index 80183dbbd..c6ccc1dee 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.cpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.cpp @@ -745,9 +745,9 @@ void SrsRtmpServer::set_auto_response(bool v) protocol->set_auto_response(v); } -void SrsRtmpServer::set_merge_read(bool v, IMergeReadHandler* handler) +void SrsRtmpServer::set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler) { - protocol->set_merge_read(v, handler); + protocol->set_merge_read(v, max_buffer, handler); } void SrsRtmpServer::set_recv_timeout(int64_t timeout_us) diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp index 46eb92275..de2e4c47e 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp @@ -348,10 +348,11 @@ public: * when it on and read small bytes, we sleep to wait more data., * that is, we merge some data to read together. * @param v true to ename merged read. + * @param max_buffer the max buffer size, the socket buffer. * @param handler the handler when merge read is enabled. * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 */ - virtual void set_merge_read(bool v, IMergeReadHandler* handler); + virtual void set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler); /** * set/get the recv timeout in us. * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT. diff --git a/trunk/src/rtmp/srs_protocol_stack.cpp b/trunk/src/rtmp/srs_protocol_stack.cpp index a58bc7c6e..e86e36b3a 100644 --- a/trunk/src/rtmp/srs_protocol_stack.cpp +++ b/trunk/src/rtmp/srs_protocol_stack.cpp @@ -479,9 +479,9 @@ int SrsProtocol::manual_response_flush() return ret; } -void SrsProtocol::set_merge_read(bool v, IMergeReadHandler* handler) +void SrsProtocol::set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler) { - in_buffer->set_merge_read(v, handler); + in_buffer->set_merge_read(v, max_buffer, handler); } void SrsProtocol::set_recv_timeout(int64_t timeout_us) diff --git a/trunk/src/rtmp/srs_protocol_stack.hpp b/trunk/src/rtmp/srs_protocol_stack.hpp index 0f7dd69d0..461e14cee 100644 --- a/trunk/src/rtmp/srs_protocol_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_stack.hpp @@ -276,10 +276,11 @@ public: * when it on and read small bytes, we sleep to wait more data., * that is, we merge some data to read together. * @param v true to ename merged read. + * @param max_buffer the max buffer size, the socket buffer. * @param handler the handler when merge read is enabled. * @see https://github.com/winlinvip/simple-rtmp-server/issues/241 */ - virtual void set_merge_read(bool v, IMergeReadHandler* handler); + virtual void set_merge_read(bool v, int max_buffer, IMergeReadHandler* handler); public: /** * set/get the recv timeout in us.