Edge: Fix flv edge crash when http unmount. v6.0.154 v7.0.13 (#4166)

Edge FLV is not working because it is stuck in an infinite loop waiting.
Previously, there was no need to wait for exit since resources were not
being cleaned up. Now, since resources need to be cleaned up, it must
wait for all active connections to exit, which causes this issue.

To reproduce the issue, start SRS edge, run the bellow command and press
`CTRL+C` to stop the request:

```bash
curl http://localhost:8080/live/livestream.flv -v >/dev/null
```

It will cause edge to fetch stream from origin, and free the consumer
when client quit. When `SrsLiveStream::do_serve_http` return, it will
free the consumer:

```cpp
srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) {
    SrsUniquePtr<SrsLiveConsumer> consumer(consumer_raw);
```

Keep in mind that in this moment, the stream is alive, because only set
to not alive after this function return:

```cpp
    alive_viewers_++;
    err = do_serve_http(w, r); // Free 'this' alive stream.
    alive_viewers_--; // Crash here, because 'this' is freed.
```

When freeing the consumer, it will cause the source to unpublish and
attempt to free the HTTP handler, which ultimately waits for the stream
not to be alive:

```cpp
SrsLiveConsumer::~SrsLiveConsumer() {
    source_->on_consumer_destroy(this);

void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer) {
    if (consumers.empty()) {
        play_edge->on_all_client_stop();

void SrsLiveSource::on_unpublish() {
    handler->on_unpublish(req);

void SrsHttpStreamServer::http_unmount(SrsRequest* r) {
    if (stream->entry) stream->entry->enabled = false;

    for (; i < 1024; i++) {
        if (!cache->alive() && !stream->alive()) {
            break;
        }
        srs_usleep(100 * SRS_UTIME_MILLISECONDS);
    }
```

After 120 seconds, it will free the stream and cause SRS to crash
because the stream is still active. In order to track this potential
issue, also add an important warning log:

```cpp
srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive());
```

SRS may crash if got this log.

---------

Co-authored-by: Jacob Su <suzp1984@gmail.com>
pull/4174/head
Winlin 5 months ago committed by GitHub
parent a7aa2eaf76
commit 740f0d38ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a> <a name="v7-changes"></a>
## SRS 7.0 Changelog ## SRS 7.0 Changelog
* v7.0, 2024-09-01, Merge [#4166](https://github.com/ossrs/srs/pull/4166): Edge: Fix flv edge crash when http unmount. v7.0.13 (#4166)
* v7.0, 2024-08-31, Merge [#4162](https://github.com/ossrs/srs/pull/4162): Fix #3767: RTMP: Do not response empty data packet. v7.0.12 (#4162) * v7.0, 2024-08-31, Merge [#4162](https://github.com/ossrs/srs/pull/4162): Fix #3767: RTMP: Do not response empty data packet. v7.0.12 (#4162)
* v7.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v7.0.11 (#4164) * v7.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v7.0.11 (#4164)
* v7.0, 2024-08-24, Merge [#4157](https://github.com/ossrs/srs/pull/4157): Fix crash when quiting. v7.0.10 (#4157) * v7.0, 2024-08-24, Merge [#4157](https://github.com/ossrs/srs/pull/4157): Fix crash when quiting. v7.0.10 (#4157)
@ -24,6 +25,7 @@ The changelog for SRS.
<a name="v6-changes"></a> <a name="v6-changes"></a>
## SRS 6.0 Changelog ## SRS 6.0 Changelog
* v6.0, 2024-09-01, Merge [#4166](https://github.com/ossrs/srs/pull/4166): Edge: Fix flv edge crash when http unmount. v6.0.154 (#4166)
* v6.0, 2024-08-31, Merge [#4162](https://github.com/ossrs/srs/pull/4162): Fix #3767: RTMP: Do not response empty data packet. v6.0.153 (#4162) * v6.0, 2024-08-31, Merge [#4162](https://github.com/ossrs/srs/pull/4162): Fix #3767: RTMP: Do not response empty data packet. v6.0.153 (#4162)
* v6.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v6.0.152 (#4164) * v6.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v6.0.152 (#4164)
* v6.0, 2024-08-24, Merge [#4157](https://github.com/ossrs/srs/pull/4157): Fix crash when quiting. v6.0.151 (#4157) * v6.0, 2024-08-24, Merge [#4157](https://github.com/ossrs/srs/pull/4157): Fix crash when quiting. v6.0.151 (#4157)

@ -636,11 +636,26 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
return srs_error_wrap(err, "http hook"); return srs_error_wrap(err, "http hook");
} }
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}
// Create consumer of source, ignore gop cache, use the audio gop cache.
SrsLiveConsumer* consumer_raw = NULL;
if ((err = live_source->create_consumer(consumer_raw)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
// When freeing the consumer, it may trigger the source unpublishing for edge. This will trigger the http
// unmount, which waiting for all http live stream to dispose, so we should free the consumer when this
// object is not alive.
SrsUniquePtr<SrsLiveConsumer> consumer(consumer_raw);
// Add the viewer to the viewers list. // Add the viewer to the viewers list.
viewers_.push_back(hc); viewers_.push_back(hc);
// Serve the viewer connection. // Serve the viewer connection.
err = do_serve_http(w, r); err = do_serve_http(live_source.get(), consumer.get(), w, r);
// Remove viewer from the viewers list. // Remove viewer from the viewers list.
vector<ISrsExpire*>::iterator it = std::find(viewers_.begin(), viewers_.end(), hc); vector<ISrsExpire*>::iterator it = std::find(viewers_.begin(), viewers_.end(), hc);
@ -667,7 +682,7 @@ void SrsLiveStream::expire()
} }
} }
srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) srs_error_t SrsLiveStream::do_serve_http(SrsLiveSource* source, SrsLiveConsumer* consumer, ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -711,19 +726,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// Enter chunked mode, because we didn't set the content-length. // Enter chunked mode, because we didn't set the content-length.
w->write_header(SRS_CONSTS_HTTP_OK); w->write_header(SRS_CONSTS_HTTP_OK);
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req); if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) {
if (!live_source.get()) {
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
}
// create consumer of souce, ignore gop cache, use the audio gop cache.
SrsLiveConsumer* consumer_raw = NULL;
if ((err = live_source->create_consumer(consumer_raw)) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
SrsUniquePtr<SrsLiveConsumer> consumer(consumer_raw);
if ((err = live_source->consumer_dumps(consumer.get(), true, true, !enc->has_cache())) != srs_success) {
return srs_error_wrap(err, "dumps consumer"); return srs_error_wrap(err, "dumps consumer");
} }
@ -744,7 +747,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
// if gop cache enabled for encoder, dump to consumer. // if gop cache enabled for encoder, dump to consumer.
if (enc->has_cache()) { if (enc->has_cache()) {
if ((err = enc->dump_cache(consumer.get(), live_source->jitter())) != srs_success) { if ((err = enc->dump_cache(consumer, source->jitter())) != srs_success) {
return srs_error_wrap(err, "encoder dump cache"); return srs_error_wrap(err, "encoder dump cache");
} }
} }
@ -1106,6 +1109,10 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r)
srs_usleep(100 * SRS_UTIME_MILLISECONDS); srs_usleep(100 * SRS_UTIME_MILLISECONDS);
} }
if (cache->alive() || stream->alive()) {
srs_warn("http: try to free a alive stream, cache=%d, stream=%d", cache->alive(), stream->alive());
}
// Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and // Unmount the HTTP handler, which will free the entry. Note that we must free it after cache and
// stream stopped for it uses it. // stream stopped for it uses it.
mux.unhandle(entry->mount, stream.get()); mux.unhandle(entry->mount, stream.get());

@ -199,7 +199,7 @@ public:
public: public:
virtual void expire(); virtual void expire();
private: private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); virtual srs_error_t do_serve_http(SrsLiveSource* source, SrsLiveConsumer* consumer, ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual srs_error_t http_hooks_on_play(ISrsHttpMessage* r); virtual srs_error_t http_hooks_on_play(ISrsHttpMessage* r);
virtual void http_hooks_on_stop(ISrsHttpMessage* r); virtual void http_hooks_on_stop(ISrsHttpMessage* r);
virtual srs_error_t streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs); virtual srs_error_t streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs);

@ -253,7 +253,7 @@ public:
void set_publish_stream(ISrsRtcPublishStream* v); void set_publish_stream(ISrsRtcPublishStream* v);
// Consume the shared RTP packet, user must free it. // Consume the shared RTP packet, user must free it.
srs_error_t on_rtp(SrsRtpPacket* pkt); srs_error_t on_rtp(SrsRtpPacket* pkt);
// Set and get stream description for souce // Set and get stream description for source
bool has_stream_desc(); bool has_stream_desc();
void set_stream_desc(SrsRtcSourceDescription* stream_desc); void set_stream_desc(SrsRtcSourceDescription* stream_desc);
std::vector<SrsRtcTrackDescription*> get_track_desc(std::string type, std::string media_type); std::vector<SrsRtcTrackDescription*> get_track_desc(std::string type, std::string media_type);

@ -55,7 +55,7 @@ extern std::string srs_path_build_timestamp(std::string template_path);
// @return an int error code. // @return an int error code.
extern srs_error_t srs_kill_forced(int& pid); extern srs_error_t srs_kill_forced(int& pid);
// Current process resouce usage. // Current process resource usage.
// @see: man getrusage // @see: man getrusage
class SrsRusage class SrsRusage
{ {

@ -9,6 +9,6 @@
#define VERSION_MAJOR 6 #define VERSION_MAJOR 6
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 153 #define VERSION_REVISION 154
#endif #endif

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7 #define VERSION_MAJOR 7
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 12 #define VERSION_REVISION 13
#endif #endif
Loading…
Cancel
Save