winlin

support set chunk size, PCUC 4bytes packets

@@ -108,17 +108,49 @@ int SrsClient::do_cycle() @@ -108,17 +108,49 @@ int SrsClient::do_cycle()
108 } 108 }
109 srs_verbose("on_bw_done success"); 109 srs_verbose("on_bw_done success");
110 110
  111 + int stream_id = SRS_DEFAULT_SID;
111 SrsClientType type; 112 SrsClientType type;
112 - std::string stream_name;  
113 - if ((ret = rtmp->identify_client(SRS_DEFAULT_SID, type, stream_name)) != ERROR_SUCCESS) { 113 + if ((ret = rtmp->identify_client(stream_id, type, req->stream)) != ERROR_SUCCESS) {
114 srs_error("identify client failed. ret=%d", ret); 114 srs_error("identify client failed. ret=%d", ret);
115 return ret; 115 return ret;
116 } 116 }
117 - srs_verbose("identify client success. type=%d", type); 117 + srs_verbose("identify client success. type=%d, stream_name=%s", type, req->stream.c_str());
  118 +
  119 + // TODO: read from config.
  120 + int chunk_size = 4096;
  121 + if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) {
  122 + srs_error("set chunk size failed. ret=%d", ret);
  123 + return ret;
  124 + }
  125 + srs_verbose("set chunk size success");
  126 +
  127 + switch (type) {
  128 + case SrsClientPlay: {
  129 + srs_verbose("start to play stream %s.", req->stream.c_str());
  130 +
  131 + if ((ret = rtmp->start_play(stream_id)) != ERROR_SUCCESS) {
  132 + srs_error("start to play stream failed. ret=%d", ret);
  133 + return ret;
  134 + }
  135 + srs_info("start to play stream %s success", req->stream.c_str());
  136 + return streaming_play();
  137 + }
  138 + default: {
  139 + ret = ERROR_SYSTEM_CLIENT_INVALID;
  140 + srs_info("invalid client type=%d. ret=%d", type, ret);
  141 + return ret;
  142 + }
  143 + }
118 144
119 return ret; 145 return ret;
120 } 146 }
121 147
  148 +int SrsClient::streaming_play()
  149 +{
  150 + int ret = ERROR_SUCCESS;
  151 + return ret;
  152 +}
  153 +
122 int SrsClient::get_peer_ip() 154 int SrsClient::get_peer_ip()
123 { 155 {
124 int ret = ERROR_SUCCESS; 156 int ret = ERROR_SUCCESS;
@@ -50,6 +50,7 @@ public: @@ -50,6 +50,7 @@ public:
50 protected: 50 protected:
51 virtual int do_cycle(); 51 virtual int do_cycle();
52 private: 52 private:
  53 + virtual int streaming_play();
53 virtual int get_peer_ip(); 54 virtual int get_peer_ip();
54 }; 55 };
55 56
@@ -59,8 +59,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -59,8 +59,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
59 #define ERROR_RTMP_MESSAGE_DECODE 307 59 #define ERROR_RTMP_MESSAGE_DECODE 307
60 #define ERROR_RTMP_MESSAGE_ENCODE 308 60 #define ERROR_RTMP_MESSAGE_ENCODE 308
61 #define ERROR_RTMP_AMF0_ENCODE 309 61 #define ERROR_RTMP_AMF0_ENCODE 309
  62 +#define ERROR_RTMP_CHUNK_SIZE 310
62 63
63 #define ERROR_SYSTEM_STREAM_INIT 400 64 #define ERROR_SYSTEM_STREAM_INIT 400
64 #define ERROR_SYSTEM_PACKET_INVALID 401 65 #define ERROR_SYSTEM_PACKET_INVALID 401
  66 +#define ERROR_SYSTEM_CLIENT_INVALID 402
65 67
66 #endif 68 #endif
@@ -50,20 +50,6 @@ with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID @@ -50,20 +50,6 @@ with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID
50 #define RTMP_MSG_SetPeerBandwidth 0x06 50 #define RTMP_MSG_SetPeerBandwidth 0x06
51 #define RTMP_MSG_EdgeAndOriginServerCommand 0x07 51 #define RTMP_MSG_EdgeAndOriginServerCommand 0x07
52 /** 52 /**
53 -* The server sends this event to test whether the client is reachable.  
54 -*  
55 -* Event data is a 4-byte timestamp, representing the local server time when the server dispatched the command.  
56 -* The client responds with PingResponse on receiving PingRequest.  
57 -*/  
58 -#define RTMP_MSG_PCUC_PingRequest 0x06  
59 -  
60 -/**  
61 -* The client sends this event to the server in response to the ping request.  
62 -*  
63 -* The event data is a 4-byte timestamp, which was received with the PingRequest request.  
64 -*/  
65 -#define RTMP_MSG_PCUC_PingResponse 0x07  
66 -/**  
67 3. Types of messages 53 3. Types of messages
68 The server and the client send messages over the network to 54 The server and the client send messages over the network to
69 communicate with each other. The messages can be of any type which 55 communicate with each other. The messages can be of any type which
@@ -184,6 +170,7 @@ messages. @@ -184,6 +170,7 @@ messages.
184 * independently for each direction. 170 * independently for each direction.
185 */ 171 */
186 #define RTMP_DEFAULT_CHUNK_SIZE 128 172 #define RTMP_DEFAULT_CHUNK_SIZE 128
  173 +#define RTMP_MIN_CHUNK_SIZE 2
187 174
188 /** 175 /**
189 * 6.1. Chunk Format 176 * 6.1. Chunk Format
@@ -316,7 +303,7 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -316,7 +303,7 @@ int SrsProtocol::recv_message(SrsMessage** pmsg)
316 } 303 }
317 304
318 if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) { 305 if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) {
319 - srs_error("update context when received msg. ret=%d", ret); 306 + srs_error("hook the received msg failed. ret=%d", ret);
320 delete msg; 307 delete msg;
321 return ret; 308 return ret;
322 } 309 }
@@ -438,6 +425,11 @@ int SrsProtocol::send_message(SrsMessage* msg) @@ -438,6 +425,11 @@ int SrsProtocol::send_message(SrsMessage* msg)
438 } 425 }
439 } while (p < (char*)msg->payload + msg->size); 426 } while (p < (char*)msg->payload + msg->size);
440 427
  428 + if ((ret = on_send_message(msg)) != ERROR_SUCCESS) {
  429 + srs_error("hook the send message failed. ret=%d", ret);
  430 + return ret;
  431 + }
  432 +
441 return ret; 433 return ret;
442 } 434 }
443 435
@@ -448,6 +440,7 @@ int SrsProtocol::on_recv_message(SrsMessage* msg) @@ -448,6 +440,7 @@ int SrsProtocol::on_recv_message(SrsMessage* msg)
448 srs_assert(msg != NULL); 440 srs_assert(msg != NULL);
449 441
450 switch (msg->header.message_type) { 442 switch (msg->header.message_type) {
  443 + case RTMP_MSG_SetChunkSize:
451 case RTMP_MSG_WindowAcknowledgementSize: 444 case RTMP_MSG_WindowAcknowledgementSize:
452 if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { 445 if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
453 srs_error("decode packet from message payload failed. ret=%d", ret); 446 srs_error("decode packet from message payload failed. ret=%d", ret);
@@ -465,6 +458,36 @@ int SrsProtocol::on_recv_message(SrsMessage* msg) @@ -465,6 +458,36 @@ int SrsProtocol::on_recv_message(SrsMessage* msg)
465 srs_trace("set ack window size to %d", pkt->ackowledgement_window_size); 458 srs_trace("set ack window size to %d", pkt->ackowledgement_window_size);
466 break; 459 break;
467 } 460 }
  461 + case RTMP_MSG_SetChunkSize: {
  462 + SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(msg->get_packet());
  463 + srs_assert(pkt != NULL);
  464 +
  465 + in_chunk_size = pkt->chunk_size;
  466 +
  467 + srs_trace("set input chunk size to %d", pkt->chunk_size);
  468 + break;
  469 + }
  470 + }
  471 +
  472 + return ret;
  473 +}
  474 +
  475 +int SrsProtocol::on_send_message(SrsMessage* msg)
  476 +{
  477 + int ret = ERROR_SUCCESS;
  478 +
  479 + srs_assert(msg != NULL);
  480 +
  481 + switch (msg->header.message_type) {
  482 + case RTMP_MSG_SetChunkSize: {
  483 + SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(msg->get_packet());
  484 + srs_assert(pkt != NULL);
  485 +
  486 + in_chunk_size = pkt->chunk_size;
  487 +
  488 + srs_trace("set output chunk size to %d", pkt->chunk_size);
  489 + break;
  490 + }
468 } 491 }
469 492
470 return ret; 493 return ret;
@@ -1580,6 +1603,70 @@ int SrsSetWindowAckSizePacket::encode_packet(SrsStream* stream) @@ -1580,6 +1603,70 @@ int SrsSetWindowAckSizePacket::encode_packet(SrsStream* stream)
1580 return ret; 1603 return ret;
1581 } 1604 }
1582 1605
  1606 +SrsSetChunkSizePacket::SrsSetChunkSizePacket()
  1607 +{
  1608 + chunk_size = RTMP_DEFAULT_CHUNK_SIZE;
  1609 +}
  1610 +
  1611 +SrsSetChunkSizePacket::~SrsSetChunkSizePacket()
  1612 +{
  1613 +}
  1614 +
  1615 +int SrsSetChunkSizePacket::decode(SrsStream* stream)
  1616 +{
  1617 + int ret = ERROR_SUCCESS;
  1618 +
  1619 + if (!stream->require(4)) {
  1620 + ret = ERROR_RTMP_MESSAGE_DECODE;
  1621 + srs_error("decode chunk size failed. ret=%d", ret);
  1622 + return ret;
  1623 + }
  1624 +
  1625 + chunk_size = stream->read_4bytes();
  1626 + srs_info("decode chunk size success. chunk_size=%d", chunk_size);
  1627 +
  1628 + if (chunk_size < RTMP_MIN_CHUNK_SIZE) {
  1629 + ret = ERROR_RTMP_CHUNK_SIZE;
  1630 + srs_error("invalid chunk size. min=%d, actual=%d, ret=%d",
  1631 + ERROR_RTMP_CHUNK_SIZE, chunk_size, ret);
  1632 + return ret;
  1633 + }
  1634 +
  1635 + return ret;
  1636 +}
  1637 +
  1638 +int SrsSetChunkSizePacket::get_perfer_cid()
  1639 +{
  1640 + return RTMP_CID_ProtocolControl;
  1641 +}
  1642 +
  1643 +int SrsSetChunkSizePacket::get_message_type()
  1644 +{
  1645 + return RTMP_MSG_SetChunkSize;
  1646 +}
  1647 +
  1648 +int SrsSetChunkSizePacket::get_size()
  1649 +{
  1650 + return 4;
  1651 +}
  1652 +
  1653 +int SrsSetChunkSizePacket::encode_packet(SrsStream* stream)
  1654 +{
  1655 + int ret = ERROR_SUCCESS;
  1656 +
  1657 + if (!stream->require(4)) {
  1658 + ret = ERROR_RTMP_MESSAGE_ENCODE;
  1659 + srs_error("encode chunk packet failed. ret=%d", ret);
  1660 + return ret;
  1661 + }
  1662 +
  1663 + stream->write_4bytes(chunk_size);
  1664 +
  1665 + srs_verbose("encode chunk packet success. ack_size=%d", chunk_size);
  1666 +
  1667 + return ret;
  1668 +}
  1669 +
1583 SrsSetPeerBandwidthPacket::SrsSetPeerBandwidthPacket() 1670 SrsSetPeerBandwidthPacket::SrsSetPeerBandwidthPacket()
1584 { 1671 {
1585 bandwidth = 0; 1672 bandwidth = 0;
@@ -1624,3 +1711,47 @@ int SrsSetPeerBandwidthPacket::encode_packet(SrsStream* stream) @@ -1624,3 +1711,47 @@ int SrsSetPeerBandwidthPacket::encode_packet(SrsStream* stream)
1624 return ret; 1711 return ret;
1625 } 1712 }
1626 1713
  1714 +SrsPCUC4BytesPacket::SrsPCUC4BytesPacket()
  1715 +{
  1716 + event_type = 0;
  1717 + event_data = 0;
  1718 +}
  1719 +
  1720 +SrsPCUC4BytesPacket::~SrsPCUC4BytesPacket()
  1721 +{
  1722 +}
  1723 +
  1724 +int SrsPCUC4BytesPacket::get_perfer_cid()
  1725 +{
  1726 + return RTMP_CID_ProtocolControl;
  1727 +}
  1728 +
  1729 +int SrsPCUC4BytesPacket::get_message_type()
  1730 +{
  1731 + return RTMP_MSG_UserControlMessage;
  1732 +}
  1733 +
  1734 +int SrsPCUC4BytesPacket::get_size()
  1735 +{
  1736 + return 2 + 4;
  1737 +}
  1738 +
  1739 +int SrsPCUC4BytesPacket::encode_packet(SrsStream* stream)
  1740 +{
  1741 + int ret = ERROR_SUCCESS;
  1742 +
  1743 + if (!stream->require(6)) {
  1744 + ret = ERROR_RTMP_MESSAGE_ENCODE;
  1745 + srs_error("encode set bandwidth packet failed. ret=%d", ret);
  1746 + return ret;
  1747 + }
  1748 +
  1749 + stream->write_2bytes(event_type);
  1750 + stream->write_4bytes(event_data);
  1751 +
  1752 + srs_verbose("encode PCUC packet success. "
  1753 + "event_type=%d, event_data=%d", event_type, event_data);
  1754 +
  1755 + return ret;
  1756 +}
  1757 +
@@ -113,6 +113,10 @@ private: @@ -113,6 +113,10 @@ private:
113 */ 113 */
114 virtual int on_recv_message(SrsMessage* msg); 114 virtual int on_recv_message(SrsMessage* msg);
115 /** 115 /**
  116 + * when message sentout, update the context.
  117 + */
  118 + virtual int on_send_message(SrsMessage* msg);
  119 + /**
116 * try to recv interlaced message from peer, 120 * try to recv interlaced message from peer,
117 * return error if error occur and nerver set the pmsg, 121 * return error if error occur and nerver set the pmsg,
118 * return success and pmsg set to NULL if no entire message got, 122 * return success and pmsg set to NULL if no entire message got,
@@ -533,6 +537,36 @@ protected: @@ -533,6 +537,36 @@ protected:
533 }; 537 };
534 538
535 /** 539 /**
  540 +* 7.1. Set Chunk Size
  541 +* Protocol control message 1, Set Chunk Size, is used to notify the
  542 +* peer about the new maximum chunk size.
  543 +*/
  544 +class SrsSetChunkSizePacket : public SrsPacket
  545 +{
  546 +private:
  547 + typedef SrsPacket super;
  548 +protected:
  549 + virtual const char* get_class_name()
  550 + {
  551 + return CLASS_NAME_STRING(SrsSetChunkSizePacket);
  552 + }
  553 +public:
  554 + int32_t chunk_size;
  555 +public:
  556 + SrsSetChunkSizePacket();
  557 + virtual ~SrsSetChunkSizePacket();
  558 +public:
  559 + virtual int decode(SrsStream* stream);
  560 +public:
  561 + virtual int get_perfer_cid();
  562 +public:
  563 + virtual int get_message_type();
  564 +protected:
  565 + virtual int get_size();
  566 + virtual int encode_packet(SrsStream* stream);
  567 +};
  568 +
  569 +/**
536 * 5.6. Set Peer Bandwidth (6) 570 * 5.6. Set Peer Bandwidth (6)
537 * The client or the server sends this message to update the output 571 * The client or the server sends this message to update the output
538 * bandwidth of the peer. 572 * bandwidth of the peer.
@@ -561,6 +595,57 @@ protected: @@ -561,6 +595,57 @@ protected:
561 virtual int encode_packet(SrsStream* stream); 595 virtual int encode_packet(SrsStream* stream);
562 }; 596 };
563 597
  598 +enum SrcPCUCEventType
  599 +{
  600 + // generally, 4bytes event-data
  601 + SrcPCUCStreamBegin = 0x00,
  602 + SrcPCUCStreamEOF = 0x01,
  603 + SrcPCUCStreamDry = 0x02,
  604 + SrcPCUCSetBufferLength = 0x03, // 8bytes event-data
  605 + SrcPCUCStreamIsRecorded = 0x04,
  606 + SrcPCUCPingRequest = 0x06,
  607 + SrcPCUCPingResponse = 0x07,
  608 +};
  609 +
  610 +/**
  611 +* for the EventData is 4bytes.
  612 +* Stream Begin(=0) 4-bytes stream ID
  613 +* Stream EOF(=1) 4-bytes stream ID
  614 +* StreamDry(=2) 4-bytes stream ID
  615 +* StreamIsRecorded(=4) 4-bytes stream ID
  616 +* PingRequest(=6) 4-bytes timestamp local server time
  617 +* PingResponse(=7) 4-bytes timestamp received ping request.
  618 +*
  619 +* 3.7. User Control message
  620 +* +------------------------------+-------------------------
  621 +* | Event Type ( 2- bytes ) | Event Data
  622 +* +------------------------------+-------------------------
  623 +* Figure 5 Pay load for the ‘User Control Message’.
  624 +*/
  625 +class SrsPCUC4BytesPacket : public SrsPacket
  626 +{
  627 +private:
  628 + typedef SrsPacket super;
  629 +protected:
  630 + virtual const char* get_class_name()
  631 + {
  632 + return CLASS_NAME_STRING(SrsPCUC4BytesPacket);
  633 + }
  634 +public:
  635 + int16_t event_type;
  636 + int32_t event_data;
  637 +public:
  638 + SrsPCUC4BytesPacket();
  639 + virtual ~SrsPCUC4BytesPacket();
  640 +public:
  641 + virtual int get_perfer_cid();
  642 +public:
  643 + virtual int get_message_type();
  644 +protected:
  645 + virtual int get_size();
  646 + virtual int encode_packet(SrsStream* stream);
  647 +};
  648 +
564 /** 649 /**
565 * expect a specified message, drop others util got specified one. 650 * expect a specified message, drop others util got specified one.
566 * @pmsg, user must free it. NULL if not success. 651 * @pmsg, user must free it. NULL if not success.
@@ -303,6 +303,50 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st @@ -303,6 +303,50 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st
303 return ret; 303 return ret;
304 } 304 }
305 305
  306 +int SrsRtmp::set_chunk_size(int chunk_size)
  307 +{
  308 + int ret = ERROR_SUCCESS;
  309 +
  310 + SrsMessage* msg = new SrsMessage();
  311 + SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
  312 +
  313 + pkt->chunk_size = chunk_size;
  314 + msg->set_packet(pkt);
  315 +
  316 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  317 + srs_error("send set chunk size message failed. ret=%d", ret);
  318 + return ret;
  319 + }
  320 + srs_info("send set chunk size message success. chunk_size=%d", chunk_size);
  321 +
  322 + return ret;
  323 +}
  324 +
  325 +int SrsRtmp::start_play(int stream_id)
  326 +{
  327 + int ret = ERROR_SUCCESS;
  328 +
  329 + // StreamBegin
  330 + if (true) {
  331 + SrsMessage* msg = new SrsMessage();
  332 + SrsPCUC4BytesPacket* pkt = new SrsPCUC4BytesPacket();
  333 +
  334 + pkt->event_type = SrcPCUCStreamBegin;
  335 + pkt->event_data = stream_id;
  336 + msg->set_packet(pkt);
  337 +
  338 + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
  339 + srs_error("send PCUC(StreamBegin) message failed. ret=%d", ret);
  340 + return ret;
  341 + }
  342 + srs_info("send PCUC(StreamBegin) message success.");
  343 + }
  344 +
  345 + srs_info("start play success.");
  346 +
  347 + return ret;
  348 +}
  349 +
306 int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) 350 int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name)
307 { 351 {
308 int ret = ERROR_SUCCESS; 352 int ret = ERROR_SUCCESS;
@@ -343,7 +387,7 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea @@ -343,7 +387,7 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea
343 SrsPacket* pkt = msg->get_packet(); 387 SrsPacket* pkt = msg->get_packet();
344 if (dynamic_cast<SrsPlayPacket*>(pkt)) { 388 if (dynamic_cast<SrsPlayPacket*>(pkt)) {
345 SrsPlayPacket* play = dynamic_cast<SrsPlayPacket*>(pkt); 389 SrsPlayPacket* play = dynamic_cast<SrsPlayPacket*>(pkt);
346 - type = SrsClientPublish; 390 + type = SrsClientPlay;
347 stream_name = play->stream_name; 391 stream_name = play->stream_name;
348 srs_trace("identity client type=play, stream_name=%s", stream_name.c_str()); 392 srs_trace("identity client type=play, stream_name=%s", stream_name.c_str());
349 return ret; 393 return ret;
@@ -99,6 +99,15 @@ public: @@ -99,6 +99,15 @@ public:
99 * @type, output the client type. 99 * @type, output the client type.
100 */ 100 */
101 virtual int identify_client(int stream_id, SrsClientType& type, std::string& stream_name); 101 virtual int identify_client(int stream_id, SrsClientType& type, std::string& stream_name);
  102 + /**
  103 + * set the chunk size when client type identified.
  104 + */
  105 + virtual int set_chunk_size(int chunk_size);
  106 + /**
  107 + * when client type is play, response with
  108 + * StreamBegin, onStatus(NetStream.Play.Reset), onStatus(NetStream.Play.Start).
  109 + */
  110 + virtual int start_play(int stream_id);
102 private: 111 private:
103 virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); 112 virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
104 }; 113 };