From 265b70863c750570432396e3bf0c80d1febe737e Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 5 Jan 2020 18:37:20 +0800 Subject: [PATCH] Improve test coverage for service TCP/UDP. --- trunk/src/service/srs_service_st.cpp | 89 ++++++---- trunk/src/utest/srs_utest_service.cpp | 237 ++++++++++++++++++++++++++ trunk/src/utest/srs_utest_service.hpp | 2 + 3 files changed, 292 insertions(+), 36 deletions(-) diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index 512ec563c..30f8b120c 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -187,6 +187,43 @@ srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t return srs_success; } +srs_error_t do_srs_tcp_listen(int fd, addrinfo* r, srs_netfd_t* pfd) +{ + srs_error_t err = srs_success; + + // Detect alive for TCP connection. + // @see https://github.com/ossrs/srs/issues/1044 + if ((err = srs_fd_keepalive(fd)) != srs_success) { + return srs_error_wrap(err, "set keepalive"); + } + + if ((err = srs_fd_closeexec(fd)) != srs_success) { + return srs_error_wrap(err, "set closeexec"); + } + + if ((err = srs_fd_reuseaddr(fd)) != srs_success) { + return srs_error_wrap(err, "set reuseaddr"); + } + + if ((err = srs_fd_reuseport(fd)) != srs_success) { + return srs_error_wrap(err, "set reuseport"); + } + + if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) { + return srs_error_new(ERROR_SOCKET_BIND, "bind"); + } + + if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) { + return srs_error_new(ERROR_SOCKET_LISTEN, "listen"); + } + + if ((*pfd = srs_netfd_open_socket(fd)) == NULL){ + return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open"); + } + + return err; +} + srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd) { srs_error_t err = srs_success; @@ -213,41 +250,41 @@ srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd) r->ai_family, r->ai_socktype, r->ai_protocol); } - // Detect alive for TCP connection. - // @see https://github.com/ossrs/srs/issues/1044 - if ((err = srs_fd_keepalive(fd)) != srs_success) { + if ((err = do_srs_tcp_listen(fd, r, pfd)) != srs_success) { ::close(fd); - return srs_error_wrap(err, "set keepalive fd=%d", fd); + return srs_error_wrap(err, "fd=%d", fd); } + return err; +} + +srs_error_t do_srs_udp_listen(int fd, addrinfo* r, srs_netfd_t* pfd) +{ + srs_error_t err = srs_success; + if ((err = srs_fd_closeexec(fd)) != srs_success) { ::close(fd); - return srs_error_wrap(err, "set closeexec fd=%d", fd); + return srs_error_wrap(err, "set closeexec"); } if ((err = srs_fd_reuseaddr(fd)) != srs_success) { ::close(fd); - return srs_error_wrap(err, "set reuseaddr fd=%d", fd); + return srs_error_wrap(err, "set reuseaddr"); } if ((err = srs_fd_reuseport(fd)) != srs_success) { ::close(fd); - return srs_error_wrap(err, "set reuseport fd=%d", fd); + return srs_error_wrap(err, "set reuseport"); } if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) { ::close(fd); - return srs_error_new(ERROR_SOCKET_BIND, "bind fd=%d", fd); - } - - if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) { - ::close(fd); - return srs_error_new(ERROR_SOCKET_LISTEN, "listen fd=%d", fd); + return srs_error_new(ERROR_SOCKET_BIND, "bind"); } if ((*pfd = srs_netfd_open_socket(fd)) == NULL){ ::close(fd); - return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open fd=%d", fd); + return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open"); } return err; @@ -279,29 +316,9 @@ srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd) r->ai_family, r->ai_socktype, r->ai_protocol); } - if ((err = srs_fd_closeexec(fd)) != srs_success) { - ::close(fd); - return srs_error_wrap(err, "set closeexec fd=%d", fd); - } - - if ((err = srs_fd_reuseaddr(fd)) != srs_success) { - ::close(fd); - return srs_error_wrap(err, "set reuseaddr fd=%d", fd); - } - - if ((err = srs_fd_reuseport(fd)) != srs_success) { - ::close(fd); - return srs_error_wrap(err, "set reuseport fd=%d", fd); - } - - if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) { - ::close(fd); - return srs_error_new(ERROR_SOCKET_BIND, "bind fd=%d", fd); - } - - if ((*pfd = srs_netfd_open_socket(fd)) == NULL){ + if ((err = do_srs_udp_listen(fd, r, pfd)) != srs_success) { ::close(fd); - return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open fd=%d", fd); + return srs_error_wrap(err, "fd=%d", fd); } return err; diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index 9307ab48f..d42abd43d 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -679,3 +679,240 @@ VOID TEST(TCPServerTest, MessageWritev) } } +VOID TEST(TCPServerTest, TCPListen) +{ + srs_error_t err; + + // Failed for invalid ip. + if (true) { + srs_netfd_t pfd = NULL; + HELPER_EXPECT_FAILED(srs_tcp_listen("10.0.0.abc", 1935, &pfd)); + srs_close_stfd(pfd); + } + + // If listen multiple times, should success for we already set the REUSEPORT. + if (true) { + srs_netfd_t pfd = NULL; + HELPER_ASSERT_SUCCESS(srs_tcp_listen("127.0.0.1", 1935, &pfd)); + + srs_netfd_t pfd2 = NULL; + srs_error_t err2 = srs_tcp_listen("127.0.0.1", 1935, &pfd2); + + srs_close_stfd(pfd); + srs_close_stfd(pfd2); + HELPER_EXPECT_SUCCESS(err2); + } + + // Typical listen. + if (true) { + srs_netfd_t pfd = NULL; + HELPER_ASSERT_SUCCESS(srs_tcp_listen("127.0.0.1", 1935, &pfd)); + srs_close_stfd(pfd); + } +} + +VOID TEST(TCPServerTest, UDPListen) +{ + srs_error_t err; + + // Failed for invalid ip. + if (true) { + srs_netfd_t pfd = NULL; + HELPER_EXPECT_FAILED(srs_udp_listen("10.0.0.abc", 1935, &pfd)); + srs_close_stfd(pfd); + } + + // If listen multiple times, should success for we already set the REUSEPORT. + if (true) { + srs_netfd_t pfd = NULL; + HELPER_ASSERT_SUCCESS(srs_udp_listen("127.0.0.1", 1935, &pfd)); + + srs_netfd_t pfd2 = NULL; + srs_error_t err2 = srs_udp_listen("127.0.0.1", 1935, &pfd2); + + srs_close_stfd(pfd); + srs_close_stfd(pfd2); + HELPER_EXPECT_SUCCESS(err2); + } + + // Typical listen. + if (true) { + srs_netfd_t pfd = NULL; + HELPER_ASSERT_SUCCESS(srs_udp_listen("127.0.0.1", 1935, &pfd)); + srs_close_stfd(pfd); + } +} + +class MockOnCycleThread : public ISrsCoroutineHandler +{ +public: + SrsSTCoroutine trd; + srs_cond_t cond; + MockOnCycleThread() : trd("mock", this, 0) { + cond = srs_cond_new(); + }; + virtual ~MockOnCycleThread() { + srs_cond_destroy(cond); + } + virtual srs_error_t cycle() { + srs_error_t err = srs_success; + + for (;;) { + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_cond_signal(cond); + // If no one waiting on the cond, directly return event signal more than one time. + // If someone waiting, signal them more than one time. + srs_cond_signal(cond); + + if ((err = trd.pull()) != srs_success) { + return err; + } + } + + return err; + } +}; + +VOID TEST(TCPServerTest, ThreadCondWait) +{ + MockOnCycleThread trd; + trd.trd.start(); + + srs_usleep(20 * SRS_UTIME_MILLISECONDS); + srs_cond_wait(trd.cond); + trd.trd.stop(); +} + +class MockOnCycleThread2 : public ISrsCoroutineHandler +{ +public: + SrsSTCoroutine trd; + srs_mutex_t lock; + MockOnCycleThread2() : trd("mock", this, 0) { + lock = srs_mutex_new(); + }; + virtual ~MockOnCycleThread2() { + srs_mutex_destroy(lock); + } + virtual srs_error_t cycle() { + srs_error_t err = srs_success; + + for (;;) { + srs_mutex_lock(lock); + srs_usleep(10 * SRS_UTIME_MILLISECONDS); + srs_mutex_unlock(lock); + + srs_error_t err = trd.pull(); + if (err != srs_success) { + return err; + } + } + + return err; + } +}; + +VOID TEST(TCPServerTest, ThreadMutexWait) +{ + MockOnCycleThread2 trd; + trd.trd.start(); + + srs_usleep(20 * SRS_UTIME_MILLISECONDS); + + srs_mutex_lock(trd.lock); + trd.trd.stop(); + srs_mutex_unlock(trd.lock); +} + +class MockOnCycleThread3 : public ISrsCoroutineHandler +{ +public: + SrsSTCoroutine trd; + srs_netfd_t fd; + MockOnCycleThread3() : trd("mock", this, 0) { + }; + virtual ~MockOnCycleThread3() { + trd.stop(); + srs_close_stfd(fd); + } + virtual srs_error_t start(string ip, int port) { + srs_error_t err = srs_success; + if ((err = srs_tcp_listen(ip, port, &fd)) != srs_success) { + return err; + } + + return trd.start(); + } + virtual srs_error_t do_cycle(srs_netfd_t cfd) { + srs_error_t err = srs_success; + + SrsStSocket skt; + if ((err = skt.initialize(cfd)) != srs_success) { + return err; + } + + skt.set_recv_timeout(1 * SRS_UTIME_SECONDS); + skt.set_send_timeout(1 * SRS_UTIME_SECONDS); + + while (true) { + if ((err = trd.pull()) != srs_success) { + return err; + } + + char buf[5]; + if ((err = skt.read_fully(buf, 5, NULL)) != srs_success) { + return err; + } + if ((err = skt.write(buf, 5, NULL)) != srs_success) { + return err; + } + } + + return err; + } + virtual srs_error_t cycle() { + srs_error_t err = srs_success; + + srs_netfd_t cfd = srs_accept(fd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); + if (cfd == NULL) { + return err; + } + + err = do_cycle(cfd); + srs_close_stfd(cfd); + srs_freep(err); + + return err; + } +}; + +VOID TEST(TCPServerTest, TCPClientServer) +{ + srs_error_t err; + + MockOnCycleThread3 trd; + HELPER_ASSERT_SUCCESS(trd.start("127.0.0.1", 1935)); + + SrsTcpClient c("127.0.0.1", 1935, 1 * SRS_UTIME_SECONDS); + HELPER_ASSERT_SUCCESS(c.connect()); + + c.set_recv_timeout(1 * SRS_UTIME_SECONDS); + c.set_send_timeout(1 * SRS_UTIME_SECONDS); + + if (true) { + HELPER_ASSERT_SUCCESS(c.write((void*)"Hello", 5, NULL)); + + char buf[6]; HELPER_ARRAY_INIT(buf, 6, 0); + HELPER_ASSERT_SUCCESS(c.read(buf, 5, NULL)); + EXPECT_STREQ("Hello", buf); + } + + if (true) { + HELPER_ASSERT_SUCCESS(c.write((void*)"Hello", 5, NULL)); + + char buf[6]; HELPER_ARRAY_INIT(buf, 6, 0); + HELPER_ASSERT_SUCCESS(c.read_fully(buf, 5, NULL)); + EXPECT_STREQ("Hello", buf); + } +} + diff --git a/trunk/src/utest/srs_utest_service.hpp b/trunk/src/utest/srs_utest_service.hpp index 819272dc6..995b75e83 100644 --- a/trunk/src/utest/srs_utest_service.hpp +++ b/trunk/src/utest/srs_utest_service.hpp @@ -29,5 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include +#include + #endif