From 8283ed465aa821fd68e9ee94d652bf66bb7e50bb Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 20 Oct 2013 17:13:01 +0800 Subject: [PATCH] support encode amf0 packet, connect app response packet --- trunk/src/core/srs_core_amf0.cpp | 305 ++++++++++++++++++++++++++- trunk/src/core/srs_core_amf0.hpp | 14 ++ trunk/src/core/srs_core_error.hpp | 1 + trunk/src/core/srs_core_protocol.cpp | 75 +++++++ trunk/src/core/srs_core_protocol.hpp | 28 +++ trunk/src/core/srs_core_stream.cpp | 47 ++++- trunk/src/core/srs_core_stream.hpp | 22 +- 7 files changed, 482 insertions(+), 10 deletions(-) diff --git a/trunk/src/core/srs_core_amf0.cpp b/trunk/src/core/srs_core_amf0.cpp index f17de3ddc..5bd40e1cf 100755 --- a/trunk/src/core/srs_core_amf0.cpp +++ b/trunk/src/core/srs_core_amf0.cpp @@ -53,6 +53,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // User defined #define RTMP_AMF0_Invalid 0x3F +int srs_amf0_get_object_eof_size(); +int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*&); +int srs_amf0_write_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*); +int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value); +int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value); + SrsAmf0Any::SrsAmf0Any() { marker = RTMP_AMF0_Invalid; @@ -167,8 +173,6 @@ SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name) return prop; } -int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*&); - int srs_amf0_read_utf8(SrsStream* stream, std::string& value) { int ret = ERROR_SUCCESS; @@ -213,6 +217,36 @@ int srs_amf0_read_utf8(SrsStream* stream, std::string& value) return ret; } +int srs_amf0_write_utf8(SrsStream* stream, std::string value) +{ + int ret = ERROR_SUCCESS; + + // len + if (!stream->require(2)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write string length failed. ret=%d", ret); + return ret; + } + stream->write_2bytes(value.length()); + srs_verbose("amf0 write string length success. len=%d", (int)value.length()); + + // empty string + if (value.length() <= 0) { + srs_verbose("amf0 write empty string. ret=%d", ret); + return ret; + } + + // data + if (!stream->require(value.length())) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write string data failed. ret=%d", ret); + return ret; + } + stream->write_string(value); + srs_verbose("amf0 write string data success. str=%s", value.c_str()); + + return ret; +} int srs_amf0_read_string(SrsStream* stream, std::string& value) { @@ -237,6 +271,23 @@ int srs_amf0_read_string(SrsStream* stream, std::string& value) return srs_amf0_read_utf8(stream, value); } +int srs_amf0_write_string(SrsStream* stream, std::string value) +{ + int ret = ERROR_SUCCESS; + + // marker + if (!stream->require(1)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write string marker failed. ret=%d", ret); + return ret; + } + + stream->write_1bytes(RTMP_AMF0_String); + srs_verbose("amf0 write string marker success"); + + return srs_amf0_write_utf8(stream, value); +} + int srs_amf0_read_boolean(SrsStream* stream, bool& value) { int ret = ERROR_SUCCESS; @@ -274,6 +325,36 @@ int srs_amf0_read_boolean(SrsStream* stream, bool& value) return ret; } +int srs_amf0_write_boolean(SrsStream* stream, bool value) +{ + int ret = ERROR_SUCCESS; + + // marker + if (!stream->require(1)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write bool marker failed. ret=%d", ret); + return ret; + } + stream->write_1bytes(RTMP_AMF0_Boolean); + srs_verbose("amf0 write bool marker success"); + + // value + if (!stream->require(1)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write bool value failed. ret=%d", ret); + return ret; + } + + if (value) { + stream->write_1bytes(0x01); + } else { + stream->write_1bytes(0x00); + } + + srs_verbose("amf0 write bool value success. value=%d", value); + + return ret; +} int srs_amf0_read_number(SrsStream* stream, double& value) { @@ -309,6 +390,35 @@ int srs_amf0_read_number(SrsStream* stream, double& value) return ret; } +int srs_amf0_write_number(SrsStream* stream, double value) +{ + int ret = ERROR_SUCCESS; + + // marker + if (!stream->require(1)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write number marker failed. ret=%d", ret); + return ret; + } + + stream->write_1bytes(RTMP_AMF0_Number); + srs_verbose("amf0 write number marker success"); + + // value + if (!stream->require(8)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write number value failed. ret=%d", ret); + return ret; + } + + int64_t temp = 0x00; + memcpy(&temp, &value, 8); + stream->write_8bytes(temp); + + srs_verbose("amf0 write number value success. value=%.2f", value); + + return ret; +} int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) { @@ -381,11 +491,66 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) return ret; } +int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value) +{ + int ret = ERROR_SUCCESS; + + srs_assert(value != NULL); + + switch (value->marker) { + case RTMP_AMF0_String: { + std::string data = srs_amf0_convert(value)->value; + return srs_amf0_write_string(stream, data); + } + case RTMP_AMF0_Boolean: { + bool data = srs_amf0_convert(value)->value; + return srs_amf0_write_boolean(stream, data); + } + case RTMP_AMF0_Number: { + double data = srs_amf0_convert(value)->value; + return srs_amf0_write_number(stream, data); + } + case RTMP_AMF0_ObjectEnd: { + SrsAmf0ObjectEOF* p = srs_amf0_convert(value); + return srs_amf0_write_object_eof(stream, p); + } + case RTMP_AMF0_Object: { + SrsAmf0Object* p = srs_amf0_convert(value); + return srs_amf0_write_object(stream, p); + } + case RTMP_AMF0_Invalid: + default: { + ret = ERROR_RTMP_AMF0_INVALID; + srs_error("invalid amf0 message type. marker=%#x, ret=%d", value->marker, ret); + return ret; + } + } + + return ret; +} int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*& value) { int ret = ERROR_SUCCESS; + // auto skip -2 to read the object eof. + srs_assert(stream->pos() >= 2); + stream->skip(-2); + + // value + if (!stream->require(2)) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 read object eof value failed. ret=%d", ret); + return ret; + } + int16_t temp = stream->read_2bytes(); + if (temp != 0x00) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 read object eof value check failed. " + "must be 0x00, actual is %#x, ret=%d", temp, ret); + return ret; + } + // marker if (!stream->require(1)) { ret = ERROR_RTMP_AMF0_DECODE; @@ -402,9 +567,36 @@ int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*& value) } srs_verbose("amf0 read object eof marker success"); - // value value = new SrsAmf0ObjectEOF(); - srs_verbose("amf0 read object eof marker success"); + srs_verbose("amf0 read object eof success"); + + return ret; +} +int srs_amf0_write_object_eof(SrsStream* stream, SrsAmf0ObjectEOF* value) +{ + int ret = ERROR_SUCCESS; + + srs_assert(value != NULL); + + // value + if (!stream->require(2)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write object eof value failed. ret=%d", ret); + return ret; + } + stream->write_2bytes(0x00); + srs_verbose("amf0 write object eof value success"); + + // marker + if (!stream->require(1)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write object eof marker failed. ret=%d", ret); + return ret; + } + + stream->write_1bytes(RTMP_AMF0_ObjectEnd); + + srs_verbose("amf0 read object eof success"); return ret; } @@ -462,3 +654,108 @@ int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value) return ret; } +int srs_amf0_write_object(SrsStream* stream, SrsAmf0Object* value) +{ + int ret = ERROR_SUCCESS; + + srs_assert(value != NULL); + + // marker + if (!stream->require(1)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write object marker failed. ret=%d", ret); + return ret; + } + + stream->write_1bytes(RTMP_AMF0_Object); + srs_verbose("amf0 write object marker success"); + + // value + std::map::iterator it; + for (it = value->properties.begin(); it != value->properties.end(); ++it) { + std::string name = it->first; + SrsAmf0Any* any = it->second; + + if ((ret = srs_amf0_write_utf8(stream, name)) != ERROR_SUCCESS) { + srs_error("write object property name failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_write_any(stream, any)) != ERROR_SUCCESS) { + srs_error("write object property value failed. ret=%d", ret); + return ret; + } + + srs_verbose("write amf0 property success. name=%s", name.c_str()); + } + + if ((ret = srs_amf0_write_object_eof(stream, &value->eof)) != ERROR_SUCCESS) { + srs_error("write object eof failed. ret=%d", ret); + return ret; + } + + srs_verbose("write amf0 object success."); + + return ret; +} + +int srs_amf0_get_utf8_size(std::string value) +{ + return 2 + value.length(); +} + +int srs_amf0_get_string_size(std::string value) +{ + return 1 + srs_amf0_get_utf8_size(value); +} + +int srs_amf0_get_number_size() +{ + return 1 + 8; +} + +int srs_amf0_get_boolean_size() +{ + return 1 + 1; +} + +int srs_amf0_get_object_size(SrsAmf0Object* obj) +{ + if (!obj) { + return 0; + } + + int size = 1; + + std::map::iterator it; + for (it = obj->properties.begin(); it != obj->properties.end(); ++it) { + std::string name = it->first; + SrsAmf0Any* value = it->second; + + size += srs_amf0_get_utf8_size(name); + + if (value->is_boolean()) { + size += srs_amf0_get_boolean_size(); + } else if (value->is_number()) { + size += srs_amf0_get_number_size(); + } else if (value->is_string()) { + SrsAmf0String* p = srs_amf0_convert(value); + size += srs_amf0_get_string_size(p->value); + } else if (value->is_object()) { + SrsAmf0Object* p = srs_amf0_convert(value); + size += srs_amf0_get_object_size(p); + } else { + // TOOD: other AMF0 types. + srs_warn("ignore unkown AMF0 type size."); + } + } + + size += srs_amf0_get_object_eof_size(); + + return size; +} + +int srs_amf0_get_object_eof_size() +{ + return 2 + 1; +} diff --git a/trunk/src/core/srs_core_amf0.hpp b/trunk/src/core/srs_core_amf0.hpp index 1fa9db737..b8e82fdfa 100755 --- a/trunk/src/core/srs_core_amf0.hpp +++ b/trunk/src/core/srs_core_amf0.hpp @@ -140,6 +140,7 @@ struct SrsAmf0Object : public SrsAmf0Any * @remark only support UTF8-1 char. */ extern int srs_amf0_read_utf8(SrsStream* stream, std::string& value); +extern int srs_amf0_write_utf8(SrsStream* stream, std::string value); /** * read amf0 string from stream. @@ -147,6 +148,7 @@ extern int srs_amf0_read_utf8(SrsStream* stream, std::string& value); * string-type = string-marker UTF-8 */ extern int srs_amf0_read_string(SrsStream* stream, std::string& value); +extern int srs_amf0_write_string(SrsStream* stream, std::string value); /** * read amf0 boolean from stream. @@ -155,6 +157,7 @@ extern int srs_amf0_read_string(SrsStream* stream, std::string& value); * 0 is false, <> 0 is true */ extern int srs_amf0_read_boolean(SrsStream* stream, bool& value); +extern int srs_amf0_write_boolean(SrsStream* stream, bool value); /** * read amf0 number from stream. @@ -162,6 +165,7 @@ extern int srs_amf0_read_boolean(SrsStream* stream, bool& value); * number-type = number-marker DOUBLE */ extern int srs_amf0_read_number(SrsStream* stream, double& value); +extern int srs_amf0_write_number(SrsStream* stream, double value); /** * read amf0 object from stream. @@ -170,6 +174,16 @@ extern int srs_amf0_read_number(SrsStream* stream, double& value); * object-property = (UTF-8 value-type) | (UTF-8-empty object-end-marker) */ extern int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value); +extern int srs_amf0_write_object(SrsStream* stream, SrsAmf0Object* value); + +/** +* get amf0 objects size. +*/ +extern int srs_amf0_get_utf8_size(std::string value); +extern int srs_amf0_get_string_size(std::string value); +extern int srs_amf0_get_number_size(); +extern int srs_amf0_get_boolean_size(); +extern int srs_amf0_get_object_size(SrsAmf0Object* obj); /** * convert the any to specified object. diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index 082ca0e1e..f43f003a2 100755 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -58,6 +58,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_RTMP_REQ_TCURL 306 #define ERROR_RTMP_MESSAGE_DECODE 307 #define ERROR_RTMP_MESSAGE_ENCODE 308 +#define ERROR_RTMP_AMF0_ENCODE 309 #define ERROR_SYSTEM_STREAM_INIT 400 #define ERROR_SYSTEM_PACKET_INVALID 401 diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 5b6437cd3..7c98aa797 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -1010,6 +1010,10 @@ SrsConnectAppPacket::SrsConnectAppPacket() SrsConnectAppPacket::~SrsConnectAppPacket() { + if (command_object) { + delete command_object; + command_object = NULL; + } } int SrsConnectAppPacket::decode(SrsStream* stream) @@ -1053,6 +1057,77 @@ int SrsConnectAppPacket::decode(SrsStream* stream) return ret; } +SrsConnectAppResPacket::SrsConnectAppResPacket() +{ + command_name = RTMP_AMF0_COMMAND_CONNECT; + transaction_id = 1; + props = NULL; + info = NULL; +} + +SrsConnectAppResPacket::~SrsConnectAppResPacket() +{ + if (props) { + delete props; + props = NULL; + } + + if (info) { + delete info; + info = NULL; + } +} + +int SrsConnectAppResPacket::get_perfer_cid() +{ + return RTMP_CID_OverConnection; +} + +int SrsConnectAppResPacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsConnectAppResPacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_object_size(props)+ srs_amf0_get_object_size(info); +} + +int SrsConnectAppResPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_object(stream, props)) != ERROR_SUCCESS) { + srs_error("encode props failed. ret=%d", ret); + return ret; + } + srs_verbose("encode props success."); + + if ((ret = srs_amf0_write_object(stream, info)) != ERROR_SUCCESS) { + srs_error("encode info failed. ret=%d", ret); + return ret; + } + srs_verbose("encode info success."); + + + srs_info("encode connect app response packet success."); + + return ret; +} + SrsSetWindowAckSizePacket::SrsSetWindowAckSizePacket() { ackowledgement_window_size = 0; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index db9378549..1a08644f8 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -324,6 +324,34 @@ public: public: virtual int decode(SrsStream* stream); }; +/** +* response for SrsConnectAppPacket. +*/ +class SrsConnectAppResPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsConnectAppResPacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Object* props; + SrsAmf0Object* info; +public: + SrsConnectAppResPacket(); + virtual ~SrsConnectAppResPacket(); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); +}; /** * 5.5. Window Acknowledgement Size (5) diff --git a/trunk/src/core/srs_core_stream.cpp b/trunk/src/core/srs_core_stream.cpp index 76847f3d7..fb05b7536 100755 --- a/trunk/src/core/srs_core_stream.cpp +++ b/trunk/src/core/srs_core_stream.cpp @@ -78,6 +78,15 @@ void SrsStream::skip(int size) p += size; } +int SrsStream::pos() +{ + if (empty()) { + return 0; + } + + return p - bytes; +} + int8_t SrsStream::read_1bytes() { srs_assert(require(1)); @@ -141,6 +150,22 @@ std::string SrsStream::read_string(int len) return value; } +void SrsStream::write_1bytes(int8_t value) +{ + srs_assert(require(1)); + + *p++ = value; +} + +void SrsStream::write_2bytes(int16_t value) +{ + srs_assert(require(2)); + + pp = (char*)&value; + *p++ = pp[1]; + *p++ = pp[0]; +} + void SrsStream::write_4bytes(int32_t value) { srs_assert(require(4)); @@ -152,10 +177,26 @@ void SrsStream::write_4bytes(int32_t value) *p++ = pp[0]; } -void SrsStream::write_1bytes(int8_t value) +void SrsStream::write_8bytes(int64_t value) { - srs_assert(require(1)); + srs_assert(require(8)); - *p++ = value; + pp = (char*)&value; + *p++ = pp[7]; + *p++ = pp[6]; + *p++ = pp[5]; + *p++ = pp[4]; + *p++ = pp[3]; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; +} + +void SrsStream::write_string(std::string value) +{ + srs_assert(require(value.length())); + + memcpy(p, value.data(), value.length()); + p += value.length(); } diff --git a/trunk/src/core/srs_core_stream.hpp b/trunk/src/core/srs_core_stream.hpp index 094f6d230..f48fd08b4 100755 --- a/trunk/src/core/srs_core_stream.hpp +++ b/trunk/src/core/srs_core_stream.hpp @@ -35,7 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsStream { -protected: +private: char* p; char* pp; char* bytes; @@ -70,6 +70,10 @@ public: * @size can be any value. positive to forward; nagetive to backward. */ virtual void skip(int size); + /** + * tell the current pos. + */ + virtual int pos(); public: /** * get 1bytes char from stream. @@ -92,14 +96,26 @@ public: */ virtual std::string read_string(int len); public: + /** + * write 1bytes char to stream. + */ + virtual void write_1bytes(int8_t value); + /** + * write 2bytes int to stream. + */ + virtual void write_2bytes(int16_t value); /** * write 4bytes int to stream. */ virtual void write_4bytes(int32_t value); /** - * write 1bytes char to stream. + * write 8bytes int to stream. */ - virtual void write_1bytes(int8_t value); + virtual void write_8bytes(int64_t value); + /** + * write string to stream + */ + virtual void write_string(std::string value); }; #endif \ No newline at end of file