From ab1e62a8860cc2891a39587e32494821dae87d2c Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 22 Sep 2015 14:33:17 +0800 Subject: [PATCH] create the metadata request message --- trunk/src/protocol/srs_kafka_stack.cpp | 49 ++++++++++++++++++++++++++ trunk/src/protocol/srs_kafka_stack.hpp | 43 ++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index 342475e4a..e6fdc9b97 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -105,6 +105,46 @@ int SrsKafkaRequestHeader::total_size() return 4 + size; } +bool SrsKafkaRequestHeader::is_producer_request() +{ + return api_key == SrsKafkaApiKeyProduceRequest; +} + +bool SrsKafkaRequestHeader::is_fetch_request() +{ + return api_key == SrsKafkaApiKeyFetchRequest; +} + +bool SrsKafkaRequestHeader::is_offset_request() +{ + return api_key == SrsKafkaApiKeyOffsetRequest; +} + +bool SrsKafkaRequestHeader::is_metadata_request() +{ + return api_key == SrsKafkaApiKeyMetadataRequest; +} + +bool SrsKafkaRequestHeader::is_offset_commit_request() +{ + return api_key == SrsKafkaApiKeyOffsetCommitRequest; +} + +bool SrsKafkaRequestHeader::is_offset_fetch_request() +{ + return api_key == SrsKafkaApiKeyOffsetFetchRequest; +} + +bool SrsKafkaRequestHeader::is_consumer_metadata_request() +{ + return api_key == SrsKafkaApiKeyConsumerMetadataRequest; +} + +void SrsKafkaRequestHeader::set_api_key(SrsKafkaApiKey key) +{ + api_key = (int16_t)key; +} + SrsKafkaResponse::SrsKafkaResponse() { correlation_id = 0; @@ -145,3 +185,12 @@ SrsKafkaMessageSet::~SrsKafkaMessageSet() messages.clear(); } +SrsKafkaTopicMetadataRequest::SrsKafkaTopicMetadataRequest() +{ + header.set_api_key(SrsKafkaApiKeyMetadataRequest); +} + +SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest() +{ +} + diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 5f52dda55..e8574161f 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -31,6 +31,19 @@ #include +// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys +enum SrsKafkaApiKey +{ + SrsKafkaApiKeyProduceRequest = 0, + SrsKafkaApiKeyFetchRequest = 1, + SrsKafkaApiKeyOffsetRequest = 2, + SrsKafkaApiKeyMetadataRequest = 3, + /* Non-user facing control APIs 4-7 */ + SrsKafkaApiKeyOffsetCommitRequest = 8, + SrsKafkaApiKeyOffsetFetchRequest = 9, + SrsKafkaApiKeyConsumerMetadataRequest = 10, +}; + /** * These types consist of a signed integer giving a length N followed by N bytes of content. * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. @@ -74,6 +87,11 @@ public: * int32 size containing the length N followed by N repetitions of the structure which can * itself be made up of other primitive types. In the BNF grammars below we will show an * array of a structure foo as [foo]. + * + * Usage: + * SrsKafkaArray body; + * body.append(new SrsKafkaBytes()); + * * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests */ template @@ -96,6 +114,12 @@ public: } elems.clear(); } +public: + virtual void append(T* elem) + { + length++; + elems.push_back(elem); + } }; /** @@ -161,6 +185,20 @@ public: * @remark total_size = 4 + header_size + message_size. */ virtual int total_size(); +public: + /** + * the api key enumeration. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys + */ + virtual bool is_producer_request(); + virtual bool is_fetch_request(); + virtual bool is_offset_request(); + virtual bool is_metadata_request(); + virtual bool is_offset_commit_request(); + virtual bool is_offset_fetch_request(); + virtual bool is_consumer_metadata_request(); + // set the api key. + virtual void set_api_key(SrsKafkaApiKey key); }; /** @@ -262,7 +300,12 @@ public: */ class SrsKafkaTopicMetadataRequest { +private: + SrsKafkaRequestHeader header; + SrsKafkaArray request; public: + SrsKafkaTopicMetadataRequest(); + virtual ~SrsKafkaTopicMetadataRequest(); }; #endif