diff --git a/README.md b/README.md index ed98922bc..d6e5d070c 100755 --- a/README.md +++ b/README.md @@ -348,7 +348,8 @@ Remark: ### SRS 2.0 history -* v2.0, 2015-05-30, fix [#420](https://github.com/simple-rtmp-server/srs/issues/420) remove ts for hls ram mode. +* v2.0, 2015-06-06, fix [#421](https://github.com/simple-rtmp-server/srs/issues/421) drop video for unkown RTMP header. +* v2.0, 2015-06-05, fix [#420](https://github.com/simple-rtmp-server/srs/issues/420) remove ts for hls ram mode. * v2.0, 2015-05-30, fix [#209](https://github.com/simple-rtmp-server/srs/issues/209) cleanup hls when stop and timeout. 2.0.173. * v2.0, 2015-05-29, fix [#409](https://github.com/simple-rtmp-server/srs/issues/409) support pure video hls. 2.0.172. * v2.0, 2015-05-28, support [srs-dolphin][srs-dolphin], the multiple-process SRS. diff --git a/trunk/src/app/srs_app_ingest.cpp b/trunk/src/app/srs_app_ingest.cpp index 2a5293e95..51803c05d 100644 --- a/trunk/src/app/srs_app_ingest.cpp +++ b/trunk/src/app/srs_app_ingest.cpp @@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #ifdef SRS_AUTO_INGEST +#include using namespace std; #include @@ -39,11 +40,9 @@ using namespace std; // ingest never sleep a long time, for we must start the stream ASAP. #define SRS_AUTO_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL) -SrsIngesterFFMPEG::SrsIngesterFFMPEG(SrsFFMPEG* _ffmpeg, string _vhost, string _id) +SrsIngesterFFMPEG::SrsIngesterFFMPEG() { - ffmpeg = _ffmpeg; - vhost = _vhost; - id = _id; + ffmpeg = NULL; } SrsIngesterFFMPEG::~SrsIngesterFFMPEG() @@ -51,6 +50,53 @@ SrsIngesterFFMPEG::~SrsIngesterFFMPEG() srs_freep(ffmpeg); } +int SrsIngesterFFMPEG::initialize(SrsFFMPEG* ff, string v, string i) +{ + int ret = ERROR_SUCCESS; + + ffmpeg = ff; + vhost = v; + id = i; + starttime = srs_get_system_time_ms(); + + return ret; +} + +string SrsIngesterFFMPEG::uri() +{ + return vhost + "/" + id; +} + +int SrsIngesterFFMPEG::alive() +{ + return (int)(srs_get_system_time_ms() - starttime); +} + +bool SrsIngesterFFMPEG::equals(string v) +{ + return vhost == v; +} + +bool SrsIngesterFFMPEG::equals(string v, string i) +{ + return vhost == v && id == i; +} + +int SrsIngesterFFMPEG::start() +{ + return ffmpeg->start(); +} + +void SrsIngesterFFMPEG::stop() +{ + ffmpeg->stop(); +} + +int SrsIngesterFFMPEG::cycle() +{ + return ffmpeg->cycle(); +} + void SrsIngesterFFMPEG::fast_stop() { ffmpeg->fast_stop(); @@ -129,6 +175,8 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest // get all engines. std::vector engines = _srs_config->get_transcode_engines(ingest); + + // create ingesters without engines. if (engines.empty()) { SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != ERROR_SUCCESS) { @@ -139,12 +187,17 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest return ret; } - SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(ffmpeg, vhost->arg0(), ingest->arg0()); + SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(); + if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) { + srs_freep(ingester); + return ret; + } + ingesters.push_back(ingester); return ret; } - // create engine + // create ingesters with engine for (int i = 0; i < (int)engines.size(); i++) { SrsConfDirective* engine = engines[i]; SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin); @@ -156,8 +209,13 @@ int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest } return ret; } + + SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(); + if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) { + srs_freep(ingester); + return ret; + } - SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG(ffmpeg, vhost->arg0(), ingest->arg0()); ingesters.push_back(ingester); } @@ -196,13 +254,13 @@ int SrsIngester::cycle() SrsIngesterFFMPEG* ingester = *it; // start all ffmpegs. - if ((ret = ingester->ffmpeg->start()) != ERROR_SUCCESS) { + if ((ret = ingester->start()) != ERROR_SUCCESS) { srs_error("ingest ffmpeg start failed. ret=%d", ret); return ret; } // check ffmpeg status. - if ((ret = ingester->ffmpeg->cycle()) != ERROR_SUCCESS) { + if ((ret = ingester->cycle()) != ERROR_SUCCESS) { srs_error("ingest ffmpeg cycle failed. ret=%d", ret); return ret; } @@ -376,11 +434,14 @@ void SrsIngester::show_ingest_log_message() return; } + // random choose one ingester to report. + int index = rand() % (int)ingesters.size(); + SrsIngesterFFMPEG* ingester = ingesters.at(index); + // reportable if (pprint->can_print()) { - // TODO: FIXME: show more info. - srs_trace("-> "SRS_CONSTS_LOG_INGESTER - " time=%"PRId64", ingesters=%d", pprint->age(), (int)ingesters.size()); + srs_trace("-> "SRS_CONSTS_LOG_INGESTER" time=%"PRId64", ingesters=%d, #%d(alive=%ds, %s)", + pprint->age(), (int)ingesters.size(), index, ingester->alive() / 1000, ingester->uri().c_str()); } } @@ -407,16 +468,15 @@ int SrsIngester::on_reload_vhost_removed(string vhost) for (it = ingesters.begin(); it != ingesters.end();) { SrsIngesterFFMPEG* ingester = *it; - if (ingester->vhost != vhost) { + if (!ingester->equals(vhost)) { ++it; continue; } // stop the ffmpeg and free it. - ingester->ffmpeg->stop(); + ingester->stop(); - srs_trace("reload stop ingester, " - "vhost=%s, id=%s", vhost.c_str(), ingester->id.c_str()); + srs_trace("reload stop ingester, vhost=%s, id=%s", vhost.c_str(), ingester->uri().c_str()); srs_freep(ingester); @@ -436,16 +496,15 @@ int SrsIngester::on_reload_ingest_removed(string vhost, string ingest_id) for (it = ingesters.begin(); it != ingesters.end();) { SrsIngesterFFMPEG* ingester = *it; - if (ingester->vhost != vhost || ingester->id != ingest_id) { + if (!ingester->equals(vhost, ingest_id)) { ++it; continue; } // stop the ffmpeg and free it. - ingester->ffmpeg->stop(); + ingester->stop(); - srs_trace("reload stop ingester, " - "vhost=%s, id=%s", vhost.c_str(), ingester->id.c_str()); + srs_trace("reload stop ingester, vhost=%s, id=%s", vhost.c_str(), ingester->uri().c_str()); srs_freep(ingester); diff --git a/trunk/src/app/srs_app_ingest.hpp b/trunk/src/app/srs_app_ingest.hpp index ede1e5211..15137d035 100644 --- a/trunk/src/app/srs_app_ingest.hpp +++ b/trunk/src/app/srs_app_ingest.hpp @@ -45,14 +45,26 @@ class SrsPithyPrint; */ class SrsIngesterFFMPEG { -public: +private: std::string vhost; std::string id; SrsFFMPEG* ffmpeg; - - SrsIngesterFFMPEG(SrsFFMPEG* _ffmpeg, std::string _vhost, std::string _id); + int64_t starttime; +public: + SrsIngesterFFMPEG(); virtual ~SrsIngesterFFMPEG(); - +public: + virtual int initialize(SrsFFMPEG* ff, std::string v, std::string i); + // the ingest uri, [vhost]/[ingest id] + virtual std::string uri(); + // the alive in ms. + virtual int alive(); + virtual bool equals(std::string v, std::string i); + virtual bool equals(std::string v); +public: + virtual int start(); + virtual void stop(); + virtual int cycle(); // @see SrsFFMPEG.fast_stop(). virtual void fast_stop(); }; diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index 9a4dcb5e8..b986b8bd6 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -61,8 +61,9 @@ public: */ virtual bool can_handle() = 0; /** - * process the received message. - */ + * process the received message. + * @remark user must free this message. + */ virtual int handle(SrsCommonMessage* msg) = 0; /** * when recv message error. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index e2613343a..b08f01d81 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -916,7 +916,6 @@ int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg srs_error("fmle decode unpublish message failed. ret=%d", ret); return ret; } - SrsAutoFree(SrsPacket, pkt); // for flash, any packet is republish. diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 2aa5e157e..042913b51 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -604,15 +604,15 @@ int SrsGopCache::cache(SrsSharedPtrMessage* shared_msg) // the gop cache know when to gop it. SrsSharedPtrMessage* msg = shared_msg; - - // disable gop cache when not h.264 - if (!SrsFlvCodec::video_is_h264(msg->payload, msg->size)) { - srs_info("gop donot cache video for none h.264"); - return ret; - } // got video, update the video count if acceptable if (msg->is_video()) { + // drop video when not h.264 + if (!SrsFlvCodec::video_is_h264(msg->payload, msg->size)) { + srs_info("gop cache drop video for none h.264"); + return ret; + } + cached_video_count++; audio_after_last_video_count = 0; } @@ -1464,11 +1464,25 @@ int SrsSource::on_audio(SrsCommonMessage* shared_audio) } srs_info("Audio dts=%"PRId64", size=%d", msg.timestamp, msg.size); + // directly process the audio message. if (!mix_correct) { return on_audio_imp(&msg); } - return do_mix_correct(&msg); + // insert msg to the queue. + mix_queue->push(msg.copy()); + + // fetch someone from mix queue. + SrsSharedPtrMessage* m = mix_queue->pop(); + if (!m) { + return ret; + } + + // consume the monotonically increase message. + ret = on_audio_imp(m); + srs_freep(m); + + return ret; } int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) @@ -1619,6 +1633,18 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) { int ret = ERROR_SUCCESS; + // drop any unknown header video. + // @see https://github.com/simple-rtmp-server/srs/issues/421 + if (!SrsFlvCodec::video_is_acceptable(shared_video->payload, shared_video->size)) { + char b0 = 0x00; + if (shared_video->size > 0) { + b0 = shared_video->payload[0]; + } + + srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0); + return ret; + } + // convert shared_video to msg, user should not use shared_video again. // the payload is transfer to msg, and set to NULL in shared_video. SrsSharedPtrMessage msg; @@ -1628,11 +1654,26 @@ int SrsSource::on_video(SrsCommonMessage* shared_video) } srs_info("Video dts=%"PRId64", size=%d", msg.timestamp, msg.size); + // directly process the audio message. if (!mix_correct) { return on_video_imp(&msg); } - return do_mix_correct(&msg); + // insert msg to the queue. + mix_queue->push(msg.copy()); + + // fetch someone from mix queue. + SrsSharedPtrMessage* m = mix_queue->pop(); + if (!m) { + return ret; + } + SrsAutoFree(SrsSharedPtrMessage, m); + + // consume the monotonically increase message. + ret = on_video_imp(m); + srs_freep(m); + + return ret; } int SrsSource::on_video_imp(SrsSharedPtrMessage* msg) @@ -1766,29 +1807,6 @@ int SrsSource::on_video_imp(SrsSharedPtrMessage* msg) return ret; } -int SrsSource::do_mix_correct(SrsSharedPtrMessage* msg) -{ - int ret = ERROR_SUCCESS; - - // insert msg to the queue. - mix_queue->push(msg->copy()); - - // fetch someone from mix queue. - SrsSharedPtrMessage* m = mix_queue->pop(); - if (!m) { - return ret; - } - SrsAutoFree(SrsSharedPtrMessage, m); - - // consume the monotonically increase message. - if (m->is_audio()) { - return on_audio_imp(m); - } - - srs_assert(m->is_video()); - return on_video_imp(m); -} - int SrsSource::on_aggregate(SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index f7a75d4ac..1f6d1c0fc 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -536,8 +536,6 @@ public: virtual int on_video(SrsCommonMessage* video); private: virtual int on_video_imp(SrsSharedPtrMessage* video); -private: - virtual int do_mix_correct(SrsSharedPtrMessage* msg); public: virtual int on_aggregate(SrsCommonMessage* msg); /** diff --git a/trunk/src/kernel/srs_kernel_codec.cpp b/trunk/src/kernel/srs_kernel_codec.cpp index 21981506c..0436c25fa 100644 --- a/trunk/src/kernel/srs_kernel_codec.cpp +++ b/trunk/src/kernel/srs_kernel_codec.cpp @@ -266,6 +266,28 @@ bool SrsFlvCodec::audio_is_aac(char* data, int size) return sound_format == SrsCodecAudioAAC; } +bool SrsFlvCodec::video_is_acceptable(char* data, int size) +{ + // 1bytes required. + if (size < 1) { + return false; + } + + char frame_type = data[0]; + char codec_id = frame_type & 0x0f; + frame_type = (frame_type >> 4) & 0x0f; + + if (frame_type < 1 || frame_type > 5) { + return false; + } + + if (codec_id < 2 || codec_id > 7) { + return false; + } + + return true; +} + string srs_codec_avc_nalu2str(SrsAvcNaluType nalu_type) { switch (nalu_type) { diff --git a/trunk/src/kernel/srs_kernel_codec.hpp b/trunk/src/kernel/srs_kernel_codec.hpp index 702106101..238feac76 100644 --- a/trunk/src/kernel/srs_kernel_codec.hpp +++ b/trunk/src/kernel/srs_kernel_codec.hpp @@ -222,6 +222,12 @@ public: * check codec aac. */ static bool audio_is_aac(char* data, int size); + /** + * check the video RTMP/flv header info, + * @return true if video RTMP/flv header is ok. + * @remark all type of audio is possible, no need to check audio. + */ + static bool video_is_acceptable(char* data, int size); }; /** diff --git a/trunk/src/protocol/srs_protocol_buffer.cpp b/trunk/src/protocol/srs_protocol_buffer.cpp index 24e64a48f..176131282 100644 --- a/trunk/src/protocol/srs_protocol_buffer.cpp +++ b/trunk/src/protocol/srs_protocol_buffer.cpp @@ -97,8 +97,8 @@ void SrsFastBuffer::set_buffer(int buffer_size) } // realloc for buffer change bigger. - int start = p - buffer; - int nb_bytes = end - p; + int start = (int)(p - buffer); + int nb_bytes = (int)(end - p); buffer = (char*)realloc(buffer, nb_resize_buf); nb_buffer = nb_resize_buf;