|
|
|
@ -516,7 +516,12 @@ int SrsRtmpConn::playing(SrsSource* source)
|
|
|
|
|
SrsAutoFree(SrsConsumer, consumer);
|
|
|
|
|
srs_verbose("consumer created success.");
|
|
|
|
|
|
|
|
|
|
// TODO: FIXME: remove it.
|
|
|
|
|
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
|
|
|
|
|
// disable the timeout.
|
|
|
|
|
// TODO: FIXME: maybe can use larger timeout?
|
|
|
|
|
rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT);
|
|
|
|
|
rtmp->set_send_timeout(ST_UTIME_NO_TIMEOUT);
|
|
|
|
|
|
|
|
|
|
SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER);
|
|
|
|
|
|
|
|
|
@ -525,12 +530,30 @@ int SrsRtmpConn::playing(SrsSource* source)
|
|
|
|
|
bool user_specified_duration_to_stop = (req->duration > 0);
|
|
|
|
|
int64_t starttime = -1;
|
|
|
|
|
|
|
|
|
|
pollfd pds[2];
|
|
|
|
|
// poll the client incoming fd.
|
|
|
|
|
pds[0].fd = st_netfd_fileno(stfd);
|
|
|
|
|
pds[0].events = POLLIN;
|
|
|
|
|
// poll the consumer queue pipe.
|
|
|
|
|
pds[1].fd = st_netfd_fileno(consumer->pipe_fd());
|
|
|
|
|
pds[1].events = POLLIN;
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
// collect elapse for pithy print.
|
|
|
|
|
pithy_print.elapse();
|
|
|
|
|
|
|
|
|
|
pds[0].revents = 0;
|
|
|
|
|
pds[1].revents = 0;
|
|
|
|
|
|
|
|
|
|
// wait for packet incoming or data outgoing.
|
|
|
|
|
if (st_poll(pds, 2, ST_UTIME_NO_TIMEOUT) <= 0) {
|
|
|
|
|
srs_error("st_poll failed.");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// packet incoming, read from RTMP.
|
|
|
|
|
// read from client.
|
|
|
|
|
if (true) {
|
|
|
|
|
if (pds[0].revents & POLLIN) {
|
|
|
|
|
SrsMessage* msg = NULL;
|
|
|
|
|
ret = rtmp->recv_message(&msg);
|
|
|
|
|
srs_verbose("play loop recv message. ret=%d", ret);
|
|
|
|
@ -553,49 +576,52 @@ int SrsRtmpConn::playing(SrsSource* source)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// get messages from consumer.
|
|
|
|
|
int count = 0;
|
|
|
|
|
if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("get messages from consumer failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// reportable
|
|
|
|
|
if (pithy_print.can_print()) {
|
|
|
|
|
kbps->sample();
|
|
|
|
|
srs_trace("-> "SRS_CONSTS_LOG_PLAY
|
|
|
|
|
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
|
|
|
|
|
pithy_print.age(), count,
|
|
|
|
|
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
|
|
|
|
|
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sendout messages
|
|
|
|
|
// @remark, becareful, all msgs must be free explicitly,
|
|
|
|
|
// free by send_and_free_message or srs_freep.
|
|
|
|
|
for (int i = 0; i < count; i++) {
|
|
|
|
|
SrsSharedPtrMessage* msg = msgs.msgs[i];
|
|
|
|
|
|
|
|
|
|
// the send_message will free the msg,
|
|
|
|
|
// so set the msgs[i] to NULL.
|
|
|
|
|
msgs.msgs[i] = NULL;
|
|
|
|
|
// data outgoing, sendout packets.
|
|
|
|
|
if (pds[1].revents & POLLIN) {
|
|
|
|
|
// get messages from consumer.
|
|
|
|
|
int count = 0;
|
|
|
|
|
if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("get messages from consumer failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// reportable
|
|
|
|
|
if (pithy_print.can_print()) {
|
|
|
|
|
kbps->sample();
|
|
|
|
|
srs_trace("-> "SRS_CONSTS_LOG_PLAY
|
|
|
|
|
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
|
|
|
|
|
pithy_print.age(), count,
|
|
|
|
|
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
|
|
|
|
|
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// only when user specifies the duration,
|
|
|
|
|
// we start to collect the durations for each message.
|
|
|
|
|
if (user_specified_duration_to_stop) {
|
|
|
|
|
// foreach msg, collect the duration.
|
|
|
|
|
// @remark: never use msg when sent it, for the protocol sdk will free it.
|
|
|
|
|
if (starttime < 0 || starttime > msg->header.timestamp) {
|
|
|
|
|
// sendout messages
|
|
|
|
|
// @remark, becareful, all msgs must be free explicitly,
|
|
|
|
|
// free by send_and_free_message or srs_freep.
|
|
|
|
|
for (int i = 0; i < count; i++) {
|
|
|
|
|
SrsSharedPtrMessage* msg = msgs.msgs[i];
|
|
|
|
|
|
|
|
|
|
// the send_message will free the msg,
|
|
|
|
|
// so set the msgs[i] to NULL.
|
|
|
|
|
msgs.msgs[i] = NULL;
|
|
|
|
|
|
|
|
|
|
// only when user specifies the duration,
|
|
|
|
|
// we start to collect the durations for each message.
|
|
|
|
|
if (user_specified_duration_to_stop) {
|
|
|
|
|
// foreach msg, collect the duration.
|
|
|
|
|
// @remark: never use msg when sent it, for the protocol sdk will free it.
|
|
|
|
|
if (starttime < 0 || starttime > msg->header.timestamp) {
|
|
|
|
|
starttime = msg->header.timestamp;
|
|
|
|
|
}
|
|
|
|
|
duration += msg->header.timestamp - starttime;
|
|
|
|
|
starttime = msg->header.timestamp;
|
|
|
|
|
}
|
|
|
|
|
duration += msg->header.timestamp - starttime;
|
|
|
|
|
starttime = msg->header.timestamp;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// no need to assert msg, for the rtmp will assert it.
|
|
|
|
|
if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("send message to client failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
|
|
// no need to assert msg, for the rtmp will assert it.
|
|
|
|
|
if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {
|
|
|
|
|
srs_error("send message to client failed. ret=%d", ret);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|