Remove KAFKA. 3.0.53

pull/1568/head
winlin 5 years ago
parent bb3e8a41d1
commit 5d17bb8bb0

@ -164,6 +164,7 @@ Please select according to languages:
### V3 changes
* v3.0, 2019-10-03, Remove KAFKA. 3.0.53
* v3.0, 2019-05-14, Covert Kernel File reader/writer. 3.0.52
* v3.0, 2019-04-30, Refine typo in files. 3.0.51
* v3.0, 2019-04-25, Upgrade http-parser from 2.1 to 2.9.2 and cover it. 3.0.50
@ -710,7 +711,6 @@ Comparing with other media servers, SRS is much better and stronger, for details
| Reload | Stable | X | X | X | X |
| Forward | Stable | X | X | X | X |
| ATC | Stable | X | X | X | X |
| KAFKA | Experiment| X | X | X | X |
#### Stream Service

@ -18,7 +18,6 @@ help=no
SRS_HDS=NO
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_KAFKA=NO
SRS_LIBRTMP=NO
SRS_RESEARCH=YES
SRS_UTEST=YES
@ -117,7 +116,6 @@ Options:
--with-hds enable hds streaming, mux RTMP to F4M/F4V files.
--with-nginx enable delivery HTTP stream with nginx.
--with-stream-caster enable stream caster to serve other stream over other protocol.
--with-kafka enable srs kafka producer to report to kafka.
--with-ffmpeg enable transcoding tool ffmpeg.
--with-transcode enable transcoding features.
--with-ingest enable ingest features.
@ -138,7 +136,6 @@ Options:
--without-hds disable hds, the adobe http dynamic streaming.
--without-nginx disable delivery HTTP stream with nginx.
--without-stream-caster disable stream caster, only listen and serve RTMP/HTTP.
--without-kafka disable the srs kafka producer.
--without-ffmpeg disable the ffmpeg transcode tool feature.
--without-transcode disable the transcoding feature.
--without-ingest disable the ingest feature.
@ -230,7 +227,6 @@ function parse_user_option() {
--with-ingest) SRS_INGEST=YES ;;
--with-stat) SRS_STAT=YES ;;
--with-stream-caster) SRS_STREAM_CASTER=YES ;;
--with-kafka) SRS_KAFKA=YES ;;
--with-librtmp) SRS_LIBRTMP=YES ;;
--with-research) SRS_RESEARCH=YES ;;
--with-utest) SRS_UTEST=YES ;;
@ -251,7 +247,6 @@ function parse_user_option() {
--without-ingest) SRS_INGEST=NO ;;
--without-stat) SRS_STAT=NO ;;
--without-stream-caster) SRS_STREAM_CASTER=NO ;;
--without-kafka) SRS_KAFKA=NO ;;
--without-librtmp) SRS_LIBRTMP=NO ;;
--without-research) SRS_RESEARCH=NO ;;
--without-utest) SRS_UTEST=NO ;;
@ -387,7 +382,6 @@ function apply_user_presets() {
SRS_HDS=NO
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_KAFKA=NO
SRS_LIBRTMP=NO
SRS_RESEARCH=NO
SRS_UTEST=NO
@ -399,7 +393,6 @@ function apply_user_presets() {
SRS_HDS=YES
SRS_NGINX=YES
SRS_FFMPEG_TOOL=YES
SRS_KAFKA=YES
SRS_LIBRTMP=YES
SRS_RESEARCH=YES
SRS_UTEST=YES
@ -411,7 +404,6 @@ function apply_user_presets() {
SRS_HDS=NO
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_KAFKA=NO
SRS_LIBRTMP=NO
SRS_RESEARCH=NO
SRS_UTEST=NO
@ -423,7 +415,6 @@ function apply_user_presets() {
SRS_HDS=NO
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_KAFKA=NO
SRS_LIBRTMP=NO
SRS_RESEARCH=NO
SRS_UTEST=NO
@ -435,7 +426,6 @@ function apply_user_presets() {
SRS_HDS=YES
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_KAFKA=YES
SRS_LIBRTMP=YES
SRS_RESEARCH=NO
SRS_UTEST=NO
@ -448,7 +438,6 @@ function apply_user_presets() {
SRS_HDS=YES
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_KAFKA=YES
SRS_LIBRTMP=YES
SRS_RESEARCH=NO
SRS_UTEST=NO
@ -460,7 +449,6 @@ function apply_user_presets() {
SRS_HDS=YES
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_KAFKA=YES
SRS_LIBRTMP=YES
SRS_RESEARCH=NO
SRS_UTEST=YES
@ -472,7 +460,6 @@ function apply_user_presets() {
SRS_HDS=YES
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_KAFKA=YES
SRS_LIBRTMP=YES
SRS_RESEARCH=NO
SRS_UTEST=YES
@ -487,7 +474,6 @@ function apply_user_presets() {
SRS_HDS=YES
SRS_NGINX=NO
SRS_FFMPEG_TOOL=YES
SRS_KAFKA=YES
SRS_LIBRTMP=YES
SRS_RESEARCH=YES
SRS_UTEST=YES
@ -499,7 +485,6 @@ function apply_user_presets() {
SRS_HDS=YES
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_KAFKA=YES
SRS_LIBRTMP=NO
SRS_RESEARCH=NO
SRS_UTEST=NO
@ -511,7 +496,6 @@ function apply_user_presets() {
SRS_HDS=YES
SRS_NGINX=NO
SRS_FFMPEG_TOOL=YES
SRS_KAFKA=YES
SRS_LIBRTMP=YES
SRS_RESEARCH=NO
SRS_UTEST=YES
@ -523,7 +507,6 @@ function apply_user_presets() {
SRS_HDS=YES
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_KAFKA=YES
SRS_LIBRTMP=YES
SRS_RESEARCH=NO
SRS_UTEST=NO
@ -535,7 +518,6 @@ function apply_user_presets() {
SRS_HDS=YES
SRS_NGINX=NO
SRS_FFMPEG_TOOL=YES
SRS_KAFKA=YES
SRS_LIBRTMP=YES
SRS_RESEARCH=NO
SRS_UTEST=NO
@ -595,7 +577,6 @@ function apply_user_detail_options() {
SRS_INGEST=NO
SRS_STAT=NO
SRS_STREAM_CASTER=NO
SRS_KAFKA=NO
SRS_LIBRTMP=YES
SRS_RESEARCH=YES
SRS_UTEST=NO
@ -627,7 +608,6 @@ SRS_AUTO_CONFIGURE="--prefix=${SRS_PREFIX}"
if [ $SRS_HTTP_CALLBACK = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-callback"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-callback"; fi
if [ $SRS_HTTP_SERVER = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-server"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-server"; fi
if [ $SRS_STREAM_CASTER = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-stream-caster"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-stream-caster"; fi
if [ $SRS_KAFKA = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-kafka"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-kafka"; fi
if [ $SRS_HTTP_API = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-api"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-api"; fi
if [ $SRS_LIBRTMP = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-librtmp"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-librtmp"; fi
if [ $SRS_RESEARCH = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-research"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-research"; fi
@ -713,7 +693,6 @@ function check_option_conflicts() {
if [ $SRS_SSL = RESERVED ]; then echo "you must specifies the ssl, see: ./configure --help"; __check_ok=NO; fi
if [ $SRS_FFMPEG_TOOL = RESERVED ]; then echo "you must specifies the ffmpeg, see: ./configure --help"; __check_ok=NO; fi
if [ $SRS_STREAM_CASTER = RESERVED ]; then echo "you must specifies the stream-caster, see: ./configure --help"; __check_ok=NO; fi
if [ $SRS_KAFKA = RESERVED ]; then echo "you must specifies the kafka, see: ./configure --help"; __check_ok=NO; fi
if [ $SRS_LIBRTMP = RESERVED ]; then echo "you must specifies the librtmp, see: ./configure --help"; __check_ok=NO; fi
if [ $SRS_RESEARCH = RESERVED ]; then echo "you must specifies the research, see: ./configure --help"; __check_ok=NO; fi
if [ $SRS_UTEST = RESERVED ]; then echo "you must specifies the utest, see: ./configure --help"; __check_ok=NO; fi

9
trunk/configure vendored

@ -195,7 +195,7 @@ ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSSLRoot})
MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_rtmp_stack"
"srs_rtmp_handshake" "srs_protocol_utility" "srs_rtmp_msg_array" "srs_protocol_stream"
"srs_raw_avc" "srs_rtsp_stack" "srs_http_stack" "srs_protocol_kbps" "srs_protocol_json"
"srs_kafka_stack" "srs_protocol_format")
"srs_protocol_format")
PROTOCOL_INCS="src/protocol"; MODULE_DIR=${PROTOCOL_INCS} . auto/modules.sh
PROTOCOL_OBJS="${MODULE_OBJS[@]}"
#
@ -225,7 +225,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
"srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static"
"srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds"
"srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call"
"srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec" "srs_app_kafka"
"srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec"
"srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr"
"srs_app_coworkers")
DEFINES=""
@ -577,11 +577,6 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
else
echo -e "${GREEN}Note: StreamCaster is disabled.${BLACK}"
fi
if [ $SRS_KAFKA = YES ]; then
echo -e "${GREEN}Kafka is enabled.${BLACK}"
else
echo -e "${YELLOW}Warning: Kafka is disabled.${BLACK}"
fi
if [ $SRS_HDS = YES ]; then
echo -e "${YELLOW}Experiment: HDS is enabled.${BLACK}"
else

@ -1555,7 +1555,6 @@ srs_error_t SrsConfig::reload_conf(SrsConfig* conf)
}
// TODO: FIXME: support reload stream_caster.
// TODO: FIXME: support reload kafka.
// merge config: vhost
if ((err = reload_vhost(old_root)) != srs_success) {
@ -2139,19 +2138,6 @@ srs_error_t SrsConfig::global_to_json(SrsJsonObject* obj)
}
}
obj->set(dir->name, sobj);
} else if (dir->name == "kafka") {
SrsJsonObject* sobj = SrsJsonAny::object();
for (int j = 0; j < (int)dir->directives.size(); j++) {
SrsConfDirective* sdir = dir->directives.at(j);
if (sdir->name == "enabled") {
sobj->set(sdir->name, sdir->dumps_arg0_to_boolean());
} else if (sdir->name == "brokers") {
sobj->set(sdir->name, sdir->dumps_args());
} else if (sdir->name == "topic") {
sobj->set(sdir->name, sdir->dumps_arg0_to_str());
}
}
obj->set(dir->name, sobj);
} else if (dir->name == "stream_caster") {
SrsJsonObject* sobj = SrsJsonAny::object();
for (int j = 0; j < (int)dir->directives.size(); j++) {
@ -3511,7 +3497,7 @@ srs_error_t SrsConfig::check_normal_config()
&& n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file"
&& n != "max_connections" && n != "daemon" && n != "heartbeat"
&& n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms"
&& n != "http_server" && n != "stream_caster" && n != "kafka"
&& n != "http_server" && n != "stream_caster"
&& n != "utc_time" && n != "work_dir" && n != "asprocess"
) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str());
@ -3545,15 +3531,6 @@ srs_error_t SrsConfig::check_normal_config()
}
}
}
if (true) {
SrsConfDirective* conf = root->get("kafka");
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
string n = conf->at(i)->name;
if (n != "enabled" && n != "brokers" && n != "topic") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal kafka.%s", n.c_str());
}
}
}
if (true) {
SrsConfDirective* conf = get_heartbeart();
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
@ -4208,55 +4185,6 @@ int SrsConfig::get_stream_caster_rtp_port_max(SrsConfDirective* conf)
return ::atoi(conf->arg0().c_str());
}
bool SrsConfig::get_kafka_enabled()
{
static bool DEFAULT = false;
SrsConfDirective* conf = root->get("kafka");
if (!conf) {
return DEFAULT;
}
conf = conf->get("enabled");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
SrsConfDirective* SrsConfig::get_kafka_brokers()
{
SrsConfDirective* conf = root->get("kafka");
if (!conf) {
return NULL;
}
conf = conf->get("brokers");
if (!conf || conf->args.empty()) {
return NULL;
}
return conf;
}
string SrsConfig::get_kafka_topic()
{
static string DEFAULT = "srs";
SrsConfDirective* conf = root->get("kafka");
if (!conf) {
return DEFAULT;
}
conf = conf->get("topic");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}
return conf->arg0();
}
SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost)
{
srs_assert(root);

@ -455,14 +455,6 @@ public:
virtual int get_stream_caster_rtp_port_min(SrsConfDirective* conf);
// Get the max udp port for rtp of stream caster rtsp.
virtual int get_stream_caster_rtp_port_max(SrsConfDirective* conf);
// kafka section.
public:
// Whether the kafka enabled.
virtual bool get_kafka_enabled();
// Get the broker list, each is format in <ip:port>.
virtual SrsConfDirective* get_kafka_brokers();
// Get the kafka topic to use for srs.
virtual std::string get_kafka_topic();
// vhost specified section
public:
// Get the vhost directive by vhost name.

@ -1,659 +0,0 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2019 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_app_kafka.hpp>
#include <vector>
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_app_config.hpp>
#include <srs_app_async_call.hpp>
#include <srs_app_utility.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_kernel_balance.hpp>
#include <srs_kafka_stack.hpp>
#include <srs_core_autofree.hpp>
#include <srs_protocol_json.hpp>
#ifdef SRS_AUTO_KAFKA
#define SRS_KAFKA_PRODUCER_TIMEOUT (30 * SRS_UTIME_MILLISECONDS)
#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 1
std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata)
{
vector<string> bs;
for (int i = 0; i < metadata->brokers.size(); i++) {
SrsKafkaBroker* broker = metadata->brokers.at(i);
string hostport = srs_int2str(broker->node_id) + "/" + broker->host.to_str();
if (broker->port > 0) {
hostport += ":" + srs_int2str(broker->port);
}
bs.push_back(hostport);
}
vector<string> ps;
for (int i = 0; i < metadata->metadatas.size(); i++) {
SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i);
for (int j = 0; j < topic->metadatas.size(); j++) {
string desc = "topic=" + topic->name.to_str();
SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j);
desc += "?partition=" + srs_int2str(partition->partition_id);
desc += "&leader=" + srs_int2str(partition->leader);
vector<string> replicas = srs_kafka_array2vector(&partition->replicas);
desc += "&replicas=" + srs_join_vector_string(replicas, ",");
ps.push_back(desc);
}
}
std::stringstream ss;
ss << "brokers=" << srs_join_vector_string(bs, ",");
ss << ", " << srs_join_vector_string(ps, ", ");
return ss.str();
}
std::string srs_kafka_summary_partitions(const vector<SrsKafkaPartition*>& partitions)
{
vector<string> ret;
vector<SrsKafkaPartition*>::const_iterator it;
for (it = partitions.begin(); it != partitions.end(); ++it) {
SrsKafkaPartition* partition = *it;
string desc = "tcp://";
desc += partition->host + ":" + srs_int2str(partition->port);
desc += "?broker=" + srs_int2str(partition->broker);
desc += "&partition=" + srs_int2str(partition->id);
ret.push_back(desc);
}
return srs_join_vector_string(ret, ", ");
}
void srs_kafka_metadata2connector(string topic_name, SrsKafkaTopicMetadataResponse* metadata, vector<SrsKafkaPartition*>& partitions)
{
for (int i = 0; i < metadata->metadatas.size(); i++) {
SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i);
for (int j = 0; j < topic->metadatas.size(); j++) {
SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j);
SrsKafkaPartition* p = new SrsKafkaPartition();
p->topic = topic_name;
p->id = partition->partition_id;
p->broker = partition->leader;
for (int i = 0; i < metadata->brokers.size(); i++) {
SrsKafkaBroker* broker = metadata->brokers.at(i);
if (broker->node_id == p->broker) {
p->host = broker->host.to_str();
p->port = broker->port;
break;
}
}
partitions.push_back(p);
}
}
}
SrsKafkaPartition::SrsKafkaPartition()
{
id = broker = 0;
port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
transport = NULL;
kafka = NULL;
}
SrsKafkaPartition::~SrsKafkaPartition()
{
disconnect();
}
string SrsKafkaPartition::hostport()
{
if (ep.empty()) {
ep = host + ":" + srs_int2str(port);
}
return ep;
}
srs_error_t SrsKafkaPartition::connect()
{
srs_error_t err = srs_success;
if (transport) {
return err;
}
transport = new SrsTcpClient(host, port, SRS_KAFKA_PRODUCER_TIMEOUT);
kafka = new SrsKafkaClient(transport);
if ((err = transport->connect()) != srs_success) {
disconnect();
return srs_error_wrap(err, "connect to %s partition=%d failed", hostport().c_str(), id);
}
srs_trace("connect at %s, partition=%d, broker=%d", hostport().c_str(), id, broker);
return err;
}
srs_error_t SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc)
{
return kafka->write_messages(topic, id, *pc);
}
void SrsKafkaPartition::disconnect()
{
srs_freep(kafka);
srs_freep(transport);
}
SrsKafkaMessage::SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j)
{
producer = p;
key = k;
obj = j;
}
SrsKafkaMessage::~SrsKafkaMessage()
{
srs_freep(obj);
}
srs_error_t SrsKafkaMessage::call()
{
srs_error_t err = producer->send(key, obj);
// the obj is manged by producer now.
obj = NULL;
return srs_error_wrap(err, "kafka send");
}
string SrsKafkaMessage::to_string()
{
return "kafka";
}
SrsKafkaCache::SrsKafkaCache()
{
count = 0;
nb_partitions = 0;
}
SrsKafkaCache::~SrsKafkaCache()
{
map<int32_t, SrsKafkaPartitionCache*>::iterator it;
for (it = cache.begin(); it != cache.end(); ++it) {
SrsKafkaPartitionCache* pc = it->second;
for (vector<SrsJsonObject*>::iterator it2 = pc->begin(); it2 != pc->end(); ++it2) {
SrsJsonObject* obj = *it2;
srs_freep(obj);
}
pc->clear();
srs_freep(pc);
}
cache.clear();
}
void SrsKafkaCache::append(int key, SrsJsonObject* obj)
{
count++;
int partition = 0;
if (nb_partitions > 0) {
partition = key % nb_partitions;
}
SrsKafkaPartitionCache* pc = NULL;
map<int32_t, SrsKafkaPartitionCache*>::iterator it = cache.find(partition);
if (it == cache.end()) {
pc = new SrsKafkaPartitionCache();
cache[partition] = pc;
} else {
pc = it->second;
}
pc->push_back(obj);
}
int SrsKafkaCache::size()
{
return count;
}
bool SrsKafkaCache::fetch(int* pkey, SrsKafkaPartitionCache** ppc)
{
map<int32_t, SrsKafkaPartitionCache*>::iterator it;
for (it = cache.begin(); it != cache.end(); ++it) {
int32_t key = it->first;
SrsKafkaPartitionCache* pc = it->second;
if (!pc->empty()) {
*pkey = (int)key;
*ppc = pc;
return true;
}
}
return false;
}
srs_error_t SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc)
{
srs_error_t err = srs_success;
// ensure the key exists.
srs_assert (cache.find(key) != cache.end());
// the cache is vector, which is continous store.
// we remember the messages we have written and clear it when completed.
int nb_msgs = (int)pc->size();
if (pc->empty()) {
return err;
}
// connect transport.
if ((err = partition->connect()) != srs_success) {
return srs_error_wrap(err, "connect partition");
}
// write the json objects.
if ((err = partition->flush(pc)) != srs_success) {
return srs_error_wrap(err, "flush partition");
}
// free all wrote messages.
for (vector<SrsJsonObject*>::iterator it = pc->begin(); it != pc->end(); ++it) {
SrsJsonObject* obj = *it;
srs_freep(obj);
}
// remove the messages from cache.
if ((int)pc->size() == nb_msgs) {
pc->clear();
} else {
pc->erase(pc->begin(), pc->begin() + nb_msgs);
}
return err;
}
ISrsKafkaCluster::ISrsKafkaCluster()
{
}
ISrsKafkaCluster::~ISrsKafkaCluster()
{
}
// @global kafka event producer, user must use srs_initialize_kafka to initialize it.
ISrsKafkaCluster* _srs_kafka = NULL;
srs_error_t srs_initialize_kafka()
{
srs_error_t err = srs_success;
SrsKafkaProducer* kafka = new SrsKafkaProducer();
_srs_kafka = kafka;
if ((err = kafka->initialize()) != srs_success) {
return srs_error_wrap(err, "initialize kafka producer");
}
if ((err = kafka->start()) != srs_success) {
return srs_error_wrap(err, "start kafka producer");
}
return err;
}
void srs_dispose_kafka()
{
SrsKafkaProducer* kafka = dynamic_cast<SrsKafkaProducer*>(_srs_kafka);
if (!kafka) {
return;
}
kafka->stop();
srs_freep(kafka);
_srs_kafka = NULL;
}
SrsKafkaProducer::SrsKafkaProducer()
{
metadata_ok = false;
metadata_expired = srs_cond_new();
lock = srs_mutex_new();
trd = new SrsDummyCoroutine();
worker = new SrsAsyncCallWorker();
cache = new SrsKafkaCache();
lb = new SrsLbRoundRobin();
}
SrsKafkaProducer::~SrsKafkaProducer()
{
clear_metadata();
srs_freep(lb);
srs_freep(worker);
srs_freep(trd);
srs_freep(cache);
srs_mutex_destroy(lock);
srs_cond_destroy(metadata_expired);
}
srs_error_t SrsKafkaProducer::initialize()
{
enabled = _srs_config->get_kafka_enabled();
srs_info("initialize kafka ok, enabled=%d.", enabled);
return srs_success;
}
srs_error_t SrsKafkaProducer::start()
{
srs_error_t err = srs_success;
if (!enabled) {
return err;
}
if ((err = worker->start()) != srs_success) {
return srs_error_wrap(err, "async worker");
}
srs_freep(trd);
trd = new SrsSTCoroutine("kafka", this, _srs_context->get_id());
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
refresh_metadata();
return err;
}
void SrsKafkaProducer::stop()
{
if (!enabled) {
return;
}
trd->stop();
worker->stop();
}
srs_error_t SrsKafkaProducer::send(int key, SrsJsonObject* obj)
{
srs_error_t err = srs_success;
// cache the json object.
cache->append(key, obj);
// too few messages, ignore.
if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) {
return err;
}
// too many messages, warn user.
if (cache->size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) {
srs_warn("kafka cache too many messages: %d", cache->size());
}
// sync with backgound metadata worker.
SrsLocker(lock);
// flush message when metadata is ok.
if (metadata_ok) {
err = flush();
}
return err;
}
srs_error_t SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip)
{
srs_error_t err = srs_success;
if (!enabled) {
return err;
}
SrsJsonObject* obj = SrsJsonAny::object();
obj->set("msg", SrsJsonAny::str("accept"));
obj->set("type", SrsJsonAny::integer(type));
obj->set("ip", SrsJsonAny::str(ip.c_str()));
return worker->execute(new SrsKafkaMessage(this, key, obj));
}
srs_error_t SrsKafkaProducer::on_close(int key)
{
srs_error_t err = srs_success;
if (!enabled) {
return err;
}
SrsJsonObject* obj = SrsJsonAny::object();
obj->set("msg", SrsJsonAny::str("close"));
return worker->execute(new SrsKafkaMessage(this, key, obj));
}
#define SRS_KAKFA_CIMS (3 * SRS_UTIME_SECONDS)
srs_error_t SrsKafkaProducer::cycle()
{
srs_error_t err = srs_success;
// wait for the metadata expired.
// when metadata is ok, wait for it expired.
if (metadata_ok) {
srs_cond_wait(metadata_expired);
}
// request to lock to acquire the socket.
SrsLocker(lock);
while (true) {
if ((err = do_cycle()) != srs_success) {
srs_warn("KafkaProducer: Ignore error, %s", srs_error_desc(err).c_str());
srs_freep(err);
}
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "kafka cycle");
}
srs_usleep(SRS_KAKFA_CIMS);
}
return err;
}
void SrsKafkaProducer::clear_metadata()
{
vector<SrsKafkaPartition*>::iterator it;
for (it = partitions.begin(); it != partitions.end(); ++it) {
SrsKafkaPartition* partition = *it;
srs_freep(partition);
}
partitions.clear();
}
srs_error_t SrsKafkaProducer::do_cycle()
{
srs_error_t err = srs_success;
// ignore when disabled.
if (!enabled) {
return err;
}
// when kafka enabled, request metadata when startup.
if ((err = request_metadata()) != srs_success) {
return srs_error_wrap(err, "request metadata");
}
return err;
}
srs_error_t SrsKafkaProducer::request_metadata()
{
srs_error_t err = srs_success;
// ignore when disabled.
if (!enabled) {
return err;
}
// select one broker to connect to.
SrsConfDirective* brokers = _srs_config->get_kafka_brokers();
if (!brokers) {
srs_warn("ignore for empty brokers.");
return err;
}
std::string server;
int port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
if (true) {
srs_assert(!brokers->args.empty());
std::string broker = lb->select(brokers->args);
srs_parse_endpoint(broker, server, port);
}
std::string topic = _srs_config->get_kafka_topic();
if (true) {
std::string senabled = srs_bool2switch(enabled);
std::string sbrokers = srs_join_vector_string(brokers->args, ",");
srs_trace("kafka request enabled:%s, brokers:%s, current:[%d]%s:%d, topic:%s",
senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());
}
SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_CONSTS_KAFKA_TIMEOUT);
SrsAutoFree(SrsTcpClient, transport);
SrsKafkaClient* kafka = new SrsKafkaClient(transport);
SrsAutoFree(SrsKafkaClient, kafka);
// reconnect to kafka server.
if ((err = transport->connect()) != srs_success) {
return srs_error_wrap(err, "connect %s:%d failed", server.c_str(), port);
}
// do fetch medata from broker.
SrsKafkaTopicMetadataResponse* metadata = NULL;
if ((err = kafka->fetch_metadata(topic, &metadata)) != srs_success) {
return srs_error_wrap(err, "fetch metadata");
}
SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata);
// we may need to request multiple times.
// for example, the first time to create a none-exists topic, then query metadata.
if (!metadata->metadatas.empty()) {
SrsKafkaTopicMetadata* topic = metadata->metadatas.at(0);
if (topic->metadatas.empty()) {
srs_warn("topic %s metadata empty, retry.", topic->name.to_str().c_str());
return err;
}
}
// show kafka metadata.
string summary = srs_kafka_metadata_summary(metadata);
srs_trace("kafka metadata: %s", summary.c_str());
// generate the partition info.
srs_kafka_metadata2connector(topic, metadata, partitions);
srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str());
// update the total partition for cache.
cache->nb_partitions = (int)partitions.size();
metadata_ok = true;
return err;
}
void SrsKafkaProducer::refresh_metadata()
{
clear_metadata();
metadata_ok = false;
srs_cond_signal(metadata_expired);
srs_trace("kafka async refresh metadata in background");
}
srs_error_t SrsKafkaProducer::flush()
{
srs_error_t err = srs_success;
// flush all available partition caches.
while (true) {
int key = -1;
SrsKafkaPartitionCache* pc = NULL;
// all flushed, or no kafka partition to write to.
if (!cache->fetch(&key, &pc) || partitions.empty()) {
break;
}
// flush specified partition.
srs_assert(key >= 0 && pc);
SrsKafkaPartition* partition = partitions.at(key % partitions.size());
if ((err = cache->flush(partition, key, pc)) != srs_success) {
return srs_error_wrap(err, "flush partition");
}
}
return err;
}
#endif

@ -1,190 +0,0 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2019 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_APP_KAFKA_HPP
#define SRS_APP_KAFKA_HPP
#include <srs_core.hpp>
#include <map>
#include <vector>
class SrsLbRoundRobin;
class SrsAsyncCallWorker;
class SrsTcpClient;
class SrsKafkaClient;
class SrsJsonObject;
class SrsKafkaProducer;
#include <srs_app_thread.hpp>
#include <srs_app_server.hpp>
#include <srs_app_async_call.hpp>
#ifdef SRS_AUTO_KAFKA
// The partition messages cache.
typedef std::vector<SrsJsonObject*> SrsKafkaPartitionCache;
// The kafka partition info.
struct SrsKafkaPartition
{
private:
std::string ep;
// Not NULL when connected.
SrsTcpClient* transport;
SrsKafkaClient* kafka;
public:
int id;
std::string topic;
// leader.
int broker;
std::string host;
int port;
public:
SrsKafkaPartition();
virtual ~SrsKafkaPartition();
public:
virtual std::string hostport();
virtual srs_error_t connect();
virtual srs_error_t flush(SrsKafkaPartitionCache* pc);
private:
virtual void disconnect();
};
// The following is all types of kafka messages.
class SrsKafkaMessage : public ISrsAsyncCallTask
{
private:
SrsKafkaProducer* producer;
int key;
SrsJsonObject* obj;
public:
SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j);
virtual ~SrsKafkaMessage();
// Interface ISrsAsyncCallTask
public:
virtual srs_error_t call();
virtual std::string to_string();
};
// A message cache for kafka.
class SrsKafkaCache
{
public:
// The total partitions,
// for the key to map to the parition by key%nb_partitions.
int nb_partitions;
private:
// Total messages for all partitions.
int count;
// The key is the partition id, value is the message set to write to this partition.
// @remark, when refresh metadata, the partition will increase,
// so maybe some message will dispatch to new partition.
std::map< int32_t, SrsKafkaPartitionCache*> cache;
public:
SrsKafkaCache();
virtual ~SrsKafkaCache();
public:
virtual void append(int key, SrsJsonObject* obj);
virtual int size();
// Fetch out a available partition cache.
// @return true when got a key and pc; otherwise, false.
virtual bool fetch(int* pkey, SrsKafkaPartitionCache** ppc);
// Flush the specified partition cache.
virtual srs_error_t flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc);
};
// The kafka cluster interface.
class ISrsKafkaCluster
{
public:
ISrsKafkaCluster();
virtual ~ISrsKafkaCluster();
public:
// When got any client connect to SRS, notify kafka.
// @param key the partition map key, the client id or hash(ip).
// @param type the type of client.
// @param ip the peer ip of client.
virtual srs_error_t on_client(int key, SrsListenerType type, std::string ip) = 0;
// When client close or disconnect for error.
// @param key the partition map key, the client id or hash(ip).
virtual srs_error_t on_close(int key) = 0;
};
// @global kafka event producer.
extern ISrsKafkaCluster* _srs_kafka;
// kafka initialize and disposer for global object.
extern srs_error_t srs_initialize_kafka();
extern void srs_dispose_kafka();
// The kafka producer used to save log to kafka cluster.
class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISrsKafkaCluster
{
private:
// TODO: FIXME: support reload.
bool enabled;
srs_mutex_t lock;
SrsCoroutine* trd;
private:
bool metadata_ok;
srs_cond_t metadata_expired;
public:
std::vector<SrsKafkaPartition*> partitions;
SrsKafkaCache* cache;
private:
SrsLbRoundRobin* lb;
SrsAsyncCallWorker* worker;
public:
SrsKafkaProducer();
virtual ~SrsKafkaProducer();
public:
virtual srs_error_t initialize();
virtual srs_error_t start();
virtual void stop();
// internal: for worker to call task to send object.
public:
// Send json object to kafka cluster.
// The producer will aggregate message and send in kafka message set.
// @param key the key to map to the partition, user can use cid or hash.
// @param obj the json object; user must never free it again.
virtual srs_error_t send(int key, SrsJsonObject* obj);
// Interface ISrsKafkaCluster
public:
virtual srs_error_t on_client(int key, SrsListenerType type, std::string ip);
virtual srs_error_t on_close(int key);
// Interface ISrsReusableThreadHandler
public:
virtual srs_error_t cycle();
private:
virtual void clear_metadata();
virtual srs_error_t do_cycle();
virtual srs_error_t request_metadata();
// Set the metadata to invalid and refresh it.
virtual void refresh_metadata();
virtual srs_error_t flush();
};
#endif
#endif

@ -55,7 +55,6 @@ using namespace std;
#include <srs_app_statistic.hpp>
#include <srs_protocol_utility.hpp>
#include <srs_protocol_json.hpp>
#include <srs_app_kafka.hpp>
// the timeout in srs_utime_t to wait encoder to republish
// if timeout, close the connection.
@ -154,13 +153,6 @@ srs_error_t SrsRtmpConn::do_cycle()
srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd));
// notify kafka cluster.
#ifdef SRS_AUTO_KAFKA
if ((err = _srs_kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != srs_success) {
return srs_error_wrap(err, "kafka on client");
}
#endif
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
@ -1194,12 +1186,6 @@ srs_error_t SrsRtmpConn::on_disconnect()
http_hooks_on_close();
#ifdef SRS_AUTO_KAFKA
if ((err = _srs_kafka->on_close(srs_id())) != srs_success) {
return srs_error_wrap(err, "kafka on close");
}
#endif
// TODO: FIXME: Implements it.
return err;

@ -54,9 +54,6 @@ class SrsSecurity;
class ISrsWakable;
class SrsCommonMessage;
class SrsPacket;
#ifdef SRS_AUTO_KAFKA
class ISrsKafkaCluster;
#endif
// The simple rtmp client for SRS.
class SrsSimpleRtmpClient : public SrsBasicRtmpClient

@ -49,7 +49,6 @@ using namespace std;
#include <srs_app_caster_flv.hpp>
#include <srs_core_mem_watch.hpp>
#include <srs_kernel_consts.hpp>
#include <srs_app_kafka.hpp>
#include <srs_app_thread.hpp>
#include <srs_app_coworkers.hpp>
@ -523,10 +522,6 @@ void SrsServer::dispose()
// @remark don't dispose ingesters, for too slow.
#ifdef SRS_AUTO_KAFKA
srs_dispose_kafka();
#endif
// dispose the source for hls and dvr.
SrsSource::dispose_all();
@ -590,13 +585,6 @@ srs_error_t SrsServer::initialize_st()
// set current log id.
_srs_context->generate_id();
// initialize the conponents that depends on st.
#ifdef SRS_AUTO_KAFKA
if ((err = srs_initialize_kafka()) != srs_success) {
return srs_error_wrap(err, "initialize kafka");
}
#endif
// check asprocess.
bool asprocess = _srs_config->get_asprocess();
if (asprocess && ppid == 1) {

@ -50,9 +50,6 @@ class ISrsUdpHandler;
class SrsUdpListener;
class SrsTcpListener;
class SrsAppCasterFlv;
#ifdef SRS_AUTO_KAFKA
class SrsKafkaProducer;
#endif
class SrsCoroutineManager;
// The listener type for server to identify the connection,

@ -27,7 +27,7 @@
// The version config.
#define VERSION_MAJOR 3
#define VERSION_MINOR 0
#define VERSION_REVISION 52
#define VERSION_REVISION 53
// The macros generated by configure script.
#include <srs_auto_headers.hpp>

@ -31,7 +31,7 @@
/**
* the round-robin load balance algorithm,
* used for edge pull, kafka and other multiple server feature.
* used for edge pull and other multiple server feature.
*/
class SrsLbRoundRobin
{

@ -390,13 +390,5 @@
#define SRS_CONSTS_RTSP_RTSPVersionNotSupported_str "RTSP Version Not Supported"
#define SRS_CONSTS_RTSP_OptionNotSupported_str "Option not support"
///////////////////////////////////////////////////////////
// KAFKA consts values
///////////////////////////////////////////////////////////
#define SRS_CONSTS_KAFKA_DEFAULT_PORT 9092
// The common io timeout, for both recv and send.
#define SRS_CONSTS_KAFKA_TIMEOUT (30 * SRS_UTIME_MILLISECONDS)
#endif

@ -284,7 +284,7 @@
#define ERROR_OCLUSTER_REDIRECT 3091
///////////////////////////////////////////////////////
// HTTP/StreamCaster/KAFKA protocol error.
// HTTP/StreamCaster protocol error.
///////////////////////////////////////////////////////
#define ERROR_HTTP_PATTERN_EMPTY 4000
#define ERROR_HTTP_PATTERN_DUPLICATED 4001
@ -316,14 +316,6 @@
#define ERROR_AVC_NALU_UEV 4027
#define ERROR_AAC_BYTES_INVALID 4028
#define ERROR_HTTP_REQUEST_EOF 4029
#define ERROR_KAFKA_CODEC_STRING 4030
#define ERROR_KAFKA_CODEC_BYTES 4031
#define ERROR_KAFKA_CODEC_REQUEST 4032
#define ERROR_KAFKA_CODEC_RESPONSE 4033
#define ERROR_KAFKA_CODEC_ARRAY 4034
#define ERROR_KAFKA_CODEC_METADATA 4035
#define ERROR_KAFKA_CODEC_MESSAGE 4036
#define ERROR_KAFKA_CODEC_PRODUCER 4037
#define ERROR_HTTP_302_INVALID 4038
#define ERROR_BASE64_DECODE 4039

@ -228,7 +228,6 @@ void show_macro_features()
ss << ", trans:" << srs_bool2switch(true);
// inge(ingest)
ss << ", inge:" << srs_bool2switch(true);
ss << ", kafka:" << srs_bool2switch(SRS_AUTO_KAFKA_BOOL);
ss << ", stat:" << srs_bool2switch(true);
ss << ", nginx:" << srs_bool2switch(SRS_AUTO_NGINX_BOOL);
// ff(ffmpeg)

File diff suppressed because it is too large Load Diff

@ -1,932 +0,0 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2013-2019 Winlin
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_PROTOCOL_KAFKA_HPP
#define SRS_PROTOCOL_KAFKA_HPP
#include <srs_core.hpp>
#include <vector>
#include <string>
#include <map>
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
class SrsFastStream;
class ISrsProtocolReadWriter;
class SrsJsonObject;
#ifdef SRS_AUTO_KAFKA
/**
* the api key used to identify the request type.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys
*/
enum SrsKafkaApiKey
{
SrsKafkaApiKeyUnknown = -1,
SrsKafkaApiKeyProduceRequest = 0,
SrsKafkaApiKeyFetchRequest = 1,
SrsKafkaApiKeyOffsetRequest = 2,
SrsKafkaApiKeyMetadataRequest = 3,
/* Non-user facing control APIs 4-7 */
SrsKafkaApiKeyOffsetCommitRequest = 8,
SrsKafkaApiKeyOffsetFetchRequest = 9,
SrsKafkaApiKeyConsumerMetadataRequest = 10,
};
/**
* These types consist of a signed integer giving a length N followed by N bytes of content.
* A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes
*/
class SrsKafkaString : public ISrsCodec
{
private:
int16_t _size;
char* data;
public:
SrsKafkaString();
SrsKafkaString(std::string v);
virtual ~SrsKafkaString();
public:
virtual bool null();
virtual bool empty();
virtual std::string to_str();
virtual void set_value(std::string v);
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* These types consist of a signed integer giving a length N followed by N bytes of content.
* A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes
*/
class SrsKafkaBytes : public ISrsCodec
{
private:
int32_t _size;
char* _data;
public:
SrsKafkaBytes();
SrsKafkaBytes(const char* v, int nb_v);
virtual ~SrsKafkaBytes();
public:
virtual char* data();
virtual int size();
virtual bool null();
virtual bool empty();
virtual void set_value(std::string v);
virtual void set_value(const char* v, int nb_v);
virtual uint32_t crc32(uint32_t previous);
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* This is a notation for handling repeated structures. These will always be encoded as an
* int32 size containing the length N followed by N repetitions of the structure which can
* itself be made up of other primitive types. In the BNF grammars below we will show an
* array of a structure foo as [foo].
*
* Usage:
* SrsKafkaArray<SrsKafkaBytes> body;
* body.append(new SrsKafkaBytes());
* @remark array elem is the T*, which must be ISrsCodec*
*
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
*/
template<typename T>
class SrsKafkaArray : public ISrsCodec
{
private:
int32_t length;
std::vector<T*> elems;
typedef typename std::vector<T*>::iterator SrsIterator;
public:
SrsKafkaArray()
{
length = 0;
}
virtual ~SrsKafkaArray()
{
for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
T* elem = *it;
srs_freep(elem);
}
elems.clear();
}
public:
virtual void append(T* elem)
{
length++;
elems.push_back(elem);
}
virtual int size()
{
return length;
}
virtual bool empty()
{
return elems.empty();
}
virtual T* at(int index)
{
return elems.at(index);
}
// Interface ISrsCodec
public:
virtual int nb_bytes()
{
int s = 4;
for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
T* elem = *it;
s += elem->nb_bytes();
}
return s;
}
virtual srs_error_t encode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
if (!buf->require(4)) {
return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left());
}
buf->write_4bytes(length);
for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
T* elem = *it;
if ((err = elem->encode(buf)) != srs_success) {
return srs_error_wrap(err, "encode elem");
}
}
return err;
}
virtual srs_error_t decode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
if (!buf->require(4)) {
return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left());
}
length = buf->read_4bytes();
for (int i = 0; i < length; i++) {
T* elem = new T();
if ((err = elem->decode(buf)) != srs_success) {
srs_freep(elem);
return srs_error_wrap(err, "decode elem");
}
elems.push_back(elem);
}
return err;
}
};
template<>
class SrsKafkaArray<int32_t> : public ISrsCodec
{
private:
int32_t length;
std::vector<int32_t> elems;
typedef std::vector<int32_t>::iterator SrsIterator;
public:
SrsKafkaArray()
{
length = 0;
}
virtual ~SrsKafkaArray()
{
elems.clear();
}
public:
virtual void append(int32_t elem)
{
length++;
elems.push_back(elem);
}
virtual int size()
{
return length;
}
virtual bool empty()
{
return elems.empty();
}
virtual int32_t at(int index)
{
return elems.at(index);
}
// Interface ISrsCodec
public:
virtual int nb_bytes()
{
return 4 + 4 * (int)elems.size();
}
virtual srs_error_t encode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
int nb_required = 4 + sizeof(int32_t) * (int)elems.size();
if (!buf->require(nb_required)) {
return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires %d only %d bytes", nb_required, buf->left());
}
buf->write_4bytes(length);
for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
int32_t elem = *it;
buf->write_4bytes(elem);
}
return err;
}
virtual srs_error_t decode(SrsBuffer* buf)
{
srs_error_t err = srs_success;
if (!buf->require(4)) {
return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left());
}
length = buf->read_4bytes();
for (int i = 0; i < length; i++) {
if (!buf->require(sizeof(int32_t))) {
return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires %d only %d bytes", sizeof(int32_t), buf->left());
}
int32_t elem = buf->read_4bytes();
elems.push_back(elem);
}
return err;
}
};
/**
* the header of request, includes the size of request.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
*/
class SrsKafkaRequestHeader : public ISrsCodec
{
private:
/**
* The MessageSize field gives the size of the subsequent request or response
* message in bytes. The client can read requests by first reading this 4 byte
* size as an integer N, and then reading and parsing the subsequent N bytes
* of the request.
*/
int32_t _size;
private:
/**
* This is a numeric id for the API being invoked (i.e. is it
* a metadata request, a produce request, a fetch request, etc).
* @remark MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
*/
int16_t _api_key;
/**
* This is a numeric version number for this api. We version each API and
* this version number allows the server to properly interpret the request
* as the protocol evolves. Responses will always be in the format corresponding
* to the request version. Currently the supported version for all APIs is 0.
*/
int16_t api_version;
/**
* This is a user-supplied integer. It will be passed back in
* the response by the server, unmodified. It is useful for matching
* request and response between the client and server.
*/
int32_t _correlation_id;
/**
* This is a user supplied identifier for the client application.
* The user can use any identifier they like and it will be used
* when logging errors, monitoring aggregates, etc. For example,
* one might want to monitor not just the requests per second overall,
* but the number coming from each client application (each of
* which could reside on multiple servers). This id acts as a
* logical grouping across all requests from a particular client.
*/
SrsKafkaString* client_id;
public:
SrsKafkaRequestHeader();
virtual ~SrsKafkaRequestHeader();
private:
/**
* the layout of request:
* +-----------+----------------------------------+
* | 4B _size | [_size] bytes |
* +-----------+------------+---------------------+
* | 4B _size | header | message |
* +-----------+------------+---------------------+
* | total size = 4 + header + message |
* +----------------------------------------------+
* where the header is specifies this request header without the start 4B size.
* @remark size = 4 + header + message.
*/
virtual int header_size();
/**
* the size of message, the bytes left after the header.
*/
virtual int message_size();
/**
* the total size of the request, includes the 4B size.
*/
virtual int total_size();
public:
/**
* when got the whole message size, update the header.
* @param s the whole message, including the 4 bytes size size.
*/
virtual void set_total_size(int s);
/**
* get the correlation id for message.
*/
virtual int32_t correlation_id();
/**
* set the correlation id for message.
*/
virtual void set_correlation_id(int32_t cid);
/**
* get the api key of header for message.
*/
virtual SrsKafkaApiKey api_key();
/**
* set the api key of header for message.
*/
virtual void set_api_key(SrsKafkaApiKey key);
public:
/**
* the api key enumeration.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys
*/
virtual bool is_producer_request();
virtual bool is_fetch_request();
virtual bool is_offset_request();
virtual bool is_metadata_request();
virtual bool is_offset_commit_request();
virtual bool is_offset_fetch_request();
virtual bool is_consumer_metadata_request();
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* the header of response, include the size of response.
* The response will always match the paired request (e.g. we will
* send a MetadataResponse in return to a MetadataRequest).
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Responses
*/
class SrsKafkaResponseHeader : public ISrsCodec
{
private:
/**
* The MessageSize field gives the size of the subsequent request or response
* message in bytes. The client can read requests by first reading this 4 byte
* size as an integer N, and then reading and parsing the subsequent N bytes
* of the request.
*/
int32_t _size;
private:
/**
* This is a user-supplied integer. It will be passed back in
* the response by the server, unmodified. It is useful for matching
* request and response between the client and server.
*/
int32_t _correlation_id;
public:
SrsKafkaResponseHeader();
virtual ~SrsKafkaResponseHeader();
private:
/**
* the layout of response:
* +-----------+----------------------------------+
* | 4B _size | [_size] bytes |
* +-----------+------------+---------------------+
* | 4B _size | 4B header | message |
* +-----------+------------+---------------------+
* | total size = 4 + 4 + message |
* +----------------------------------------------+
* where the header is specifies this request header without the start 4B size.
* @remark size = 4 + 4 + message.
*/
virtual int header_size();
/**
* the size of message, the bytes left after the header.
*/
virtual int message_size();
public:
/**
* the total size of the request, includes the 4B size and message body.
*/
virtual int total_size();
public:
/**
* when got the whole message size, update the header.
* @param s the whole message, including the 4 bytes size size.
*/
virtual void set_total_size(int s);
/**
* get the correlation id of response message.
*/
virtual int32_t correlation_id();
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* the kafka message in message set.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
*/
struct SrsKafkaRawMessage : public ISrsCodec
{
// metadata.
public:
/**
* This is the offset used in kafka as the log sequence number. When the
* producer is sending messages it doesn't actually know the offset and
* can fill in any value here it likes.
*/
int64_t offset;
/**
* the size of this message.
*/
int32_t message_size;
// message.
public:
/**
* The CRC is the CRC32 of the remainder of the message bytes.
* This is used to check the integrity of the message on the broker and consumer.
*/
int32_t crc;
/**
* This is a version id used to allow backwards compatible evolution
* of the message binary format. The current value is 0.
*/
int8_t magic_byte;
/**
* This byte holds metadata attributes about the message.
* The lowest 2 bits contain the compression codec used
* for the message. The other bits should be set to 0.
*/
int8_t attributes;
/**
* The key is an optional message key that was used for
* partition assignment. The key can be null.
*/
SrsKafkaBytes* key;
/**
* The value is the actual message contents as an opaque byte array.
* Kafka supports recursive messages in which case this may itself
* contain a message set. The message can be null.
*/
SrsKafkaBytes* value;
public:
SrsKafkaRawMessage();
virtual ~SrsKafkaRawMessage();
public:
/**
* create message from json object.
*/
virtual srs_error_t create(SrsJsonObject* obj);
private:
/**
* get the raw message, bytes after the message_size.
*/
virtual int raw_message_size();
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* a set of kafka message.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
* @remark because the message set are not preceded by int32, so we decode the buffer util empty.
*/
class SrsKafkaRawMessageSet : public ISrsCodec
{
private:
std::vector<SrsKafkaRawMessage*> messages;
public:
SrsKafkaRawMessageSet();
virtual ~SrsKafkaRawMessageSet();
public:
virtual void append(SrsKafkaRawMessage* msg);
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* the kafka request message, for protocol to send.
*/
class SrsKafkaRequest : public ISrsCodec
{
protected:
SrsKafkaRequestHeader header;
public:
SrsKafkaRequest();
virtual ~SrsKafkaRequest();
public:
/**
* update the size in header.
* @param s an int value specifies the size of message in header.
*/
virtual void update_header(int s);
/**
* get the correlation id of header for message.
*/
virtual int32_t correlation_id();
/**
* get the api key of request.
*/
virtual SrsKafkaApiKey api_key();
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* the kafka response message, for protocol to recv.
*/
class SrsKafkaResponse : public ISrsCodec
{
protected:
SrsKafkaResponseHeader header;
public:
SrsKafkaResponse();
virtual ~SrsKafkaResponse();
public:
/**
* update the size in header.
* @param s an int value specifies the size of message in header.
*/
virtual void update_header(int s);
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* request the metadata from broker.
* This API answers the following questions:
* What topics exist?
* How many partitions does each topic have?
* Which broker is currently the leader for each partition?
* What is the host and port for each of these brokers?
* This is the only request that can be addressed to any broker in the cluster.
*
* Since there may be many topics the client can give an optional list of topic
* names in order to only return metadata for a subset of topics.
*
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
*/
class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest
{
private:
SrsKafkaArray<SrsKafkaString> topics;
public:
SrsKafkaTopicMetadataRequest();
virtual ~SrsKafkaTopicMetadataRequest();
public:
virtual void add_topic(std::string topic);
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* the metadata response data.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse
*/
struct SrsKafkaBroker : public ISrsCodec
{
public:
int32_t node_id;
SrsKafkaString host;
int32_t port;
public:
SrsKafkaBroker();
virtual ~SrsKafkaBroker();
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
struct SrsKafkaPartitionMetadata : public ISrsCodec
{
public:
int16_t error_code;
int32_t partition_id;
int32_t leader;
SrsKafkaArray<int32_t> replicas;
SrsKafkaArray<int32_t> isr;
public:
SrsKafkaPartitionMetadata();
virtual ~SrsKafkaPartitionMetadata();
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
struct SrsKafkaTopicMetadata : public ISrsCodec
{
public:
int16_t error_code;
SrsKafkaString name;
SrsKafkaArray<SrsKafkaPartitionMetadata> metadatas;
public:
SrsKafkaTopicMetadata();
virtual ~SrsKafkaTopicMetadata();
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* response for the metadata request from broker.
* The response contains metadata for each partition,
* with partitions grouped together by topic. This
* metadata refers to brokers by their broker id.
* The brokers each have a host and port.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse
*/
class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse
{
public:
SrsKafkaArray<SrsKafkaBroker> brokers;
SrsKafkaArray<SrsKafkaTopicMetadata> metadatas;
public:
SrsKafkaTopicMetadataResponse();
virtual ~SrsKafkaTopicMetadataResponse();
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* the messages for producer to send.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest
*/
struct SrsKafkaProducerPartitionMessages : public ISrsCodec
{
public:
/**
* The partition that data is being published to.
*/
int32_t partition;
/**
* The size, in bytes, of the message set that follows.
*/
int32_t message_set_size;
/**
* messages in set.
*/
SrsKafkaRawMessageSet messages;
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
struct SrsKafkaProducerTopicMessages : public ISrsCodec
{
public:
/**
* The topic that data is being published to.
*/
SrsKafkaString topic_name;
/**
* messages of partitions.
*/
SrsKafkaArray<SrsKafkaProducerPartitionMessages> partitions;
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* the request for producer to send message.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest
*/
class SrsKafkaProducerRequest : public SrsKafkaRequest
{
public:
/**
* This field indicates how many acknowledgements the servers should receive
* before responding to the request. If it is 0 the server will not send any
* response (this is the only case where the server will not reply to a request).
* If it is 1, the server will wait the data is written to the local log
* before sending a response. If it is -1 the server will block until the
* message is committed by all in sync replicas before sending a response.
* For any number > 1 the server will block waiting for this number of
* acknowledgements to occur (but the server will never wait for more
* acknowledgements than there are in-sync replicas).
*/
int16_t required_acks;
/**
* This provides a maximum time in milliseconds the server can await the receipt
* of the number of acknowledgements in RequiredAcks. The timeout is not an exact
* limit on the request time for a few reasons: (1) it does not include network
* latency, (2) the timer begins at the beginning of the processing of this request
* so if many requests are queued due to server overload that wait time will not
* be included, (3) we will not terminate a local write so if the local write
* time exceeds this timeout it will not be respected. To get a hard timeout of
* this type the client should use the socket timeout.
*/
int32_t timeout;
/**
* messages of topics.
*/
SrsKafkaArray<SrsKafkaProducerTopicMessages> topics;
public:
SrsKafkaProducerRequest();
virtual ~SrsKafkaProducerRequest();
// Interface ISrsCodec
public:
virtual int nb_bytes();
virtual srs_error_t encode(SrsBuffer* buf);
virtual srs_error_t decode(SrsBuffer* buf);
};
/**
* the poll to discovery reponse.
* @param CorrelationId This is a user-supplied integer. It will be passed back
* in the response by the server, unmodified. It is useful for matching
* request and response between the client and server.
* @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
*/
class SrsKafkaCorrelationPool
{
private:
static SrsKafkaCorrelationPool* _instance;
public:
static SrsKafkaCorrelationPool* instance();
private:
std::map<int32_t, SrsKafkaApiKey> correlation_ids;
private:
SrsKafkaCorrelationPool();
public:
virtual ~SrsKafkaCorrelationPool();
public:
/**
* generate a global correlation id.
*/
virtual int32_t generate_correlation_id();
/**
* set the correlation id to specified request key.
*/
virtual SrsKafkaApiKey set(int32_t correlation_id, SrsKafkaApiKey request);
/**
* unset the correlation id.
* @return the previous api key; unknown if not set.
*/
virtual SrsKafkaApiKey unset(int32_t correlation_id);
/**
* get the key by specified correlation id.
* @return the specified api key; unknown if no correlation id.
*/
virtual SrsKafkaApiKey get(int32_t correlation_id);
};
/**
* the kafka protocol stack, use to send and recv kakfa messages.
*/
class SrsKafkaProtocol
{
private:
ISrsProtocolReadWriter* skt;
SrsFastStream* reader;
public:
SrsKafkaProtocol(ISrsProtocolReadWriter* io);
virtual ~SrsKafkaProtocol();
public:
/**
* write the message to kafka server.
* @param msg the msg to send. user must not free it again.
*/
virtual srs_error_t send_and_free_message(SrsKafkaRequest* msg);
/**
* read the message from kafka server.
* @param pmsg output the received message. user must free it.
*/
virtual srs_error_t recv_message(SrsKafkaResponse** pmsg);
public:
/**
* expect specified message.
*/
template<typename T>
srs_error_t expect_message(T** pmsg)
{
srs_error_t err = srs_success;
while (true) {
SrsKafkaResponse* res = NULL;
if ((err = recv_message(&res)) != srs_success) {
return srs_error_wrap(err, "recv message");
}
// drop not matched.
T* msg = dynamic_cast<T*>(res);
if (!msg) {
srs_info("kafka drop response.");
srs_freep(res);
continue;
}
*pmsg = msg;
break;
}
return err;
}
};
/**
* the kafka client, for producer or consumer.
*/
class SrsKafkaClient
{
private:
SrsKafkaProtocol* protocol;
public:
SrsKafkaClient(ISrsProtocolReadWriter* io);
virtual ~SrsKafkaClient();
public:
/**
* fetch the metadata from broker for topic.
*/
virtual srs_error_t fetch_metadata(std::string topic, SrsKafkaTopicMetadataResponse** pmsg);
/**
* write the messages to partition of topic.
*/
virtual srs_error_t write_messages(std::string topic, int32_t partition, std::vector<SrsJsonObject*>& msgs);
};
// convert kafka array[string] to vector[string]
extern std::vector<std::string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr);
extern std::vector<std::string> srs_kafka_array2vector(SrsKafkaArray<int32_t>* arr);
#endif
#endif
Loading…
Cancel
Save