From 8974fe298b24e186df51aa31535c18b4a5389b8b Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 15 Oct 2015 17:45:58 +0800 Subject: [PATCH] connect to kafka and send metadata request. --- trunk/conf/full.conf | 5 ++- trunk/src/app/srs_app_config.cpp | 17 +++++++++ trunk/src/app/srs_app_config.hpp | 4 +++ trunk/src/app/srs_app_kafka.cpp | 34 +++++++++++++++--- trunk/src/app/srs_app_kafka.hpp | 4 +++ trunk/src/kernel/srs_kernel_consts.hpp | 8 +++++ trunk/src/protocol/srs_kafka_stack.cpp | 40 +++++++++++++++++++++ trunk/src/protocol/srs_kafka_stack.hpp | 50 ++++++++++++++++++++++---- 8 files changed, 151 insertions(+), 11 deletions(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 1ebc28790..dcffd69d9 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -242,8 +242,11 @@ kafka { enabled off; # the broker list, broker is # and use space to specify multple brokers. - # for exampl, 127.0.0.1:9092 127.0.0.1:9093 + # for example, 127.0.0.1:9092 127.0.0.1:9093 brokers 127.0.0.1:9092; + # the kafka topic to use. + # default: srs + topic srs; } ############################################################################################# diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index f0de5ac7a..12e6525ee 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4298,6 +4298,23 @@ SrsConfDirective* SrsConfig::get_kafka_brokers() return conf; } +string SrsConfig::get_kafka_topic() +{ + static string DEFAULT = "srs"; + + SrsConfDirective* conf = root->get("kafka"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("topic"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return conf->arg0(); +} + SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost) { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 48cbd59e3..662187b1d 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -638,6 +638,10 @@ public: * get the broker list, each is format in . */ virtual SrsConfDirective* get_kafka_brokers(); + /** + * get the kafka topic to use for srs. + */ + virtual std::string get_kafka_topic(); // vhost specified section public: /** diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 9eb821941..2bcc0f2c7 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -33,6 +33,7 @@ using namespace std; #include #include #include +#include #ifdef SRS_AUTO_KAFKA @@ -40,12 +41,16 @@ SrsKafkaProducer::SrsKafkaProducer() { lb = new SrsLbRoundRobin(); worker = new SrsAsyncCallWorker(); + transport = new SrsTcpClient(); + kafka = new SrsKafkaClient(transport); } SrsKafkaProducer::~SrsKafkaProducer() { srs_freep(lb); srs_freep(worker); + srs_freep(kafka); + srs_freep(transport); } int SrsKafkaProducer::initialize() @@ -86,25 +91,46 @@ int SrsKafkaProducer::request_metadata() { int ret = ERROR_SUCCESS; + // ignore when disabled. bool enabled = _srs_config->get_kafka_enabled(); if (!enabled) { return ret; } + // select one broker to connect to. SrsConfDirective* brokers = _srs_config->get_kafka_brokers(); if (!brokers) { srs_warn("ignore for empty brokers."); return ret; } - srs_assert(!brokers->args.empty()); - std::string broker = lb->select(brokers->args); + std::string server; + int port = SRS_CONSTS_KAFKA_DEFAULT_PORT; + if (true) { + srs_assert(!brokers->args.empty()); + std::string broker = lb->select(brokers->args); + srs_parse_endpoint(broker, server, port); + } + // connect to kafka server. + if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) { + srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret); + return ret; + } + + // do fetch medata from broker. + std::string topic = _srs_config->get_kafka_topic(); + if ((ret = kafka->fetch_metadata(topic)) != ERROR_SUCCESS) { + srs_error("kafka fetch metadata failed. ret=%d", ret); + return ret; + } + + // log when completed. if (true) { std::string senabled = srs_bool2switch(enabled); std::string sbrokers = srs_join_vector_string(brokers->args, ","); - srs_trace("kafka ok, enabled:%s, brokers:%s, current:[%d]%s", - senabled.c_str(), sbrokers.c_str(), lb->current(), broker.c_str()); + srs_trace("kafka ok, enabled:%s, brokers:%s, current:[%d]%s:%d, topic:%s", + senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str()); } return ret; diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index 02dc39724..81251317f 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -31,6 +31,8 @@ class SrsLbRoundRobin; class SrsAsyncCallWorker; +class SrsTcpClient; +class SrsKafkaClient; #ifdef SRS_AUTO_KAFKA @@ -42,6 +44,8 @@ class SrsKafkaProducer private: SrsLbRoundRobin* lb; SrsAsyncCallWorker* worker; + SrsTcpClient* transport; + SrsKafkaClient* kafka; public: SrsKafkaProducer(); virtual ~SrsKafkaProducer(); diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index d209c7762..34b4a0fa4 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -398,5 +398,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_CONSTS_RTSP_RTSPVersionNotSupported_str "RTSP Version Not Supported" #define SRS_CONSTS_RTSP_OptionNotSupported_str "Option not support" +/////////////////////////////////////////////////////////// +// KAFKA consts values +/////////////////////////////////////////////////////////// +#define SRS_CONSTS_KAFKA_DEFAULT_PORT 9092 + +// the common io timeout, for both recv and send. +#define SRS_CONSTS_KAFKA_TIMEOUT_US (int64_t)(30*1000*1000LL) + #endif diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp index f23d0d00a..deada64a1 100644 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ b/trunk/src/protocol/srs_kafka_stack.cpp @@ -23,8 +23,11 @@ #include +#include using namespace std; +#include + #ifdef SRS_AUTO_KAFKA SrsKafkaString::SrsKafkaString() @@ -196,5 +199,42 @@ SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest() { } +SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReaderWriter* io) +{ + skt = io; +} + +SrsKafkaProtocol::~SrsKafkaProtocol() +{ +} + +int SrsKafkaProtocol::send_and_free_message(SrsKafkaMessage* msg) +{ + int ret = ERROR_SUCCESS; + + // TODO: FIXME: implements it. + + return ret; +} + +SrsKafkaClient::SrsKafkaClient(ISrsProtocolReaderWriter* io) +{ + protocol = new SrsKafkaProtocol(io); +} + +SrsKafkaClient::~SrsKafkaClient() +{ + srs_freep(protocol); +} + +int SrsKafkaClient::fetch_metadata(string topic) +{ + int ret = ERROR_SUCCESS; + + // TODO: FIXME: implements it. + + return ret; +} + #endif diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp index 4be61c3eb..dc6a9c61d 100644 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ b/trunk/src/protocol/srs_kafka_stack.hpp @@ -30,6 +30,9 @@ #include #include +#include + +class ISrsProtocolReaderWriter; #ifdef SRS_AUTO_KAFKA @@ -94,7 +97,7 @@ public: * array of a structure foo as [foo]. * * Usage: - * SrsKafkaArray body; + * SrsKafkaArray body; * body.append(new SrsKafkaBytes()); * * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests @@ -104,8 +107,8 @@ class SrsKafkaArray { private: int length; - std::vector elems; - typedef typename std::vector::iterator SrsIterator; + std::vector elems; + typedef typename std::vector::iterator SrsIterator; public: SrsKafkaArray() { @@ -114,13 +117,13 @@ public: virtual ~SrsKafkaArray() { for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { - T* elem = *it; + T elem = *it; srs_freep(elem); } elems.clear(); } public: - virtual void append(T* elem) + virtual void append(T elem) { length++; elems.push_back(elem); @@ -307,12 +310,47 @@ class SrsKafkaTopicMetadataRequest { private: SrsKafkaRequestHeader header; - SrsKafkaArray request; + SrsKafkaArray request; public: SrsKafkaTopicMetadataRequest(); virtual ~SrsKafkaTopicMetadataRequest(); }; +/** + * the kafka protocol stack, use to send and recv kakfa messages. + */ +class SrsKafkaProtocol +{ +private: + ISrsProtocolReaderWriter* skt; +public: + SrsKafkaProtocol(ISrsProtocolReaderWriter* io); + virtual ~SrsKafkaProtocol(); +public: + /** + * write the message to kafka server. + * @param msg the msg to send. user must not free it again. + */ + virtual int send_and_free_message(SrsKafkaMessage* msg); +}; + +/** + * the kafka client, for producer or consumer. + */ +class SrsKafkaClient +{ +private: + SrsKafkaProtocol* protocol; +public: + SrsKafkaClient(ISrsProtocolReaderWriter* io); + virtual ~SrsKafkaClient(); +public: + /** + * fetch the metadata from broker for topic. + */ + virtual int fetch_metadata(std::string topic); +}; + #endif #endif