diff --git a/README.md b/README.md index b68b191d7..9c5c9593f 100755 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ url: rtmp://127.0.0.1:1935/live/livestream * nginx v1.5.0: 139524 lines
### History +* v0.4, 2013-11-09, support pause for live stream. * v0.3, 2013-11-04, v0.3 released. 11773 lines. * v0.3, 2013-11-04, support refer/play-refer/publish-refer. * v0.3, 2013-11-04, support vhosts specified config. diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index e0ecf021d..3ee52085e 100755 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -63,7 +63,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. (void)0 // current release version -#define RTMP_SIG_SRS_VERSION "0.2.0" +#define RTMP_SIG_SRS_VERSION "0.3.0" // server info. #define RTMP_SIG_SRS_KEY "srs" #define RTMP_SIG_SRS_ROLE "origin server" diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index 1e79c27a0..cf64c7299 100755 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -270,10 +270,9 @@ int SrsClient::playing(SrsSource* source) srs_error("recv client control message failed. ret=%d", ret); return ret; } - if (ret == ERROR_SUCCESS && !msg) { - srs_info("play loop got a message."); - SrsAutoFree(SrsCommonMessage, msg, false); - // TODO: process it. + if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) { + srs_error("process play control message failed. ret=%d", ret); + return ret; } } @@ -442,3 +441,44 @@ int SrsClient::get_peer_ip() return ret; } +int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg) +{ + int ret = ERROR_SUCCESS; + + if (!msg) { + srs_verbose("ignore all empty message."); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + + if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) { + srs_info("ignore all message except amf0/amf3 command."); + return ret; + } + + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + srs_error("decode the amf0/amf3 command packet failed. ret=%d", ret); + return ret; + } + srs_info("decode the amf0/amf3 command packet success."); + + SrsPausePacket* pause = dynamic_cast(msg->get_packet()); + if (!pause) { + srs_info("ignore all amf0/amf3 command except pause."); + return ret; + } + + if ((ret = rtmp->on_play_client_pause(res->stream_id, pause->is_pause)) != ERROR_SUCCESS) { + srs_error("rtmp process play client pause failed. ret=%d", ret); + return ret; + } + + if ((ret = consumer->on_play_client_pause(pause->is_pause)) != ERROR_SUCCESS) { + srs_error("consumer process play client pause failed. ret=%d", ret); + return ret; + } + srs_info("process pause success, is_pause=%d, time=%d.", pause->is_pause, pause->time_ms); + + return ret; +} + diff --git a/trunk/src/core/srs_core_client.hpp b/trunk/src/core/srs_core_client.hpp index dab54e170..b8e433da4 100755 --- a/trunk/src/core/srs_core_client.hpp +++ b/trunk/src/core/srs_core_client.hpp @@ -37,6 +37,8 @@ class SrsRequest; class SrsResponse; class SrsSource; class SrsRefer; +class SrsConsumer; +class SrsCommonMessage; /** * the client provides the main logic control for RTMP clients. @@ -59,6 +61,7 @@ private: virtual int playing(SrsSource* source); virtual int publish(SrsSource* source, bool is_fmle); virtual int get_peer_ip(); + virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); }; #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 3d50b248f..45f84df4d 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -195,6 +195,7 @@ messages. #define RTMP_AMF0_COMMAND_CONNECT "connect" #define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream" #define RTMP_AMF0_COMMAND_PLAY "play" +#define RTMP_AMF0_COMMAND_PAUSE "pause" #define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone" #define RTMP_AMF0_COMMAND_ON_STATUS "onStatus" #define RTMP_AMF0_COMMAND_RESULT "_result" @@ -1219,6 +1220,10 @@ int SrsCommonMessage::decode_packet() srs_info("decode the AMF0/AMF3 command(paly message)."); packet = new SrsPlayPacket(); return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_PAUSE) { + srs_info("decode the AMF0/AMF3 command(pause message)."); + packet = new SrsPausePacket(); + return packet->decode(stream); } else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) { srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message)."); packet = new SrsFMLEStartPacket(); @@ -1896,6 +1901,61 @@ int SrsPublishPacket::decode(SrsStream* stream) return ret; } +SrsPausePacket::SrsPausePacket() +{ + command_name = RTMP_AMF0_COMMAND_PAUSE; + transaction_id = 0; + command_object = new SrsAmf0Null(); + + time_ms = 0; + is_pause = true; +} + +SrsPausePacket::~SrsPausePacket() +{ + srs_freep(command_object); +} + +int SrsPausePacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode pause command_name failed. ret=%d", ret); + return ret; + } + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_PAUSE) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode pause command_name failed. " + "command_name=%s, ret=%d", command_name.c_str(), ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode pause transaction_id failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode pause command_object failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_boolean(stream, is_pause)) != ERROR_SUCCESS) { + srs_error("amf0 decode pause is_pause failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, time_ms)) != ERROR_SUCCESS) { + srs_error("amf0 decode pause time_ms failed. ret=%d", ret); + return ret; + } + + srs_info("amf0 decode pause packet success"); + + return ret; +} + SrsPlayPacket::SrsPlayPacket() { command_name = RTMP_AMF0_COMMAND_PLAY; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index db31b4b3b..958647836 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -636,6 +636,33 @@ public: virtual int decode(SrsStream* stream); }; +/** +* 4.2.8. pause +* The client sends the pause command to tell the server to pause or +* start playing. +*/ +class SrsPausePacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsPausePacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Null* command_object; + bool is_pause; + double time_ms; +public: + SrsPausePacket(); + virtual ~SrsPausePacket(); +public: + virtual int decode(SrsStream* stream); +}; + /** * 4.2.1. play * The client sends this command to the server to play a stream. @@ -960,12 +987,12 @@ protected: enum SrcPCUCEventType { // generally, 4bytes event-data - SrcPCUCStreamBegin = 0x00, + SrcPCUCStreamBegin = 0x00, SrcPCUCStreamEOF = 0x01, SrcPCUCStreamDry = 0x02, - SrcPCUCSetBufferLength = 0x03, // 8bytes event-data + SrcPCUCSetBufferLength = 0x03, // 8bytes event-data SrcPCUCStreamIsRecorded = 0x04, - SrcPCUCPingRequest = 0x06, + SrcPCUCPingRequest = 0x06, SrcPCUCPingResponse = 0x07, }; diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index db777f149..eda853f0b 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -34,34 +34,36 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. /** * the signature for packets to client. */ -#define RTMP_SIG_FMS_VER "3,5,3,888" -#define RTMP_SIG_AMF0_VER 0 -#define RTMP_SIG_CLIENT_ID "ASAICiss" +#define RTMP_SIG_FMS_VER "3,5,3,888" +#define RTMP_SIG_AMF0_VER 0 +#define RTMP_SIG_CLIENT_ID "ASAICiss" /** * onStatus consts. */ -#define StatusLevel "level" -#define StatusCode "code" -#define StatusDescription "description" -#define StatusDetails "details" -#define StatusClientId "clientid" +#define StatusLevel "level" +#define StatusCode "code" +#define StatusDescription "description" +#define StatusDetails "details" +#define StatusClientId "clientid" // status value -#define StatusLevelStatus "status" +#define StatusLevelStatus "status" // code value -#define StatusCodeConnectSuccess "NetConnection.Connect.Success" -#define StatusCodeStreamReset "NetStream.Play.Reset" -#define StatusCodeStreamStart "NetStream.Play.Start" -#define StatusCodePublishStart "NetStream.Publish.Start" -#define StatusCodeDataStart "NetStream.Data.Start" -#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success" +#define StatusCodeConnectSuccess "NetConnection.Connect.Success" +#define StatusCodeStreamReset "NetStream.Play.Reset" +#define StatusCodeStreamStart "NetStream.Play.Start" +#define StatusCodeStreamPause "NetStream.Pause.Notify" +#define StatusCodeStreamUnpause "NetStream.Unpause.Notify" +#define StatusCodePublishStart "NetStream.Publish.Start" +#define StatusCodeDataStart "NetStream.Data.Start" +#define StatusCodeUnpublishSuccess "NetStream.Unpublish.Success" // FMLE #define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish" #define RTMP_AMF0_COMMAND_ON_FC_UNPUBLISH "onFCUnpublish" // default stream id for response the createStream request. -#define SRS_DEFAULT_SID 1 +#define SRS_DEFAULT_SID 1 SrsRequest::SrsRequest() { @@ -570,6 +572,81 @@ int SrsRtmp::start_play(int stream_id) return ret; } +int SrsRtmp::on_play_client_pause(int stream_id, bool is_pause) +{ + int ret = ERROR_SUCCESS; + + if (is_pause) { + // onStatus(NetStream.Pause.Notify) + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); + + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus)); + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamPause)); + pkt->data->set(StatusDescription, new SrsAmf0String("Paused stream.")); + + msg->set_packet(pkt, stream_id); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send onStatus(NetStream.Pause.Notify) message failed. ret=%d", ret); + return ret; + } + srs_info("send onStatus(NetStream.Pause.Notify) message success."); + } + // StreamEOF + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsUserControlPacket* pkt = new SrsUserControlPacket(); + + pkt->event_type = SrcPCUCStreamEOF; + pkt->event_data = stream_id; + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send PCUC(StreamEOF) message failed. ret=%d", ret); + return ret; + } + srs_info("send PCUC(StreamEOF) message success."); + } + } else { + // onStatus(NetStream.Unpause.Notify) + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); + + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus)); + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamUnpause)); + pkt->data->set(StatusDescription, new SrsAmf0String("Unpaused stream.")); + + msg->set_packet(pkt, stream_id); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send onStatus(NetStream.Unpause.Notify) message failed. ret=%d", ret); + return ret; + } + srs_info("send onStatus(NetStream.Unpause.Notify) message success."); + } + // StreanBegin + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsUserControlPacket* pkt = new SrsUserControlPacket(); + + pkt->event_type = SrcPCUCStreamBegin; + pkt->event_data = stream_id; + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send PCUC(StreanBegin) message failed. ret=%d", ret); + return ret; + } + srs_info("send PCUC(StreanBegin) message success."); + } + } + + return ret; +} + int SrsRtmp::start_fmle_publish(int stream_id) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index 547082379..707ebc0f0 100755 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -148,6 +148,16 @@ public: */ virtual int start_play(int stream_id); /** + * when client(type is play) send pause message, + * if is_pause, response the following packets: + * onStatus(NetStream.Pause.Notify) + * StreamEOF + * if not is_pause, response the following packets: + * onStatus(NetStream.Unpause.Notify) + * StreamBegin + */ + virtual int on_play_client_pause(int stream_id, bool is_pause); + /** * when client type is publish, response with packets: * releaseStream response * FCPublish diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index 53c288e66..eb40f62b9 100755 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -31,8 +31,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -#define CONST_MAX_JITTER_MS 500 -#define DEFAULT_FRAME_TIME_MS 10 +#define CONST_MAX_JITTER_MS 500 +#define DEFAULT_FRAME_TIME_MS 10 +#define PAUSED_SHRINK_SIZE 250 std::map SrsSource::pool; @@ -50,16 +51,14 @@ SrsConsumer::SrsConsumer(SrsSource* _source) { source = _source; last_pkt_correct_time = last_pkt_time = 0; + paused = false; + codec = new SrsCodec(); } SrsConsumer::~SrsConsumer() { - std::vector::iterator it; - for (it = msgs.begin(); it != msgs.end(); ++it) { - SrsSharedPtrMessage* msg = *it; - srs_freep(msg); - } - msgs.clear(); + clear(); + srs_freep(codec); source->on_consumer_destroy(this); } @@ -89,6 +88,13 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c if (msgs.empty()) { return ret; } + + if (paused) { + if ((int)msgs.size() >= PAUSED_SHRINK_SIZE) { + shrink(); + } + return ret; + } if (max_count == 0) { count = (int)msgs.size(); @@ -111,6 +117,68 @@ int SrsConsumer::get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& c return ret; } +int SrsConsumer::on_play_client_pause(bool is_pause) +{ + int ret = ERROR_SUCCESS; + + srs_trace("stream consumer change pause state %d=>%d", paused, is_pause); + paused = is_pause; + + return ret; +} + +void SrsConsumer::shrink() +{ + int i = 0; + std::vector::iterator it; + + // issue the last video iframe. + bool has_video = false; + int frame_to_remove = 0; + std::vector::iterator iframe = msgs.end(); + for (i = 0, it = msgs.begin(); it != msgs.end(); ++it, i++) { + SrsSharedPtrMessage* msg = *it; + if (msg->header.is_video()) { + has_video = true; + if (codec->video_is_keyframe(msg->payload, msg->size)) { + iframe = it; + frame_to_remove = i + 1; + } + } + } + + // last iframe is the first elem, ignore it. + if (iframe == msgs.begin()) { + return; + } + + // recalc the frame to remove + if (iframe == msgs.end()) { + frame_to_remove = 0; + } + if (!has_video) { + frame_to_remove = (int)msgs.size(); + } + + srs_trace("shrink the cache queue, has_video=%d, has_iframe=%d, size=%d, removed=%d", + has_video, iframe != msgs.end(), (int)msgs.size(), frame_to_remove); + + // if no video, remove all audio. + if (!has_video) { + clear(); + return; + } + + // if exists video Iframe, remove the frames before it. + if (iframe != msgs.end()) { + for (it = msgs.begin(); it != iframe; ++it) { + SrsSharedPtrMessage* msg = *it; + srs_freep(msg); + } + msgs.erase(msgs.begin(), iframe); + } +} + int SrsConsumer::jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate) { int ret = ERROR_SUCCESS; @@ -156,6 +224,16 @@ int SrsConsumer::jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate) return ret; } +void SrsConsumer::clear() +{ + std::vector::iterator it; + for (it = msgs.begin(); it != msgs.end(); ++it) { + SrsSharedPtrMessage* msg = *it; + srs_freep(msg); + } + msgs.clear(); +} + SrsSource::SrsSource(std::string _stream_url) { stream_url = _stream_url; diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index 107ce5a21..503cb3f7f 100755 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -50,6 +50,8 @@ private: int32_t last_pkt_correct_time; SrsSource* source; std::vector msgs; + bool paused; + SrsCodec* codec; public: SrsConsumer(SrsSource* _source); virtual ~SrsConsumer(); @@ -69,11 +71,21 @@ public: * @max_count the max count to dequeue, 0 to dequeue all. */ virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count); + /** + * when client send the pause message. + */ + virtual int on_play_client_pause(bool is_pause); private: + /** + * when paused, shrink the cache queue, + * remove to cache only one gop. + */ + virtual void shrink(); /** * detect the time jitter and correct it. */ virtual int jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate); + virtual void clear(); }; /**