diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index 44e2faed7..622507d51 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -221,8 +221,8 @@ int SrsKafkaBytes::decode(SrsBuffer* buf) SrsKafkaRequestHeader::SrsKafkaRequestHeader() { _size = 0; - api_key = api_version = 0; - correlation_id = 0; + _api_key = api_version = 0; + _correlation_id = 0; client_id = new SrsKafkaString(); } @@ -246,49 +246,69 @@ int SrsKafkaRequestHeader::total_size() return 4 + _size; } -bool SrsKafkaRequestHeader::is_producer_request() +void SrsKafkaRequestHeader::set_total_size(int s) { - return api_key == SrsKafkaApiKeyProduceRequest; + _size = s - 4; } -bool SrsKafkaRequestHeader::is_fetch_request() +int32_t SrsKafkaRequestHeader::correlation_id() { - return api_key == SrsKafkaApiKeyFetchRequest; + return _correlation_id; } -bool SrsKafkaRequestHeader::is_offset_request() +void SrsKafkaRequestHeader::set_correlation_id(int32_t cid) { - return api_key == SrsKafkaApiKeyOffsetRequest; + _correlation_id = cid; } -bool SrsKafkaRequestHeader::is_metadata_request() +SrsKafkaApiKey SrsKafkaRequestHeader::api_key() { - return api_key == SrsKafkaApiKeyMetadataRequest; -} - -bool SrsKafkaRequestHeader::is_offset_commit_request() -{ - return api_key == SrsKafkaApiKeyOffsetCommitRequest; -} - -bool SrsKafkaRequestHeader::is_offset_fetch_request() -{ - return api_key == SrsKafkaApiKeyOffsetFetchRequest; -} - -bool SrsKafkaRequestHeader::is_consumer_metadata_request() -{ - return api_key == SrsKafkaApiKeyConsumerMetadataRequest; + return (SrsKafkaApiKey)_api_key; } void SrsKafkaRequestHeader::set_api_key(SrsKafkaApiKey key) { - api_key = (int16_t)key; + _api_key = (int16_t)key; +} + +bool SrsKafkaRequestHeader::is_producer_request() +{ + return _api_key == SrsKafkaApiKeyProduceRequest; +} + +bool SrsKafkaRequestHeader::is_fetch_request() +{ + return _api_key == SrsKafkaApiKeyFetchRequest; +} + +bool SrsKafkaRequestHeader::is_offset_request() +{ + return _api_key == SrsKafkaApiKeyOffsetRequest; +} + +bool SrsKafkaRequestHeader::is_metadata_request() +{ + return _api_key == SrsKafkaApiKeyMetadataRequest; +} + +bool SrsKafkaRequestHeader::is_offset_commit_request() +{ + return _api_key == SrsKafkaApiKeyOffsetCommitRequest; +} + +bool SrsKafkaRequestHeader::is_offset_fetch_request() +{ + return _api_key == SrsKafkaApiKeyOffsetFetchRequest; +} + +bool SrsKafkaRequestHeader::is_consumer_metadata_request() +{ + return _api_key == SrsKafkaApiKeyConsumerMetadataRequest; } int SrsKafkaRequestHeader::size() { - return 4 + _size; + return 4 + header_size(); } int SrsKafkaRequestHeader::encode(SrsBuffer* buf) @@ -302,9 +322,9 @@ int SrsKafkaRequestHeader::encode(SrsBuffer* buf) } buf->write_4bytes(_size); - buf->write_2bytes(api_key); + buf->write_2bytes(_api_key); buf->write_2bytes(api_version); - buf->write_4bytes(correlation_id); + buf->write_4bytes(_correlation_id); if ((ret = client_id->encode(buf)) != ERROR_SUCCESS) { srs_error("kafka encode request client_id failed. ret=%d", ret); @@ -335,9 +355,9 @@ int SrsKafkaRequestHeader::decode(SrsBuffer* buf) srs_error("kafka decode request message failed. ret=%d", ret); return ret; } - api_key = buf->read_2bytes(); + _api_key = buf->read_2bytes(); api_version = buf->read_2bytes(); - correlation_id = buf->read_4bytes(); + _correlation_id = buf->read_4bytes(); if ((ret = client_id->decode(buf)) != ERROR_SUCCESS) { srs_error("kafka decode request client_id failed. ret=%d", ret); @@ -372,9 +392,14 @@ int SrsKafkaResponseHeader::total_size() return 4 + _size; } +void SrsKafkaResponseHeader::set_total_size(int s) +{ + _size = s - 4; +} + int SrsKafkaResponseHeader::size() { - return 4 + _size; + return 4 + header_size(); } int SrsKafkaResponseHeader::encode(SrsBuffer* buf) @@ -452,12 +477,28 @@ SrsKafkaMessageSet::~SrsKafkaMessageSet() SrsKafkaRequest::SrsKafkaRequest() { + header.set_correlation_id(SrsKafkaCorrelationPool::instance()->generate_correlation_id()); } SrsKafkaRequest::~SrsKafkaRequest() { } +void SrsKafkaRequest::update_header(int s) +{ + header.set_total_size(s); +} + +int32_t SrsKafkaRequest::correlation_id() +{ + return header.correlation_id(); +} + +SrsKafkaApiKey SrsKafkaRequest::api_key() +{ + return header.api_key(); +} + int SrsKafkaRequest::size() { return header.size(); @@ -481,6 +522,11 @@ SrsKafkaResponse::~SrsKafkaResponse() { } +void SrsKafkaResponse::update_header(int s) +{ + header.set_total_size(s); +} + int SrsKafkaResponse::size() { return header.size(); @@ -589,6 +635,50 @@ int SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf) return ret; } +SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::_instance = new SrsKafkaCorrelationPool(); + +SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::instance() +{ + return _instance; +} + +SrsKafkaCorrelationPool::SrsKafkaCorrelationPool() +{ +} + +SrsKafkaCorrelationPool::~SrsKafkaCorrelationPool() +{ + correlation_ids.clear(); +} + +int32_t SrsKafkaCorrelationPool::generate_correlation_id() +{ + static int32_t cid = 1; + return cid++; +} + +void SrsKafkaCorrelationPool::set(int32_t correlation_id, SrsKafkaApiKey request) +{ + correlation_ids[correlation_id] = request; +} + +void SrsKafkaCorrelationPool::unset(int32_t correlation_id) +{ + std::map::iterator it = correlation_ids.find(correlation_id); + if (it != correlation_ids.end()) { + correlation_ids.erase(it); + } +} + +SrsKafkaApiKey SrsKafkaCorrelationPool::get(int32_t correlation_id) +{ + if (correlation_ids.find(correlation_id) == correlation_ids.end()) { + return SrsKafkaApiKeyUnknown; + } + + return correlation_ids[correlation_id]; +} + SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io) { skt = io; @@ -610,6 +700,13 @@ int SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg) return ret; } + // update the header of message. + msg->update_header(size); + + // cache the request correlation id to discovery response message. + SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance(); + pool->set(msg->correlation_id(), msg->api_key()); + // TODO: FIXME: refine for performance issue. char* bytes = new char[size]; SrsAutoFree(char, bytes); diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index f1ced607b..520c9bdea 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -31,6 +31,7 @@ #include #include +#include #include #include @@ -45,6 +46,8 @@ class ISrsProtocolReaderWriter; */ enum SrsKafkaApiKey { + SrsKafkaApiKeyUnknown = -1, + SrsKafkaApiKeyProduceRequest = 0, SrsKafkaApiKeyFetchRequest = 1, SrsKafkaApiKeyOffsetRequest = 2, @@ -203,7 +206,7 @@ private: * a metadata request, a produce request, a fetch request, etc). * @remark MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest */ - int16_t api_key; + int16_t _api_key; /** * This is a numeric version number for this api. We version each API and * this version number allows the server to properly interpret the request @@ -216,7 +219,7 @@ private: * the response by the server, unmodified. It is useful for matching * request and response between the client and server. */ - int32_t correlation_id; + int32_t _correlation_id; /** * This is a user supplied identifier for the client application. * The user can use any identifier they like and it will be used @@ -252,6 +255,28 @@ private: * the total size of the request, includes the 4B size. */ virtual int total_size(); +public: + /** + * when got the whole message size, update the header. + * @param s the whole message, including the 4 bytes size size. + */ + virtual void set_total_size(int s); + /** + * get the correlation id for message. + */ + virtual int32_t correlation_id(); + /** + * set the correlation id for message. + */ + virtual void set_correlation_id(int32_t cid); + /** + * get the api key of header for message. + */ + virtual SrsKafkaApiKey api_key(); + /** + * set the api key of header for message. + */ + virtual void set_api_key(SrsKafkaApiKey key); public: /** * the api key enumeration. @@ -264,8 +289,6 @@ public: virtual bool is_offset_commit_request(); virtual bool is_offset_fetch_request(); virtual bool is_consumer_metadata_request(); - // set the api key. - virtual void set_api_key(SrsKafkaApiKey key); // interface ISrsCodec public: virtual int size(); @@ -321,6 +344,12 @@ private: * the total size of the request, includes the 4B size. */ virtual int total_size(); +public: + /** + * when got the whole message size, update the header. + * @param s the whole message, including the 4 bytes size size. + */ + virtual void set_total_size(int s); // interface ISrsCodec public: virtual int size(); @@ -403,6 +432,20 @@ protected: public: SrsKafkaRequest(); virtual ~SrsKafkaRequest(); +public: + /** + * update the size in header. + * @param s an int value specifies the size of message in header. + */ + virtual void update_header(int s); + /** + * get the correlation id of header for message. + */ + virtual int32_t correlation_id(); + /** + * get the api key of request. + */ + virtual SrsKafkaApiKey api_key(); // interface ISrsCodec public: virtual int size(); @@ -420,6 +463,12 @@ protected: public: SrsKafkaResponse(); virtual ~SrsKafkaResponse(); +public: + /** + * update the size in header. + * @param s an int value specifies the size of message in header. + */ + virtual void update_header(int s); // interface ISrsCodec public: virtual int size(); @@ -477,6 +526,32 @@ public: virtual int decode(SrsBuffer* buf); }; +/** + * the poll to discovery reponse. + * @param CorrelationId This is a user-supplied integer. It will be passed back + * in the response by the server, unmodified. It is useful for matching + * request and response between the client and server. + * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests + */ +class SrsKafkaCorrelationPool +{ +private: + static SrsKafkaCorrelationPool* _instance; +public: + static SrsKafkaCorrelationPool* instance(); +private: + std::map correlation_ids; +private: + SrsKafkaCorrelationPool(); +public: + virtual ~SrsKafkaCorrelationPool(); +public: + virtual int32_t generate_correlation_id(); + virtual void set(int32_t correlation_id, SrsKafkaApiKey request); + virtual void unset(int32_t correlation_id); + virtual SrsKafkaApiKey get(int32_t correlation_id); +}; + /** * the kafka protocol stack, use to send and recv kakfa messages. */