diff --git a/trunk/src/app/srs_app_coworkers.cpp b/trunk/src/app/srs_app_coworkers.cpp
index be9cba5d0..2dae87871 100644
--- a/trunk/src/app/srs_app_coworkers.cpp
+++ b/trunk/src/app/srs_app_coworkers.cpp
@@ -58,30 +58,50 @@ SrsCoWorkers* SrsCoWorkers::instance()
return _instance;
}
-SrsJsonAny* SrsCoWorkers::dumps(string vhost, string app, string stream)
+SrsJsonAny* SrsCoWorkers::dumps(string vhost, string coworker, string app, string stream)
{
SrsRequest* r = find_stream_info(vhost, app, stream);
if (!r) {
// TODO: FIXME: Find stream from our origin util return to the start point.
return SrsJsonAny::null();
}
-
- vector service_ports = _srs_config->get_listens();
- if (service_ports.empty()) {
- return SrsJsonAny::null();
+
+ // The service port parsing from listen port.
+ string listen_host;
+ int listen_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
+ vector listen_hostports = _srs_config->get_listens();
+ if (!listen_hostports.empty()) {
+ string list_hostport = listen_hostports.at(0);
+
+ if (list_hostport.find(":") != string::npos) {
+ srs_parse_hostport(list_hostport, listen_host, listen_port);
+ } else {
+ listen_port = ::atoi(list_hostport.c_str());
+ }
}
-
- string service_ip = srs_get_public_internet_address();
- string service_hostport = service_ports.at(0);
-
- int service_port = SRS_CONSTS_RTMP_DEFAULT_PORT;
- if (service_hostport.find(":") != string::npos) {
- string service_host;
- srs_parse_hostport(service_hostport, service_host, service_port);
- } else {
- service_port = ::atoi(service_hostport.c_str());
+
+ // The ip of server, we use the request coworker-host as ip, if listen host is localhost or loopback.
+ // For example, the server may behind a NAT(192.x.x.x), while its ip is a docker ip(172.x.x.x),
+ // we should use the NAT(192.x.x.x) address as it's the exposed ip.
+ // @see https://github.com/ossrs/srs/issues/1501
+ string service_ip;
+ if (listen_host != SRS_CONSTS_LOCALHOST && listen_host != SRS_CONSTS_LOOPBACK && listen_host != SRS_CONSTS_LOOPBACK6) {
+ service_ip = listen_host;
}
-
+ if (service_ip.empty()) {
+ int coworker_port;
+ string coworker_host = coworker;
+ if (coworker.find(":") != string::npos) {
+ srs_parse_hostport(coworker, coworker_host, coworker_port);
+ }
+
+ service_ip = coworker_host;
+ }
+ if (service_ip.empty()) {
+ service_ip = srs_get_public_internet_address();
+ }
+
+ // The backend API endpoint.
string backend = _srs_config->get_http_api_listen();
if (backend.find(":") == string::npos) {
backend = service_ip + ":" + backend;
@@ -89,10 +109,13 @@ SrsJsonAny* SrsCoWorkers::dumps(string vhost, string app, string stream)
// The routers to detect loop and identify path.
SrsJsonArray* routers = SrsJsonAny::array()->append(SrsJsonAny::str(backend.c_str()));
-
+
+ srs_trace("Redirect vhost=%s, path=%s/%s to ip=%s, port=%d, api=%s",
+ vhost.c_str(), app.c_str(), stream.c_str(), service_ip.c_str(), listen_port, backend.c_str());
+
return SrsJsonAny::object()
->set("ip", SrsJsonAny::str(service_ip.c_str()))
- ->set("port", SrsJsonAny::integer(service_port))
+ ->set("port", SrsJsonAny::integer(listen_port))
->set("vhost", SrsJsonAny::str(r->vhost.c_str()))
->set("api", SrsJsonAny::str(backend.c_str()))
->set("routers", routers);
diff --git a/trunk/src/app/srs_app_coworkers.hpp b/trunk/src/app/srs_app_coworkers.hpp
index b7be73dca..c25f1f300 100644
--- a/trunk/src/app/srs_app_coworkers.hpp
+++ b/trunk/src/app/srs_app_coworkers.hpp
@@ -46,7 +46,7 @@ private:
public:
static SrsCoWorkers* instance();
public:
- virtual SrsJsonAny* dumps(std::string vhost, std::string app, std::string stream);
+ virtual SrsJsonAny* dumps(std::string vhost, std::string coworker, std::string app, std::string stream);
private:
virtual SrsRequest* find_stream_info(std::string vhost, std::string app, std::string stream);
public:
diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp
index 11582373c..a2d95e706 100644
--- a/trunk/src/app/srs_app_edge.cpp
+++ b/trunk/src/app/srs_app_edge.cpp
@@ -100,10 +100,13 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
string _schema, _vhost, _app, _stream, _param, _host;
srs_discovery_tc_url(redirect, _schema, _host, _vhost, _app, _stream, _port, _param);
- srs_warn("RTMP redirect %s:%d to %s:%d stream=%s", server.c_str(), port, _host.c_str(), _port, _stream.c_str());
server = _host;
port = _port;
}
+
+ // Remember the current selected server.
+ selected_ip = server;
+ selected_port = port;
// support vhost tranform for edge,
// @see https://github.com/ossrs/srs/issues/372
@@ -144,6 +147,12 @@ void SrsEdgeRtmpUpstream::close()
srs_freep(sdk);
}
+void SrsEdgeRtmpUpstream::selected(string& server, int& port)
+{
+ server = selected_ip;
+ port = selected_port;
+}
+
void SrsEdgeRtmpUpstream::set_recv_timeout(srs_utime_t tm)
{
sdk->set_recv_timeout(tm);
@@ -160,7 +169,7 @@ SrsEdgeIngester::SrsEdgeIngester()
edge = NULL;
req = NULL;
- upstream = new SrsEdgeRtmpUpstream(redirect);
+ upstream = new SrsEdgeRtmpUpstream("");
lb = new SrsLbRoundRobin();
trd = new SrsDummyCoroutine();
}
@@ -243,7 +252,8 @@ srs_error_t SrsEdgeIngester::cycle()
srs_error_t SrsEdgeIngester::do_cycle()
{
srs_error_t err = srs_success;
-
+
+ std::string redirect;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "do cycle pull");
@@ -252,10 +262,6 @@ srs_error_t SrsEdgeIngester::do_cycle()
srs_freep(upstream);
upstream = new SrsEdgeRtmpUpstream(redirect);
- // we only use the redict once.
- // reset the redirect to empty, for maybe the origin changed.
- redirect = "";
-
if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {
return srs_error_wrap(err, "on source id changed");
}
@@ -267,11 +273,21 @@ srs_error_t SrsEdgeIngester::do_cycle()
if ((err = edge->on_ingest_play()) != srs_success) {
return srs_error_wrap(err, "notify edge play");
}
+
+ // set to larger timeout to read av data from origin.
+ upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);
- err = ingest();
+ err = ingest(redirect);
// retry for rtmp 302 immediately.
if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
+ int port;
+ string server;
+ upstream->selected(server, port);
+
+ string url = req->get_stream_url();
+ srs_warn("RTMP redirect %s from %s:%d to %s", url.c_str(), server.c_str(), port, redirect.c_str());
+
srs_error_reset(err);
continue;
}
@@ -286,15 +302,16 @@ srs_error_t SrsEdgeIngester::do_cycle()
return err;
}
-srs_error_t SrsEdgeIngester::ingest()
+srs_error_t SrsEdgeIngester::ingest(string& redirect)
{
srs_error_t err = srs_success;
SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
-
- // set to larger timeout to read av data from origin.
- upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);
+
+ // we only use the redict once.
+ // reset the redirect to empty, for maybe the origin changed.
+ redirect = "";
while (true) {
srs_error_t err = srs_success;
@@ -318,7 +335,7 @@ srs_error_t SrsEdgeIngester::ingest()
srs_assert(msg);
SrsAutoFree(SrsCommonMessage, msg);
- if ((err = process_publish_message(msg)) != srs_success) {
+ if ((err = process_publish_message(msg, redirect)) != srs_success) {
return srs_error_wrap(err, "process message");
}
}
@@ -326,7 +343,7 @@ srs_error_t SrsEdgeIngester::ingest()
return err;
}
-srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
+srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{
srs_error_t err = srs_success;
diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp
index 24512dc86..f9ead1f63 100644
--- a/trunk/src/app/srs_app_edge.hpp
+++ b/trunk/src/app/srs_app_edge.hpp
@@ -80,6 +80,7 @@ public:
virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) = 0;
virtual void close() = 0;
public:
+ virtual void selected(std::string& server, int& port) = 0;
virtual void set_recv_timeout(srs_utime_t tm) = 0;
virtual void kbps_sample(const char* label, int64_t age) = 0;
};
@@ -91,6 +92,10 @@ private:
// use this as upstream.
std::string redirect;
SrsSimpleRtmpClient* sdk;
+private:
+ // Current selected server, the ip:port.
+ std::string selected_ip;
+ int selected_port;
public:
// @param rediect, override the server. ignore if empty.
SrsEdgeRtmpUpstream(std::string r);
@@ -101,6 +106,7 @@ public:
virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual void close();
public:
+ virtual void selected(std::string& server, int& port);
virtual void set_recv_timeout(srs_utime_t tm);
virtual void kbps_sample(const char* label, int64_t age);
};
@@ -115,8 +121,6 @@ private:
SrsCoroutine* trd;
SrsLbRoundRobin* lb;
SrsEdgeUpstream* upstream;
- // For RTMP 302 redirect.
- std::string redirect;
public:
SrsEdgeIngester();
virtual ~SrsEdgeIngester();
@@ -131,8 +135,8 @@ public:
private:
virtual srs_error_t do_cycle();
private:
- virtual srs_error_t ingest();
- virtual srs_error_t process_publish_message(SrsCommonMessage* msg);
+ virtual srs_error_t ingest(std::string& redirect);
+ virtual srs_error_t process_publish_message(SrsCommonMessage* msg, std::string& redirect);
};
// The edge used to forward stream to origin.
diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp
index a41d74d01..bd6c01ced 100644
--- a/trunk/src/app/srs_app_http_api.cpp
+++ b/trunk/src/app/srs_app_http_api.cpp
@@ -1290,6 +1290,7 @@ srs_error_t SrsGoApiClusters::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
string vhost = r->query_get("vhost");
string app = r->query_get("app");
string stream = r->query_get("stream");
+ string coworker = r->query_get("coworker");
data->set("query", SrsJsonAny::object()
->set("ip", SrsJsonAny::str(ip.c_str()))
->set("vhost", SrsJsonAny::str(vhost.c_str()))
@@ -1297,7 +1298,7 @@ srs_error_t SrsGoApiClusters::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
->set("stream", SrsJsonAny::str(stream.c_str())));
SrsCoWorkers* coworkers = SrsCoWorkers::instance();
- data->set("origin", coworkers->dumps(vhost, app, stream));
+ data->set("origin", coworkers->dumps(vhost, coworker, app, stream));
return srs_api_response(w, r, obj->dumps());
}
@@ -1342,7 +1343,7 @@ srs_error_t SrsHttpApi::do_cycle()
{
srs_error_t err = srs_success;
- srs_trace("api get peer ip success. ip=%s", ip.c_str());
+ srs_trace("API server client, ip=%s", ip.c_str());
// initialize parser
if ((err = parser->initialize(HTTP_REQUEST, true)) != srs_success) {
diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp
index e1dabc1c3..867c4aea1 100644
--- a/trunk/src/app/srs_app_rtmp_conn.cpp
+++ b/trunk/src/app/srs_app_rtmp_conn.cpp
@@ -610,7 +610,8 @@ srs_error_t SrsRtmpConn::playing(SrsSource* source)
int port;
string host;
string url = "http://" + coworkers.at(i) + "/api/v1/clusters?"
- + "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream;
+ + "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream
+ + "&coworker=" + coworkers.at(i);
if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) != srs_success) {
return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str());
}
@@ -1179,7 +1180,8 @@ srs_error_t SrsRtmpConn::do_token_traverse_auth(SrsRtmpClient* client)
}
// for token tranverse, always take the debug info(which carries token).
- if ((err = client->connect_app(req->app, req->tcUrl, req, true, NULL)) != srs_success) {
+ SrsServerInfo si;
+ if ((err = client->connect_app(req->app, req->tcUrl, req, true, &si)) != srs_success) {
return srs_error_wrap(err, "rtmp: connect tcUrl");
}
diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp
index 8f4e8f406..385fca95c 100644
--- a/trunk/src/core/srs_core.hpp
+++ b/trunk/src/core/srs_core.hpp
@@ -27,7 +27,7 @@
// The version config.
#define VERSION_MAJOR 3
#define VERSION_MINOR 0
-#define VERSION_REVISION 63
+#define VERSION_REVISION 67
// The macros generated by configure script.
#include
diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp
index bb6d85292..8df891fd2 100644
--- a/trunk/src/kernel/srs_kernel_consts.hpp
+++ b/trunk/src/kernel/srs_kernel_consts.hpp
@@ -116,6 +116,8 @@
///////////////////////////////////////////////////////////
#define SRS_CONSTS_NULL_FILE "/dev/null"
#define SRS_CONSTS_LOCALHOST "127.0.0.1"
+#define SRS_CONSTS_LOOPBACK "0.0.0.0"
+#define SRS_CONSTS_LOOPBACK6 "::"
// The signal defines.
// To reload the config file and apply new config.
diff --git a/trunk/src/kernel/srs_kernel_error.cpp b/trunk/src/kernel/srs_kernel_error.cpp
index 9ea9534de..de13304db 100644
--- a/trunk/src/kernel/srs_kernel_error.cpp
+++ b/trunk/src/kernel/srs_kernel_error.cpp
@@ -81,7 +81,7 @@ std::string SrsCplxError::description() {
next = this;
while (next) {
- ss << "thread #" << next->cid << ": "
+ ss << "thread [" << next->cid << "]: "
<< next->func << "() [" << next->file << ":" << next->line << "]"
<< "[errno=" << next->rerrno << "]";
diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp
index d9432ceb5..64642514b 100644
--- a/trunk/src/kernel/srs_kernel_flv.cpp
+++ b/trunk/src/kernel/srs_kernel_flv.cpp
@@ -284,6 +284,11 @@ int SrsSharedPtrMessage::count()
bool SrsSharedPtrMessage::check(int stream_id)
{
+ // Ignore error when message has no payload.
+ if (!ptr) {
+ return true;
+ }
+
// we donot use the complex basic header,
// ensure the basic header is 1bytes.
if (ptr->header.perfer_cid < 2) {
diff --git a/trunk/src/kernel/srs_kernel_stream.cpp b/trunk/src/kernel/srs_kernel_stream.cpp
index 9873ced63..cec9f3d9a 100755
--- a/trunk/src/kernel/srs_kernel_stream.cpp
+++ b/trunk/src/kernel/srs_kernel_stream.cpp
@@ -68,3 +68,8 @@ void SrsSimpleStream::append(const char* bytes, int size)
data.insert(data.end(), bytes, bytes + size);
}
+
+void SrsSimpleStream::append(SrsSimpleStream* src)
+{
+ append(src->bytes(), src->length());
+}
diff --git a/trunk/src/kernel/srs_kernel_stream.hpp b/trunk/src/kernel/srs_kernel_stream.hpp
index 699673076..67e57ccb5 100644
--- a/trunk/src/kernel/srs_kernel_stream.hpp
+++ b/trunk/src/kernel/srs_kernel_stream.hpp
@@ -63,6 +63,7 @@ public:
* @remark assert size is positive.
*/
virtual void append(const char* bytes, int size);
+ virtual void append(SrsSimpleStream* src);
};
#endif
diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp
index 93ea6a895..9a2b7c46c 100644
--- a/trunk/src/kernel/srs_kernel_utility.cpp
+++ b/trunk/src/kernel/srs_kernel_utility.cpp
@@ -231,9 +231,9 @@ string srs_any_address_for_listener()
}
if (ipv6_active && !ipv4_active) {
- return "::";
+ return SRS_CONSTS_LOOPBACK6;
}
- return "0.0.0.0";
+ return SRS_CONSTS_LOOPBACK;
}
void srs_parse_endpoint(string hostport, string& ip, int& port)
diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp
index 5d2387aef..0099d5791 100644
--- a/trunk/src/protocol/srs_rtmp_stack.cpp
+++ b/trunk/src/protocol/srs_rtmp_stack.cpp
@@ -979,18 +979,18 @@ srs_error_t SrsProtocol::read_basic_header(char& fmt, int& cid)
// 2-63, 1B chunk header
if (cid > 1) {
return err;
- }
-
// 64-319, 2B chunk header
- if (cid == 0) {
+ } else if (cid == 0) {
if ((err = in_buffer->grow(skt, 1)) != srs_success) {
return srs_error_wrap(err, "basic header requires 2 bytes");
}
-
+
cid = 64;
cid += (uint8_t)in_buffer->read_1byte();
- // 64-65599, 3B chunk header
- } else if (cid == 1) {
+ // 64-65599, 3B chunk header
+ } else {
+ srs_assert(cid == 1);
+
if ((err = in_buffer->grow(skt, 2)) != srs_success) {
return srs_error_wrap(err, "basic header requires 3 bytes");
}
@@ -998,9 +998,6 @@ srs_error_t SrsProtocol::read_basic_header(char& fmt, int& cid)
cid = 64;
cid += (uint8_t)in_buffer->read_1byte();
cid += ((uint8_t)in_buffer->read_1byte()) * 256;
- } else {
- srs_error("invalid path, impossible basic header.");
- srs_assert(false);
}
return err;
diff --git a/trunk/src/service/srs_service_log.cpp b/trunk/src/service/srs_service_log.cpp
index 895658560..d92cb5ce7 100644
--- a/trunk/src/service/srs_service_log.cpp
+++ b/trunk/src/service/srs_service_log.cpp
@@ -43,7 +43,11 @@ SrsThreadContext::~SrsThreadContext()
int SrsThreadContext::generate_id()
{
- static int id = 100;
+ static int id = 0;
+
+ if (id == 0) {
+ id = (100 + ((int)(int64_t)this)%1000);
+ }
int gid = id++;
cache[srs_thread_self()] = gid;
diff --git a/trunk/src/service/srs_service_rtmp_conn.cpp b/trunk/src/service/srs_service_rtmp_conn.cpp
index 366af9839..b5525e4fa 100644
--- a/trunk/src/service/srs_service_rtmp_conn.cpp
+++ b/trunk/src/service/srs_service_rtmp_conn.cpp
@@ -139,7 +139,8 @@ srs_error_t SrsBasicRtmpClient::do_connect_app(string local_ip, bool debug)
// upnode server identity will show in the connect_app of client.
// @see https://github.com/ossrs/srs/issues/160
// the debug_srs_upnode is config in vhost and default to true.
- if ((err = client->connect_app(req->app, tc_url, req, debug, NULL)) != srs_success) {
+ SrsServerInfo si;
+ if ((err = client->connect_app(req->app, tc_url, req, debug, &si)) != srs_success) {
return srs_error_wrap(err, "connect app tcUrl=%s, debug=%d", tc_url.c_str(), debug);
}
diff --git a/trunk/src/utest/srs_utest_protocol.cpp b/trunk/src/utest/srs_utest_protocol.cpp
index 1a6a7a6e3..645e53296 100644
--- a/trunk/src/utest/srs_utest_protocol.cpp
+++ b/trunk/src/utest/srs_utest_protocol.cpp
@@ -95,6 +95,7 @@ MockBufferIO::MockBufferIO()
{
rtm = stm = SRS_UTIME_NO_TIMEOUT;
rbytes = sbytes = 0;
+ in_err = out_err = srs_success;
}
MockBufferIO::~MockBufferIO()
@@ -109,6 +110,10 @@ MockBufferIO* MockBufferIO::append(string data)
srs_error_t MockBufferIO::read_fully(void* buf, size_t size, ssize_t* nread)
{
+ if (in_err != srs_success) {
+ return srs_error_copy(in_err);
+ }
+
if (in_buffer.length() < (int)size) {
return srs_error_new(ERROR_SOCKET_READ, "read");
}
@@ -124,6 +129,10 @@ srs_error_t MockBufferIO::read_fully(void* buf, size_t size, ssize_t* nread)
srs_error_t MockBufferIO::write(void* buf, size_t size, ssize_t* nwrite)
{
+ if (out_err != srs_success) {
+ return srs_error_copy(out_err);
+ }
+
sbytes += size;
if (nwrite) {
*nwrite = size;
@@ -165,6 +174,10 @@ int64_t MockBufferIO::get_send_bytes()
srs_error_t MockBufferIO::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
srs_error_t err = srs_success;
+
+ if (out_err != srs_success) {
+ return srs_error_copy(out_err);
+ }
ssize_t total = 0;
for (int i = 0; i size;
+ payload = this->payload;
+ this->payload = NULL;
+ return srs_success;
}
};
@@ -54,12 +81,16 @@ VOID TEST(ProtoStackTest, PacketEncode)
char* payload;
if (true) {
- MockErrorPacket pkt;
+ MockPacket pkt;
+ pkt.size = 1024;
+
HELPER_EXPECT_FAILED(pkt.encode(size, payload));
}
if (true) {
- MockErrorPacket pkt;
+ MockPacket pkt;
+ pkt.size = 1024;
+
SrsBuffer b;
HELPER_EXPECT_FAILED(pkt.decode(&b));
}
@@ -72,7 +103,9 @@ VOID TEST(ProtoStackTest, PacketEncode)
}
if (true) {
- MockErrorPacket pkt;
+ MockPacket pkt;
+ pkt.size = 1024;
+
EXPECT_EQ(1024, pkt.get_size());
}
}
@@ -147,5 +180,1088 @@ VOID TEST(ProtoStackTest, ManualFlush)
HELPER_EXPECT_SUCCESS(p.manual_response_flush());
EXPECT_EQ(12+6, io.out_buffer.length());
}
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ // Always response ACK message.
+ HELPER_EXPECT_SUCCESS(p.set_in_window_ack_size(1));
+
+ // When not auto response, need to flush it manually.
+ p.set_auto_response(false);
+ HELPER_EXPECT_SUCCESS(p.response_ping_message(1024));
+ EXPECT_EQ(0, io.out_buffer.length());
+
+ // If not flushed, the packets will be destroyed.
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ p.set_recv_buffer(0);
+ p.set_recv_buffer(131072 * 10);
+
+ p.set_merge_read(true, NULL);
+ p.set_merge_read(false, NULL);
+ }
+}
+
+VOID TEST(ProtoStackTest, SendPacketsError)
+{
+ srs_error_t err;
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ MockPacket* pkt = new MockPacket();
+ pkt->size = 1024;
+ HELPER_EXPECT_FAILED(p.send_and_free_packet(pkt, 1));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsPacket* pkt = new SrsPacket();
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(pkt, 1));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ MockPacket2* pkt = new MockPacket2();
+ pkt->size = 1024;
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(pkt, 1));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsCommonMessage pkt;
+ pkt.header.initialize_audio(200, 1000, 1);
+ pkt.create_payload(256);
+ pkt.size = 256;
+
+ SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
+ msg->create(&pkt);
+ SrsAutoFree(SrsSharedPtrMessage, msg);
+
+ SrsSharedPtrMessage* msgs[10240];
+ for (int i = 0; i < 10240; i++) {
+ msgs[i] = msg->copy();
+ }
+
+ io.out_err = srs_error_new(1, "fail");
+ HELPER_EXPECT_FAILED(p.send_and_free_messages(msgs, 10240, 1));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ // Always response ACK message.
+ HELPER_EXPECT_SUCCESS(p.set_in_window_ack_size(1));
+
+ // When not auto response, need to flush it manually.
+ p.set_auto_response(false);
+ HELPER_EXPECT_SUCCESS(p.response_acknowledgement_message());
+ EXPECT_EQ(0, io.out_buffer.length());
+
+ io.out_err = srs_error_new(1, "fail");
+ HELPER_EXPECT_FAILED(p.manual_response_flush());
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ MockPacket2* pkt = new MockPacket2();
+ pkt->size = 16;
+ pkt->payload = new char[16];
+
+ io.out_err = srs_error_new(1, "fail");
+ HELPER_EXPECT_FAILED(p.send_and_free_packet(pkt, 1));
+ }
+}
+
+VOID TEST(ProtoStackTest, SendHugePacket)
+{
+ srs_error_t err;
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ MockPacket2* pkt = new MockPacket2();
+ pkt->size = 1024;
+ pkt->payload = new char[1024];
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(pkt, 1));
+ }
+}
+
+VOID TEST(ProtoStackTest, SendZeroMessages)
+{
+ srs_error_t err;
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+ HELPER_EXPECT_SUCCESS(p.send_and_free_message(NULL, 0));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+ SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
+ HELPER_EXPECT_SUCCESS(p.send_and_free_message(msg, 1));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+ SrsSharedPtrMessage* msgs[1024];
+ for (int i = 0; i < 1024; i++) {
+ msgs[i] = new SrsSharedPtrMessage();
+ }
+ HELPER_EXPECT_SUCCESS(p.send_and_free_messages(msgs, 1024, 0));
+ }
+}
+
+VOID TEST(ProtoStackTest, HugeMessages)
+{
+ srs_error_t err;
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsCommonMessage pkt;
+ pkt.header.initialize_audio(200, 1000, 1);
+ pkt.create_payload(256);
+ pkt.size = 256;
+
+ SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
+ msg->create(&pkt);
+
+ HELPER_EXPECT_SUCCESS(p.send_and_free_message(msg, 1));
+ EXPECT_EQ(269, io.out_buffer.length());
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsCommonMessage pkt;
+ pkt.header.initialize_audio(200, 1000, 1);
+ pkt.create_payload(256);
+ pkt.size = 256;
+
+ SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
+ msg->create(&pkt);
+ SrsAutoFree(SrsSharedPtrMessage, msg);
+
+ SrsSharedPtrMessage* msgs[1024];
+ for (int i = 0; i < 1024; i++) {
+ msgs[i] = msg->copy();
+ }
+
+ HELPER_EXPECT_SUCCESS(p.send_and_free_messages(msgs, 1024, 1));
+ EXPECT_EQ(269*1024, io.out_buffer.length());
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsCommonMessage pkt;
+ pkt.header.initialize_audio(200, 1000, 1);
+ pkt.create_payload(256);
+ pkt.size = 256;
+
+ SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
+ msg->create(&pkt);
+ SrsAutoFree(SrsSharedPtrMessage, msg);
+
+ SrsSharedPtrMessage* msgs[10240];
+ for (int i = 0; i < 10240; i++) {
+ msgs[i] = msg->copy();
+ }
+
+ HELPER_EXPECT_SUCCESS(p.send_and_free_messages(msgs, 10240, 1));
+ EXPECT_EQ(269*10240, io.out_buffer.length());
+ }
+}
+
+VOID TEST(ProtoStackTest, DecodeMessages)
+{
+ srs_error_t err;
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ // AMF0 message with 1B should fail.
+ SrsCommonMessage msg;
+ msg.header.initialize_amf0_script(1, 1);
+ msg.create_payload(1);
+ msg.size = 1;
+
+ SrsPacket* pkt;
+ HELPER_EXPECT_FAILED(p.decode_message(&msg, &pkt));
+ }
+}
+
+VOID TEST(ProtoStackTest, OnDecodeMessages)
+{
+ srs_error_t err;
+
+ SrsSimpleStream bytes;
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
+ pkt->chunk_size = 0;
+
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(pkt, 1));
+ bytes.append(&io.out_buffer);
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ // Always response ACK message.
+ HELPER_EXPECT_SUCCESS(p.set_in_window_ack_size(1));
+
+ SrsCommonMessage* msg;
+ io.in_buffer.append(&bytes);
+ HELPER_EXPECT_FAILED(p.recv_message(&msg));
+ srs_freep(msg);
+ }
+}
+
+SrsCommonMessage* _create_amf0(char* bytes, int size, int stream_id)
+{
+ SrsCommonMessage* msg = new SrsCommonMessage();
+ msg->header.initialize_amf0_script(size, stream_id);
+ msg->create_payload(size);
+ memcpy(msg->payload, bytes, size);
+ msg->size = size;
+ return msg;
+}
+
+VOID TEST(ProtoStackTest, OnDecodeMessages2)
+{
+ srs_error_t err;
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x17, 0x02, 0x00, 0x01, 's', 0x00, 0,0,0,0,0,0,0,0, 0x03,0,0,9};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+ msg->header.message_type = RTMP_MSG_AMF3CommandMessage;
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+ HELPER_EXPECT_SUCCESS(p.decode_message(msg, &pkt));
+
+ SrsCallPacket* call = (SrsCallPacket*)pkt;
+ EXPECT_STREQ("s", call->command_name.c_str());
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x17, 0x02, 0x00, 0x01, 's'};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+ msg->header.message_type = RTMP_MSG_AMF3CommandMessage;
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x00};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+ msg->header.message_type = 0xff;
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ HELPER_EXPECT_SUCCESS(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 0x01, 's'};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+ msg->header.message_type = RTMP_MSG_AMF0DataMessage;
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ HELPER_EXPECT_SUCCESS(p.decode_message(msg, &pkt));
+ }
+}
+
+VOID TEST(ProtoStackTest, OnDecodeMessages3)
+{
+ srs_error_t err;
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 0x07, '_','r','e','s','u','l','t'};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+ msg->header.message_type = RTMP_MSG_AMF0DataMessage;
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Decode the response failed, no transaction ID was set by request.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x17, 0x02, 0x00, 0x07, '_','r','e','s','u','l','t'};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+ msg->header.message_type = RTMP_MSG_AMF3DataMessage;
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Decode the response failed, no transaction ID was set by request.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x17, 0x02, 0x00, 0x07, '_','r','e','s','u','l','t', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+ msg->header.message_type = RTMP_MSG_AMF3CommandMessage;
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Decode the response failed, no transaction ID was set by request.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsConnectAppPacket* request = new SrsConnectAppPacket();
+ request->transaction_id = 0.0;
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(request, 1));
+
+ uint8_t bytes[] = {0x02, 0x00, 0x07, '_','r','e','s','u','l','t', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the response packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsCreateStreamPacket* request = new SrsCreateStreamPacket();
+ request->transaction_id = 0.0;
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(request, 1));
+
+ uint8_t bytes[] = {0x02, 0x00, 0x07, '_','r','e','s','u','l','t', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the response packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsFMLEStartPacket* request = SrsFMLEStartPacket::create_FC_publish("livestream");
+ request->transaction_id = 0.0;
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(request, 1));
+
+ uint8_t bytes[] = {0x02, 0x00, 0x07, '_','r','e','s','u','l','t', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the response packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsFMLEStartPacket* request = SrsFMLEStartPacket::create_release_stream("livestream");
+ request->transaction_id = 0.0;
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(request, 1));
+
+ uint8_t bytes[] = {0x02, 0x00, 0x07, '_','r','e','s','u','l','t', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the response packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsFMLEStartPacket* request = SrsFMLEStartPacket::create_release_stream("livestream");
+ request->command_name = RTMP_AMF0_COMMAND_UNPUBLISH;
+ request->transaction_id = 0.0;
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(request, 1));
+
+ uint8_t bytes[] = {0x02, 0x00, 0x07, '_','r','e','s','u','l','t', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the response packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsFMLEStartPacket* request = new SrsFMLEStartPacket();
+ request->command_name = "srs";
+ request->transaction_id = 0.0;
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(request, 1));
+
+ uint8_t bytes[] = {0x02, 0x00, 0x07, '_','r','e','s','u','l','t', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the response packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+}
+
+VOID TEST(ProtoStackTest, OnDecodeMessages4)
+{
+ srs_error_t err;
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 0x07, 'c','o','n','n','e','c','t', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 12, 'c','r','e','a','t','e','S','t','r','e','a','m', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 4, 'p','l','a','y', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 5, 'p','a','u','s','e', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 13, 'r','e','l','e','a','s','e','S','t','r','e','a','m', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 9, 'F','C','P','u','b','l','i','s','h', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 7, 'p','u','b','l','i','s','h', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 11, 'F','C','U','n','p','u','b','l','i','s','h', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 13, '@','s','e','t','D','a','t','a','F','r','a','m','e', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 10, 'o','n','M','e','t','a','D','a','t','a', 03,0,0,9};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ HELPER_EXPECT_SUCCESS(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 22, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','F','i','n','i','s','h','e','d', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 21, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','P','l','a','y','i','n','g', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 24, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','P','u','b','l','i','s','h','i','n','g', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 31, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','S','t','a','r','t','i','n','g','P','l','a','y','B','y','t','e','s', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 34, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','S','t','a','r','t','i','n','g','P','u','b','l','i','s','h','B','y','t','e','s', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 28, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','S','t','a','r','t','P','l','a','y','B','y','t','e','s', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 31, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','S','t','a','r','t','P','u','b','l','i','s','h','B','y','t','e','s', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 30, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','S','t','o','p','p','e','d','P','l','a','y','B','y','t','e','s', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 27, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','S','t','o','p','P','l','a','y','B','y','t','e','s', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 30, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','S','t','o','p','P','u','b','l','i','s','h','B','y','t','e','s', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 33, 'o','n','S','r','s','B','a','n','d','C','h','e','c','k','S','t','o','p','p','e','d','P','u','b','l','i','s','h','B','y','t','e','s', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 17, 'f','i','n','a','l','C','l','i','e','n','t','P','a','c','k','e','t', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 11, 'c','l','o','s','e','S','t','r','e','a','m', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x02, 0x00, 3, 's','r','s', 0x00,0,0,0,0,0,0,0,0};
+ SrsCommonMessage* msg = _create_amf0((char*)bytes, sizeof(bytes), 1);
+ msg->header.message_type = RTMP_MSG_AMF0CommandMessage;
+ SrsAutoFree(SrsCommonMessage, msg);
+
+ SrsPacket* pkt;
+ SrsAutoFree(SrsPacket, pkt);
+
+ // Without enough data, it fail when decoding the request packet.
+ HELPER_EXPECT_FAILED(p.decode_message(msg, &pkt));
+ }
+}
+
+VOID TEST(ProtoStackTest, RecvMessage)
+{
+ srs_error_t err;
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x01, 0x00, 0x00};
+ io.in_buffer.append((char*)bytes, sizeof(bytes));
+
+ SrsCommonMessage* msg;
+ SrsAutoFree(SrsCommonMessage, msg);
+ HELPER_EXPECT_FAILED(p.recv_message(&msg));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x00, 0x00};
+ io.in_buffer.append((char*)bytes, sizeof(bytes));
+
+ SrsCommonMessage* msg;
+ SrsAutoFree(SrsCommonMessage, msg);
+ HELPER_EXPECT_FAILED(p.recv_message(&msg));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x00};
+ io.in_buffer.append((char*)bytes, sizeof(bytes));
+
+ SrsCommonMessage* msg;
+ SrsAutoFree(SrsCommonMessage, msg);
+ HELPER_EXPECT_FAILED(p.recv_message(&msg));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsCommonMessage* msg;
+ SrsAutoFree(SrsCommonMessage, msg);
+ HELPER_EXPECT_FAILED(p.recv_message(&msg));
+ }
+}
+
+VOID TEST(ProtoStackTest, RecvMessage2)
+{
+ srs_error_t err;
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x03, 0,0,0, 0,0,4, 0, 0,0,0,0, 1,2,3};
+ io.in_buffer.append((char*)bytes, sizeof(bytes));
+
+ SrsCommonMessage* msg;
+ SrsAutoFree(SrsCommonMessage, msg);
+ HELPER_EXPECT_FAILED(p.recv_message(&msg));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ p.in_chunk_size = 3;
+
+ uint8_t bytes[] = {0x03, 0,0,0, 0,0,4, 0, 0,0,0,0, 1,2,3};
+ io.in_buffer.append((char*)bytes, sizeof(bytes));
+
+ SrsCommonMessage* msg;
+ SrsAutoFree(SrsCommonMessage, msg);
+ HELPER_EXPECT_FAILED(p.recv_message(&msg));
+
+ uint8_t bytes2[] = {0x43, 0,0,0, 0,0,5, 0, 0,0,0,0, 1,2,3};
+ io.in_buffer.append((char*)bytes2, sizeof(bytes2));
+ HELPER_EXPECT_FAILED(p.recv_message(&msg));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x03};
+ io.in_buffer.append((char*)bytes, sizeof(bytes));
+
+ SrsCommonMessage* msg;
+ SrsAutoFree(SrsCommonMessage, msg);
+ HELPER_EXPECT_FAILED(p.recv_message(&msg));
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ uint8_t bytes[] = {0x43, 0,0,0, 0,0,0, 0};
+ io.in_buffer.append((char*)bytes, sizeof(bytes));
+
+ SrsCommonMessage* msg;
+ SrsAutoFree(SrsCommonMessage, msg);
+ HELPER_EXPECT_FAILED(p.recv_message(&msg));
+ }
+}
+
+VOID TEST(ProtoStackTest, RecvMessage3)
+{
+ if (true) {
+ SrsRequest req;
+ req.ip = "10.11.12.13";
+
+ SrsRequest* cp = req.copy();
+ EXPECT_STREQ("10.11.12.13", cp->ip.c_str());
+ srs_freep(cp);
+ }
+
+ if (true) {
+ SrsRequest req;
+ req.ip = "10.11.12.13";
+
+ SrsAmf0Object* obj = SrsAmf0Any::object();
+ obj->set("id", SrsAmf0Any::str("srs"));
+ req.args = obj;
+
+ SrsRequest* cp = req.copy();
+ EXPECT_STREQ("10.11.12.13", cp->ip.c_str());
+
+ SrsAmf0Object* cpa = dynamic_cast(cp->args);
+ SrsAmf0Any* cps = cpa->ensure_property_string("id");
+ EXPECT_STREQ("srs", cps->to_str().c_str());
+ srs_freep(cp);
+ }
+
+ if (true) {
+ SrsRequest req;
+ EXPECT_STREQ("//", req.get_stream_url().c_str());
+ }
+
+ if (true) {
+ SrsRequest req;
+ EXPECT_STREQ("", req.schema.c_str());
+
+ req.as_http();
+ EXPECT_STREQ("http", req.schema.c_str());
+ }
+
+ if (true) {
+ SrsResponse res;
+ EXPECT_EQ(1, res.stream_id);
+ }
+
+ if (true) {
+ EXPECT_STREQ("Play", srs_client_type_string(SrsRtmpConnPlay).c_str());
+ EXPECT_STREQ("flash-publish", srs_client_type_string(SrsRtmpConnFlashPublish).c_str());
+ EXPECT_STREQ("fmle-publish", srs_client_type_string(SrsRtmpConnFMLEPublish).c_str());
+ EXPECT_STREQ("haivision-publish", srs_client_type_string(SrsRtmpConnHaivisionPublish).c_str());
+ EXPECT_STREQ("Unknown", srs_client_type_string(SrsRtmpConnType(0x0f)).c_str());
+
+ EXPECT_TRUE(srs_client_type_is_publish(SrsRtmpConnFlashPublish));
+ EXPECT_TRUE(srs_client_type_is_publish(SrsRtmpConnFMLEPublish));
+ EXPECT_TRUE(srs_client_type_is_publish(SrsRtmpConnHaivisionPublish));
+ EXPECT_FALSE(srs_client_type_is_publish(SrsRtmpConnPlay));
+ }
+}
+
+VOID TEST(ProtoStackTest, RecvMessage4)
+{
+ srs_error_t err;
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
+ pkt->chunk_size = 256;
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(pkt, 0));
+
+ io.in_buffer.append(&io.out_buffer);
+
+ SrsCommonMessage* msg;
+ SrsAutoFree(SrsCommonMessage, msg);
+ HELPER_EXPECT_SUCCESS(p.recv_message(&msg));
+
+ EXPECT_EQ(256, p.out_chunk_size);
+ }
+
+ if (true) {
+ MockBufferIO io;
+ SrsProtocol p(&io);
+
+ SrsUserControlPacket* pkt = new SrsUserControlPacket();
+ pkt->event_type = SrcPCUCSetBufferLength;
+ pkt->extra_data = 256;
+ HELPER_EXPECT_SUCCESS(p.send_and_free_packet(pkt, 0));
+
+ io.in_buffer.append(&io.out_buffer);
+
+ SrsCommonMessage* msg;
+ SrsAutoFree(SrsCommonMessage, msg);
+ HELPER_EXPECT_SUCCESS(p.recv_message(&msg));
+
+ EXPECT_EQ(256, p.in_buffer_length);
+ }
}