diff --git a/README.md b/README.md
index 03a801028..6871c2d78 100755
--- a/README.md
+++ b/README.md
@@ -166,6 +166,7 @@ Please select according to languages:
 
 ### V3 changes
 
+* v3.0, 2019-10-03, Remove KAFKA. 3.0.53
 * v3.0, 2019-05-14, Covert Kernel File reader/writer. 3.0.52
 * v3.0, 2019-04-30, Refine typo in files. 3.0.51
 * v3.0, 2019-04-25, Upgrade http-parser from 2.1 to 2.9.2 and cover it. 3.0.50
@@ -712,7 +713,6 @@ Comparing with other media servers, SRS is much better and stronger, for details
 |   Reload      |   Stable  |   X       |   X       |   X       |   X       |
 |   Forward     |   Stable  |   X       |   X       |   X       |   X       |
 |   ATC         |   Stable  |   X       |   X       |   X       |   X       |
-|   KAFKA       | Experiment|   X       |   X       |   X       |   X       |
 
 #### Stream Service
 
diff --git a/trunk/auto/auto_headers.sh b/trunk/auto/auto_headers.sh
index dcaba18eb..15efa6fe7 100755
--- a/trunk/auto/auto_headers.sh
+++ b/trunk/auto/auto_headers.sh
@@ -79,12 +79,6 @@ fi
 echo "" >> $SRS_AUTO_HEADERS_H
 
 # auto headers in depends.
-if [ $SRS_KAFKA = YES ]; then
-    srs_define_macro "SRS_AUTO_KAFKA" $SRS_AUTO_HEADERS_H
-else
-    srs_undefine_macro "SRS_AUTO_KAFKA" $SRS_AUTO_HEADERS_H
-fi
-
 if [ $SRS_NGINX = YES ]; then
     srs_define_macro "SRS_AUTO_NGINX" $SRS_AUTO_HEADERS_H
 else
diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh
index 6787d89cf..d938fb52c 100755
--- a/trunk/auto/options.sh
+++ b/trunk/auto/options.sh
@@ -18,7 +18,6 @@ help=no
 SRS_HDS=NO
 SRS_NGINX=NO
 SRS_FFMPEG_TOOL=NO
-SRS_KAFKA=NO
 SRS_LIBRTMP=NO
 SRS_RESEARCH=YES
 SRS_UTEST=YES
@@ -117,7 +116,6 @@ Options:
   --with-hds                enable hds streaming, mux RTMP to F4M/F4V files.
   --with-nginx              enable delivery HTTP stream with nginx.
   --with-stream-caster      enable stream caster to serve other stream over other protocol.
-  --with-kafka              enable srs kafka producer to report to kafka.
   --with-ffmpeg             enable transcoding tool ffmpeg.
   --with-transcode          enable transcoding features.
   --with-ingest             enable ingest features.
@@ -138,7 +136,6 @@ Options:
   --without-hds             disable hds, the adobe http dynamic streaming.
   --without-nginx           disable delivery HTTP stream with nginx.
   --without-stream-caster   disable stream caster, only listen and serve RTMP/HTTP.
-  --without-kafka           disable the srs kafka producer.
   --without-ffmpeg          disable the ffmpeg transcode tool feature.
   --without-transcode       disable the transcoding feature.
   --without-ingest          disable the ingest feature.
@@ -230,7 +227,6 @@ function parse_user_option() {
         --with-ingest)                  SRS_INGEST=YES              ;;
         --with-stat)                    SRS_STAT=YES                ;;
         --with-stream-caster)           SRS_STREAM_CASTER=YES       ;;
-        --with-kafka)                   SRS_KAFKA=YES               ;;
         --with-librtmp)                 SRS_LIBRTMP=YES             ;;
         --with-research)                SRS_RESEARCH=YES            ;;
         --with-utest)                   SRS_UTEST=YES               ;;
@@ -251,7 +247,6 @@ function parse_user_option() {
         --without-ingest)               SRS_INGEST=NO               ;;
         --without-stat)                 SRS_STAT=NO                 ;;
         --without-stream-caster)        SRS_STREAM_CASTER=NO        ;;
-        --without-kafka)                SRS_KAFKA=NO                ;;
         --without-librtmp)              SRS_LIBRTMP=NO              ;;
         --without-research)             SRS_RESEARCH=NO             ;;
         --without-utest)                SRS_UTEST=NO                ;;
@@ -387,7 +382,6 @@ function apply_user_presets() {
         SRS_HDS=NO
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=NO
-        SRS_KAFKA=NO
         SRS_LIBRTMP=NO
         SRS_RESEARCH=NO
         SRS_UTEST=NO
@@ -399,7 +393,6 @@ function apply_user_presets() {
         SRS_HDS=YES
         SRS_NGINX=YES
         SRS_FFMPEG_TOOL=YES
-        SRS_KAFKA=YES
         SRS_LIBRTMP=YES
         SRS_RESEARCH=YES
         SRS_UTEST=YES
@@ -411,7 +404,6 @@ function apply_user_presets() {
         SRS_HDS=NO
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=NO
-        SRS_KAFKA=NO
         SRS_LIBRTMP=NO
         SRS_RESEARCH=NO
         SRS_UTEST=NO
@@ -423,7 +415,6 @@ function apply_user_presets() {
         SRS_HDS=NO
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=NO
-        SRS_KAFKA=NO
         SRS_LIBRTMP=NO
         SRS_RESEARCH=NO
         SRS_UTEST=NO
@@ -435,7 +426,6 @@ function apply_user_presets() {
         SRS_HDS=YES
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=NO
-        SRS_KAFKA=YES
         SRS_LIBRTMP=YES
         SRS_RESEARCH=NO
         SRS_UTEST=NO
@@ -448,7 +438,6 @@ function apply_user_presets() {
         SRS_HDS=YES
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=NO
-        SRS_KAFKA=YES
         SRS_LIBRTMP=YES
         SRS_RESEARCH=NO
         SRS_UTEST=NO
@@ -460,7 +449,6 @@ function apply_user_presets() {
         SRS_HDS=YES
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=NO
-        SRS_KAFKA=YES
         SRS_LIBRTMP=YES
         SRS_RESEARCH=NO
         SRS_UTEST=YES
@@ -472,7 +460,6 @@ function apply_user_presets() {
         SRS_HDS=YES
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=NO
-        SRS_KAFKA=YES
         SRS_LIBRTMP=YES
         SRS_RESEARCH=NO
         SRS_UTEST=YES
@@ -487,7 +474,6 @@ function apply_user_presets() {
         SRS_HDS=YES
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=YES
-        SRS_KAFKA=YES
         SRS_LIBRTMP=YES
         SRS_RESEARCH=YES
         SRS_UTEST=YES
@@ -499,7 +485,6 @@ function apply_user_presets() {
         SRS_HDS=YES
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=NO
-        SRS_KAFKA=YES
         SRS_LIBRTMP=NO
         SRS_RESEARCH=NO
         SRS_UTEST=NO
@@ -511,7 +496,6 @@ function apply_user_presets() {
         SRS_HDS=YES
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=YES
-        SRS_KAFKA=YES
         SRS_LIBRTMP=YES
         SRS_RESEARCH=NO
         SRS_UTEST=YES
@@ -523,7 +507,6 @@ function apply_user_presets() {
         SRS_HDS=YES
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=NO
-        SRS_KAFKA=YES
         SRS_LIBRTMP=YES
         SRS_RESEARCH=NO
         SRS_UTEST=NO
@@ -535,7 +518,6 @@ function apply_user_presets() {
         SRS_HDS=YES
         SRS_NGINX=NO
         SRS_FFMPEG_TOOL=YES
-        SRS_KAFKA=YES
         SRS_LIBRTMP=YES
         SRS_RESEARCH=NO
         SRS_UTEST=NO
@@ -595,7 +577,6 @@ function apply_user_detail_options() {
         SRS_INGEST=NO
         SRS_STAT=NO
         SRS_STREAM_CASTER=NO
-        SRS_KAFKA=NO
         SRS_LIBRTMP=YES
         SRS_RESEARCH=YES
         SRS_UTEST=NO
@@ -627,7 +608,6 @@ SRS_AUTO_CONFIGURE="--prefix=${SRS_PREFIX}"
     if [ $SRS_HTTP_CALLBACK = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-callback"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-callback"; fi
     if [ $SRS_HTTP_SERVER = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-server"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-server"; fi
     if [ $SRS_STREAM_CASTER = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-stream-caster"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-stream-caster"; fi
-    if [ $SRS_KAFKA = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-kafka"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-kafka"; fi
     if [ $SRS_HTTP_API = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-api"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-api"; fi
     if [ $SRS_LIBRTMP = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-librtmp"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-librtmp"; fi
     if [ $SRS_RESEARCH = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-research"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-research"; fi
@@ -713,7 +693,6 @@ function check_option_conflicts() {
     if [ $SRS_SSL = RESERVED ]; then echo "you must specifies the ssl, see: ./configure --help"; __check_ok=NO; fi
     if [ $SRS_FFMPEG_TOOL = RESERVED ]; then echo "you must specifies the ffmpeg, see: ./configure --help"; __check_ok=NO; fi
     if [ $SRS_STREAM_CASTER = RESERVED ]; then echo "you must specifies the stream-caster, see: ./configure --help"; __check_ok=NO; fi
-    if [ $SRS_KAFKA = RESERVED ]; then echo "you must specifies the kafka, see: ./configure --help"; __check_ok=NO; fi
     if [ $SRS_LIBRTMP = RESERVED ]; then echo "you must specifies the librtmp, see: ./configure --help"; __check_ok=NO; fi
     if [ $SRS_RESEARCH = RESERVED ]; then echo "you must specifies the research, see: ./configure --help"; __check_ok=NO; fi
     if [ $SRS_UTEST = RESERVED ]; then echo "you must specifies the utest, see: ./configure --help"; __check_ok=NO; fi
diff --git a/trunk/auto/summary.sh b/trunk/auto/summary.sh
index 476aa457e..f19a27722 100755
--- a/trunk/auto/summary.sh
+++ b/trunk/auto/summary.sh
@@ -12,7 +12,6 @@ SrsHttpCallbackSummaryColor="\${YELLOW}(Disabled) "; if [ $SRS_HTTP_CALLBACK = Y
 SrsHttpServerSummaryColor="\${YELLOW}(Disabled) "; if [ $SRS_HTTP_SERVER = YES ]; then SrsHttpServerSummaryColor="\${GREEN}"; fi
 SrsHttpApiSummaryColor="\${YELLOW}(Disabled) "; if [ $SRS_HTTP_API = YES ]; then SrsHttpApiSummaryColor="\${GREEN}"; fi
 SrsStreamCasterSummaryColor="\${YELLOW}(Disabled) "; if [ $SRS_STREAM_CASTER = YES ]; then SrsStreamCasterSummaryColor="\${GREEN}"; fi
-SrsKafkaSummaryColor="\${YELLOW}(Disabled) "; if [ $SRS_KAFKA = YES ]; then SrsKafkaSummaryColor="\${GREEN}"; fi
 SrsLibrtmpSummaryColor="\${YELLOW}(Disabled) "; if [ $SRS_LIBRTMP = YES ]; then SrsLibrtmpSummaryColor="\${GREEN}"; fi
 SrsLibrtmpSSLSummaryColor="\${YELLOW}(Disabled) "; if [ $SRS_LIBRTMP = YES ]; then if [ $SRS_SSL = YES ]; then SrsLibrtmpSSLSummaryColor="\${GREEN}"; fi fi
 SrsResearchSummaryColor="\${GREEN}(Disabled) "; if [ $SRS_RESEARCH = YES ]; then SrsResearchSummaryColor="\${GREEN}"; fi
diff --git a/trunk/configure b/trunk/configure
index a324d6905..301b997f5 100755
--- a/trunk/configure
+++ b/trunk/configure
@@ -195,7 +195,7 @@ ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSSLRoot})
 MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_rtmp_stack"
         "srs_rtmp_handshake" "srs_protocol_utility" "srs_rtmp_msg_array" "srs_protocol_stream"
         "srs_raw_avc" "srs_rtsp_stack" "srs_http_stack" "srs_protocol_kbps" "srs_protocol_json"
-        "srs_kafka_stack" "srs_protocol_format")
+        "srs_protocol_format")
 PROTOCOL_INCS="src/protocol"; MODULE_DIR=${PROTOCOL_INCS} . auto/modules.sh
 PROTOCOL_OBJS="${MODULE_OBJS[@]}"
 #
@@ -225,7 +225,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
             "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static"
             "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds"
             "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call"
-            "srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec" "srs_app_kafka"
+            "srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec"
             "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr"
             "srs_app_coworkers")
     DEFINES=""
@@ -577,11 +577,6 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
     else
         echo -e "${GREEN}Note: StreamCaster is disabled.${BLACK}"
     fi
-    if [ $SRS_KAFKA = YES ]; then
-        echo -e "${GREEN}Kafka is enabled.${BLACK}"
-    else
-        echo -e "${YELLOW}Warning: Kafka is disabled.${BLACK}"
-    fi
     if [ $SRS_HDS = YES ]; then
         echo -e "${YELLOW}Experiment: HDS is enabled.${BLACK}"
     else
diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp
index 0210eed1d..a0adb9d07 100644
--- a/trunk/src/app/srs_app_config.cpp
+++ b/trunk/src/app/srs_app_config.cpp
@@ -367,7 +367,6 @@ srs_error_t srs_config_transform_vhost(SrsConfDirective* root)
                 SrsConfDirective* hstrs = conf->get("hstrs");
                 conf->remove(hstrs);
                 srs_freep(hstrs);
-                srs_warn("Always enable hstrs, please never config it.");
             }
             
             // SRS3.0, change the refer style
@@ -1555,7 +1554,6 @@ srs_error_t SrsConfig::reload_conf(SrsConfig* conf)
     }
     
     // TODO: FIXME: support reload stream_caster.
-    // TODO: FIXME: support reload kafka.
     
     // merge config: vhost
     if ((err = reload_vhost(old_root)) != srs_success) {
@@ -2139,19 +2137,6 @@ srs_error_t SrsConfig::global_to_json(SrsJsonObject* obj)
                 }
             }
             obj->set(dir->name, sobj);
-        } else if (dir->name == "kafka") {
-            SrsJsonObject* sobj = SrsJsonAny::object();
-            for (int j = 0; j < (int)dir->directives.size(); j++) {
-                SrsConfDirective* sdir = dir->directives.at(j);
-                if (sdir->name == "enabled") {
-                    sobj->set(sdir->name, sdir->dumps_arg0_to_boolean());
-                } else if (sdir->name == "brokers") {
-                    sobj->set(sdir->name, sdir->dumps_args());
-                } else if (sdir->name == "topic") {
-                    sobj->set(sdir->name, sdir->dumps_arg0_to_str());
-                }
-            }
-            obj->set(dir->name, sobj);
         } else if (dir->name == "stream_caster") {
             SrsJsonObject* sobj = SrsJsonAny::object();
             for (int j = 0; j < (int)dir->directives.size(); j++) {
@@ -3511,7 +3496,7 @@ srs_error_t SrsConfig::check_normal_config()
             && n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file"
             && n != "max_connections" && n != "daemon" && n != "heartbeat"
             && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms"
-            && n != "http_server" && n != "stream_caster" && n != "kafka"
+            && n != "http_server" && n != "stream_caster"
             && n != "utc_time" && n != "work_dir" && n != "asprocess"
             ) {
             return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str());
@@ -3545,15 +3530,6 @@ srs_error_t SrsConfig::check_normal_config()
             }
         }
     }
-    if (true) {
-        SrsConfDirective* conf = root->get("kafka");
-        for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
-            string n = conf->at(i)->name;
-            if (n != "enabled" && n != "brokers" && n != "topic") {
-                return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal kafka.%s", n.c_str());
-            }
-        }
-    }
     if (true) {
         SrsConfDirective* conf = get_heartbeart();
         for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
@@ -4208,55 +4184,6 @@ int SrsConfig::get_stream_caster_rtp_port_max(SrsConfDirective* conf)
     return ::atoi(conf->arg0().c_str());
 }
 
-bool SrsConfig::get_kafka_enabled()
-{
-    static bool DEFAULT = false;
-    
-    SrsConfDirective* conf = root->get("kafka");
-    if (!conf) {
-        return DEFAULT;
-    }
-    
-    conf = conf->get("enabled");
-    if (!conf || conf->arg0().empty()) {
-        return DEFAULT;
-    }
-    
-    return SRS_CONF_PERFER_FALSE(conf->arg0());
-}
-
-SrsConfDirective* SrsConfig::get_kafka_brokers()
-{
-    SrsConfDirective* conf = root->get("kafka");
-    if (!conf) {
-        return NULL;
-    }
-    
-    conf = conf->get("brokers");
-    if (!conf || conf->args.empty()) {
-        return NULL;
-    }
-    
-    return conf;
-}
-
-string SrsConfig::get_kafka_topic()
-{
-    static string DEFAULT = "srs";
-    
-    SrsConfDirective* conf = root->get("kafka");
-    if (!conf) {
-        return DEFAULT;
-    }
-    
-    conf = conf->get("topic");
-    if (!conf || conf->arg0().empty()) {
-        return DEFAULT;
-    }
-    
-    return conf->arg0();
-}
-
 SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost)
 {
     srs_assert(root);
diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp
index 9d30cae86..6ff503bb9 100644
--- a/trunk/src/app/srs_app_config.hpp
+++ b/trunk/src/app/srs_app_config.hpp
@@ -455,14 +455,6 @@ public:
     virtual int get_stream_caster_rtp_port_min(SrsConfDirective* conf);
     // Get the max udp port for rtp of stream caster rtsp.
     virtual int get_stream_caster_rtp_port_max(SrsConfDirective* conf);
-// kafka section.
-public:
-    // Whether the kafka enabled.
-    virtual bool get_kafka_enabled();
-    // Get the broker list, each is format in <ip:port>.
-    virtual SrsConfDirective* get_kafka_brokers();
-    // Get the kafka topic to use for srs.
-    virtual std::string get_kafka_topic();
 // vhost specified section
 public:
     // Get the vhost directive by vhost name.
diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp
deleted file mode 100644
index 6ce886dc9..000000000
--- a/trunk/src/app/srs_app_kafka.cpp
+++ /dev/null
@@ -1,659 +0,0 @@
-/**
- * The MIT License (MIT)
- *
- * Copyright (c) 2013-2019 Winlin
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy of
- * this software and associated documentation files (the "Software"), to deal in
- * the Software without restriction, including without limitation the rights to
- * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
- * the Software, and to permit persons to whom the Software is furnished to do so,
- * subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
- * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
- * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
- * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- */
-
-#include <srs_app_kafka.hpp>
-
-#include <vector>
-using namespace std;
-
-#include <srs_kernel_error.hpp>
-#include <srs_kernel_log.hpp>
-#include <srs_app_config.hpp>
-#include <srs_app_async_call.hpp>
-#include <srs_app_utility.hpp>
-#include <srs_kernel_utility.hpp>
-#include <srs_protocol_utility.hpp>
-#include <srs_kernel_balance.hpp>
-#include <srs_kafka_stack.hpp>
-#include <srs_core_autofree.hpp>
-#include <srs_protocol_json.hpp>
-
-#ifdef SRS_AUTO_KAFKA
-
-#define SRS_KAFKA_PRODUCER_TIMEOUT (30 * SRS_UTIME_MILLISECONDS)
-#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 1
-
-std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata)
-{
-    vector<string> bs;
-    for (int i = 0; i < metadata->brokers.size(); i++) {
-        SrsKafkaBroker* broker = metadata->brokers.at(i);
-        
-        string hostport = srs_int2str(broker->node_id) + "/" + broker->host.to_str();
-        if (broker->port > 0) {
-            hostport += ":" + srs_int2str(broker->port);
-        }
-        
-        bs.push_back(hostport);
-    }
-    
-    vector<string> ps;
-    for (int i = 0; i < metadata->metadatas.size(); i++) {
-        SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i);
-        
-        for (int j = 0; j < topic->metadatas.size(); j++) {
-            string desc = "topic=" + topic->name.to_str();
-            
-            SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j);
-            
-            desc += "?partition=" + srs_int2str(partition->partition_id);
-            desc += "&leader=" + srs_int2str(partition->leader);
-            
-            vector<string> replicas = srs_kafka_array2vector(&partition->replicas);
-            desc += "&replicas=" + srs_join_vector_string(replicas, ",");
-            
-            ps.push_back(desc);
-        }
-    }
-    
-    std::stringstream ss;
-    ss << "brokers=" << srs_join_vector_string(bs, ",");
-    ss << ", " << srs_join_vector_string(ps, ", ");
-    
-    return ss.str();
-}
-
-std::string srs_kafka_summary_partitions(const vector<SrsKafkaPartition*>& partitions)
-{
-    vector<string> ret;
-    
-    vector<SrsKafkaPartition*>::const_iterator it;
-    for (it = partitions.begin(); it != partitions.end(); ++it) {
-        SrsKafkaPartition* partition = *it;
-        
-        string desc = "tcp://";
-        desc += partition->host + ":" + srs_int2str(partition->port);
-        desc += "?broker=" + srs_int2str(partition->broker);
-        desc += "&partition=" + srs_int2str(partition->id);
-        ret.push_back(desc);
-    }
-    
-    return srs_join_vector_string(ret, ", ");
-}
-
-void srs_kafka_metadata2connector(string topic_name, SrsKafkaTopicMetadataResponse* metadata, vector<SrsKafkaPartition*>& partitions)
-{
-    for (int i = 0; i < metadata->metadatas.size(); i++) {
-        SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i);
-        
-        for (int j = 0; j < topic->metadatas.size(); j++) {
-            SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j);
-            
-            SrsKafkaPartition* p = new SrsKafkaPartition();
-            
-            p->topic = topic_name;
-            p->id = partition->partition_id;
-            p->broker = partition->leader;
-            
-            for (int i = 0; i < metadata->brokers.size(); i++) {
-                SrsKafkaBroker* broker = metadata->brokers.at(i);
-                if (broker->node_id == p->broker) {
-                    p->host = broker->host.to_str();
-                    p->port = broker->port;
-                    break;
-                }
-            }
-            
-            partitions.push_back(p);
-        }
-    }
-}
-
-SrsKafkaPartition::SrsKafkaPartition()
-{
-    id = broker = 0;
-    port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
-    
-    transport = NULL;
-    kafka = NULL;
-}
-
-SrsKafkaPartition::~SrsKafkaPartition()
-{
-    disconnect();
-}
-
-string SrsKafkaPartition::hostport()
-{
-    if (ep.empty()) {
-        ep = host + ":" + srs_int2str(port);
-    }
-    
-    return ep;
-}
-
-srs_error_t SrsKafkaPartition::connect()
-{
-    srs_error_t err = srs_success;
-    
-    if (transport) {
-        return err;
-    }
-    transport = new SrsTcpClient(host, port, SRS_KAFKA_PRODUCER_TIMEOUT);
-    kafka = new SrsKafkaClient(transport);
-    
-    if ((err = transport->connect()) != srs_success) {
-        disconnect();
-        return srs_error_wrap(err, "connect to %s partition=%d failed", hostport().c_str(), id);
-    }
-    
-    srs_trace("connect at %s, partition=%d, broker=%d", hostport().c_str(), id, broker);
-    
-    return err;
-}
-
-srs_error_t SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc)
-{
-    return kafka->write_messages(topic, id, *pc);
-}
-
-void SrsKafkaPartition::disconnect()
-{
-    srs_freep(kafka);
-    srs_freep(transport);
-}
-
-SrsKafkaMessage::SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j)
-{
-    producer = p;
-    key = k;
-    obj = j;
-}
-
-SrsKafkaMessage::~SrsKafkaMessage()
-{
-    srs_freep(obj);
-}
-
-srs_error_t SrsKafkaMessage::call()
-{
-    srs_error_t err = producer->send(key, obj);
-    
-    // the obj is manged by producer now.
-    obj = NULL;
-    
-    return srs_error_wrap(err, "kafka send");
-}
-
-string SrsKafkaMessage::to_string()
-{
-    return "kafka";
-}
-
-SrsKafkaCache::SrsKafkaCache()
-{
-    count = 0;
-    nb_partitions = 0;
-}
-
-SrsKafkaCache::~SrsKafkaCache()
-{
-    map<int32_t, SrsKafkaPartitionCache*>::iterator it;
-    for (it = cache.begin(); it != cache.end(); ++it) {
-        SrsKafkaPartitionCache* pc = it->second;
-        
-        for (vector<SrsJsonObject*>::iterator it2 = pc->begin(); it2 != pc->end(); ++it2) {
-            SrsJsonObject* obj = *it2;
-            srs_freep(obj);
-        }
-        pc->clear();
-        
-        srs_freep(pc);
-    }
-    cache.clear();
-}
-
-void SrsKafkaCache::append(int key, SrsJsonObject* obj)
-{
-    count++;
-    
-    int partition = 0;
-    if (nb_partitions > 0) {
-        partition = key % nb_partitions;
-    }
-    
-    SrsKafkaPartitionCache* pc = NULL;
-    map<int32_t, SrsKafkaPartitionCache*>::iterator it = cache.find(partition);
-    if (it == cache.end()) {
-        pc = new SrsKafkaPartitionCache();
-        cache[partition] = pc;
-    } else {
-        pc = it->second;
-    }
-    
-    pc->push_back(obj);
-}
-
-int SrsKafkaCache::size()
-{
-    return count;
-}
-
-bool SrsKafkaCache::fetch(int* pkey, SrsKafkaPartitionCache** ppc)
-{
-    map<int32_t, SrsKafkaPartitionCache*>::iterator it;
-    for (it = cache.begin(); it != cache.end(); ++it) {
-        int32_t key = it->first;
-        SrsKafkaPartitionCache* pc = it->second;
-        
-        if (!pc->empty()) {
-            *pkey = (int)key;
-            *ppc = pc;
-            return true;
-        }
-    }
-    
-    return false;
-}
-
-srs_error_t SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc)
-{
-    srs_error_t err = srs_success;
-    
-    // ensure the key exists.
-    srs_assert (cache.find(key) != cache.end());
-    
-    // the cache is vector, which is continous store.
-    // we remember the messages we have written and clear it when completed.
-    int nb_msgs = (int)pc->size();
-    if (pc->empty()) {
-        return err;
-    }
-    
-    // connect transport.
-    if ((err = partition->connect()) != srs_success) {
-        return srs_error_wrap(err, "connect partition");
-    }
-    
-    // write the json objects.
-    if ((err = partition->flush(pc)) != srs_success) {
-        return srs_error_wrap(err, "flush partition");
-    }
-    
-    // free all wrote messages.
-    for (vector<SrsJsonObject*>::iterator it = pc->begin(); it != pc->end(); ++it) {
-        SrsJsonObject* obj = *it;
-        srs_freep(obj);
-    }
-    
-    // remove the messages from cache.
-    if ((int)pc->size() == nb_msgs) {
-        pc->clear();
-    } else {
-        pc->erase(pc->begin(), pc->begin() + nb_msgs);
-    }
-    
-    return err;
-}
-
-ISrsKafkaCluster::ISrsKafkaCluster()
-{
-}
-
-ISrsKafkaCluster::~ISrsKafkaCluster()
-{
-}
-
-// @global kafka event producer, user must use srs_initialize_kafka to initialize it.
-ISrsKafkaCluster* _srs_kafka = NULL;
-
-srs_error_t srs_initialize_kafka()
-{
-    srs_error_t err = srs_success;
-    
-    SrsKafkaProducer* kafka = new SrsKafkaProducer();
-    _srs_kafka = kafka;
-    
-    if ((err = kafka->initialize()) != srs_success) {
-        return srs_error_wrap(err, "initialize kafka producer");
-    }
-    
-    if ((err = kafka->start()) != srs_success) {
-        return srs_error_wrap(err, "start kafka producer");
-    }
-    
-    return err;
-}
-
-void srs_dispose_kafka()
-{
-    SrsKafkaProducer* kafka = dynamic_cast<SrsKafkaProducer*>(_srs_kafka);
-    if (!kafka) {
-        return;
-    }
-    
-    kafka->stop();
-    
-    srs_freep(kafka);
-    _srs_kafka = NULL;
-}
-
-SrsKafkaProducer::SrsKafkaProducer()
-{
-    metadata_ok = false;
-    metadata_expired = srs_cond_new();
-    
-    lock = srs_mutex_new();
-    trd = new SrsDummyCoroutine();
-    worker = new SrsAsyncCallWorker();
-    cache = new SrsKafkaCache();
-    
-    lb = new SrsLbRoundRobin();
-}
-
-SrsKafkaProducer::~SrsKafkaProducer()
-{
-    clear_metadata();
-    
-    srs_freep(lb);
-    
-    srs_freep(worker);
-    srs_freep(trd);
-    srs_freep(cache);
-    
-    srs_mutex_destroy(lock);
-    srs_cond_destroy(metadata_expired);
-}
-
-srs_error_t SrsKafkaProducer::initialize()
-{
-    enabled = _srs_config->get_kafka_enabled();
-    srs_info("initialize kafka ok, enabled=%d.", enabled);
-    return srs_success;
-}
-
-srs_error_t SrsKafkaProducer::start()
-{
-    srs_error_t err = srs_success;
-    
-    if (!enabled) {
-        return err;
-    }
-    
-    if ((err = worker->start()) != srs_success) {
-        return srs_error_wrap(err, "async worker");
-    }
-    
-    srs_freep(trd);
-    trd = new SrsSTCoroutine("kafka", this, _srs_context->get_id());
-    if ((err = trd->start()) != srs_success) {
-        return srs_error_wrap(err, "coroutine");
-    }
-    
-    refresh_metadata();
-    
-    return err;
-}
-
-void SrsKafkaProducer::stop()
-{
-    if (!enabled) {
-        return;
-    }
-    
-    trd->stop();
-    worker->stop();
-}
-
-srs_error_t SrsKafkaProducer::send(int key, SrsJsonObject* obj)
-{
-    srs_error_t err = srs_success;
-    
-    // cache the json object.
-    cache->append(key, obj);
-    
-    // too few messages, ignore.
-    if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) {
-        return err;
-    }
-    
-    // too many messages, warn user.
-    if (cache->size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) {
-        srs_warn("kafka cache too many messages: %d", cache->size());
-    }
-    
-    // sync with backgound metadata worker.
-    SrsLocker(lock);
-    
-    // flush message when metadata is ok.
-    if (metadata_ok) {
-        err = flush();
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip)
-{
-    srs_error_t err = srs_success;
-    
-    if (!enabled) {
-        return err;
-    }
-    
-    SrsJsonObject* obj = SrsJsonAny::object();
-    
-    obj->set("msg", SrsJsonAny::str("accept"));
-    obj->set("type", SrsJsonAny::integer(type));
-    obj->set("ip", SrsJsonAny::str(ip.c_str()));
-    
-    return worker->execute(new SrsKafkaMessage(this, key, obj));
-}
-
-srs_error_t SrsKafkaProducer::on_close(int key)
-{
-    srs_error_t err = srs_success;
-    
-    if (!enabled) {
-        return err;
-    }
-    
-    SrsJsonObject* obj = SrsJsonAny::object();
-    
-    obj->set("msg", SrsJsonAny::str("close"));
-    
-    return worker->execute(new SrsKafkaMessage(this, key, obj));
-}
-
-#define SRS_KAKFA_CIMS (3 * SRS_UTIME_SECONDS)
-
-srs_error_t SrsKafkaProducer::cycle()
-{
-    srs_error_t err = srs_success;
-    
-    // wait for the metadata expired.
-    // when metadata is ok, wait for it expired.
-    if (metadata_ok) {
-        srs_cond_wait(metadata_expired);
-    }
-    
-    // request to lock to acquire the socket.
-    SrsLocker(lock);
-    
-    while (true) {
-        if ((err = do_cycle()) != srs_success) {
-            srs_warn("KafkaProducer: Ignore error, %s", srs_error_desc(err).c_str());
-            srs_freep(err);
-        }
-        
-        if ((err = trd->pull()) != srs_success) {
-            return srs_error_wrap(err, "kafka cycle");
-        }
-    
-        srs_usleep(SRS_KAKFA_CIMS);
-    }
-    
-    return err;
-}
-
-void SrsKafkaProducer::clear_metadata()
-{
-    vector<SrsKafkaPartition*>::iterator it;
-    
-    for (it = partitions.begin(); it != partitions.end(); ++it) {
-        SrsKafkaPartition* partition = *it;
-        srs_freep(partition);
-    }
-    
-    partitions.clear();
-}
-
-srs_error_t SrsKafkaProducer::do_cycle()
-{
-    srs_error_t err = srs_success;
-    
-    // ignore when disabled.
-    if (!enabled) {
-        return err;
-    }
-    
-    // when kafka enabled, request metadata when startup.
-    if ((err = request_metadata()) != srs_success) {
-        return srs_error_wrap(err, "request metadata");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaProducer::request_metadata()
-{
-    srs_error_t err = srs_success;
-    
-    // ignore when disabled.
-    if (!enabled) {
-        return err;
-    }
-    
-    // select one broker to connect to.
-    SrsConfDirective* brokers = _srs_config->get_kafka_brokers();
-    if (!brokers) {
-        srs_warn("ignore for empty brokers.");
-        return err;
-    }
-    
-    std::string server;
-    int port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
-    if (true) {
-        srs_assert(!brokers->args.empty());
-        std::string broker = lb->select(brokers->args);
-        srs_parse_endpoint(broker, server, port);
-    }
-    
-    std::string topic = _srs_config->get_kafka_topic();
-    if (true) {
-        std::string senabled = srs_bool2switch(enabled);
-        std::string sbrokers = srs_join_vector_string(brokers->args, ",");
-        srs_trace("kafka request enabled:%s, brokers:%s, current:[%d]%s:%d, topic:%s",
-                  senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());
-    }
-    
-    SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_CONSTS_KAFKA_TIMEOUT);
-    SrsAutoFree(SrsTcpClient, transport);
-    
-    SrsKafkaClient* kafka = new SrsKafkaClient(transport);
-    SrsAutoFree(SrsKafkaClient, kafka);
-    
-    // reconnect to kafka server.
-    if ((err = transport->connect()) != srs_success) {
-        return srs_error_wrap(err, "connect %s:%d failed", server.c_str(), port);
-    }
-    
-    // do fetch medata from broker.
-    SrsKafkaTopicMetadataResponse* metadata = NULL;
-    if ((err = kafka->fetch_metadata(topic, &metadata)) != srs_success) {
-        return srs_error_wrap(err, "fetch metadata");
-    }
-    SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata);
-    
-    // we may need to request multiple times.
-    // for example, the first time to create a none-exists topic, then query metadata.
-    if (!metadata->metadatas.empty()) {
-        SrsKafkaTopicMetadata* topic = metadata->metadatas.at(0);
-        if (topic->metadatas.empty()) {
-            srs_warn("topic %s metadata empty, retry.", topic->name.to_str().c_str());
-            return err;
-        }
-    }
-    
-    // show kafka metadata.
-    string summary = srs_kafka_metadata_summary(metadata);
-    srs_trace("kafka metadata: %s", summary.c_str());
-    
-    // generate the partition info.
-    srs_kafka_metadata2connector(topic, metadata, partitions);
-    srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str());
-    
-    // update the total partition for cache.
-    cache->nb_partitions = (int)partitions.size();
-    
-    metadata_ok = true;
-    
-    return err;
-}
-
-void SrsKafkaProducer::refresh_metadata()
-{
-    clear_metadata();
-    
-    metadata_ok = false;
-    srs_cond_signal(metadata_expired);
-    srs_trace("kafka async refresh metadata in background");
-}
-
-srs_error_t SrsKafkaProducer::flush()
-{
-    srs_error_t err = srs_success;
-    
-    // flush all available partition caches.
-    while (true) {
-        int key = -1;
-        SrsKafkaPartitionCache* pc = NULL;
-        
-        // all flushed, or no kafka partition to write to.
-        if (!cache->fetch(&key, &pc) || partitions.empty()) {
-            break;
-        }
-        
-        // flush specified partition.
-        srs_assert(key >= 0 && pc);
-        SrsKafkaPartition* partition = partitions.at(key % partitions.size());
-        if ((err = cache->flush(partition, key, pc)) != srs_success) {
-            return srs_error_wrap(err, "flush partition");
-        }
-    }
-    
-    return err;
-}
-
-#endif
-
diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp
deleted file mode 100644
index 835a09686..000000000
--- a/trunk/src/app/srs_app_kafka.hpp
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * The MIT License (MIT)
- *
- * Copyright (c) 2013-2019 Winlin
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy of
- * this software and associated documentation files (the "Software"), to deal in
- * the Software without restriction, including without limitation the rights to
- * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
- * the Software, and to permit persons to whom the Software is furnished to do so,
- * subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
- * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
- * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
- * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- */
-
-#ifndef SRS_APP_KAFKA_HPP
-#define SRS_APP_KAFKA_HPP
-
-#include <srs_core.hpp>
-
-#include <map>
-#include <vector>
-
-class SrsLbRoundRobin;
-class SrsAsyncCallWorker;
-class SrsTcpClient;
-class SrsKafkaClient;
-class SrsJsonObject;
-class SrsKafkaProducer;
-
-#include <srs_app_thread.hpp>
-#include <srs_app_server.hpp>
-#include <srs_app_async_call.hpp>
-
-#ifdef SRS_AUTO_KAFKA
-
-// The partition messages cache.
-typedef std::vector<SrsJsonObject*> SrsKafkaPartitionCache;
-
-// The kafka partition info.
-struct SrsKafkaPartition
-{
-private:
-    std::string ep;
-    // Not NULL when connected.
-    SrsTcpClient* transport;
-    SrsKafkaClient* kafka;
-public:
-    int id;
-    std::string topic;
-    // leader.
-    int broker;
-    std::string host;
-    int port;
-public:
-    SrsKafkaPartition();
-    virtual ~SrsKafkaPartition();
-public:
-    virtual std::string hostport();
-    virtual srs_error_t connect();
-    virtual srs_error_t flush(SrsKafkaPartitionCache* pc);
-private:
-    virtual void disconnect();
-};
-
-// The following is all types of kafka messages.
-class SrsKafkaMessage : public ISrsAsyncCallTask
-{
-private:
-    SrsKafkaProducer* producer;
-    int key;
-    SrsJsonObject* obj;
-public:
-    SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j);
-    virtual ~SrsKafkaMessage();
-// Interface ISrsAsyncCallTask
-public:
-    virtual srs_error_t call();
-    virtual std::string to_string();
-};
-
-// A message cache for kafka.
-class SrsKafkaCache
-{
-public:
-    // The total partitions,
-    // for the key to map to the parition by key%nb_partitions.
-    int nb_partitions;
-private:
-    // Total messages for all partitions.
-    int count;
-    // The key is the partition id, value is the message set to write to this partition.
-    // @remark, when refresh metadata, the partition will increase,
-    //      so maybe some message will dispatch to new partition.
-    std::map< int32_t, SrsKafkaPartitionCache*> cache;
-public:
-    SrsKafkaCache();
-    virtual ~SrsKafkaCache();
-public:
-    virtual void append(int key, SrsJsonObject* obj);
-    virtual int size();
-    // Fetch out a available partition cache.
-    // @return true when got a key and pc; otherwise, false.
-    virtual bool fetch(int* pkey, SrsKafkaPartitionCache** ppc);
-    // Flush the specified partition cache.
-    virtual srs_error_t flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc);
-};
-
-// The kafka cluster interface.
-class ISrsKafkaCluster
-{
-public:
-    ISrsKafkaCluster();
-    virtual ~ISrsKafkaCluster();
-public:
-    // When got any client connect to SRS, notify kafka.
-    // @param key the partition map key, the client id or hash(ip).
-    // @param type the type of client.
-    // @param ip the peer ip of client.
-    virtual srs_error_t on_client(int key, SrsListenerType type, std::string ip) = 0;
-    // When client close or disconnect for error.
-    // @param key the partition map key, the client id or hash(ip).
-    virtual srs_error_t on_close(int key) = 0;
-};
-
-// @global kafka event producer.
-extern ISrsKafkaCluster* _srs_kafka;
-// kafka initialize and disposer for global object.
-extern srs_error_t srs_initialize_kafka();
-extern void srs_dispose_kafka();
-
-// The kafka producer used to save log to kafka cluster.
-class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISrsKafkaCluster
-{
-private:
-    // TODO: FIXME: support reload.
-    bool enabled;
-    srs_mutex_t lock;
-    SrsCoroutine* trd;
-private:
-    bool metadata_ok;
-    srs_cond_t metadata_expired;
-public:
-    std::vector<SrsKafkaPartition*> partitions;
-    SrsKafkaCache* cache;
-private:
-    SrsLbRoundRobin* lb;
-    SrsAsyncCallWorker* worker;
-public:
-    SrsKafkaProducer();
-    virtual ~SrsKafkaProducer();
-public:
-    virtual srs_error_t initialize();
-    virtual srs_error_t start();
-    virtual void stop();
-// internal: for worker to call task to send object.
-public:
-    // Send json object to kafka cluster.
-    // The producer will aggregate message and send in kafka message set.
-    // @param key the key to map to the partition, user can use cid or hash.
-    // @param obj the json object; user must never free it again.
-    virtual srs_error_t send(int key, SrsJsonObject* obj);
-// Interface ISrsKafkaCluster
-public:
-    virtual srs_error_t on_client(int key, SrsListenerType type, std::string ip);
-    virtual srs_error_t on_close(int key);
-// Interface ISrsReusableThreadHandler
-public:
-    virtual srs_error_t cycle();
-private:
-    virtual void clear_metadata();
-    virtual srs_error_t do_cycle();
-    virtual srs_error_t request_metadata();
-    // Set the metadata to invalid and refresh it.
-    virtual void refresh_metadata();
-    virtual srs_error_t flush();
-};
-
-#endif
-
-#endif
-
diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp
index 9fc028ef6..850e4a1cc 100644
--- a/trunk/src/app/srs_app_rtmp_conn.cpp
+++ b/trunk/src/app/srs_app_rtmp_conn.cpp
@@ -55,7 +55,6 @@ using namespace std;
 #include <srs_app_statistic.hpp>
 #include <srs_protocol_utility.hpp>
 #include <srs_protocol_json.hpp>
-#include <srs_app_kafka.hpp>
 
 // the timeout in srs_utime_t to wait encoder to republish
 // if timeout, close the connection.
@@ -154,13 +153,6 @@ srs_error_t SrsRtmpConn::do_cycle()
     
     srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd));
     
-    // notify kafka cluster.
-#ifdef SRS_AUTO_KAFKA
-    if ((err = _srs_kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != srs_success) {
-        return srs_error_wrap(err, "kafka on client");
-    }
-#endif
-    
     rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);
     rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
 
@@ -1194,12 +1186,6 @@ srs_error_t SrsRtmpConn::on_disconnect()
     
     http_hooks_on_close();
     
-#ifdef SRS_AUTO_KAFKA
-    if ((err = _srs_kafka->on_close(srs_id())) != srs_success) {
-        return srs_error_wrap(err, "kafka on close");
-    }
-#endif
-    
     // TODO: FIXME: Implements it.
     
     return err;
diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp
index b84d6f4a8..100fb0025 100644
--- a/trunk/src/app/srs_app_rtmp_conn.hpp
+++ b/trunk/src/app/srs_app_rtmp_conn.hpp
@@ -54,9 +54,6 @@ class SrsSecurity;
 class ISrsWakable;
 class SrsCommonMessage;
 class SrsPacket;
-#ifdef SRS_AUTO_KAFKA
-class ISrsKafkaCluster;
-#endif
 
 // The simple rtmp client for SRS.
 class SrsSimpleRtmpClient : public SrsBasicRtmpClient
diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp
index 4d2241bc9..1901da4db 100644
--- a/trunk/src/app/srs_app_server.cpp
+++ b/trunk/src/app/srs_app_server.cpp
@@ -49,7 +49,6 @@ using namespace std;
 #include <srs_app_caster_flv.hpp>
 #include <srs_core_mem_watch.hpp>
 #include <srs_kernel_consts.hpp>
-#include <srs_app_kafka.hpp>
 #include <srs_app_thread.hpp>
 #include <srs_app_coworkers.hpp>
 
@@ -523,10 +522,6 @@ void SrsServer::dispose()
     
     // @remark don't dispose ingesters, for too slow.
     
-#ifdef SRS_AUTO_KAFKA
-    srs_dispose_kafka();
-#endif
-    
     // dispose the source for hls and dvr.
     SrsSource::dispose_all();
     
@@ -590,13 +585,6 @@ srs_error_t SrsServer::initialize_st()
     // set current log id.
     _srs_context->generate_id();
     
-    // initialize the conponents that depends on st.
-#ifdef SRS_AUTO_KAFKA
-    if ((err = srs_initialize_kafka()) != srs_success) {
-        return srs_error_wrap(err, "initialize kafka");
-    }
-#endif
-    
     // check asprocess.
     bool asprocess = _srs_config->get_asprocess();
     if (asprocess && ppid == 1) {
diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp
index b8366ea21..aad92c653 100644
--- a/trunk/src/app/srs_app_server.hpp
+++ b/trunk/src/app/srs_app_server.hpp
@@ -50,9 +50,6 @@ class ISrsUdpHandler;
 class SrsUdpListener;
 class SrsTcpListener;
 class SrsAppCasterFlv;
-#ifdef SRS_AUTO_KAFKA
-class SrsKafkaProducer;
-#endif
 class SrsCoroutineManager;
 
 // The listener type for server to identify the connection,
diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp
index 5f7e22218..9ea41f532 100644
--- a/trunk/src/core/srs_core.hpp
+++ b/trunk/src/core/srs_core.hpp
@@ -27,7 +27,7 @@
 // The version config.
 #define VERSION_MAJOR       3
 #define VERSION_MINOR       0
-#define VERSION_REVISION    52
+#define VERSION_REVISION    53
 
 // The macros generated by configure script.
 #include <srs_auto_headers.hpp>
diff --git a/trunk/src/kernel/srs_kernel_balance.hpp b/trunk/src/kernel/srs_kernel_balance.hpp
index d6def79b1..5efc89514 100644
--- a/trunk/src/kernel/srs_kernel_balance.hpp
+++ b/trunk/src/kernel/srs_kernel_balance.hpp
@@ -31,7 +31,7 @@
 
 /**
  * the round-robin load balance algorithm,
- * used for edge pull, kafka and other multiple server feature.
+ * used for edge pull and other multiple server feature.
  */
 class SrsLbRoundRobin
 {
diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp
index 8f09912ff..bb6d85292 100644
--- a/trunk/src/kernel/srs_kernel_consts.hpp
+++ b/trunk/src/kernel/srs_kernel_consts.hpp
@@ -390,13 +390,5 @@
 #define SRS_CONSTS_RTSP_RTSPVersionNotSupported_str             "RTSP Version Not Supported"
 #define SRS_CONSTS_RTSP_OptionNotSupported_str                  "Option not support"
 
-///////////////////////////////////////////////////////////
-// KAFKA consts values
-///////////////////////////////////////////////////////////
-#define SRS_CONSTS_KAFKA_DEFAULT_PORT 9092
-
-// The common io timeout, for both recv and send.
-#define SRS_CONSTS_KAFKA_TIMEOUT (30 * SRS_UTIME_MILLISECONDS)
-
 #endif
 
diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp
index 45a7751c7..88672030f 100644
--- a/trunk/src/kernel/srs_kernel_error.hpp
+++ b/trunk/src/kernel/srs_kernel_error.hpp
@@ -284,7 +284,7 @@
 #define ERROR_OCLUSTER_REDIRECT             3091
 
 ///////////////////////////////////////////////////////
-// HTTP/StreamCaster/KAFKA protocol error.
+// HTTP/StreamCaster protocol error.
 ///////////////////////////////////////////////////////
 #define ERROR_HTTP_PATTERN_EMPTY            4000
 #define ERROR_HTTP_PATTERN_DUPLICATED       4001
@@ -316,14 +316,6 @@
 #define ERROR_AVC_NALU_UEV                  4027
 #define ERROR_AAC_BYTES_INVALID             4028
 #define ERROR_HTTP_REQUEST_EOF              4029
-#define ERROR_KAFKA_CODEC_STRING            4030
-#define ERROR_KAFKA_CODEC_BYTES             4031
-#define ERROR_KAFKA_CODEC_REQUEST           4032
-#define ERROR_KAFKA_CODEC_RESPONSE          4033
-#define ERROR_KAFKA_CODEC_ARRAY             4034
-#define ERROR_KAFKA_CODEC_METADATA          4035
-#define ERROR_KAFKA_CODEC_MESSAGE           4036
-#define ERROR_KAFKA_CODEC_PRODUCER          4037
 #define ERROR_HTTP_302_INVALID              4038
 #define ERROR_BASE64_DECODE                 4039
 
diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp
index 39d518fa0..22932c160 100644
--- a/trunk/src/main/srs_main_server.cpp
+++ b/trunk/src/main/srs_main_server.cpp
@@ -228,7 +228,6 @@ void show_macro_features()
         ss << ", trans:" << srs_bool2switch(true);
         // inge(ingest)
         ss << ", inge:" << srs_bool2switch(true);
-        ss << ", kafka:" << srs_bool2switch(SRS_AUTO_KAFKA_BOOL);
         ss << ", stat:" << srs_bool2switch(true);
         ss << ", nginx:" << srs_bool2switch(SRS_AUTO_NGINX_BOOL);
         // ff(ffmpeg)
diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp
deleted file mode 100644
index 8c73b5b70..000000000
--- a/trunk/src/protocol/srs_kafka_stack.cpp
+++ /dev/null
@@ -1,1415 +0,0 @@
-/**
- * The MIT License (MIT)
- *
- * Copyright (c) 2013-2019 Winlin
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy of
- * this software and associated documentation files (the "Software"), to deal in
- * the Software without restriction, including without limitation the rights to
- * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
- * the Software, and to permit persons to whom the Software is furnished to do so,
- * subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
- * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
- * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
- * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- */
-
-#include <srs_kafka_stack.hpp>
-
-#include <sstream>
-using namespace std;
-
-#include <srs_kernel_error.hpp>
-#include <srs_core_autofree.hpp>
-#include <srs_kernel_log.hpp>
-#include <srs_protocol_io.hpp>
-#include <srs_protocol_stream.hpp>
-#include <srs_kernel_utility.hpp>
-#include <srs_protocol_utility.hpp>
-#include <srs_protocol_json.hpp>
-
-#ifdef SRS_AUTO_KAFKA
-
-#define SRS_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS 300000
-
-SrsKafkaString::SrsKafkaString()
-{
-    _size = -1;
-    data = NULL;
-}
-
-SrsKafkaString::SrsKafkaString(string v)
-{
-    _size = -1;
-    data = NULL;
-    
-    set_value(v);
-}
-
-SrsKafkaString::~SrsKafkaString()
-{
-    srs_freepa(data);
-}
-
-bool SrsKafkaString::null()
-{
-    return _size == -1;
-}
-
-bool SrsKafkaString::empty()
-{
-    return _size <= 0;
-}
-
-string SrsKafkaString::to_str()
-{
-    string ret;
-    if (_size > 0) {
-        ret.append(data, _size);
-    }
-    return ret;
-}
-
-void SrsKafkaString::set_value(string v)
-{
-    // free previous data.
-    srs_freepa(data);
-    
-    // copy new value to data.
-    _size = (int16_t)v.length();
-    
-    srs_assert(_size > 0);
-    data = new char[_size];
-    memcpy(data, v.data(), _size);
-}
-
-int SrsKafkaString::nb_bytes()
-{
-    return _size == -1? 2 : 2 + _size;
-}
-
-srs_error_t SrsKafkaString::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(2)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_STRING, "requires 2 only %d bytes", buf->left());
-    }
-    buf->write_2bytes(_size);
-    
-    if (_size <= 0) {
-        return err;
-    }
-    
-    if (!buf->require(_size)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_STRING, "requires %d only %d bytes", _size, buf->left());
-    }
-    buf->write_bytes(data, _size);
-    
-    return err;
-}
-
-srs_error_t SrsKafkaString::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(2)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_STRING, "requires 2 only %d bytes", buf->left());
-    }
-    _size = buf->read_2bytes();
-    
-    if (_size != -1 && _size < 0) {
-        return srs_error_new(ERROR_KAFKA_CODEC_STRING, "invalid size=%d", _size);
-    }
-    
-    if (_size <= 0) {
-        return err;
-    }
-    
-    if (!buf->require(_size)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_STRING, "requires %d only %d bytes", _size, buf->left());
-    }
-    
-    srs_freepa(data);
-    data = new char[_size];
-    
-    buf->read_bytes(data, _size);
-    
-    return err;
-}
-
-SrsKafkaBytes::SrsKafkaBytes()
-{
-    _size = -1;
-    _data = NULL;
-}
-
-SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v)
-{
-    _size = -1;
-    _data = NULL;
-    
-    set_value(v, nb_v);
-}
-
-SrsKafkaBytes::~SrsKafkaBytes()
-{
-    srs_freepa(_data);
-}
-
-char* SrsKafkaBytes::data()
-{
-    return _data;
-}
-
-int SrsKafkaBytes::size()
-{
-    return _size;
-}
-
-bool SrsKafkaBytes::null()
-{
-    return _size == -1;
-}
-
-bool SrsKafkaBytes::empty()
-{
-    return _size <= 0;
-}
-
-void SrsKafkaBytes::set_value(string v)
-{
-    set_value(v.data(), (int)v.length());
-}
-
-void SrsKafkaBytes::set_value(const char* v, int nb_v)
-{
-    // free previous data.
-    srs_freepa(_data);
-    
-    // copy new value to data.
-    _size = (int16_t)nb_v;
-    
-    srs_assert(_size > 0);
-    _data = new char[_size];
-    memcpy(_data, v, _size);
-}
-
-uint32_t SrsKafkaBytes::crc32(uint32_t previous)
-{
-    char bsize[4];
-    SrsBuffer(bsize, 4).write_4bytes(_size);
-    
-    if (_size <= 0) {
-        return srs_crc32_ieee(bsize, 4, previous);
-    }
-    
-    uint32_t crc = srs_crc32_ieee(bsize, 4, previous);
-    crc = srs_crc32_ieee(_data, _size, crc);
-    
-    return crc;
-}
-
-int SrsKafkaBytes::nb_bytes()
-{
-    return 4 + (_size == -1? 0 : _size);
-}
-
-srs_error_t SrsKafkaBytes::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(4)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_BYTES, "requires 4 only %d bytes", buf->left());
-    }
-    buf->write_4bytes(_size);
-    
-    if (_size <= 0) {
-        return err;
-    }
-    
-    if (!buf->require(_size)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_BYTES, "requires %d only %d bytes", _size, buf->left());
-    }
-    buf->write_bytes(_data, _size);
-    
-    return err;
-}
-
-srs_error_t SrsKafkaBytes::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(4)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_BYTES, "requires 4 only %d bytes", buf->left());
-    }
-    _size = buf->read_4bytes();
-    
-    if (_size != -1 && _size < 0) {
-        return srs_error_new(ERROR_KAFKA_CODEC_BYTES, "invalid size=%d", _size);
-    }
-    
-    if (!buf->require(_size)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_BYTES, "requires %d only %d bytes", _size, buf->left());
-    }
-    
-    srs_freepa(_data);
-    _data = new char[_size];
-    buf->read_bytes(_data, _size);
-    
-    return err;
-}
-
-SrsKafkaRequestHeader::SrsKafkaRequestHeader()
-{
-    _size = 0;
-    _api_key = api_version = 0;
-    _correlation_id = 0;
-    client_id = new SrsKafkaString();
-}
-
-SrsKafkaRequestHeader::~SrsKafkaRequestHeader()
-{
-    srs_freep(client_id);
-}
-
-int SrsKafkaRequestHeader::header_size()
-{
-    return 2 + 2 + 4 + client_id->nb_bytes();
-}
-
-int SrsKafkaRequestHeader::message_size()
-{
-    return _size - header_size();
-}
-
-int SrsKafkaRequestHeader::total_size()
-{
-    return 4 + _size;
-}
-
-void SrsKafkaRequestHeader::set_total_size(int s)
-{
-    _size = s - 4;
-}
-
-int32_t SrsKafkaRequestHeader::correlation_id()
-{
-    return _correlation_id;
-}
-
-void SrsKafkaRequestHeader::set_correlation_id(int32_t cid)
-{
-    _correlation_id = cid;
-}
-
-SrsKafkaApiKey SrsKafkaRequestHeader::api_key()
-{
-    return (SrsKafkaApiKey)_api_key;
-}
-
-void SrsKafkaRequestHeader::set_api_key(SrsKafkaApiKey key)
-{
-    _api_key = (int16_t)key;
-}
-
-bool SrsKafkaRequestHeader::is_producer_request()
-{
-    return _api_key == SrsKafkaApiKeyProduceRequest;
-}
-
-bool SrsKafkaRequestHeader::is_fetch_request()
-{
-    return _api_key == SrsKafkaApiKeyFetchRequest;
-}
-
-bool SrsKafkaRequestHeader::is_offset_request()
-{
-    return _api_key == SrsKafkaApiKeyOffsetRequest;
-}
-
-bool SrsKafkaRequestHeader::is_metadata_request()
-{
-    return _api_key == SrsKafkaApiKeyMetadataRequest;
-}
-
-bool SrsKafkaRequestHeader::is_offset_commit_request()
-{
-    return _api_key == SrsKafkaApiKeyOffsetCommitRequest;
-}
-
-bool SrsKafkaRequestHeader::is_offset_fetch_request()
-{
-    return _api_key == SrsKafkaApiKeyOffsetFetchRequest;
-}
-
-bool SrsKafkaRequestHeader::is_consumer_metadata_request()
-{
-    return _api_key == SrsKafkaApiKeyConsumerMetadataRequest;
-}
-
-int SrsKafkaRequestHeader::nb_bytes()
-{
-    return 4 + header_size();
-}
-
-srs_error_t SrsKafkaRequestHeader::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(4 + _size)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_REQUEST, "requires %d only %d bytes", 4 + _size, buf->left());
-    }
-    
-    buf->write_4bytes(_size);
-    buf->write_2bytes(_api_key);
-    buf->write_2bytes(api_version);
-    buf->write_4bytes(_correlation_id);
-    
-    if ((err = client_id->encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "encode client_id");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaRequestHeader::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(4)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_REQUEST, "requires %d only %d bytes", 4, buf->left());
-    }
-    _size = buf->read_4bytes();
-    
-    if (_size <= 0) {
-        srs_warn("kafka got empty request");
-        return err;
-    }
-    
-    if (!buf->require(_size)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_REQUEST, "requires %d only %d bytes", _size, buf->left());
-    }
-    _api_key = buf->read_2bytes();
-    api_version = buf->read_2bytes();
-    _correlation_id = buf->read_4bytes();
-    
-    if ((err = client_id->decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "decode client_id");
-    }
-    
-    return err;
-}
-
-SrsKafkaResponseHeader::SrsKafkaResponseHeader()
-{
-    _size = 0;
-    _correlation_id = 0;
-}
-
-SrsKafkaResponseHeader::~SrsKafkaResponseHeader()
-{
-}
-
-int SrsKafkaResponseHeader::header_size()
-{
-    return 4;
-}
-
-int SrsKafkaResponseHeader::message_size()
-{
-    return _size - header_size();
-}
-
-int SrsKafkaResponseHeader::total_size()
-{
-    return 4 + _size;
-}
-
-void SrsKafkaResponseHeader::set_total_size(int s)
-{
-    _size = s - 4;
-}
-
-int32_t SrsKafkaResponseHeader::correlation_id()
-{
-    return _correlation_id;
-}
-
-int SrsKafkaResponseHeader::nb_bytes()
-{
-    return 4 + header_size();
-}
-
-srs_error_t SrsKafkaResponseHeader::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(4 + _size)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_RESPONSE, "requires %d only %d bytes", 4 + _size, buf->left());
-    }
-    
-    buf->write_4bytes(_size);
-    buf->write_4bytes(_correlation_id);
-    
-    return err;
-}
-
-srs_error_t SrsKafkaResponseHeader::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(4)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_RESPONSE, "requires %d only %d bytes", 4, buf->left());
-    }
-    _size = buf->read_4bytes();
-    
-    if (_size <= 0) {
-        srs_warn("kafka got empty response");
-        return err;
-    }
-    
-    if (!buf->require(_size)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_RESPONSE, "requires %d only %d bytes", _size, buf->left());
-    }
-    _correlation_id = buf->read_4bytes();
-    
-    return err;
-}
-
-SrsKafkaRawMessage::SrsKafkaRawMessage()
-{
-    offset = 0;
-    message_size = 0;
-    
-    crc = 0;
-    magic_byte = attributes = 0;
-    key = new SrsKafkaBytes();
-    value = new SrsKafkaBytes();
-}
-
-SrsKafkaRawMessage::~SrsKafkaRawMessage()
-{
-    srs_freep(key);
-    srs_freep(value);
-}
-
-srs_error_t SrsKafkaRawMessage::create(SrsJsonObject* obj)
-{
-    srs_error_t err = srs_success;
-    
-    // current must be 0.
-    magic_byte = 0;
-    
-    // no compression codec.
-    attributes = 0;
-    
-    // dumps the json to string.
-    value->set_value(obj->dumps());
-    
-    // crc32 message.
-    crc = srs_crc32_ieee(&magic_byte, 1);
-    crc = srs_crc32_ieee(&attributes, 1, crc);
-    crc = key->crc32(crc);
-    crc = value->crc32(crc);
-    
-    srs_info("crc32 message is %#x", crc);
-    
-    message_size = raw_message_size();
-    
-    return err;
-}
-
-int SrsKafkaRawMessage::raw_message_size()
-{
-    return 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes();
-}
-
-int SrsKafkaRawMessage::nb_bytes()
-{
-    return 8 + 4 + 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes();
-}
-
-srs_error_t SrsKafkaRawMessage::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    int nb_required = 8 + 4 + 4 + 1 + 1;
-    if (!buf->require(nb_required)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_MESSAGE, "requires %d only %d bytes", nb_required, buf->left());
-    }
-    buf->write_8bytes(offset);
-    buf->write_4bytes(message_size);
-    buf->write_4bytes(crc);
-    buf->write_1bytes(magic_byte);
-    buf->write_1bytes(attributes);
-    
-    if ((err = key->encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "encode key");
-    }
-    
-    if ((err = value->encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "encode value");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaRawMessage::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    int nb_required = 8 + 4 + 4 + 1 + 1;
-    if (!buf->require(nb_required)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_MESSAGE, "requires %d only %d bytes", nb_required, buf->left());
-    }
-    offset = buf->read_8bytes();
-    message_size = buf->read_4bytes();
-    crc = buf->read_4bytes();
-    magic_byte = buf->read_1bytes();
-    attributes = buf->read_1bytes();
-    
-    if ((err = key->decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "decode key");
-    }
-    
-    if ((err = value->decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "decode value");
-    }
-    
-    return err;
-}
-
-SrsKafkaRawMessageSet::SrsKafkaRawMessageSet()
-{
-}
-
-SrsKafkaRawMessageSet::~SrsKafkaRawMessageSet()
-{
-    vector<SrsKafkaRawMessage*>::iterator it;
-    for (it = messages.begin(); it != messages.end(); ++it) {
-        SrsKafkaRawMessage* message = *it;
-        srs_freep(message);
-    }
-    messages.clear();
-}
-
-void SrsKafkaRawMessageSet::append(SrsKafkaRawMessage* msg)
-{
-    messages.push_back(msg);
-}
-
-int SrsKafkaRawMessageSet::nb_bytes()
-{
-    int s = 0;
-    
-    vector<SrsKafkaRawMessage*>::iterator it;
-    for (it = messages.begin(); it != messages.end(); ++it) {
-        SrsKafkaRawMessage* message = *it;
-        s += message->nb_bytes();
-    }
-    
-    return s;
-}
-
-srs_error_t SrsKafkaRawMessageSet::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    vector<SrsKafkaRawMessage*>::iterator it;
-    for (it = messages.begin(); it != messages.end(); ++it) {
-        SrsKafkaRawMessage* message = *it;
-        if ((err = message->encode(buf)) != srs_success) {
-            return srs_error_wrap(err, "encode message");
-        }
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaRawMessageSet::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    while (!buf->empty()) {
-        SrsKafkaRawMessage* message = new SrsKafkaRawMessage();
-        
-        if ((err = message->decode(buf)) != srs_success) {
-            srs_freep(message);
-            return srs_error_wrap(err, "decode message");
-        }
-        
-        messages.push_back(message);
-    }
-    
-    return err;
-}
-
-SrsKafkaRequest::SrsKafkaRequest()
-{
-    header.set_correlation_id(SrsKafkaCorrelationPool::instance()->generate_correlation_id());
-}
-
-SrsKafkaRequest::~SrsKafkaRequest()
-{
-}
-
-void SrsKafkaRequest::update_header(int s)
-{
-    header.set_total_size(s);
-}
-
-int32_t SrsKafkaRequest::correlation_id()
-{
-    return header.correlation_id();
-}
-
-SrsKafkaApiKey SrsKafkaRequest::api_key()
-{
-    return header.api_key();
-}
-
-int SrsKafkaRequest::nb_bytes()
-{
-    return header.nb_bytes();
-}
-
-srs_error_t SrsKafkaRequest::encode(SrsBuffer* buf)
-{
-    return header.encode(buf);
-}
-
-srs_error_t SrsKafkaRequest::decode(SrsBuffer* buf)
-{
-    return header.decode(buf);
-}
-
-SrsKafkaResponse::SrsKafkaResponse()
-{
-}
-
-SrsKafkaResponse::~SrsKafkaResponse()
-{
-}
-
-void SrsKafkaResponse::update_header(int s)
-{
-    header.set_total_size(s);
-}
-
-int SrsKafkaResponse::nb_bytes()
-{
-    return header.nb_bytes();
-}
-
-srs_error_t SrsKafkaResponse::encode(SrsBuffer* buf)
-{
-    return header.encode(buf);
-}
-
-srs_error_t SrsKafkaResponse::decode(SrsBuffer* buf)
-{
-    return header.decode(buf);
-}
-
-SrsKafkaTopicMetadataRequest::SrsKafkaTopicMetadataRequest()
-{
-    header.set_api_key(SrsKafkaApiKeyMetadataRequest);
-}
-
-SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest()
-{
-}
-
-void SrsKafkaTopicMetadataRequest::add_topic(string topic)
-{
-    topics.append(new SrsKafkaString(topic));
-}
-
-int SrsKafkaTopicMetadataRequest::nb_bytes()
-{
-    return SrsKafkaRequest::nb_bytes() + topics.nb_bytes();
-}
-
-srs_error_t SrsKafkaTopicMetadataRequest::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if ((err = SrsKafkaRequest::encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "encode request");
-    }
-    
-    if ((err = topics.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "encode topics");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaTopicMetadataRequest::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if ((err = SrsKafkaRequest::decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "decode request");
-    }
-    
-    if ((err = topics.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "decode topics");
-    }
-    
-    return err;
-}
-
-SrsKafkaBroker::SrsKafkaBroker()
-{
-    node_id = port = 0;
-}
-
-SrsKafkaBroker::~SrsKafkaBroker()
-{
-}
-
-int SrsKafkaBroker::nb_bytes()
-{
-    return 4 + host.nb_bytes() + 4;
-}
-
-srs_error_t SrsKafkaBroker::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(4)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 4, buf->left());
-    }
-    buf->write_4bytes(node_id);
-    
-    if ((err = host.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "host");
-    }
-    
-    if (!buf->require(4)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 4, buf->left());
-    }
-    buf->write_4bytes(port);
-    
-    return err;
-}
-
-srs_error_t SrsKafkaBroker::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(4)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 4, buf->left());
-    }
-    node_id = buf->read_4bytes();
-    
-    if ((err = host.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "host");
-    }
-    
-    if (!buf->require(4)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 4, buf->left());
-    }
-    port = buf->read_4bytes();
-    
-    return err;
-}
-
-SrsKafkaPartitionMetadata::SrsKafkaPartitionMetadata()
-{
-    error_code = 0;
-    partition_id = 0;
-    leader = 0;
-}
-
-SrsKafkaPartitionMetadata::~SrsKafkaPartitionMetadata()
-{
-}
-
-int SrsKafkaPartitionMetadata::nb_bytes()
-{
-    return 2 + 4 + 4 + replicas.nb_bytes() + isr.nb_bytes();
-}
-
-srs_error_t SrsKafkaPartitionMetadata::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    int nb_required = 2 + 4 + 4;
-    if (!buf->require(nb_required)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", nb_required, buf->left());
-    }
-    buf->write_2bytes(error_code);
-    buf->write_4bytes(partition_id);
-    buf->write_4bytes(leader);
-    
-    if ((err = replicas.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "replicas");
-    }
-    if ((err = isr.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "isr");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaPartitionMetadata::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    int nb_required = 2 + 4 + 4;
-    if (!buf->require(nb_required)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", nb_required, buf->left());
-    }
-    error_code = buf->read_2bytes();
-    partition_id = buf->read_4bytes();
-    leader = buf->read_4bytes();
-    
-    if ((err = replicas.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "replicas");
-    }
-    if ((err = isr.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "isr");
-    }
-    
-    return err;
-}
-
-SrsKafkaTopicMetadata::SrsKafkaTopicMetadata()
-{
-    error_code = 0;
-}
-
-SrsKafkaTopicMetadata::~SrsKafkaTopicMetadata()
-{
-}
-
-int SrsKafkaTopicMetadata::nb_bytes()
-{
-    return 2 + name.nb_bytes() + metadatas.nb_bytes();
-}
-
-srs_error_t SrsKafkaTopicMetadata::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(2)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 2, buf->left());
-    }
-    buf->write_2bytes(error_code);
-    
-    if ((err = name.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "name");
-    }
-    
-    if ((err = metadatas.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "metadatas");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaTopicMetadata::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if (!buf->require(2)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 2, buf->left());
-    }
-    error_code = buf->read_2bytes();
-    
-    if ((err = name.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "name");
-    }
-    
-    if ((err = metadatas.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "metadatas");
-    }
-    
-    return err;
-}
-
-SrsKafkaTopicMetadataResponse::SrsKafkaTopicMetadataResponse()
-{
-}
-
-SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse()
-{
-}
-
-int SrsKafkaTopicMetadataResponse::nb_bytes()
-{
-    return SrsKafkaResponse::nb_bytes() + brokers.nb_bytes() + metadatas.nb_bytes();
-}
-
-srs_error_t SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if ((err = SrsKafkaResponse::encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "encode response");
-    }
-    
-    if ((err = brokers.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "brokers");
-    }
-    
-    if ((err = metadatas.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "metadatas");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if ((err = SrsKafkaResponse::decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "decode response");
-    }
-    
-    if ((err = brokers.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "brokers");
-    }
-    
-    if ((err = metadatas.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "metadatas");
-    }
-    
-    return err;
-}
-
-int SrsKafkaProducerPartitionMessages::nb_bytes()
-{
-    return 4 + 4 + messages.nb_bytes();
-}
-
-srs_error_t SrsKafkaProducerPartitionMessages::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    int nb_required = 4 + 4;
-    if (!buf->require(nb_required)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_PRODUCER, "requires %d only %d bytes", nb_required, buf->left());
-    }
-    buf->write_4bytes(partition);
-    buf->write_4bytes(message_set_size);
-    
-    if ((err = messages.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "messages");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaProducerPartitionMessages::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    int nb_required = 4 + 4;
-    if (!buf->require(nb_required)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_PRODUCER, "requires %d only %d bytes", nb_required, buf->left());
-    }
-    partition = buf->read_4bytes();
-    message_set_size = buf->read_4bytes();
-    
-    // for the message set decode util empty, we must create a new buffer when
-    // there exists other objects after message set.
-    if (buf->left() != message_set_size) {
-        SrsBuffer* tbuf = new SrsBuffer(buf->data() + buf->pos(), message_set_size);
-        SrsAutoFree(SrsBuffer, tbuf);
-        
-        if ((err = messages.decode(buf)) != srs_success) {
-            return srs_error_wrap(err, "messages");
-        }
-    } else {
-        if ((err = messages.decode(buf)) != srs_success) {
-            return srs_error_wrap(err, "messages");
-        }
-    }
-    
-    return err;
-}
-
-int SrsKafkaProducerTopicMessages::nb_bytes()
-{
-    return topic_name.nb_bytes() + partitions.nb_bytes();
-}
-
-srs_error_t SrsKafkaProducerTopicMessages::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if ((err = topic_name.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "topic_name");
-    }
-    
-    if ((err = partitions.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "partitions");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaProducerTopicMessages::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if ((err = topic_name.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "topic_name");
-    }
-    
-    if ((err = partitions.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "partitions");
-    }
-    
-    return err;
-}
-
-SrsKafkaProducerRequest::SrsKafkaProducerRequest()
-{
-    required_acks = 0;
-    timeout = 0;
-}
-
-SrsKafkaProducerRequest::~SrsKafkaProducerRequest()
-{
-}
-
-int SrsKafkaProducerRequest::nb_bytes()
-{
-    return SrsKafkaRequest::nb_bytes() + 2 + 4 + topics.nb_bytes();
-}
-
-srs_error_t SrsKafkaProducerRequest::encode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if ((err = SrsKafkaRequest::encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "encode request");
-    }
-    
-    int nb_required = 2 + 4;
-    if (!buf->require(nb_required)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_PRODUCER, "requires %d only %d bytes", nb_required, buf->left());
-    }
-    buf->write_2bytes(required_acks);
-    buf->write_4bytes(timeout);
-    
-    if ((err = topics.encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "encode topics");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaProducerRequest::decode(SrsBuffer* buf)
-{
-    srs_error_t err = srs_success;
-    
-    if ((err = SrsKafkaRequest::decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "decode request");
-    }
-    
-    int nb_required = 2 + 4;
-    if (!buf->require(nb_required)) {
-        return srs_error_new(ERROR_KAFKA_CODEC_PRODUCER, "requires %d only %d bytes", nb_required, buf->left());
-    }
-    required_acks = buf->read_2bytes();
-    timeout = buf->read_4bytes();
-    
-    if ((err = topics.decode(buf)) != srs_success) {
-        return srs_error_wrap(err, "decode topics");
-    }
-    
-    return err;
-}
-
-SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::_instance = new SrsKafkaCorrelationPool();
-
-SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::instance()
-{
-    return _instance;
-}
-
-SrsKafkaCorrelationPool::SrsKafkaCorrelationPool()
-{
-}
-
-SrsKafkaCorrelationPool::~SrsKafkaCorrelationPool()
-{
-    correlation_ids.clear();
-}
-
-int32_t SrsKafkaCorrelationPool::generate_correlation_id()
-{
-    static int32_t cid = 1;
-    return cid++;
-}
-
-SrsKafkaApiKey SrsKafkaCorrelationPool::set(int32_t correlation_id, SrsKafkaApiKey request)
-{
-    SrsKafkaApiKey previous = SrsKafkaApiKeyUnknown;
-    
-    std::map<int32_t, SrsKafkaApiKey>::iterator it = correlation_ids.find(correlation_id);
-    if (it != correlation_ids.end()) {
-        previous = it->second;
-    }
-    
-    correlation_ids[correlation_id] = request;
-    
-    return previous;
-}
-
-SrsKafkaApiKey SrsKafkaCorrelationPool::unset(int32_t correlation_id)
-{
-    std::map<int32_t, SrsKafkaApiKey>::iterator it = correlation_ids.find(correlation_id);
-    
-    if (it != correlation_ids.end()) {
-        SrsKafkaApiKey key = it->second;
-        correlation_ids.erase(it);
-        return key;
-    }
-    
-    return SrsKafkaApiKeyUnknown;
-}
-
-SrsKafkaApiKey SrsKafkaCorrelationPool::get(int32_t correlation_id)
-{
-    if (correlation_ids.find(correlation_id) == correlation_ids.end()) {
-        return SrsKafkaApiKeyUnknown;
-    }
-    
-    return correlation_ids[correlation_id];
-}
-
-SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReadWriter* io)
-{
-    skt = io;
-    reader = new SrsFastStream();
-}
-
-SrsKafkaProtocol::~SrsKafkaProtocol()
-{
-    srs_freep(reader);
-}
-
-srs_error_t SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg)
-{
-    srs_error_t err = srs_success;
-    
-    // TODO: FIXME: refine for performance issue.
-    SrsAutoFree(SrsKafkaRequest, msg);
-    
-    int size = msg->nb_bytes();
-    if (size <= 0) {
-        return err;
-    }
-    
-    // update the header of message.
-    msg->update_header(size);
-    
-    // cache the request correlation id to discovery response message.
-    SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance();
-    pool->set(msg->correlation_id(), msg->api_key());
-    
-    // TODO: FIXME: refine for performance issue.
-    char* bytes = new char[size];
-    SrsAutoFreeA(char, bytes);
-    
-    // TODO: FIXME: refine for performance issue.
-    SrsBuffer* buf = new SrsBuffer(bytes, size);
-    SrsAutoFree(SrsBuffer, buf);
-    
-    if ((err = msg->encode(buf)) != srs_success) {
-        return srs_error_wrap(err, "encode msg");
-    }
-    
-    if ((err = skt->write(bytes, size, NULL)) != srs_success) {
-        return srs_error_wrap(err, "write msg");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg)
-{
-    *pmsg = NULL;
-    
-    srs_error_t err = srs_success;
-    
-    while (true) {
-        SrsKafkaResponseHeader header;
-        
-        // ensure enough bytes for response header.
-        if ((err = reader->grow(skt, header.nb_bytes())) != srs_success) {
-            return srs_error_wrap(err, "grow buffer");
-        }
-        
-        // decode response header.
-        SrsBuffer* buf = new SrsBuffer(reader->bytes(), reader->size());
-        SrsAutoFree(SrsBuffer, buf);
-        
-        if ((err = header.decode(buf)) != srs_success) {
-            return srs_error_wrap(err, "decode header");
-        }
-        
-        // skip the used buffer for header.
-        buf->skip(-1 * buf->pos());
-        
-        // fetch cached api key.
-        SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance();
-        SrsKafkaApiKey key = pool->unset(header.correlation_id());
-        srs_info("kafka got %d bytes response, key=%d", header.total_size(), header.correlation_id());
-        
-        // create message by cached api key.
-        SrsKafkaResponse* res = NULL;
-        switch (key) {
-            case SrsKafkaApiKeyMetadataRequest:
-                srs_info("kafka got metadata response");
-                res = new SrsKafkaTopicMetadataResponse();
-                break;
-            case SrsKafkaApiKeyUnknown:
-            default:
-                break;
-        }
-        
-        // ensure enough bytes to decode message.
-        if ((err = reader->grow(skt, header.total_size())) != srs_success) {
-            srs_freep(res);
-            return srs_error_wrap(err, "grow buffer");
-        }
-        
-        // dropped message, fetch next.
-        if (!res) {
-            reader->skip(header.total_size());
-            srs_warn("kafka ignore unknown message, size=%d.", header.total_size());
-            continue;
-        }
-        
-        // parse the whole message.
-        if ((err = res->decode(buf)) != srs_success) {
-            srs_freep(res);
-            return srs_error_wrap(err, "decode response");
-        }
-        
-        *pmsg = res;
-        break;
-    }
-    
-    return err;
-}
-
-SrsKafkaClient::SrsKafkaClient(ISrsProtocolReadWriter* io)
-{
-    protocol = new SrsKafkaProtocol(io);
-}
-
-SrsKafkaClient::~SrsKafkaClient()
-{
-    srs_freep(protocol);
-}
-
-srs_error_t SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse** pmsg)
-{
-    *pmsg = NULL;
-    
-    srs_error_t err = srs_success;
-    
-    SrsKafkaTopicMetadataRequest* req = new SrsKafkaTopicMetadataRequest();
-    
-    req->add_topic(topic);
-    
-    if ((err = protocol->send_and_free_message(req)) != srs_success) {
-        return srs_error_wrap(err, "send request");
-    }
-    
-    if ((err = protocol->expect_message(pmsg)) != srs_success) {
-        return srs_error_wrap(err, "expect message");
-    }
-    
-    return err;
-}
-
-srs_error_t SrsKafkaClient::write_messages(std::string topic, int32_t partition, vector<SrsJsonObject*>& msgs)
-{
-    srs_error_t err = srs_success;
-    
-    SrsKafkaProducerRequest* req = new SrsKafkaProducerRequest();
-    
-    // 0 the server will not send any response.
-    req->required_acks = 0;
-    // timeout of producer message.
-    req->timeout = SRS_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS;
-    
-    // create the topic and partition to write message to.
-    SrsKafkaProducerTopicMessages* topics = new SrsKafkaProducerTopicMessages();
-    SrsKafkaProducerPartitionMessages* partitions = new SrsKafkaProducerPartitionMessages();
-    
-    topics->partitions.append(partitions);
-    req->topics.append(topics);
-    
-    topics->topic_name.set_value(topic);
-    partitions->partition = partition;
-    
-    // convert json objects to kafka raw messages.
-    vector<SrsJsonObject*>::iterator it;
-    for (it = msgs.begin(); it != msgs.end(); ++it) {
-        SrsJsonObject* obj = *it;
-        SrsKafkaRawMessage* msg = new SrsKafkaRawMessage();
-        
-        if ((err = msg->create(obj)) != srs_success) {
-            srs_freep(msg);
-            srs_freep(req);
-            return srs_error_wrap(err, "create message");
-        }
-        
-        partitions->messages.append(msg);
-    }
-    
-    partitions->message_set_size = partitions->messages.nb_bytes();
-    
-    // write to kafka cluster.
-    if ((err = protocol->send_and_free_message(req)) != srs_success) {
-        return srs_error_wrap(err, "send request");
-    }
-    
-    return err;
-}
-
-vector<string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr)
-{
-    vector<string> strs;
-    
-    for (int i = 0; i < arr->size(); i++) {
-        SrsKafkaString* elem = arr->at(i);
-        strs.push_back(elem->to_str());
-    }
-    
-    return strs;
-}
-
-vector<string> srs_kafka_array2vector(SrsKafkaArray<int32_t>* arr)
-{
-    vector<string> strs;
-    
-    for (int i = 0; i < arr->size(); i++) {
-        int32_t elem = arr->at(i);
-        strs.push_back(srs_int2str(elem));
-    }
-    
-    return strs;
-}
-
-#endif
-
diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp
deleted file mode 100644
index 8665d74fa..000000000
--- a/trunk/src/protocol/srs_kafka_stack.hpp
+++ /dev/null
@@ -1,932 +0,0 @@
-/**
- * The MIT License (MIT)
- *
- * Copyright (c) 2013-2019 Winlin
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy of
- * this software and associated documentation files (the "Software"), to deal in
- * the Software without restriction, including without limitation the rights to
- * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
- * the Software, and to permit persons to whom the Software is furnished to do so,
- * subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
- * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
- * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
- * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- */
-
-#ifndef SRS_PROTOCOL_KAFKA_HPP
-#define SRS_PROTOCOL_KAFKA_HPP
-
-#include <srs_core.hpp>
-
-#include <vector>
-#include <string>
-#include <map>
-
-#include <srs_kernel_buffer.hpp>
-#include <srs_kernel_error.hpp>
-#include <srs_kernel_log.hpp>
-
-class SrsFastStream;
-class ISrsProtocolReadWriter;
-class SrsJsonObject;
-
-#ifdef SRS_AUTO_KAFKA
-
-/**
- * the api key used to identify the request type.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys
- */
-enum SrsKafkaApiKey
-{
-    SrsKafkaApiKeyUnknown = -1,
-    
-    SrsKafkaApiKeyProduceRequest = 0,
-    SrsKafkaApiKeyFetchRequest = 1,
-    SrsKafkaApiKeyOffsetRequest = 2,
-    SrsKafkaApiKeyMetadataRequest = 3,
-    /* Non-user facing control APIs 4-7 */
-    SrsKafkaApiKeyOffsetCommitRequest = 8,
-    SrsKafkaApiKeyOffsetFetchRequest = 9,
-    SrsKafkaApiKeyConsumerMetadataRequest = 10,
-};
-
-/**
- * These types consist of a signed integer giving a length N followed by N bytes of content.
- * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes
- */
-class SrsKafkaString : public ISrsCodec
-{
-private:
-    int16_t _size;
-    char* data;
-public:
-    SrsKafkaString();
-    SrsKafkaString(std::string v);
-    virtual ~SrsKafkaString();
-public:
-    virtual bool null();
-    virtual bool empty();
-    virtual std::string to_str();
-    virtual void set_value(std::string v);
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * These types consist of a signed integer giving a length N followed by N bytes of content.
- * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes
- */
-class SrsKafkaBytes : public ISrsCodec
-{
-private:
-    int32_t _size;
-    char* _data;
-public:
-    SrsKafkaBytes();
-    SrsKafkaBytes(const char* v, int nb_v);
-    virtual ~SrsKafkaBytes();
-public:
-    virtual char* data();
-    virtual int size();
-    virtual bool null();
-    virtual bool empty();
-    virtual void set_value(std::string v);
-    virtual void set_value(const char* v, int nb_v);
-    virtual uint32_t crc32(uint32_t previous);
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * This is a notation for handling repeated structures. These will always be encoded as an
- * int32 size containing the length N followed by N repetitions of the structure which can
- * itself be made up of other primitive types. In the BNF grammars below we will show an
- * array of a structure foo as [foo].
- *
- * Usage:
- *      SrsKafkaArray<SrsKafkaBytes> body;
- *      body.append(new SrsKafkaBytes());
- * @remark array elem is the T*, which must be ISrsCodec*
- *
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
- */
-template<typename T>
-class SrsKafkaArray : public ISrsCodec
-{
-private:
-    int32_t length;
-    std::vector<T*> elems;
-    typedef typename std::vector<T*>::iterator SrsIterator;
-public:
-    SrsKafkaArray()
-    {
-        length = 0;
-    }
-    virtual ~SrsKafkaArray()
-    {
-        for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
-            T* elem = *it;
-            srs_freep(elem);
-        }
-        elems.clear();
-    }
-public:
-    virtual void append(T* elem)
-    {
-        length++;
-        elems.push_back(elem);
-    }
-    virtual int size()
-    {
-        return length;
-    }
-    virtual bool empty()
-    {
-        return elems.empty();
-    }
-    virtual T* at(int index)
-    {
-        return elems.at(index);
-    }
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes()
-    {
-        int s = 4;
-        
-        for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
-            T* elem = *it;
-            s += elem->nb_bytes();
-        }
-        
-        return s;
-    }
-    virtual srs_error_t encode(SrsBuffer* buf)
-    {
-        srs_error_t err = srs_success;
-        
-        if (!buf->require(4)) {
-            return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left());
-        }
-        buf->write_4bytes(length);
-        
-        for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
-            T* elem = *it;
-            if ((err = elem->encode(buf)) != srs_success) {
-                return srs_error_wrap(err, "encode elem");
-            }
-        }
-        
-        return err;
-    }
-    virtual srs_error_t decode(SrsBuffer* buf)
-    {
-        srs_error_t err = srs_success;
-        
-        if (!buf->require(4)) {
-            return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left());
-        }
-        length = buf->read_4bytes();
-        
-        for (int i = 0; i < length; i++) {
-            T* elem = new T();
-            if ((err = elem->decode(buf)) != srs_success) {
-                srs_freep(elem);
-                return srs_error_wrap(err, "decode elem");
-            }
-            
-            elems.push_back(elem);
-        }
-        
-        return err;
-    }
-};
-template<>
-class SrsKafkaArray<int32_t> : public ISrsCodec
-{
-private:
-    int32_t length;
-    std::vector<int32_t> elems;
-    typedef std::vector<int32_t>::iterator SrsIterator;
-public:
-    SrsKafkaArray()
-    {
-        length = 0;
-    }
-    virtual ~SrsKafkaArray()
-    {
-        elems.clear();
-    }
-public:
-    virtual void append(int32_t elem)
-    {
-        length++;
-        elems.push_back(elem);
-    }
-    virtual int size()
-    {
-        return length;
-    }
-    virtual bool empty()
-    {
-        return elems.empty();
-    }
-    virtual int32_t at(int index)
-    {
-        return elems.at(index);
-    }
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes()
-    {
-        return 4 + 4 * (int)elems.size();
-    }
-    virtual srs_error_t encode(SrsBuffer* buf)
-    {
-        srs_error_t err = srs_success;
-        
-        int nb_required = 4 + sizeof(int32_t) * (int)elems.size();
-        if (!buf->require(nb_required)) {
-            return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires %d only %d bytes", nb_required, buf->left());
-        }
-        buf->write_4bytes(length);
-        
-        for (SrsIterator it = elems.begin(); it != elems.end(); ++it) {
-            int32_t elem = *it;
-            buf->write_4bytes(elem);
-        }
-        
-        return err;
-    }
-    virtual srs_error_t decode(SrsBuffer* buf)
-    {
-        srs_error_t err = srs_success;
-        
-        if (!buf->require(4)) {
-            return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left());
-        }
-        length = buf->read_4bytes();
-        
-        for (int i = 0; i < length; i++) {
-            if (!buf->require(sizeof(int32_t))) {
-                return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires %d only %d bytes", sizeof(int32_t), buf->left());
-                
-            }
-            
-            int32_t elem = buf->read_4bytes();
-            elems.push_back(elem);
-        }
-        
-        return err;
-    }
-};
-
-/**
- * the header of request, includes the size of request.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
- */
-class SrsKafkaRequestHeader : public ISrsCodec
-{
-private:
-    /**
-     * The MessageSize field gives the size of the subsequent request or response
-     * message in bytes. The client can read requests by first reading this 4 byte
-     * size as an integer N, and then reading and parsing the subsequent N bytes
-     * of the request.
-     */
-    int32_t _size;
-private:
-    /**
-     * This is a numeric id for the API being invoked (i.e. is it
-     * a metadata request, a produce request, a fetch request, etc).
-     * @remark MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
-     */
-    int16_t _api_key;
-    /**
-     * This is a numeric version number for this api. We version each API and
-     * this version number allows the server to properly interpret the request
-     * as the protocol evolves. Responses will always be in the format corresponding
-     * to the request version. Currently the supported version for all APIs is 0.
-     */
-    int16_t api_version;
-    /**
-     * This is a user-supplied integer. It will be passed back in
-     * the response by the server, unmodified. It is useful for matching
-     * request and response between the client and server.
-     */
-    int32_t _correlation_id;
-    /**
-     * This is a user supplied identifier for the client application.
-     * The user can use any identifier they like and it will be used
-     * when logging errors, monitoring aggregates, etc. For example,
-     * one might want to monitor not just the requests per second overall,
-     * but the number coming from each client application (each of
-     * which could reside on multiple servers). This id acts as a
-     * logical grouping across all requests from a particular client.
-     */
-    SrsKafkaString* client_id;
-public:
-    SrsKafkaRequestHeader();
-    virtual ~SrsKafkaRequestHeader();
-private:
-    /**
-     * the layout of request:
-     *      +-----------+----------------------------------+
-     *      |  4B _size |      [_size] bytes               |
-     *      +-----------+------------+---------------------+
-     *      |  4B _size |   header   |    message          |
-     *      +-----------+------------+---------------------+
-     *      |  total size = 4 + header + message           |
-     *      +----------------------------------------------+
-     * where the header is specifies this request header without the start 4B size.
-     * @remark size = 4 + header + message.
-     */
-    virtual int header_size();
-    /**
-     * the size of message, the bytes left after the header.
-     */
-    virtual int message_size();
-    /**
-     * the total size of the request, includes the 4B size.
-     */
-    virtual int total_size();
-public:
-    /**
-     * when got the whole message size, update the header.
-     * @param s the whole message, including the 4 bytes size size.
-     */
-    virtual void set_total_size(int s);
-    /**
-     * get the correlation id for message.
-     */
-    virtual int32_t correlation_id();
-    /**
-     * set the correlation id for message.
-     */
-    virtual void set_correlation_id(int32_t cid);
-    /**
-     * get the api key of header for message.
-     */
-    virtual SrsKafkaApiKey api_key();
-    /**
-     * set the api key of header for message.
-     */
-    virtual void set_api_key(SrsKafkaApiKey key);
-public:
-    /**
-     * the api key enumeration.
-     * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys
-     */
-    virtual bool is_producer_request();
-    virtual bool is_fetch_request();
-    virtual bool is_offset_request();
-    virtual bool is_metadata_request();
-    virtual bool is_offset_commit_request();
-    virtual bool is_offset_fetch_request();
-    virtual bool is_consumer_metadata_request();
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * the header of response, include the size of response.
- * The response will always match the paired request (e.g. we will
- * send a MetadataResponse in return to a MetadataRequest).
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Responses
- */
-class SrsKafkaResponseHeader : public ISrsCodec
-{
-private:
-    /**
-     * The MessageSize field gives the size of the subsequent request or response
-     * message in bytes. The client can read requests by first reading this 4 byte
-     * size as an integer N, and then reading and parsing the subsequent N bytes
-     * of the request.
-     */
-    int32_t _size;
-private:
-    /**
-     * This is a user-supplied integer. It will be passed back in
-     * the response by the server, unmodified. It is useful for matching
-     * request and response between the client and server.
-     */
-    int32_t _correlation_id;
-public:
-    SrsKafkaResponseHeader();
-    virtual ~SrsKafkaResponseHeader();
-private:
-    /**
-     * the layout of response:
-     *      +-----------+----------------------------------+
-     *      |  4B _size |      [_size] bytes               |
-     *      +-----------+------------+---------------------+
-     *      |  4B _size |  4B header |    message          |
-     *      +-----------+------------+---------------------+
-     *      |  total size = 4 + 4 + message                |
-     *      +----------------------------------------------+
-     * where the header is specifies this request header without the start 4B size.
-     * @remark size = 4 + 4 + message.
-     */
-    virtual int header_size();
-    /**
-     * the size of message, the bytes left after the header.
-     */
-    virtual int message_size();
-public:
-    /**
-     * the total size of the request, includes the 4B size and message body.
-     */
-    virtual int total_size();
-public:
-    /**
-     * when got the whole message size, update the header.
-     * @param s the whole message, including the 4 bytes size size.
-     */
-    virtual void set_total_size(int s);
-    /**
-     * get the correlation id of response message.
-     */
-    virtual int32_t correlation_id();
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * the kafka message in message set.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
- */
-struct SrsKafkaRawMessage : public ISrsCodec
-{
-    // metadata.
-public:
-    /**
-     * This is the offset used in kafka as the log sequence number. When the
-     * producer is sending messages it doesn't actually know the offset and
-     * can fill in any value here it likes.
-     */
-    int64_t offset;
-    /**
-     * the size of this message.
-     */
-    int32_t message_size;
-    // message.
-public:
-    /**
-     * The CRC is the CRC32 of the remainder of the message bytes.
-     * This is used to check the integrity of the message on the broker and consumer.
-     */
-    int32_t crc;
-    /**
-     * This is a version id used to allow backwards compatible evolution
-     * of the message binary format. The current value is 0.
-     */
-    int8_t magic_byte;
-    /**
-     * This byte holds metadata attributes about the message.
-     * The lowest 2 bits contain the compression codec used
-     * for the message. The other bits should be set to 0.
-     */
-    int8_t attributes;
-    /**
-     * The key is an optional message key that was used for
-     * partition assignment. The key can be null.
-     */
-    SrsKafkaBytes* key;
-    /**
-     * The value is the actual message contents as an opaque byte array.
-     * Kafka supports recursive messages in which case this may itself
-     * contain a message set. The message can be null.
-     */
-    SrsKafkaBytes* value;
-public:
-    SrsKafkaRawMessage();
-    virtual ~SrsKafkaRawMessage();
-public:
-    /**
-     * create message from json object.
-     */
-    virtual srs_error_t create(SrsJsonObject* obj);
-private:
-    /**
-     * get the raw message, bytes after the message_size.
-     */
-    virtual int raw_message_size();
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * a set of kafka message.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
- * @remark because the message set are not preceded by int32, so we decode the buffer util empty.
- */
-class SrsKafkaRawMessageSet : public ISrsCodec
-{
-private:
-    std::vector<SrsKafkaRawMessage*> messages;
-public:
-    SrsKafkaRawMessageSet();
-    virtual ~SrsKafkaRawMessageSet();
-public:
-    virtual void append(SrsKafkaRawMessage* msg);
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * the kafka request message, for protocol to send.
- */
-class SrsKafkaRequest : public ISrsCodec
-{
-protected:
-    SrsKafkaRequestHeader header;
-public:
-    SrsKafkaRequest();
-    virtual ~SrsKafkaRequest();
-public:
-    /**
-     * update the size in header.
-     * @param s an int value specifies the size of message in header.
-     */
-    virtual void update_header(int s);
-    /**
-     * get the correlation id of header for message.
-     */
-    virtual int32_t correlation_id();
-    /**
-     * get the api key of request.
-     */
-    virtual SrsKafkaApiKey api_key();
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * the kafka response message, for protocol to recv.
- */
-class SrsKafkaResponse : public ISrsCodec
-{
-protected:
-    SrsKafkaResponseHeader header;
-public:
-    SrsKafkaResponse();
-    virtual ~SrsKafkaResponse();
-public:
-    /**
-     * update the size in header.
-     * @param s an int value specifies the size of message in header.
-     */
-    virtual void update_header(int s);
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * request the metadata from broker.
- * This API answers the following questions:
- *      What topics exist?
- *      How many partitions does each topic have?
- *      Which broker is currently the leader for each partition?
- *      What is the host and port for each of these brokers?
- * This is the only request that can be addressed to any broker in the cluster.
- *
- * Since there may be many topics the client can give an optional list of topic
- * names in order to only return metadata for a subset of topics.
- *
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
- */
-class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest
-{
-private:
-    SrsKafkaArray<SrsKafkaString> topics;
-public:
-    SrsKafkaTopicMetadataRequest();
-    virtual ~SrsKafkaTopicMetadataRequest();
-public:
-    virtual void add_topic(std::string topic);
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * the metadata response data.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse
- */
-struct SrsKafkaBroker : public ISrsCodec
-{
-public:
-    int32_t node_id;
-    SrsKafkaString host;
-    int32_t port;
-public:
-    SrsKafkaBroker();
-    virtual ~SrsKafkaBroker();
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-struct SrsKafkaPartitionMetadata : public ISrsCodec
-{
-public:
-    int16_t error_code;
-    int32_t partition_id;
-    int32_t leader;
-    SrsKafkaArray<int32_t> replicas;
-    SrsKafkaArray<int32_t> isr;
-public:
-    SrsKafkaPartitionMetadata();
-    virtual ~SrsKafkaPartitionMetadata();
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-struct SrsKafkaTopicMetadata : public ISrsCodec
-{
-public:
-    int16_t error_code;
-    SrsKafkaString name;
-    SrsKafkaArray<SrsKafkaPartitionMetadata> metadatas;
-public:
-    SrsKafkaTopicMetadata();
-    virtual ~SrsKafkaTopicMetadata();
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * response for the metadata request from broker.
- * The response contains metadata for each partition,
- * with partitions grouped together by topic. This
- * metadata refers to brokers by their broker id.
- * The brokers each have a host and port.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse
- */
-class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse
-{
-public:
-    SrsKafkaArray<SrsKafkaBroker> brokers;
-    SrsKafkaArray<SrsKafkaTopicMetadata> metadatas;
-public:
-    SrsKafkaTopicMetadataResponse();
-    virtual ~SrsKafkaTopicMetadataResponse();
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-
-/**
- * the messages for producer to send.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest
- */
-struct SrsKafkaProducerPartitionMessages : public ISrsCodec
-{
-public:
-    /**
-     * The partition that data is being published to.
-     */
-    int32_t partition;
-    /**
-     * The size, in bytes, of the message set that follows.
-     */
-    int32_t message_set_size;
-    /**
-     * messages in set.
-     */
-    SrsKafkaRawMessageSet messages;
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-struct SrsKafkaProducerTopicMessages : public ISrsCodec
-{
-public:
-    /**
-     * The topic that data is being published to.
-     */
-    SrsKafkaString topic_name;
-    /**
-     * messages of partitions.
-     */
-    SrsKafkaArray<SrsKafkaProducerPartitionMessages> partitions;
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * the request for producer to send message.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest
- */
-class SrsKafkaProducerRequest : public SrsKafkaRequest
-{
-public:
-    /**
-     * This field indicates how many acknowledgements the servers should receive
-     * before responding to the request. If it is 0 the server will not send any
-     * response (this is the only case where the server will not reply to a request).
-     * If it is 1, the server will wait the data is written to the local log
-     * before sending a response. If it is -1 the server will block until the
-     * message is committed by all in sync replicas before sending a response.
-     * For any number > 1 the server will block waiting for this number of
-     * acknowledgements to occur (but the server will never wait for more
-     * acknowledgements than there are in-sync replicas).
-     */
-    int16_t required_acks;
-    /**
-     * This provides a maximum time in milliseconds the server can await the receipt
-     * of the number of acknowledgements in RequiredAcks. The timeout is not an exact
-     * limit on the request time for a few reasons: (1) it does not include network
-     * latency, (2) the timer begins at the beginning of the processing of this request
-     * so if many requests are queued due to server overload that wait time will not
-     * be included, (3) we will not terminate a local write so if the local write
-     * time exceeds this timeout it will not be respected. To get a hard timeout of
-     * this type the client should use the socket timeout.
-     */
-    int32_t timeout;
-    /**
-     * messages of topics.
-     */
-    SrsKafkaArray<SrsKafkaProducerTopicMessages> topics;
-public:
-    SrsKafkaProducerRequest();
-    virtual ~SrsKafkaProducerRequest();
-// Interface ISrsCodec
-public:
-    virtual int nb_bytes();
-    virtual srs_error_t encode(SrsBuffer* buf);
-    virtual srs_error_t decode(SrsBuffer* buf);
-};
-
-/**
- * the poll to discovery reponse.
- * @param CorrelationId This is a user-supplied integer. It will be passed back
- *          in the response by the server, unmodified. It is useful for matching
- *          request and response between the client and server.
- * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests
- */
-class SrsKafkaCorrelationPool
-{
-private:
-    static SrsKafkaCorrelationPool* _instance;
-public:
-    static SrsKafkaCorrelationPool* instance();
-private:
-    std::map<int32_t, SrsKafkaApiKey> correlation_ids;
-private:
-    SrsKafkaCorrelationPool();
-public:
-    virtual ~SrsKafkaCorrelationPool();
-public:
-    /**
-     * generate a global correlation id.
-     */
-    virtual int32_t generate_correlation_id();
-    /**
-     * set the correlation id to specified request key.
-     */
-    virtual SrsKafkaApiKey set(int32_t correlation_id, SrsKafkaApiKey request);
-    /**
-     * unset the correlation id.
-     * @return the previous api key; unknown if not set.
-     */
-    virtual SrsKafkaApiKey unset(int32_t correlation_id);
-    /**
-     * get the key by specified correlation id.
-     * @return the specified api key; unknown if no correlation id.
-     */
-    virtual SrsKafkaApiKey get(int32_t correlation_id);
-};
-
-/**
- * the kafka protocol stack, use to send and recv kakfa messages.
- */
-class SrsKafkaProtocol
-{
-private:
-    ISrsProtocolReadWriter* skt;
-    SrsFastStream* reader;
-public:
-    SrsKafkaProtocol(ISrsProtocolReadWriter* io);
-    virtual ~SrsKafkaProtocol();
-public:
-    /**
-     * write the message to kafka server.
-     * @param msg the msg to send. user must not free it again.
-     */
-    virtual srs_error_t send_and_free_message(SrsKafkaRequest* msg);
-    /**
-     * read the message from kafka server.
-     * @param pmsg output the received message. user must free it.
-     */
-    virtual srs_error_t recv_message(SrsKafkaResponse** pmsg);
-public:
-    /**
-     * expect specified message.
-     */
-    template<typename T>
-    srs_error_t expect_message(T** pmsg)
-    {
-        srs_error_t err = srs_success;
-        
-        while (true) {
-            SrsKafkaResponse* res = NULL;
-            if ((err = recv_message(&res)) != srs_success) {
-                return srs_error_wrap(err, "recv message");
-            }
-            
-            // drop not matched.
-            T* msg = dynamic_cast<T*>(res);
-            if (!msg) {
-                srs_info("kafka drop response.");
-                srs_freep(res);
-                continue;
-            }
-            
-            *pmsg = msg;
-            break;
-        }
-        
-        return err;
-    }
-};
-
-/**
- * the kafka client, for producer or consumer.
- */
-class SrsKafkaClient
-{
-private:
-    SrsKafkaProtocol* protocol;
-public:
-    SrsKafkaClient(ISrsProtocolReadWriter* io);
-    virtual ~SrsKafkaClient();
-public:
-    /**
-     * fetch the metadata from broker for topic.
-     */
-    virtual srs_error_t fetch_metadata(std::string topic, SrsKafkaTopicMetadataResponse** pmsg);
-    /**
-     * write the messages to partition of topic.
-     */
-    virtual srs_error_t write_messages(std::string topic, int32_t partition, std::vector<SrsJsonObject*>& msgs);
-};
-
-// convert kafka array[string] to vector[string]
-extern std::vector<std::string> srs_kafka_array2vector(SrsKafkaArray<SrsKafkaString>* arr);
-extern std::vector<std::string> srs_kafka_array2vector(SrsKafkaArray<int32_t>* arr);
-
-#endif
-
-#endif
-
diff --git a/trunk/src/utest/srs_utest_kernel.cpp b/trunk/src/utest/srs_utest_kernel.cpp
index bd4f7c4be..2e4e0960c 100644
--- a/trunk/src/utest/srs_utest_kernel.cpp
+++ b/trunk/src/utest/srs_utest_kernel.cpp
@@ -935,6 +935,87 @@ VOID TEST(KernelFLVTest, CoverVodStreamErrorCase)
 	}
 }
 
+VOID TEST(KernelFLVTest, CoverFLVVodSHCase)
+{
+	srs_error_t err;
+
+	if (true) {
+		uint8_t buf[] = {
+			0x12, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00,
+			0x09, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00,
+			0x08, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00
+		};
+
+		MockSrsFileReader r((const char*)buf, sizeof(buf));
+		HELPER_EXPECT_SUCCESS(r.open(""));
+
+		SrsFlvVodStreamDecoder d;
+		HELPER_EXPECT_SUCCESS(d.initialize(&r));
+
+		int64_t start = -1; int size = 0;
+		HELPER_EXPECT_SUCCESS(d.read_sequence_header_summary(&start, &size));
+		EXPECT_EQ(15, start);
+		EXPECT_EQ(30, size);
+	}
+
+	if (true) {
+		uint8_t buf[] = {
+			0x12, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00,
+			0x09, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00,
+			0x09, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00
+		};
+
+		MockSrsFileReader r((const char*)buf, sizeof(buf));
+		HELPER_EXPECT_SUCCESS(r.open(""));
+
+		SrsFlvVodStreamDecoder d;
+		HELPER_EXPECT_SUCCESS(d.initialize(&r));
+
+		int64_t start = -1; int size = 0;
+		HELPER_EXPECT_SUCCESS(d.read_sequence_header_summary(&start, &size));
+		EXPECT_EQ(15, start);
+		EXPECT_EQ(15, size);
+	}
+
+	if (true) {
+		uint8_t buf[] = {
+			0x12, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00,
+			0x08, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00,
+			0x08, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00
+		};
+
+		MockSrsFileReader r((const char*)buf, sizeof(buf));
+		HELPER_EXPECT_SUCCESS(r.open(""));
+
+		SrsFlvVodStreamDecoder d;
+		HELPER_EXPECT_SUCCESS(d.initialize(&r));
+
+		int64_t start = -1; int size = 0;
+		HELPER_EXPECT_SUCCESS(d.read_sequence_header_summary(&start, &size));
+		EXPECT_EQ(15, start);
+		EXPECT_EQ(15, size);
+	}
+
+	if (true) {
+		uint8_t buf[] = {
+			0x12, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00,
+			0x12, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00,
+			0x12, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00, 0x00,0x00,0x00, 0x00,0x00,0x00,0x00
+		};
+
+		MockSrsFileReader r((const char*)buf, sizeof(buf));
+		HELPER_EXPECT_SUCCESS(r.open(""));
+
+		SrsFlvVodStreamDecoder d;
+		HELPER_EXPECT_SUCCESS(d.initialize(&r));
+
+		int64_t start = -1; int size = 0;
+		HELPER_EXPECT_FAILED(d.read_sequence_header_summary(&start, &size));
+		EXPECT_EQ(-1, start);
+		EXPECT_EQ(0, size);
+	}
+}
+
 /**
 * test the flv decoder,
 * exception: file stream not open.