HTTP-FLV: Notify connection to expire when unpublishing. v6.0.152 v7.0.11 (#4164)

When stopping the stream, it will wait for the HTTP Streaming to exit.
If the HTTP Streaming goroutine hangs, it will not exit automatically.

```cpp
void SrsHttpStreamServer::http_unmount(SrsRequest* r)
{
    SrsUniquePtr<SrsLiveStream> stream(entry->stream);
    if (stream->entry) stream->entry->enabled = false;
    srs_usleep(...); // Wait for about 120s.
    mux.unhandle(entry->mount, stream.get()); // Free stream.
}

srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
    err = do_serve_http(w, r); // If stuck in here for 120s+
    alive_viewers_--; // Crash at here, because stream has been deleted.
```

We should notify http stream connection to interrupt(expire):

```cpp
void SrsHttpStreamServer::http_unmount(SrsRequest* r)
{
    SrsUniquePtr<SrsLiveStream> stream(entry->stream);
    if (stream->entry) stream->entry->enabled = false;
    stream->expire(); // Notify http stream to interrupt.
```

Note that we should notify all viewers pulling stream from this http
stream.

Note that we have tried to fix this issue, but only try to wait for all
viewers to quit, without interrupting the viewers, see
https://github.com/ossrs/srs/pull/4144


---------

Co-authored-by: Jacob Su <suzp1984@gmail.com>
pull/4162/head
Winlin 5 months ago committed by GitHub
parent f8319d6b6d
commit 05c3a422a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v7-changes"></a> <a name="v7-changes"></a>
## SRS 7.0 Changelog ## SRS 7.0 Changelog
* v7.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v7.0.11 (#4164)
* v7.0, 2024-08-24, Merge [#4157](https://github.com/ossrs/srs/pull/4157): Fix crash when quiting. v7.0.10 (#4157) * v7.0, 2024-08-24, Merge [#4157](https://github.com/ossrs/srs/pull/4157): Fix crash when quiting. v7.0.10 (#4157)
* v7.0, 2024-08-24, Merge [#4156](https://github.com/ossrs/srs/pull/4156): Build: Fix srs_mp4_parser compiling error. v7.0.9 (#4156) * v7.0, 2024-08-24, Merge [#4156](https://github.com/ossrs/srs/pull/4156): Build: Fix srs_mp4_parser compiling error. v7.0.9 (#4156)
* v7.0, 2024-08-22, Merge [#4154](https://github.com/ossrs/srs/pull/4154): ASAN: Disable memory leak detection by default. v7.0.8 (#4154) * v7.0, 2024-08-22, Merge [#4154](https://github.com/ossrs/srs/pull/4154): ASAN: Disable memory leak detection by default. v7.0.8 (#4154)
@ -22,6 +23,7 @@ The changelog for SRS.
<a name="v6-changes"></a> <a name="v6-changes"></a>
## SRS 6.0 Changelog ## SRS 6.0 Changelog
* v6.0, 2024-08-31, Merge [#4164](https://github.com/ossrs/srs/pull/4164): HTTP-FLV: Notify connection to expire when unpublishing. v6.0.152 (#4164)
* v6.0, 2024-08-24, Merge [#4157](https://github.com/ossrs/srs/pull/4157): Fix crash when quiting. v6.0.151 (#4157) * v6.0, 2024-08-24, Merge [#4157](https://github.com/ossrs/srs/pull/4157): Fix crash when quiting. v6.0.151 (#4157)
* v6.0, 2024-08-24, Merge [#4156](https://github.com/ossrs/srs/pull/4156): Build: Fix srs_mp4_parser compiling error. v6.0.150 (#4156) * v6.0, 2024-08-24, Merge [#4156](https://github.com/ossrs/srs/pull/4156): Build: Fix srs_mp4_parser compiling error. v6.0.150 (#4156)
* v6.0, 2024-08-21, Merge [#4150](https://github.com/ossrs/srs/pull/4150): API: Support new HTTP API for VALGRIND. v6.0.149 (#4150) * v6.0, 2024-08-21, Merge [#4150](https://github.com/ossrs/srs/pull/4150): API: Support new HTTP API for VALGRIND. v6.0.149 (#4150)

@ -1206,8 +1206,6 @@ SrsGoApiSignal::~SrsGoApiSignal()
srs_error_t SrsGoApiSignal::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) srs_error_t SrsGoApiSignal::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{ {
srs_error_t err = srs_success;
std::string signal = r->query_get("signo"); std::string signal = r->query_get("signo");
srs_trace("query signo=%s", signal.c_str()); srs_trace("query signo=%s", signal.c_str());

@ -583,13 +583,15 @@ SrsLiveStream::SrsLiveStream(SrsRequest* r, SrsBufferCache* c)
cache = c; cache = c;
req = r->copy()->as_http(); req = r->copy()->as_http();
security_ = new SrsSecurity(); security_ = new SrsSecurity();
alive_viewers_ = 0;
} }
SrsLiveStream::~SrsLiveStream() SrsLiveStream::~SrsLiveStream()
{ {
srs_freep(req); srs_freep(req);
srs_freep(security_); srs_freep(security_);
// The live stream should never be destroyed when it's serving any viewers.
srs_assert(viewers_.empty());
} }
srs_error_t SrsLiveStream::update_auth(SrsRequest* r) srs_error_t SrsLiveStream::update_auth(SrsRequest* r)
@ -634,10 +636,18 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
return srs_error_wrap(err, "http hook"); return srs_error_wrap(err, "http hook");
} }
alive_viewers_++; // Add the viewer to the viewers list.
viewers_.push_back(hc);
// Serve the viewer connection.
err = do_serve_http(w, r); err = do_serve_http(w, r);
alive_viewers_--;
// Remove viewer from the viewers list.
vector<ISrsExpire*>::iterator it = std::find(viewers_.begin(), viewers_.end(), hc);
srs_assert (it != viewers_.end());
viewers_.erase(it);
// Do hook after serving.
http_hooks_on_stop(r); http_hooks_on_stop(r);
return err; return err;
@ -645,7 +655,16 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
bool SrsLiveStream::alive() bool SrsLiveStream::alive()
{ {
return alive_viewers_ > 0; return !viewers_.empty();
}
void SrsLiveStream::expire()
{
vector<ISrsExpire*>::iterator it;
for (it = viewers_.begin(); it != viewers_.end(); ++it) {
ISrsExpire* conn = *it;
conn->expire();
}
} }
srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
@ -1075,6 +1094,7 @@ void SrsHttpStreamServer::http_unmount(SrsRequest* r)
// Notify cache and stream to stop. // Notify cache and stream to stop.
if (stream->entry) stream->entry->enabled = false; if (stream->entry) stream->entry->enabled = false;
stream->expire();
cache->stop(); cache->stop();
// Wait for cache and stream to stop. // Wait for cache and stream to stop.

@ -11,6 +11,8 @@
#include <srs_app_security.hpp> #include <srs_app_security.hpp>
#include <srs_app_http_conn.hpp> #include <srs_app_http_conn.hpp>
#include <vector>
class SrsAacTransmuxer; class SrsAacTransmuxer;
class SrsMp3Transmuxer; class SrsMp3Transmuxer;
class SrsFlvTransmuxer; class SrsFlvTransmuxer;
@ -176,7 +178,7 @@ public:
// HTTP Live Streaming, to transmux RTMP to HTTP FLV or other format. // HTTP Live Streaming, to transmux RTMP to HTTP FLV or other format.
// TODO: FIXME: Rename to SrsHttpLive // TODO: FIXME: Rename to SrsHttpLive
class SrsLiveStream : public ISrsHttpHandler class SrsLiveStream : public ISrsHttpHandler, public ISrsExpire
{ {
private: private:
SrsRequest* req; SrsRequest* req;
@ -185,7 +187,7 @@ private:
// For multiple viewers, which means there will more than one alive viewers for a live stream, so we must // For multiple viewers, which means there will more than one alive viewers for a live stream, so we must
// use an int value to represent if there is any viewer is alive. We should never do cleanup unless all // use an int value to represent if there is any viewer is alive. We should never do cleanup unless all
// viewers closed the connection. // viewers closed the connection.
int alive_viewers_; std::vector<ISrsExpire*> viewers_;
public: public:
SrsLiveStream(SrsRequest* r, SrsBufferCache* c); SrsLiveStream(SrsRequest* r, SrsBufferCache* c);
virtual ~SrsLiveStream(); virtual ~SrsLiveStream();
@ -193,6 +195,9 @@ public:
public: public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual bool alive(); virtual bool alive();
// Interface ISrsExpire
public:
virtual void expire();
private: private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual srs_error_t http_hooks_on_play(ISrsHttpMessage* r); virtual srs_error_t http_hooks_on_play(ISrsHttpMessage* r);

@ -9,6 +9,6 @@
#define VERSION_MAJOR 6 #define VERSION_MAJOR 6
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 151 #define VERSION_REVISION 152
#endif #endif

@ -9,6 +9,6 @@
#define VERSION_MAJOR 7 #define VERSION_MAJOR 7
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 10 #define VERSION_REVISION 11
#endif #endif
Loading…
Cancel
Save