winlin

merge wenjie. fix jw/flower player pause bug, which send closeStream actually.

@@ -289,6 +289,7 @@ See also: [Performance Test Guide](https://github.com/winlinvip/simple-rtmp-serv @@ -289,6 +289,7 @@ See also: [Performance Test Guide](https://github.com/winlinvip/simple-rtmp-serv
289 * nginx v1.5.0: 139524 lines <br/> 289 * nginx v1.5.0: 139524 lines <br/>
290 290
291 ### History 291 ### History
  292 +* v1.0, 2014-01-11, fix jw/flower player pause bug, which send closeStream actually.
292 * v1.0, 2014-01-05, add wiki [Build](https://github.com/winlinvip/simple-rtmp-server/wiki/Build), [Performance](https://github.com/winlinvip/simple-rtmp-server/wiki/Performance), [Cluster](https://github.com/winlinvip/simple-rtmp-server/wiki/Cluster) 293 * v1.0, 2014-01-05, add wiki [Build](https://github.com/winlinvip/simple-rtmp-server/wiki/Build), [Performance](https://github.com/winlinvip/simple-rtmp-server/wiki/Performance), [Cluster](https://github.com/winlinvip/simple-rtmp-server/wiki/Cluster)
293 * v1.0, 2014-01-01, change listen(512), chunk-size(60000), to improve performance. 294 * v1.0, 2014-01-01, change listen(512), chunk-size(60000), to improve performance.
294 * v1.0, 2013-12-27, merge from wenjie, the bandwidth test feature. 295 * v1.0, 2013-12-27, merge from wenjie, the bandwidth test feature.
@@ -168,6 +168,41 @@ int SrsClient::service_cycle() @@ -168,6 +168,41 @@ int SrsClient::service_cycle()
168 } 168 }
169 srs_verbose("on_bw_done success"); 169 srs_verbose("on_bw_done success");
170 170
  171 + while (true) {
  172 + ret = stream_service_cycle();
  173 +
  174 + // stream service must terminated with error, never success.
  175 + srs_assert(ret != ERROR_SUCCESS);
  176 +
  177 + // when not system control error, fatal error, return.
  178 + if (!srs_is_system_control_error(ret)) {
  179 + srs_error("stream service cycle failed. ret=%d", ret);
  180 + return ret;
  181 + }
  182 +
  183 + // for "some" system control error,
  184 + // logical accept and retry stream service.
  185 + if (ret == ERROR_CONTROL_RTMP_CLOSE) {
  186 + // set timeout to a larger value, for user paused.
  187 + rtmp->set_recv_timeout(SRS_PAUSED_SEND_TIMEOUT_US);
  188 + rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);
  189 +
  190 + srs_trace("control message(close) accept, retry stream service.");
  191 + continue;
  192 + }
  193 +
  194 + // for other system control message, fatal error.
  195 + srs_error("control message(%d) reject as error. ret=%d", ret, ret);
  196 + return ret;
  197 + }
  198 +
  199 + return ret;
  200 +}
  201 +
  202 +int SrsClient::stream_service_cycle()
  203 +{
  204 + int ret = ERROR_SUCCESS;
  205 +
171 SrsClientType type; 206 SrsClientType type;
172 if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) { 207 if ((ret = rtmp->identify_client(res->stream_id, type, req->stream)) != ERROR_SUCCESS) {
173 srs_error("identify client failed. ret=%d", ret); 208 srs_error("identify client failed. ret=%d", ret);
@@ -176,6 +211,11 @@ int SrsClient::service_cycle() @@ -176,6 +211,11 @@ int SrsClient::service_cycle()
176 req->strip(); 211 req->strip();
177 srs_trace("identify client success. type=%d, stream_name=%s", type, req->stream.c_str()); 212 srs_trace("identify client success. type=%d, stream_name=%s", type, req->stream.c_str());
178 213
  214 + // client is identified, set the timeout to service timeout.
  215 + rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US);
  216 + rtmp->set_send_timeout(SRS_SEND_TIMEOUT_US);
  217 +
  218 + // set timeout to larger.
179 int chunk_size = config->get_chunk_size(req->vhost); 219 int chunk_size = config->get_chunk_size(req->vhost);
180 if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) { 220 if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) {
181 srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret); 221 srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret);
@@ -341,7 +381,9 @@ int SrsClient::playing(SrsSource* source) @@ -341,7 +381,9 @@ int SrsClient::playing(SrsSource* source)
341 return ret; 381 return ret;
342 } 382 }
343 if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) { 383 if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
  384 + if (!srs_is_system_control_error(ret)) {
344 srs_error("process play control message failed. ret=%d", ret); 385 srs_error("process play control message failed. ret=%d", ret);
  386 + }
345 return ret; 387 return ret;
346 } 388 }
347 } 389 }
@@ -555,6 +597,13 @@ int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* @@ -555,6 +597,13 @@ int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage*
555 } 597 }
556 srs_info("decode the amf0/amf3 command packet success."); 598 srs_info("decode the amf0/amf3 command packet success.");
557 599
  600 + SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(msg->get_packet());
  601 + if (close) {
  602 + ret = ERROR_CONTROL_RTMP_CLOSE;
  603 + srs_trace("system control message: rtmp close stream. ret=%d", ret);
  604 + return ret;
  605 + }
  606 +
558 SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(msg->get_packet()); 607 SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(msg->get_packet());
559 if (!pause) { 608 if (!pause) {
560 srs_info("ignore all amf0/amf3 command except pause."); 609 srs_info("ignore all amf0/amf3 command except pause.");
@@ -700,4 +749,3 @@ void SrsClient::on_stop() @@ -700,4 +749,3 @@ void SrsClient::on_stop()
700 } 749 }
701 #endif 750 #endif
702 } 751 }
703 -  
@@ -71,6 +71,8 @@ public: @@ -71,6 +71,8 @@ public:
71 private: 71 private:
72 // when valid and connected to vhost/app, service the client. 72 // when valid and connected to vhost/app, service the client.
73 virtual int service_cycle(); 73 virtual int service_cycle();
  74 + // stream(play/publish) service cycle, identify client first.
  75 + virtual int stream_service_cycle();
74 virtual int check_vhost(); 76 virtual int check_vhost();
75 virtual int playing(SrsSource* source); 77 virtual int playing(SrsSource* source);
76 virtual int publish(SrsSource* source, bool is_fmle); 78 virtual int publish(SrsSource* source, bool is_fmle);
@@ -22,3 +22,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -22,3 +22,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 */ 22 */
23 23
24 #include <srs_core_error.hpp> 24 #include <srs_core_error.hpp>
  25 +
  26 +bool srs_is_system_control_error(int error_code)
  27 +{
  28 + return error_code == ERROR_CONTROL_RTMP_CLOSE;
  29 +}
  30 +
@@ -147,4 +147,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -147,4 +147,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
147 #define ERROR_HTTP_DATA_INVLIAD 801 147 #define ERROR_HTTP_DATA_INVLIAD 801
148 #define ERROR_HTTP_PARSE_HEADER 802 148 #define ERROR_HTTP_PARSE_HEADER 802
149 149
  150 +// system control message,
  151 +// not an error, but special control logic.
  152 +// sys ctl: rtmp close stream, support replay.
  153 +#define ERROR_CONTROL_RTMP_CLOSE 900
  154 +
  155 +/**
  156 +* whether the error code is an system control error.
  157 +*/
  158 +extern bool srs_is_system_control_error(int error_code);
  159 +
150 #endif 160 #endif
@@ -196,6 +196,7 @@ messages. @@ -196,6 +196,7 @@ messages.
196 */ 196 */
197 #define RTMP_AMF0_COMMAND_CONNECT "connect" 197 #define RTMP_AMF0_COMMAND_CONNECT "connect"
198 #define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream" 198 #define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream"
  199 +#define RTMP_AMF0_COMMAND_CLOSE_STREAM "closeStream"
199 #define RTMP_AMF0_COMMAND_PLAY "play" 200 #define RTMP_AMF0_COMMAND_PLAY "play"
200 #define RTMP_AMF0_COMMAND_PAUSE "pause" 201 #define RTMP_AMF0_COMMAND_PAUSE "pause"
201 #define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone" 202 #define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone"
@@ -1363,6 +1364,10 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol) @@ -1363,6 +1364,10 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
1363 srs_info("decode the AMF0/AMF3 band width check message."); 1364 srs_info("decode the AMF0/AMF3 band width check message.");
1364 packet = new SrsBandwidthPacket(); 1365 packet = new SrsBandwidthPacket();
1365 return packet->decode(stream); 1366 return packet->decode(stream);
  1367 + } else if (command == RTMP_AMF0_COMMAND_CLOSE_STREAM) {
  1368 + srs_info("decode the AMF0/AMF3 closeStream message.");
  1369 + packet = new SrsCloseStreamPacket();
  1370 + return packet->decode(stream);
1366 } 1371 }
1367 1372
1368 // default packet to drop message. 1373 // default packet to drop message.
@@ -2064,6 +2069,41 @@ int SrsCreateStreamResPacket::encode_packet(SrsStream* stream) @@ -2064,6 +2069,41 @@ int SrsCreateStreamResPacket::encode_packet(SrsStream* stream)
2064 return ret; 2069 return ret;
2065 } 2070 }
2066 2071
  2072 +SrsCloseStreamPacket::SrsCloseStreamPacket()
  2073 +{
  2074 + command_name = RTMP_AMF0_COMMAND_CLOSE_STREAM;
  2075 + transaction_id = 0;
  2076 + command_object = new SrsAmf0Null();
  2077 +}
  2078 +
  2079 +SrsCloseStreamPacket::~SrsCloseStreamPacket()
  2080 +{
  2081 + srs_freep(command_object);
  2082 +}
  2083 +
  2084 +int SrsCloseStreamPacket::decode(SrsStream* stream)
  2085 +{
  2086 + int ret = ERROR_SUCCESS;
  2087 +
  2088 + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
  2089 + srs_error("amf0 decode closeStream command_name failed. ret=%d", ret);
  2090 + return ret;
  2091 + }
  2092 +
  2093 + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
  2094 + srs_error("amf0 decode closeStream transaction_id failed. ret=%d", ret);
  2095 + return ret;
  2096 + }
  2097 +
  2098 + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
  2099 + srs_error("amf0 decode closeStream command_object failed. ret=%d", ret);
  2100 + return ret;
  2101 + }
  2102 + srs_info("amf0 decode closeStream packet success");
  2103 +
  2104 + return ret;
  2105 +}
  2106 +
2067 SrsFMLEStartPacket::SrsFMLEStartPacket() 2107 SrsFMLEStartPacket::SrsFMLEStartPacket()
2068 { 2108 {
2069 command_name = RTMP_AMF0_COMMAND_CREATE_STREAM; 2109 command_name = RTMP_AMF0_COMMAND_CREATE_STREAM;
@@ -3325,4 +3365,3 @@ int SrsUserControlPacket::encode_packet(SrsStream* stream) @@ -3325,4 +3365,3 @@ int SrsUserControlPacket::encode_packet(SrsStream* stream)
3325 3365
3326 return ret; 3366 return ret;
3327 } 3367 }
3328 -  
@@ -56,6 +56,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -56,6 +56,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
56 // if timeout, close the connection. 56 // if timeout, close the connection.
57 #define SRS_RECV_TIMEOUT_US 30*1000*1000L 57 #define SRS_RECV_TIMEOUT_US 30*1000*1000L
58 58
  59 +// the timeout to wait client data, when client paused
  60 +// if timeout, close the connection.
  61 +#define SRS_PAUSED_SEND_TIMEOUT_US 30*60*1000*1000L
  62 +
  63 +// the timeout to send data to client, when client paused
  64 +// if timeout, close the connection.
  65 +#define SRS_PAUSED_RECV_TIMEOUT_US 30*60*1000*1000L
  66 +
59 // when stream is busy, for example, streaming is already 67 // when stream is busy, for example, streaming is already
60 // publishing, when a new client to request to publish, 68 // publishing, when a new client to request to publish,
61 // sleep a while and close the connection. 69 // sleep a while and close the connection.
@@ -625,6 +633,28 @@ protected: @@ -625,6 +633,28 @@ protected:
625 virtual int get_size(); 633 virtual int get_size();
626 virtual int encode_packet(SrsStream* stream); 634 virtual int encode_packet(SrsStream* stream);
627 }; 635 };
  636 +/**
  637 +* client close stream packet.
  638 +*/
  639 +class SrsCloseStreamPacket : public SrsPacket
  640 +{
  641 +private:
  642 + typedef SrsPacket super;
  643 +protected:
  644 + virtual const char* get_class_name()
  645 + {
  646 + return CLASS_NAME_STRING(SrsCloseStreamPacket);
  647 + }
  648 +public:
  649 + std::string command_name;
  650 + double transaction_id;
  651 + SrsAmf0Null* command_object;
  652 +public:
  653 + SrsCloseStreamPacket();
  654 + virtual ~SrsCloseStreamPacket();
  655 +public:
  656 + virtual int decode(SrsStream* stream);
  657 +};
628 658
629 /** 659 /**
630 * FMLE start publish: ReleaseStream/PublishStream 660 * FMLE start publish: ReleaseStream/PublishStream
@@ -171,7 +171,7 @@ void SrsRequest::strip() @@ -171,7 +171,7 @@ void SrsRequest::strip()
171 trim(stream, "/ \n\r\t"); 171 trim(stream, "/ \n\r\t");
172 } 172 }
173 173
174 -std::string& SrsRequest::trim(string& str, string chs) 174 +string& SrsRequest::trim(string& str, string chs)
175 { 175 {
176 for (int i = 0; i < (int)chs.length(); i++) { 176 for (int i = 0; i < (int)chs.length(); i++) {
177 char ch = chs.at(i); 177 char ch = chs.at(i);
@@ -695,7 +695,7 @@ int SrsRtmp::on_bw_done() @@ -695,7 +695,7 @@ int SrsRtmp::on_bw_done()
695 return ret; 695 return ret;
696 } 696 }
697 697
698 -int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& stream_name) 698 +int SrsRtmp::identify_client(int stream_id, SrsClientType& type, string& stream_name)
699 { 699 {
700 type = SrsClientUnknown; 700 type = SrsClientUnknown;
701 int ret = ERROR_SUCCESS; 701 int ret = ERROR_SUCCESS;
@@ -723,13 +723,15 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st @@ -723,13 +723,15 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st
723 SrsPacket* pkt = msg->get_packet(); 723 SrsPacket* pkt = msg->get_packet();
724 if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) { 724 if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
725 srs_info("identify client by create stream, play or flash publish."); 725 srs_info("identify client by create stream, play or flash publish.");
726 - return identify_create_stream_client(  
727 - dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, type, stream_name); 726 + return identify_create_stream_client(dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, type, stream_name);
728 } 727 }
729 if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) { 728 if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
730 srs_info("identify client by releaseStream, fmle publish."); 729 srs_info("identify client by releaseStream, fmle publish.");
731 - return identify_fmle_publish_client(  
732 - dynamic_cast<SrsFMLEStartPacket*>(pkt), type, stream_name); 730 + return identify_fmle_publish_client(dynamic_cast<SrsFMLEStartPacket*>(pkt), type, stream_name);
  731 + }
  732 + if (dynamic_cast<SrsPlayPacket*>(pkt)) {
  733 + srs_info("level0 identify client by play.");
  734 + return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt), type, stream_name);
733 } 735 }
734 736
735 srs_trace("ignore AMF0/AMF3 command message."); 737 srs_trace("ignore AMF0/AMF3 command message.");
@@ -1165,16 +1167,12 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea @@ -1165,16 +1167,12 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea
1165 1167
1166 SrsPacket* pkt = msg->get_packet(); 1168 SrsPacket* pkt = msg->get_packet();
1167 if (dynamic_cast<SrsPlayPacket*>(pkt)) { 1169 if (dynamic_cast<SrsPlayPacket*>(pkt)) {
1168 - SrsPlayPacket* play = dynamic_cast<SrsPlayPacket*>(pkt);  
1169 - type = SrsClientPlay;  
1170 - stream_name = play->stream_name;  
1171 - srs_trace("identity client type=play, stream_name=%s", stream_name.c_str());  
1172 - return ret; 1170 + srs_info("level1 identify client by play.");
  1171 + return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt), type, stream_name);
1173 } 1172 }
1174 if (dynamic_cast<SrsPublishPacket*>(pkt)) { 1173 if (dynamic_cast<SrsPublishPacket*>(pkt)) {
1175 srs_info("identify client by publish, falsh publish."); 1174 srs_info("identify client by publish, falsh publish.");
1176 - return identify_flash_publish_client(  
1177 - dynamic_cast<SrsPublishPacket*>(pkt), type, stream_name); 1175 + return identify_flash_publish_client(dynamic_cast<SrsPublishPacket*>(pkt), type, stream_name);
1178 } 1176 }
1179 1177
1180 srs_trace("ignore AMF0/AMF3 command message."); 1178 srs_trace("ignore AMF0/AMF3 command message.");
@@ -1216,3 +1214,16 @@ int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& @@ -1216,3 +1214,16 @@ int SrsRtmp::identify_flash_publish_client(SrsPublishPacket* req, SrsClientType&
1216 1214
1217 return ret; 1215 return ret;
1218 } 1216 }
  1217 +
  1218 +int SrsRtmp::identify_play_client(SrsPlayPacket* req, SrsClientType& type, string& stream_name)
  1219 +{
  1220 + int ret = ERROR_SUCCESS;
  1221 +
  1222 + type = SrsClientPlay;
  1223 + stream_name = req->stream_name;
  1224 +
  1225 + srs_trace("identity client type=play, stream_name=%s", stream_name.c_str());
  1226 +
  1227 + return ret;
  1228 +}
  1229 +
@@ -40,6 +40,7 @@ class SrsFMLEStartPacket; @@ -40,6 +40,7 @@ class SrsFMLEStartPacket;
40 class SrsPublishPacket; 40 class SrsPublishPacket;
41 class SrsSharedPtrMessage; 41 class SrsSharedPtrMessage;
42 class SrsOnMetaDataPacket; 42 class SrsOnMetaDataPacket;
  43 +class SrsPlayPacket;
43 44
44 /** 45 /**
45 * the original request from client. 46 * the original request from client.
@@ -226,6 +227,8 @@ private: @@ -226,6 +227,8 @@ private:
226 virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); 227 virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name);
227 virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name); 228 virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, SrsClientType& type, std::string& stream_name);
228 virtual int identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name); 229 virtual int identify_flash_publish_client(SrsPublishPacket* req, SrsClientType& type, std::string& stream_name);
  230 +private:
  231 + virtual int identify_play_client(SrsPlayPacket* req, SrsClientType& type, std::string& stream_name);
229 }; 232 };
230 233
231 #endif 234 #endif