diff --git a/README.md b/README.md
index 67e7b3c9b..a62d389e3 100755
--- a/README.md
+++ b/README.md
@@ -212,6 +212,7 @@ usr sys idl wai hiq siq| read writ| recv send| in out | int csw
* nginx v1.5.0: 139524 lines
### History
+* v0.9, 2013-12-15, drop the old whole gop when live message queue full.
* v0.9, 2013-12-15, fix the forwarder reconnect bug, feed it the sequence header.
* v0.9, 2013-12-15, support reload the hls/forwarder/transcoder.
* v0.9, 2013-12-14, refine the thread model for the retry threads.
diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf
index dc94b75ef..2e1826be2 100755
--- a/trunk/conf/srs.conf
+++ b/trunk/conf/srs.conf
@@ -21,6 +21,7 @@ max_connections 2000;
vhost __defaultVhost__ {
enabled on;
gop_cache on;
+ queue_length 30;
forward 127.0.0.1:19350;
hls {
enabled on;
@@ -92,7 +93,8 @@ vhost __defaultVhost__ {
vhost dev {
enabled on;
gop_cache on;
- forward 127.0.0.1:19350;
+ queue_length 30;
+ #forward 127.0.0.1:19350;
hls {
enabled off;
hls_path ./objs/nginx/html;
@@ -685,6 +687,11 @@ vhost min.delay.com {
# set to on if requires client fast startup.
# default: on
gop_cache off;
+ # the max live queue length in seconds.
+ # if the messages in the queue exceed the max length,
+ # drop the old whole gop.
+ # default: 30
+ queue_length 10;
}
# the vhost for antisuck.
vhost refer.anti_suck.com {
diff --git a/trunk/src/core/srs_core_config.cpp b/trunk/src/core/srs_core_config.cpp
index 904dc0fed..fea6d9dd2 100644
--- a/trunk/src/core/srs_core_config.cpp
+++ b/trunk/src/core/srs_core_config.cpp
@@ -548,6 +548,17 @@ int SrsConfig::reload()
}
srs_trace("vhost %s reload gop_cache success.", vhost.c_str());
}
+ // queue_length
+ if (!srs_directive_equals(new_vhost->get("queue_length"), old_vhost->get("queue_length"))) {
+ for (it = subscribes.begin(); it != subscribes.end(); ++it) {
+ ISrsReloadHandler* subscribe = *it;
+ if ((ret = subscribe->on_reload_queue_length(vhost)) != ERROR_SUCCESS) {
+ srs_error("vhost %s notify subscribes queue_length failed. ret=%d", vhost.c_str(), ret);
+ return ret;
+ }
+ }
+ srs_trace("vhost %s reload queue_length success.", vhost.c_str());
+ }
// forward
if (!srs_directive_equals(new_vhost->get("forward"), old_vhost->get("forward"))) {
for (it = subscribes.begin(); it != subscribes.end(); ++it) {
@@ -1275,6 +1286,7 @@ bool SrsConfig::get_gop_cache(string vhost)
return true;
}
+ conf = conf->get("gop_cache");
if (conf && conf->arg0() == "off") {
return false;
}
@@ -1282,6 +1294,22 @@ bool SrsConfig::get_gop_cache(string vhost)
return true;
}
+double SrsConfig::get_queue_length(string vhost)
+{
+ SrsConfDirective* conf = get_vhost(vhost);
+
+ if (!conf) {
+ return SRS_CONF_DEFAULT_QUEUE_LENGTH;
+ }
+
+ conf = conf->get("queue_length");
+ if (conf || conf->arg0().empty()) {
+ return SRS_CONF_DEFAULT_QUEUE_LENGTH;
+ }
+
+ return ::atoi(conf->arg0().c_str());
+}
+
SrsConfDirective* SrsConfig::get_forward(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
diff --git a/trunk/src/core/srs_core_config.hpp b/trunk/src/core/srs_core_config.hpp
index d173d6ec2..b7ccf8d89 100644
--- a/trunk/src/core/srs_core_config.hpp
+++ b/trunk/src/core/srs_core_config.hpp
@@ -48,6 +48,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_CONF_DEFAULT_AAC_SYNC 100
// in ms, for HLS aac flush the audio
#define SRS_CONF_DEFAULT_AAC_DELAY 300
+// in seconds, the live queue length.
+#define SRS_CONF_DEFAULT_QUEUE_LENGTH 30
+// in seconds, the paused queue length.
+#define SRS_CONF_DEFAULT_PAUSED_LENGTH 10
#define SRS_CONF_DEFAULT_CHUNK_SIZE 4096
@@ -145,6 +149,7 @@ public:
virtual std::string get_log_dir();
virtual int get_max_connections();
virtual bool get_gop_cache(std::string vhost);
+ virtual double get_queue_length(std::string vhost);
virtual SrsConfDirective* get_forward(std::string vhost);
private:
virtual SrsConfDirective* get_hls(std::string vhost);
diff --git a/trunk/src/core/srs_core_forward.cpp b/trunk/src/core/srs_core_forward.cpp
index c6bca4fbb..bb2de325e 100644
--- a/trunk/src/core/srs_core_forward.cpp
+++ b/trunk/src/core/srs_core_forward.cpp
@@ -51,12 +51,14 @@ SrsForwarder::SrsForwarder(SrsSource* _source)
stream_id = 0;
pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_MS);
+ queue = new SrsMessageQueue();
}
SrsForwarder::~SrsForwarder()
{
on_unpublish();
+ // TODO: FIXME: remove it.
std::vector::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
SrsSharedPtrMessage* msg = *it;
@@ -65,6 +67,12 @@ SrsForwarder::~SrsForwarder()
msgs.clear();
srs_freep(pthread);
+ srs_freep(queue);
+}
+
+void SrsForwarder::set_queue_size(double queue_size)
+{
+ queue->set_queue_size(queue_size);
}
int SrsForwarder::on_publish(SrsRequest* req, std::string forward_server)
diff --git a/trunk/src/core/srs_core_forward.hpp b/trunk/src/core/srs_core_forward.hpp
index d2d38242e..37cb71269 100644
--- a/trunk/src/core/srs_core_forward.hpp
+++ b/trunk/src/core/srs_core_forward.hpp
@@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
class SrsSharedPtrMessage;
class SrsOnMetaDataPacket;
+class SrsMessageQueue;
class SrsRtmpClient;
class SrsRequest;
class SrsSource;
@@ -58,10 +59,13 @@ private:
private:
SrsSource* source;
SrsRtmpClient* client;
+ SrsMessageQueue* queue;
std::vector msgs;
public:
SrsForwarder(SrsSource* _source);
virtual ~SrsForwarder();
+public:
+ virtual void set_queue_size(double queue_size);
public:
virtual int on_publish(SrsRequest* req, std::string forward_server);
virtual void on_unpublish();
diff --git a/trunk/src/core/srs_core_reload.cpp b/trunk/src/core/srs_core_reload.cpp
index 71cd5a3c1..f6feee15e 100644
--- a/trunk/src/core/srs_core_reload.cpp
+++ b/trunk/src/core/srs_core_reload.cpp
@@ -55,6 +55,11 @@ int ISrsReloadHandler::on_reload_gop_cache(string /*vhost*/)
return ERROR_SUCCESS;
}
+int ISrsReloadHandler::on_reload_queue_length(string /*vhost*/)
+{
+ return ERROR_SUCCESS;
+}
+
int ISrsReloadHandler::on_reload_forward(string /*vhost*/)
{
return ERROR_SUCCESS;
diff --git a/trunk/src/core/srs_core_reload.hpp b/trunk/src/core/srs_core_reload.hpp
index a819bd7dd..3d8f3e8b9 100644
--- a/trunk/src/core/srs_core_reload.hpp
+++ b/trunk/src/core/srs_core_reload.hpp
@@ -44,6 +44,7 @@ public:
virtual int on_reload_pithy_print();
virtual int on_reload_vhost_removed(std::string vhost);
virtual int on_reload_gop_cache(std::string vhost);
+ virtual int on_reload_queue_length(std::string vhost);
virtual int on_reload_forward(std::string vhost);
virtual int on_reload_hls(std::string vhost);
virtual int on_reload_transcode(std::string vhost);
diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp
index fab9361bc..332f164f0 100644
--- a/trunk/src/core/srs_core_source.cpp
+++ b/trunk/src/core/srs_core_source.cpp
@@ -39,7 +39,6 @@ using namespace std;
#define CONST_MAX_JITTER_MS 500
#define DEFAULT_FRAME_TIME_MS 10
-#define PAUSED_SHRINK_SIZE 250
SrsRtmpJitter::SrsRtmpJitter()
{
@@ -50,9 +49,21 @@ SrsRtmpJitter::~SrsRtmpJitter()
{
}
+// TODO: FIXME: remove the 64bits time, change the timestamp in heaer to 64bits.
int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int tba, int tbv, int64_t* corrected_time)
{
int ret = ERROR_SUCCESS;
+
+ // set to 0 for metadata.
+ if (!msg->header.is_video() && !msg->header.is_audio()) {
+ if (corrected_time) {
+ *corrected_time = 0;
+ }
+
+ msg->header.timestamp = 0;
+
+ return ret;
+ }
int sample_rate = tba;
int frame_rate = tbv;
@@ -110,55 +121,50 @@ int SrsRtmpJitter::get_time()
return (int)last_pkt_correct_time;
}
-SrsConsumer::SrsConsumer(SrsSource* _source)
+SrsMessageQueue::SrsMessageQueue()
{
- source = _source;
- paused = false;
- jitter = new SrsRtmpJitter();
+ queue_size_ms = 0;
+ av_start_time = av_end_time = -1;
}
-SrsConsumer::~SrsConsumer()
+SrsMessageQueue::~SrsMessageQueue()
{
clear();
-
- source->on_consumer_destroy(this);
- srs_freep(jitter);
}
-int SrsConsumer::get_time()
+void SrsMessageQueue::set_queue_size(double queue_size)
{
- return jitter->get_time();
+ queue_size_ms = (int)(queue_size * 1000);
}
-int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)
+int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
- if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {
- srs_freep(msg);
- return ret;
+ if (msg->header.is_video() || msg->header.is_audio()) {
+ if (av_start_time == -1) {
+ av_start_time = msg->header.timestamp;
+ }
+
+ av_end_time = msg->header.timestamp;
}
- // TODO: check the queue size and drop packets if overflow.
msgs.push_back(msg);
+
+ while (av_end_time - av_start_time > queue_size_ms) {
+ shrink();
+ }
return ret;
}
-int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
+int SrsMessageQueue::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
{
int ret = ERROR_SUCCESS;
if (msgs.empty()) {
return ret;
}
-
- if (paused) {
- if ((int)msgs.size() >= PAUSED_SHRINK_SIZE) {
- shrink();
- }
- return ret;
- }
if (max_count == 0) {
count = (int)msgs.size();
@@ -181,6 +187,112 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c
return ret;
}
+void SrsMessageQueue::shrink()
+{
+ int iframe_index = -1;
+
+ // issue the first iframe.
+ // skip the first frame, whatever the type of it,
+ // for when we shrinked, the first is the iframe,
+ // we will directly remove the gop next time.
+ for (int i = 1; i < (int)msgs.size(); i++) {
+ SrsSharedPtrMessage* msg = msgs[i];
+
+ if (msg->header.is_video()) {
+ if (SrsCodec::video_is_keyframe(msg->payload, msg->size)) {
+ // the max frame index to remove.
+ iframe_index = i;
+
+ // set the start time, we will remove until this frame.
+ av_start_time = msg->header.timestamp;
+
+ break;
+ }
+ }
+ }
+
+ // no iframe, clear the queue.
+ if (iframe_index < 0) {
+ clear();
+ return;
+ }
+
+ // remove the first gop from the front
+ for (int i = 0; i < iframe_index; i++) {
+ SrsSharedPtrMessage* msg = msgs[i];
+ srs_freep(msg);
+ }
+ msgs.erase(msgs.begin(), msgs.begin() + iframe_index);
+
+ srs_trace("shrink the cache queue, "
+ "size=%d, removed=%d", (int)msgs.size(), iframe_index);
+}
+
+void SrsMessageQueue::clear()
+{
+ std::vector::iterator it;
+
+ for (it = msgs.begin(); it != msgs.end(); ++it) {
+ SrsSharedPtrMessage* msg = *it;
+ srs_freep(msg);
+ }
+
+ msgs.clear();
+
+ av_start_time = av_end_time = -1;
+}
+
+SrsConsumer::SrsConsumer(SrsSource* _source)
+{
+ source = _source;
+ paused = false;
+ jitter = new SrsRtmpJitter();
+ queue = new SrsMessageQueue();
+}
+
+SrsConsumer::~SrsConsumer()
+{
+ source->on_consumer_destroy(this);
+ srs_freep(jitter);
+ srs_freep(queue);
+}
+
+void SrsConsumer::set_queue_size(double queue_size)
+{
+ queue->set_queue_size(queue_size);
+}
+
+int SrsConsumer::get_time()
+{
+ return jitter->get_time();
+}
+
+int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int tba, int tbv)
+{
+ int ret = ERROR_SUCCESS;
+
+ if ((ret = jitter->correct(msg, tba, tbv)) != ERROR_SUCCESS) {
+ srs_freep(msg);
+ return ret;
+ }
+
+ if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
+ return ret;
+ }
+
+ return ret;
+}
+
+int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count)
+{
+ // paused, return nothing.
+ if (paused) {
+ return ERROR_SUCCESS;
+ }
+
+ return queue->get_packets(max_count, pmsgs, count);
+}
+
int SrsConsumer::on_play_client_pause(bool is_pause)
{
int ret = ERROR_SUCCESS;
@@ -191,68 +303,6 @@ int SrsConsumer::on_play_client_pause(bool is_pause)
return ret;
}
-void SrsConsumer::shrink()
-{
- int i = 0;
- std::vector::iterator it;
-
- // issue the last video iframe.
- bool has_video = false;
- int frame_to_remove = 0;
- std::vector::iterator iframe = msgs.end();
- for (i = 0, it = msgs.begin(); it != msgs.end(); ++it, i++) {
- SrsSharedPtrMessage* msg = *it;
- if (msg->header.is_video()) {
- has_video = true;
- if (SrsCodec::video_is_keyframe(msg->payload, msg->size)) {
- iframe = it;
- frame_to_remove = i + 1;
- }
- }
- }
-
- // last iframe is the first elem, ignore it.
- if (iframe == msgs.begin()) {
- return;
- }
-
- // recalc the frame to remove
- if (iframe == msgs.end()) {
- frame_to_remove = 0;
- }
- if (!has_video) {
- frame_to_remove = (int)msgs.size();
- }
-
- srs_trace("shrink the cache queue, has_video=%d, has_iframe=%d, size=%d, removed=%d",
- has_video, iframe != msgs.end(), (int)msgs.size(), frame_to_remove);
-
- // if no video, remove all audio.
- if (!has_video) {
- clear();
- return;
- }
-
- // if exists video Iframe, remove the frames before it.
- if (iframe != msgs.end()) {
- for (it = msgs.begin(); it != iframe; ++it) {
- SrsSharedPtrMessage* msg = *it;
- srs_freep(msg);
- }
- msgs.erase(msgs.begin(), iframe);
- }
-}
-
-void SrsConsumer::clear()
-{
- std::vector::iterator it;
- for (it = msgs.begin(); it != msgs.end(); ++it) {
- SrsSharedPtrMessage* msg = *it;
- srs_freep(msg);
- }
- msgs.clear();
-}
-
SrsGopCache::SrsGopCache()
{
cached_video_count = 0;
@@ -436,6 +486,41 @@ int SrsSource::on_reload_gop_cache(string vhost)
return ret;
}
+int SrsSource::on_reload_queue_length(string vhost)
+{
+ int ret = ERROR_SUCCESS;
+
+ if (req->vhost != vhost) {
+ return ret;
+ }
+
+ double queue_size = config->get_queue_length(req->vhost);
+
+ if (true) {
+ std::vector::iterator it;
+
+ for (it = consumers.begin(); it != consumers.end(); ++it) {
+ SrsConsumer* consumer = *it;
+ consumer->set_queue_size(queue_size);
+ }
+
+ srs_trace("consumers reload queue size success.");
+ }
+
+ if (true) {
+ std::vector::iterator it;
+
+ for (it = forwarders.begin(); it != forwarders.end(); ++it) {
+ SrsForwarder* forwarder = *it;
+ forwarder->set_queue_size(queue_size);
+ }
+
+ srs_trace("forwarders reload queue size success.");
+ }
+
+ return ret;
+}
+
int SrsSource::on_reload_forward(string vhost)
{
int ret = ERROR_SUCCESS;
@@ -735,7 +820,7 @@ int SrsSource::on_video(SrsCommonMessage* video)
// cache the last gop packets
if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
- srs_error("shrink gop cache failed. ret=%d", ret);
+ srs_error("gop cache msg failed. ret=%d", ret);
return ret;
}
srs_verbose("cache gop success.");
@@ -809,6 +894,7 @@ void SrsSource::on_unpublish()
int ret = ERROR_SUCCESS;
consumer = new SrsConsumer(this);
+ consumer->set_queue_size(config->get_queue_length(req->vhost));
consumers.push_back(consumer);
if (cache_metadata && (ret = consumer->enqueue(cache_metadata->copy(), sample_rate, frame_rate)) != ERROR_SUCCESS) {
diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp
index 897918c7e..a4567b2b9 100644
--- a/trunk/src/core/srs_core_source.hpp
+++ b/trunk/src/core/srs_core_source.hpp
@@ -74,6 +74,45 @@ public:
virtual int get_time();
};
+/**
+* the message queue for the consumer(client), forwarder.
+* we limit the size in seconds, drop old messages(the whole gop) if full.
+*/
+class SrsMessageQueue
+{
+private:
+ int64_t av_start_time;
+ int64_t av_end_time;
+ int queue_size_ms;
+ std::vector msgs;
+public:
+ SrsMessageQueue();
+ virtual ~SrsMessageQueue();
+public:
+ /**
+ * set the queue size
+ * @param queue_size the queue size in seconds.
+ */
+ virtual void set_queue_size(double queue_size);
+public:
+ /**
+ * enqueue the message, the timestamp always monotonically.
+ * @param msg, the msg to enqueue, user never free it whatever the return code.
+ */
+ virtual int enqueue(SrsSharedPtrMessage* msg);
+ /**
+ * get messages from the queue.
+ */
+ virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count);
+private:
+ /**
+ * remove a gop from the front.
+ * if no iframe found, clear it.
+ */
+ virtual void shrink();
+ virtual void clear();
+};
+
/**
* the consumer for SrsSource, that is a play client.
*/
@@ -82,11 +121,14 @@ class SrsConsumer
private:
SrsRtmpJitter* jitter;
SrsSource* source;
+ SrsMessageQueue* queue;
std::vector msgs;
bool paused;
public:
SrsConsumer(SrsSource* _source);
virtual ~SrsConsumer();
+public:
+ virtual void set_queue_size(double queue_size);
public:
/**
* get current client time, the last packet time.
@@ -111,13 +153,6 @@ public:
* when client send the pause message.
*/
virtual int on_play_client_pause(bool is_pause);
-private:
- /**
- * when paused, shrink the cache queue,
- * remove to cache only one gop.
- */
- virtual void shrink();
- virtual void clear();
};
/**
@@ -218,6 +253,7 @@ public:
// interface ISrsReloadHandler
public:
virtual int on_reload_gop_cache(std::string vhost);
+ virtual int on_reload_queue_length(std::string vhost);
virtual int on_reload_forward(std::string vhost);
virtual int on_reload_hls(std::string vhost);
virtual int on_reload_transcode(std::string vhost);