support fmle publish. fix bug of rtmp chunk stream.

pull/133/head
winlin 11 years ago
parent e598616e14
commit 33c586181d

@ -45,4 +45,20 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <stddef.h>
#include <sys/types.h>
// free the p and set to NULL.
// p must be a T*.
#define srs_freep(p) \
if (p) { \
delete p; \
p = NULL; \
} \
(void)0
// free the p which represents a array
#define srs_freepa(p) \
if (p) { \
delete[] p; \
p = NULL; \
} \
(void)0
#endif

@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_amf0.hpp>
#include <utility>
#include <srs_core_log.hpp>
#include <srs_core_error.hpp>
#include <srs_core_stream.hpp>
@ -169,33 +171,76 @@ SrsAmf0ObjectEOF::~SrsAmf0ObjectEOF()
{
}
SrsAmf0Object::SrsAmf0Object()
SrsUnSortedHashtable::SrsUnSortedHashtable()
{
marker = RTMP_AMF0_Object;
}
SrsAmf0Object::~SrsAmf0Object()
SrsUnSortedHashtable::~SrsUnSortedHashtable()
{
std::map<std::string, SrsAmf0Any*>::iterator it;
std::vector<SrsObjectPropertyType>::iterator it;
for (it = properties.begin(); it != properties.end(); ++it) {
SrsAmf0Any* any = it->second;
delete any;
SrsObjectPropertyType& elem = *it;
SrsAmf0Any* any = elem.second;
srs_freep(any);
}
properties.clear();
}
SrsAmf0Any* SrsAmf0Object::get_property(std::string name)
int SrsUnSortedHashtable::size()
{
std::map<std::string, SrsAmf0Any*>::iterator it;
return (int)properties.size();
}
if ((it = properties.find(name)) == properties.end()) {
return NULL;
std::string SrsUnSortedHashtable::key_at(int index)
{
srs_assert(index < size());
SrsObjectPropertyType& elem = properties[index];
return elem.first;
}
SrsAmf0Any* SrsUnSortedHashtable::value_at(int index)
{
srs_assert(index < size());
SrsObjectPropertyType& elem = properties[index];
return elem.second;
}
void SrsUnSortedHashtable::set(std::string key, SrsAmf0Any* value)
{
std::vector<SrsObjectPropertyType>::iterator it;
for (it = properties.begin(); it != properties.end(); ++it) {
SrsObjectPropertyType& elem = *it;
std::string name = elem.first;
SrsAmf0Any* any = elem.second;
if (key == name) {
srs_freep(any);
properties.erase(it);
break;
}
}
return it->second;
properties.push_back(std::make_pair(key, value));
}
SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name)
SrsAmf0Any* SrsUnSortedHashtable::get_property(std::string name)
{
std::vector<SrsObjectPropertyType>::iterator it;
for (it = properties.begin(); it != properties.end(); ++it) {
SrsObjectPropertyType& elem = *it;
std::string key = elem.first;
SrsAmf0Any* any = elem.second;
if (key == name) {
return any;
}
}
return NULL;
}
SrsAmf0Any* SrsUnSortedHashtable::ensure_property_string(std::string name)
{
SrsAmf0Any* prop = get_property(name);
@ -210,6 +255,45 @@ SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name)
return prop;
}
SrsAmf0Object::SrsAmf0Object()
{
marker = RTMP_AMF0_Object;
}
SrsAmf0Object::~SrsAmf0Object()
{
}
int SrsAmf0Object::size()
{
return properties.size();
}
std::string SrsAmf0Object::key_at(int index)
{
return properties.key_at(index);
}
SrsAmf0Any* SrsAmf0Object::value_at(int index)
{
return properties.value_at(index);
}
void SrsAmf0Object::set(std::string key, SrsAmf0Any* value)
{
properties.set(key, value);
}
SrsAmf0Any* SrsAmf0Object::get_property(std::string name)
{
return properties.get_property(name);
}
SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name)
{
return properties.ensure_property_string(name);
}
SrsASrsAmf0EcmaArray::SrsASrsAmf0EcmaArray()
{
marker = RTMP_AMF0_EcmaArray;
@ -217,38 +301,36 @@ SrsASrsAmf0EcmaArray::SrsASrsAmf0EcmaArray()
SrsASrsAmf0EcmaArray::~SrsASrsAmf0EcmaArray()
{
std::map<std::string, SrsAmf0Any*>::iterator it;
for (it = properties.begin(); it != properties.end(); ++it) {
SrsAmf0Any* any = it->second;
delete any;
}
properties.clear();
}
SrsAmf0Any* SrsASrsAmf0EcmaArray::get_property(std::string name)
int SrsASrsAmf0EcmaArray::size()
{
std::map<std::string, SrsAmf0Any*>::iterator it;
if ((it = properties.find(name)) == properties.end()) {
return NULL;
}
return properties.size();
}
return it->second;
std::string SrsASrsAmf0EcmaArray::key_at(int index)
{
return properties.key_at(index);
}
SrsAmf0Any* SrsASrsAmf0EcmaArray::ensure_property_string(std::string name)
SrsAmf0Any* SrsASrsAmf0EcmaArray::value_at(int index)
{
SrsAmf0Any* prop = get_property(name);
return properties.value_at(index);
}
if (!prop) {
return NULL;
}
void SrsASrsAmf0EcmaArray::set(std::string key, SrsAmf0Any* value)
{
properties.set(key, value);
}
if (!prop->is_string()) {
return NULL;
}
SrsAmf0Any* SrsASrsAmf0EcmaArray::get_property(std::string name)
{
return properties.get_property(name);
}
return prop;
SrsAmf0Any* SrsASrsAmf0EcmaArray::ensure_property_string(std::string name)
{
return properties.ensure_property_string(name);
}
int srs_amf0_read_utf8(SrsStream* stream, std::string& value)
@ -877,14 +959,14 @@ int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value)
// AMF0 Object EOF.
if (property_name.empty() || !property_value || property_value->is_object_eof()) {
if (property_value) {
delete property_value;
srs_freep(property_value);
}
srs_info("amf0 read object EOF.");
break;
}
// add property
value->properties[property_name] = property_value;
value->set(property_name, property_value);
}
return ret;
@ -906,10 +988,9 @@ int srs_amf0_write_object(SrsStream* stream, SrsAmf0Object* value)
srs_verbose("amf0 write object marker success");
// value
std::map<std::string, SrsAmf0Any*>::iterator it;
for (it = value->properties.begin(); it != value->properties.end(); ++it) {
std::string name = it->first;
SrsAmf0Any* any = it->second;
for (int i = 0; i < value->size(); i++) {
std::string name = value->key_at(i);
SrsAmf0Any* any = value->value_at(i);
if ((ret = srs_amf0_write_utf8(stream, name)) != ERROR_SUCCESS) {
srs_error("write object property name failed. ret=%d", ret);
@ -986,14 +1067,14 @@ int srs_amf0_read_ecma_array(SrsStream* stream, SrsASrsAmf0EcmaArray*& value)
// AMF0 Object EOF.
if (property_name.empty() || !property_value || property_value->is_object_eof()) {
if (property_value) {
delete property_value;
srs_freep(property_value);
}
srs_info("amf0 read ecma_array EOF.");
break;
}
// add property
value->properties[property_name] = property_value;
value->set(property_name, property_value);
}
return ret;
@ -1025,10 +1106,9 @@ int srs_amf0_write_ecma_array(SrsStream* stream, SrsASrsAmf0EcmaArray* value)
srs_verbose("amf0 write ecma_array count success. count=%d", value->count);
// value
std::map<std::string, SrsAmf0Any*>::iterator it;
for (it = value->properties.begin(); it != value->properties.end(); ++it) {
std::string name = it->first;
SrsAmf0Any* any = it->second;
for (int i = 0; i < value->size(); i++) {
std::string name = value->key_at(i);
SrsAmf0Any* any = value->value_at(i);
if ((ret = srs_amf0_write_utf8(stream, name)) != ERROR_SUCCESS) {
srs_error("write ecma_array property name failed. ret=%d", ret);
@ -1091,10 +1171,9 @@ int srs_amf0_get_object_size(SrsAmf0Object* obj)
int size = 1;
std::map<std::string, SrsAmf0Any*>::iterator it;
for (it = obj->properties.begin(); it != obj->properties.end(); ++it) {
std::string name = it->first;
SrsAmf0Any* value = it->second;
for (int i = 0; i < obj->size(); i++){
std::string name = obj->key_at(i);
SrsAmf0Any* value = obj->value_at(i);
size += srs_amf0_get_utf8_size(name);
size += srs_amf0_get_any_size(value);
@ -1113,10 +1192,9 @@ int srs_amf0_get_ecma_array_size(SrsASrsAmf0EcmaArray* arr)
int size = 1 + 4;
std::map<std::string, SrsAmf0Any*>::iterator it;
for (it = arr->properties.begin(); it != arr->properties.end(); ++it) {
std::string name = it->first;
SrsAmf0Any* value = it->second;
for (int i = 0; i < arr->size(); i++){
std::string name = arr->key_at(i);
SrsAmf0Any* value = arr->value_at(i);
size += srs_amf0_get_utf8_size(name);
size += srs_amf0_get_any_size(value);

@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <string>
#include <map>
#include <vector>
class SrsStream;
class SrsAmf0Object;
@ -139,6 +139,30 @@ struct SrsAmf0ObjectEOF : public SrsAmf0Any
virtual ~SrsAmf0ObjectEOF();
};
/**
* to ensure in inserted order.
* for the FMLE will crash when AMF0Object is not ordered by inserted,
* if ordered in map, the string compare order, the FMLE will creash when
* get the response of connect app.
*/
struct SrsUnSortedHashtable
{
private:
typedef std::pair<std::string, SrsAmf0Any*> SrsObjectPropertyType;
std::vector<SrsObjectPropertyType> properties;
public:
SrsUnSortedHashtable();
virtual ~SrsUnSortedHashtable();
virtual int size();
virtual std::string key_at(int index);
virtual SrsAmf0Any* value_at(int index);
virtual void set(std::string key, SrsAmf0Any* value);
virtual SrsAmf0Any* get_property(std::string name);
virtual SrsAmf0Any* ensure_property_string(std::string name);
};
/**
* 2.5 Object Type
* anonymous-object-type = object-marker *(object-property)
@ -146,12 +170,19 @@ struct SrsAmf0ObjectEOF : public SrsAmf0Any
*/
struct SrsAmf0Object : public SrsAmf0Any
{
std::map<std::string, SrsAmf0Any*> properties;
private:
SrsUnSortedHashtable properties;
public:
SrsAmf0ObjectEOF eof;
SrsAmf0Object();
virtual ~SrsAmf0Object();
virtual int size();
virtual std::string key_at(int index);
virtual SrsAmf0Any* value_at(int index);
virtual void set(std::string key, SrsAmf0Any* value);
virtual SrsAmf0Any* get_property(std::string name);
virtual SrsAmf0Any* ensure_property_string(std::string name);
};
@ -164,13 +195,20 @@ struct SrsAmf0Object : public SrsAmf0Any
*/
struct SrsASrsAmf0EcmaArray : public SrsAmf0Any
{
private:
SrsUnSortedHashtable properties;
public:
int32_t count;
std::map<std::string, SrsAmf0Any*> properties;
SrsAmf0ObjectEOF eof;
SrsASrsAmf0EcmaArray();
virtual ~SrsASrsAmf0EcmaArray();
virtual int size();
virtual std::string key_at(int index);
virtual SrsAmf0Any* value_at(int index);
virtual void set(std::string key, SrsAmf0Any* value);
virtual SrsAmf0Any* get_property(std::string name);
virtual SrsAmf0Any* ensure_property_string(std::string name);
};

@ -55,7 +55,7 @@ void SrsBuffer::append(char* bytes, int size)
{
std::vector<char> vec(bytes, bytes + size);
data.insert(data.begin(), vec.begin(), vec.end());
data.insert(data.end(), vec.begin(), vec.end());
}
int SrsBuffer::ensure_buffer_bytes(SrsSocket* skt, int required_size)

@ -42,20 +42,9 @@ SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
SrsClient::~SrsClient()
{
if (ip) {
delete[] ip;
ip = NULL;
}
if (req) {
delete req;
req = NULL;
}
if (rtmp) {
delete rtmp;
rtmp = NULL;
}
srs_freepa(ip);
srs_freep(req);
srs_freep(rtmp);
}
int SrsClient::do_cycle()

@ -199,6 +199,7 @@ messages.
#define RTMP_AMF0_COMMAND_RESULT "_result"
#define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream"
#define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish"
#define RTMP_AMF0_COMMAND_PUBLISH "publish"
#define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess"
/****************************************************************************
@ -260,23 +261,13 @@ SrsProtocol::~SrsProtocol()
for (it = chunk_streams.begin(); it != chunk_streams.end(); ++it) {
SrsChunkStream* stream = it->second;
if (stream) {
delete stream;
}
srs_freep(stream);
}
chunk_streams.clear();
if (buffer) {
delete buffer;
buffer = NULL;
}
if (skt) {
delete skt;
skt = NULL;
}
srs_freep(buffer);
srs_freep(skt);
}
int SrsProtocol::recv_message(SrsMessage** pmsg)
@ -302,13 +293,13 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
srs_trace("ignore empty message(type=%d, size=%d, time=%d, sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
delete msg;
srs_freep(msg);
continue;
}
if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) {
srs_error("hook the received msg failed. ret=%d", ret);
delete msg;
srs_freep(msg);
return ret;
}
@ -615,13 +606,36 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
{
int ret = ERROR_SUCCESS;
// when not exists cached msg, means get an new message,
// the fmt must be type0/type1 which means new message.
if (!chunk->msg && fmt != RTMP_FMT_TYPE0 && fmt != RTMP_FMT_TYPE1) {
/**
* we should not assert anything about fmt, for the first packet.
* (when first packet, the chunk->msg is NULL).
* the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet.
* the previous packet is:
* 04 // fmt=0, cid=4
* 00 00 1a // timestamp=26
* 00 00 9d // payload_length=157
* 08 // message_type=8(audio)
* 01 00 00 00 // stream_id=1
* the current packet maybe:
* c4 // fmt=3, cid=4
* it's ok, for the packet is audio, and timestamp delta is 26.
* the current packet must be parsed as:
* fmt=0, cid=4
* timestamp=26+26=52
* payload_length=157
* message_type=8(audio)
* stream_id=1
* so we must update the timestamp even fmt=3 for first packet.
*/
// fresh packet used to update the timestamp even fmt=3 for first packet.
bool is_fresh_packet = !chunk->msg;
// but, we can ensure that when a chunk stream is fresh,
// the fmt must be 0, a new stream.
if (chunk->msg_count == 0 && fmt != RTMP_FMT_TYPE0) {
ret = ERROR_RTMP_CHUNK_START;
srs_error("chunk stream start, "
"fmt must be %d or %d, actual is %d. ret=%d",
RTMP_FMT_TYPE0, RTMP_FMT_TYPE1, fmt, ret);
srs_error("chunk stream is fresh, "
"fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret);
return ret;
}
@ -636,13 +650,12 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
// create msg when new chunk stream start
if (!chunk->msg) {
srs_assert(fmt == RTMP_FMT_TYPE0 || fmt == RTMP_FMT_TYPE1);
chunk->msg = new SrsMessage();
srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid);
}
// read message header from socket to buffer.
static char mh_sizes[] = {11, 7, 1, 0};
static char mh_sizes[] = {11, 7, 3, 0};
mh_size = mh_sizes[(int)fmt];
srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size);
@ -656,8 +669,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
// parse the message header.
// see also: ngx_rtmp_recv
if (fmt <= RTMP_FMT_TYPE2) {
int32_t timestamp_delta;
char* pp = (char*)&timestamp_delta;
char* pp = (char*)&chunk->header.timestamp_delta;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
@ -667,13 +679,13 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
// 6.1.2.1. Type 0
// For a type-0 chunk, the absolute timestamp of the message is sent
// here.
chunk->header.timestamp = timestamp_delta;
chunk->header.timestamp = chunk->header.timestamp_delta;
} else {
// 6.1.2.2. Type 1
// 6.1.2.3. Type 2
// For a type-1 or type-2 chunk, the difference between the previous
// chunk's timestamp and the current chunk's timestamp is sent here.
chunk->header.timestamp += timestamp_delta;
chunk->header.timestamp += chunk->header.timestamp_delta;
}
// fmt: 0
@ -689,7 +701,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
// 0x00ffffff), this value MUST be 16777215, and the extended
// timestamp header MUST be present. Otherwise, this value SHOULD be
// the entire delta.
chunk->extended_timestamp = (timestamp_delta >= RTMP_EXTENDED_TIMESTAMP);
chunk->extended_timestamp = (chunk->header.timestamp_delta >= RTMP_EXTENDED_TIMESTAMP);
if (chunk->extended_timestamp) {
chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP;
}
@ -722,6 +734,10 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp);
}
} else {
// update the timestamp even fmt=3 for first stream
if (is_fresh_packet && !chunk->extended_timestamp) {
chunk->header.timestamp += chunk->header.timestamp_delta;
}
srs_verbose("header read completed. fmt=%d, size=%d, ext_time=%d",
fmt, mh_size, chunk->extended_timestamp);
}
@ -754,6 +770,9 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz
// copy header to msg
chunk->msg->header = chunk->header;
// increase the msg count, the chunk stream can accept fmt=1/2/3 message now.
chunk->msg_count++;
return ret;
}
@ -802,7 +821,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
buffer->erase(bh_size + mh_size + payload_size);
chunk->msg->size += payload_size;
srs_verbose("chunk payload read complted. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size);
srs_verbose("chunk payload read completed. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size);
// got entire RTMP message?
if (chunk->header.payload_length == chunk->msg->size) {
@ -826,8 +845,10 @@ SrsMessageHeader::SrsMessageHeader()
{
message_type = 0;
payload_length = 0;
timestamp = 0;
timestamp_delta = 0;
stream_id = 0;
timestamp = 0;
}
SrsMessageHeader::~SrsMessageHeader()
@ -849,20 +870,23 @@ bool SrsMessageHeader::is_window_ackledgement_size()
return message_type == RTMP_MSG_WindowAcknowledgementSize;
}
bool SrsMessageHeader::is_set_chunk_size()
{
return message_type == RTMP_MSG_SetChunkSize;
}
SrsChunkStream::SrsChunkStream(int _cid)
{
fmt = 0;
cid = _cid;
extended_timestamp = false;
msg = NULL;
msg_count = 0;
}
SrsChunkStream::~SrsChunkStream()
{
if (msg) {
delete msg;
msg = NULL;
}
srs_freep(msg);
}
SrsMessage::SrsMessage()
@ -875,20 +899,9 @@ SrsMessage::SrsMessage()
SrsMessage::~SrsMessage()
{
if (payload) {
delete[] payload;
payload = NULL;
}
if (packet) {
delete packet;
packet = NULL;
}
if (stream) {
delete stream;
stream = NULL;
}
srs_freep(payload);
srs_freep(packet);
srs_freep(stream);
}
int SrsMessage::decode_packet()
@ -962,6 +975,10 @@ int SrsMessage::decode_packet()
srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message).");
packet = new SrsFMLEStartPacket();
return packet->decode(stream);
} else if(command == RTMP_AMF0_COMMAND_PUBLISH) {
srs_info("decode the AMF0/AMF3 command(publish message).");
packet = new SrsPublishPacket();
return packet->decode(stream);
}
// default packet to drop message.
@ -972,6 +989,10 @@ int SrsMessage::decode_packet()
srs_verbose("start to decode set ack window size message.");
packet = new SrsSetWindowAckSizePacket();
return packet->decode(stream);
} else if(header.is_set_chunk_size()) {
srs_verbose("start to decode set chunk size message.");
packet = new SrsSetChunkSizePacket();
return packet->decode(stream);
} else {
// default packet to drop message.
srs_trace("drop the unknown message, type=%d", header.message_type);
@ -1008,9 +1029,7 @@ int SrsMessage::get_perfer_cid()
void SrsMessage::set_packet(SrsPacket* pkt, int stream_id)
{
if (packet) {
delete packet;
}
srs_freep(packet);
packet = pkt;
@ -1029,10 +1048,7 @@ int SrsMessage::encode_packet()
}
// realloc the payload.
size = 0;
if (payload) {
delete[] payload;
payload = NULL;
}
srs_freepa(payload);
return packet->encode(size, (char*&)payload);
}
@ -1087,14 +1103,14 @@ int SrsPacket::encode(int& psize, char*& ppayload)
if ((ret = stream.initialize(payload, size)) != ERROR_SUCCESS) {
srs_error("initialize the stream failed. ret=%d", ret);
delete[] payload;
srs_freepa(payload);
return ret;
}
}
if ((ret = encode_packet(&stream)) != ERROR_SUCCESS) {
srs_error("encode the packet failed. ret=%d", ret);
delete[] payload;
srs_freepa(payload);
return ret;
}
@ -1132,10 +1148,7 @@ SrsConnectAppPacket::SrsConnectAppPacket()
SrsConnectAppPacket::~SrsConnectAppPacket()
{
if (command_object) {
delete command_object;
command_object = NULL;
}
srs_freep(command_object);
}
int SrsConnectAppPacket::decode(SrsStream* stream)
@ -1189,15 +1202,8 @@ SrsConnectAppResPacket::SrsConnectAppResPacket()
SrsConnectAppResPacket::~SrsConnectAppResPacket()
{
if (props) {
delete props;
props = NULL;
}
if (info) {
delete info;
info = NULL;
}
srs_freep(props);
srs_freep(info);
}
int SrsConnectAppResPacket::get_perfer_cid()
@ -1259,10 +1265,7 @@ SrsCreateStreamPacket::SrsCreateStreamPacket()
SrsCreateStreamPacket::~SrsCreateStreamPacket()
{
if (command_object) {
delete command_object;
command_object = NULL;
}
srs_freep(command_object);
}
int SrsCreateStreamPacket::decode(SrsStream* stream)
@ -1305,10 +1308,7 @@ SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id, doubl
SrsCreateStreamResPacket::~SrsCreateStreamResPacket()
{
if (command_object) {
delete command_object;
command_object = NULL;
}
srs_freep(command_object);
}
int SrsCreateStreamResPacket::get_perfer_cid()
@ -1365,10 +1365,12 @@ SrsFMLEStartPacket::SrsFMLEStartPacket()
{
command_name = RTMP_AMF0_COMMAND_CREATE_STREAM;
transaction_id = 0;
command_object = new SrsAmf0Null();
}
SrsFMLEStartPacket::~SrsFMLEStartPacket()
{
srs_freep(command_object);
}
int SrsFMLEStartPacket::decode(SrsStream* stream)
@ -1380,8 +1382,8 @@ int SrsFMLEStartPacket::decode(SrsStream* stream)
return ret;
}
if (command_name.empty()
|| command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM
|| command_name != RTMP_AMF0_COMMAND_FC_PUBLISH
|| (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM
&& command_name != RTMP_AMF0_COMMAND_FC_PUBLISH)
) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode FMLE start command_name failed. "
@ -1394,6 +1396,11 @@ int SrsFMLEStartPacket::decode(SrsStream* stream)
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start command_object failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret);
return ret;
@ -1414,14 +1421,8 @@ SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id)
SrsFMLEStartResPacket::~SrsFMLEStartResPacket()
{
if (command_object) {
delete command_object;
command_object = NULL;
}
if (args) {
delete args;
args = NULL;
}
srs_freep(command_object);
srs_freep(args);
}
int SrsFMLEStartResPacket::get_perfer_cid()
@ -1474,6 +1475,59 @@ int SrsFMLEStartResPacket::encode_packet(SrsStream* stream)
return ret;
}
SrsPublishPacket::SrsPublishPacket()
{
command_name = RTMP_AMF0_COMMAND_PUBLISH;
transaction_id = 0;
command_object = new SrsAmf0Null();
type = "live";
}
SrsPublishPacket::~SrsPublishPacket()
{
srs_freep(command_object);
}
int SrsPublishPacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode publish command_name failed. ret=%d", ret);
return ret;
}
if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_PUBLISH) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 decode publish command_name failed. "
"command_name=%s, ret=%d", command_name.c_str(), ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode publish transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode publish command_object failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode publish stream_name failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_string(stream, type)) != ERROR_SUCCESS) {
srs_error("amf0 decode publish type failed. ret=%d", ret);
return ret;
}
srs_info("amf0 decode publish packet success");
return ret;
}
SrsPlayPacket::SrsPlayPacket()
{
command_name = RTMP_AMF0_COMMAND_PLAY;
@ -1487,10 +1541,7 @@ SrsPlayPacket::SrsPlayPacket()
SrsPlayPacket::~SrsPlayPacket()
{
if (command_object) {
delete command_object;
command_object = NULL;
}
srs_freep(command_object);
}
int SrsPlayPacket::decode(SrsStream* stream)
@ -1551,15 +1602,8 @@ SrsPlayResPacket::SrsPlayResPacket()
SrsPlayResPacket::~SrsPlayResPacket()
{
if (command_object) {
delete command_object;
command_object = NULL;
}
if (desc) {
delete desc;
desc = NULL;
}
srs_freep(command_object);
srs_freep(desc);
}
int SrsPlayResPacket::get_perfer_cid()
@ -1621,10 +1665,7 @@ SrsOnBWDonePacket::SrsOnBWDonePacket()
SrsOnBWDonePacket::~SrsOnBWDonePacket()
{
if (args) {
delete args;
args = NULL;
}
srs_freep(args);
}
int SrsOnBWDonePacket::get_perfer_cid()
@ -1680,15 +1721,8 @@ SrsOnStatusCallPacket::SrsOnStatusCallPacket()
SrsOnStatusCallPacket::~SrsOnStatusCallPacket()
{
if (args) {
delete args;
args = NULL;
}
if (data) {
delete data;
data = NULL;
}
srs_freep(args);
srs_freep(data);
}
int SrsOnStatusCallPacket::get_perfer_cid()
@ -1748,10 +1782,7 @@ SrsOnStatusDataPacket::SrsOnStatusDataPacket()
SrsOnStatusDataPacket::~SrsOnStatusDataPacket()
{
if (data) {
delete data;
data = NULL;
}
srs_freep(data);
}
int SrsOnStatusDataPacket::get_perfer_cid()

@ -160,22 +160,29 @@ struct SrsMessageHeader
*/
int32_t payload_length;
/**
* Four-byte field that contains a timestamp of the message.
* Three-byte field that contains a timestamp delta of the message.
* The 4 bytes are packed in the big-endian order.
*/
int32_t timestamp;
int32_t timestamp_delta;
/**
* Three-byte field that identifies the stream of the message. These
* bytes are set in big-endian format.
*/
int32_t stream_id;
/**
* Four-byte field that contains a timestamp of the message.
* The 4 bytes are packed in the big-endian order.
*/
int32_t timestamp;
SrsMessageHeader();
virtual ~SrsMessageHeader();
bool is_amf0_command();
bool is_amf3_command();
bool is_window_ackledgement_size();
bool is_set_chunk_size();
};
/**
@ -207,6 +214,10 @@ public:
* partially read message.
*/
SrsMessage* msg;
/**
* decoded msg count, to identify whether the chunk stream is fresh.
*/
int64_t msg_count;
public:
SrsChunkStream(int _cid);
virtual ~SrsChunkStream();
@ -257,7 +268,7 @@ public:
* @stream_id, the id of stream which is created by createStream.
* @remark, user never free the pkt, the message will auto free it.
*/
virtual void set_packet(SrsPacket* pkt, int stream_id = 0);
virtual void set_packet(SrsPacket* pkt, int stream_id);
/**
* encode the packet to message payload bytes.
* @remark there exists empty packet, so maybe the payload is NULL.
@ -443,6 +454,7 @@ protected:
public:
std::string command_name;
double transaction_id;
SrsAmf0Null* command_object;
std::string stream_name;
public:
SrsFMLEStartPacket();
@ -479,6 +491,35 @@ protected:
virtual int encode_packet(SrsStream* stream);
};
/**
* FMLE/flash publish
* 4.2.6. Publish
* The client sends the publish command to publish a named stream to the
* server. Using this name, any client can play this stream and receive
* the published audio, video, and data messages.
*/
class SrsPublishPacket : public SrsPacket
{
private:
typedef SrsPacket super;
protected:
virtual const char* get_class_name()
{
return CLASS_NAME_STRING(SrsPublishPacket);
}
public:
std::string command_name;
double transaction_id;
SrsAmf0Null* command_object;
std::string stream_name;
std::string type;
public:
SrsPublishPacket();
virtual ~SrsPublishPacket();
public:
virtual int decode(SrsStream* stream);
};
/**
* 4.2.1. play
* The client sends this command to the server to play a stream.
@ -566,7 +607,7 @@ protected:
/**
* onStatus command, AMF0 Call
* @remark, user must set the stream_id in header.
* @remark, user must set the stream_id by SrsMessage.set_packet().
*/
class SrsOnStatusCallPacket : public SrsPacket
{
@ -596,7 +637,7 @@ protected:
/**
* onStatus data, AMF0 Data
* @remark, user must set the stream_id in header.
* @remark, user must set the stream_id by SrsMessage.set_packet().
*/
class SrsOnStatusDataPacket : public SrsPacket
{
@ -624,7 +665,7 @@ protected:
/**
* AMF0Data RtmpSampleAccess
* @remark, user must set the stream_id in header.
* @remark, user must set the stream_id by SrsMessage.set_packet().
*/
class SrsSampleAccessPacket : public SrsPacket
{

@ -51,8 +51,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define StatusCodeConnectSuccess "NetConnection.Connect.Success"
#define StatusCodeStreamReset "NetStream.Play.Reset"
#define StatusCodeStreamStart "NetStream.Play.Start"
#define StatusCodePublishStart "NetStream.Publish.Start"
#define StatusCodeDataStart "NetStream.Data.Start"
// FMLE
#define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish"
SrsRequest::SrsRequest()
{
objectEncoding = RTMP_SIG_AMF0_VER;
@ -111,10 +115,7 @@ SrsRtmp::SrsRtmp(st_netfd_t client_stfd)
SrsRtmp::~SrsRtmp()
{
if (protocol) {
delete protocol;
protocol = NULL;
}
srs_freep(protocol);
}
int SrsRtmp::handshake()
@ -210,7 +211,7 @@ int SrsRtmp::set_window_ack_size(int ack_size)
SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();
pkt->ackowledgement_window_size = ack_size;
msg->set_packet(pkt);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send ack size message failed. ret=%d", ret);
@ -230,7 +231,7 @@ int SrsRtmp::set_peer_bandwidth(int bandwidth, int type)
pkt->bandwidth = bandwidth;
pkt->type = type;
msg->set_packet(pkt);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send set bandwidth message failed. ret=%d", ret);
@ -249,23 +250,23 @@ int SrsRtmp::response_connect_app(SrsRequest* req)
SrsMessage* msg = new SrsMessage();
SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket();
pkt->props->properties["fmsVer"] = new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER);
pkt->props->properties["capabilities"] = new SrsAmf0Number(123);
pkt->props->properties["mode"] = new SrsAmf0Number(1);
pkt->props->set("fmsVer", new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER));
pkt->props->set("capabilities", new SrsAmf0Number(127));
pkt->props->set("mode", new SrsAmf0Number(1));
pkt->info->properties[StatusLevel] = new SrsAmf0String(StatusLevelStatus);
pkt->info->properties[StatusCode] = new SrsAmf0String(StatusCodeConnectSuccess);
pkt->info->properties[StatusDescription] = new SrsAmf0String("Connection succeeded");
pkt->info->properties["objectEncoding"] = new SrsAmf0Number(req->objectEncoding);
pkt->info->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess));
pkt->info->set(StatusDescription, new SrsAmf0String("Connection succeeded"));
pkt->info->set("objectEncoding", new SrsAmf0Number(req->objectEncoding));
SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray();
pkt->info->properties["data"] = data;
pkt->info->set("data", data);
data->properties["version"] = new SrsAmf0String(RTMP_SIG_FMS_VER);
data->properties["server"] = new SrsAmf0String(RTMP_SIG_SRS_NAME);
data->properties["srs_url"] = new SrsAmf0String(RTMP_SIG_SRS_URL);
data->properties["srs_version"] = new SrsAmf0String(RTMP_SIG_SRS_VERSION);
data->set("version", new SrsAmf0String(RTMP_SIG_FMS_VER));
data->set("server", new SrsAmf0String(RTMP_SIG_SRS_NAME));
data->set("srs_url", new SrsAmf0String(RTMP_SIG_SRS_URL));
data->set("srs_version", new SrsAmf0String(RTMP_SIG_SRS_VERSION));
msg->set_packet(pkt);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send connect app response message failed. ret=%d", ret);
@ -283,7 +284,7 @@ int SrsRtmp::on_bw_done()
SrsMessage* msg = new SrsMessage();
SrsOnBWDonePacket* pkt = new SrsOnBWDonePacket();
msg->set_packet(pkt);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onBWDone message failed. ret=%d", ret);
@ -345,7 +346,7 @@ int SrsRtmp::set_chunk_size(int chunk_size)
SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
pkt->chunk_size = chunk_size;
msg->set_packet(pkt);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send set chunk size message failed. ret=%d", ret);
@ -367,7 +368,7 @@ int SrsRtmp::start_play(int stream_id)
pkt->event_type = SrcPCUCStreamBegin;
pkt->event_data = stream_id;
msg->set_packet(pkt);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send PCUC(StreamBegin) message failed. ret=%d", ret);
@ -381,14 +382,13 @@ int SrsRtmp::start_play(int stream_id)
SrsMessage* msg = new SrsMessage();
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->properties[StatusLevel] = new SrsAmf0String(StatusLevelStatus);
pkt->data->properties[StatusCode] = new SrsAmf0String(StatusCodeStreamReset);
pkt->data->properties[StatusDescription] = new SrsAmf0String("Playing and resetting stream.");
pkt->data->properties[StatusDetails] = new SrsAmf0String("stream");
pkt->data->properties[StatusClientId] = new SrsAmf0String(RTMP_SIG_CLIENT_ID);
pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamReset));
pkt->data->set(StatusDescription, new SrsAmf0String("Playing and resetting stream."));
pkt->data->set(StatusDetails, new SrsAmf0String("stream"));
pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
msg->header.stream_id = stream_id;
msg->set_packet(pkt);
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);
@ -402,14 +402,13 @@ int SrsRtmp::start_play(int stream_id)
SrsMessage* msg = new SrsMessage();
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->properties[StatusLevel] = new SrsAmf0String(StatusLevelStatus);
pkt->data->properties[StatusCode] = new SrsAmf0String(StatusCodeStreamStart);
pkt->data->properties[StatusDescription] = new SrsAmf0String("Started playing stream.");
pkt->data->properties[StatusDetails] = new SrsAmf0String("stream");
pkt->data->properties[StatusClientId] = new SrsAmf0String(RTMP_SIG_CLIENT_ID);
pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamStart));
pkt->data->set(StatusDescription, new SrsAmf0String("Started playing stream."));
pkt->data->set(StatusDetails, new SrsAmf0String("stream"));
pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
msg->header.stream_id = stream_id;
msg->set_packet(pkt);
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret);
@ -423,8 +422,7 @@ int SrsRtmp::start_play(int stream_id)
SrsMessage* msg = new SrsMessage();
SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();
msg->header.stream_id = stream_id;
msg->set_packet(pkt);
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send |RtmpSampleAccess(false, false) message failed. ret=%d", ret);
@ -438,10 +436,9 @@ int SrsRtmp::start_play(int stream_id)
SrsMessage* msg = new SrsMessage();
SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();
pkt->data->properties[StatusCode] = new SrsAmf0String(StatusCodeDataStart);
pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeDataStart));
msg->header.stream_id = stream_id;
msg->set_packet(pkt);
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Data.Start) message failed. ret=%d", ret);
@ -463,7 +460,7 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea
SrsMessage* msg = new SrsMessage();
SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id);
msg->set_packet(pkt);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send createStream response message failed. ret=%d", ret);
@ -519,7 +516,7 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id
SrsMessage* msg = new SrsMessage();
SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
msg->set_packet(pkt);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send releaseStream response message failed. ret=%d", ret);
@ -547,7 +544,7 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id
SrsMessage* msg = new SrsMessage();
SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid);
msg->set_packet(pkt);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send FCPublish response message failed. ret=%d", ret);
@ -575,7 +572,7 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id
SrsMessage* msg = new SrsMessage();
SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id);
msg->set_packet(pkt);
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send createStream response message failed. ret=%d", ret);
@ -584,6 +581,64 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id
srs_info("send createStream response message success.");
}
// publish
if (true) {
SrsMessage* msg = NULL;
SrsPublishPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsPublishPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("recv publish message failed. ret=%d", ret);
return ret;
}
srs_info("recv publish request message success.");
SrsAutoFree(SrsMessage, msg, false);
}
// publish response onFCPublish(NetStream.Publish.Start)
if (true) {
SrsMessage* msg = new SrsMessage();
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH;
pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));
pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d", ret);
return ret;
}
srs_info("send onFCPublish(NetStream.Publish.Start) message success.");
}
// publish response onStatus(NetStream.Publish.Start)
if (true) {
SrsMessage* msg = new SrsMessage();
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus));
pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart));
pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream."));
pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID));
msg->set_packet(pkt, stream_id);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret);
return ret;
}
srs_info("send onStatus(NetStream.Publish.Start) message success.");
}
while (true) {
SrsMessage* msg = NULL;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv identify client message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsMessage, msg, false);
}
return ret;
}

@ -45,7 +45,7 @@ SrsServer::~SrsServer()
{
for (std::vector<SrsConnection*>::iterator it = conns.begin(); it != conns.end(); ++it) {
SrsConnection* conn = *it;
delete conn;
srs_freep(conn);
}
conns.clear();
}
@ -151,8 +151,8 @@ void SrsServer::remove(SrsConnection* conn)
srs_info("conn removed. conns=%d", (int)conns.size());
// all connections are created by server,
// so we delete it here.
delete conn;
// so we free it here.
srs_freep(conn);
}
int SrsServer::accept_client(st_netfd_t client_stfd)

Loading…
Cancel
Save