diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp index 287751585..b4c96129a 100644 --- a/trunk/src/app/srs_app_kafka.cpp +++ b/trunk/src/app/srs_app_kafka.cpp @@ -383,6 +383,13 @@ void SrsKafkaProducer::stop() int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) { + int ret = ERROR_SUCCESS; + + bool enabled = _srs_config->get_kafka_enabled(); + if (!enabled) { + return ret; + } + SrsJsonObject* obj = SrsJsonAny::object(); obj->set("msg", SrsJsonAny::str("accept")); @@ -392,6 +399,22 @@ int SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) return worker->execute(new SrsKafkaMessage(this, key, obj)); } +int SrsKafkaProducer::on_close(int key) +{ + int ret = ERROR_SUCCESS; + + bool enabled = _srs_config->get_kafka_enabled(); + if (!enabled) { + return ret; + } + + SrsJsonObject* obj = SrsJsonAny::object(); + + obj->set("msg", SrsJsonAny::str("close")); + + return worker->execute(new SrsKafkaMessage(this, key, obj)); +} + int SrsKafkaProducer::send(int key, SrsJsonObject* obj) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp index 77a4c217c..b42f1c9d4 100644 --- a/trunk/src/app/srs_app_kafka.hpp +++ b/trunk/src/app/srs_app_kafka.hpp @@ -137,11 +137,16 @@ public: public: /** * when got any client connect to SRS, notify kafka. - * @param key the partition map key, a id or hash. + * @param key the partition map key, the client id or hash(ip). * @param type the type of client. * @param ip the peer ip of client. */ virtual int on_client(int key, SrsListenerType type, std::string ip) = 0; + /** + * when client close or disconnect for error. + * @param key the partition map key, the client id or hash(ip). + */ + virtual int on_close(int key) = 0; }; /** @@ -168,11 +173,10 @@ public: virtual int initialize(); virtual int start(); virtual void stop(); +// interface ISrsKafkaCluster public: - /** - * when got any client connect to SRS, notify kafka. - */ virtual int on_client(int key, SrsListenerType type, std::string ip); + virtual int on_close(int key); // for worker to call task to send object. public: /** diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 0ab1b58df..777f50c61 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1555,6 +1555,13 @@ int SrsRtmpConn::on_disconnect() int ret = ERROR_SUCCESS; http_hooks_on_close(); + +#ifdef SRS_AUTO_KAFKA + if ((ret = kafka->on_close(srs_id())) != ERROR_SUCCESS) { + srs_error("notify kafka failed. ret=%d", ret); + return ret; + } +#endif // TODO: implements it.