winlin

refine the protocol and amf0, extract the template method as global static method

@@ -50,6 +50,123 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -50,6 +50,123 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
50 // origin array whos data takes the same form as LengthValueBytes 50 // origin array whos data takes the same form as LengthValueBytes
51 #define RTMP_AMF0_OriginStrictArray 0x20 51 #define RTMP_AMF0_OriginStrictArray 0x20
52 52
  53 +// User defined
  54 +#define RTMP_AMF0_Invalid 0x3F
  55 +
  56 +SrsAmf0Any::SrsAmf0Any()
  57 +{
  58 + marker = RTMP_AMF0_Invalid;
  59 +}
  60 +
  61 +SrsAmf0Any::~SrsAmf0Any()
  62 +{
  63 +}
  64 +
  65 +bool SrsAmf0Any::is_string()
  66 +{
  67 + return marker == RTMP_AMF0_String;
  68 +}
  69 +
  70 +bool SrsAmf0Any::is_boolean()
  71 +{
  72 + return marker == RTMP_AMF0_Boolean;
  73 +}
  74 +
  75 +bool SrsAmf0Any::is_number()
  76 +{
  77 + return marker == RTMP_AMF0_Number;
  78 +}
  79 +
  80 +bool SrsAmf0Any::is_object()
  81 +{
  82 + return marker == RTMP_AMF0_Object;
  83 +}
  84 +
  85 +bool SrsAmf0Any::is_object_eof()
  86 +{
  87 + return marker == RTMP_AMF0_ObjectEnd;
  88 +}
  89 +
  90 +SrsAmf0String::SrsAmf0String()
  91 +{
  92 + marker = RTMP_AMF0_String;
  93 +}
  94 +
  95 +SrsAmf0String::~SrsAmf0String()
  96 +{
  97 +}
  98 +
  99 +SrsAmf0Boolean::SrsAmf0Boolean()
  100 +{
  101 + marker = RTMP_AMF0_Boolean;
  102 + value = false;
  103 +}
  104 +
  105 +SrsAmf0Boolean::~SrsAmf0Boolean()
  106 +{
  107 +}
  108 +
  109 +SrsAmf0Number::SrsAmf0Number()
  110 +{
  111 + marker = RTMP_AMF0_Number;
  112 + value = 0;
  113 +}
  114 +
  115 +SrsAmf0Number::~SrsAmf0Number()
  116 +{
  117 + marker = RTMP_AMF0_ObjectEnd;
  118 +}
  119 +
  120 +SrsAmf0ObjectEOF::SrsAmf0ObjectEOF()
  121 +{
  122 + utf8_empty = 0x00;
  123 +}
  124 +
  125 +SrsAmf0ObjectEOF::~SrsAmf0ObjectEOF()
  126 +{
  127 +}
  128 +
  129 +SrsAmf0Object::SrsAmf0Object()
  130 +{
  131 + marker = RTMP_AMF0_Object;
  132 +}
  133 +
  134 +SrsAmf0Object::~SrsAmf0Object()
  135 +{
  136 + std::map<std::string, SrsAmf0Any*>::iterator it;
  137 + for (it = properties.begin(); it != properties.end(); ++it) {
  138 + SrsAmf0Any* any = it->second;
  139 + delete any;
  140 + }
  141 + properties.clear();
  142 +}
  143 +
  144 +SrsAmf0Any* SrsAmf0Object::get_property(std::string name)
  145 +{
  146 + std::map<std::string, SrsAmf0Any*>::iterator it;
  147 +
  148 + if ((it = properties.find(name)) == properties.end()) {
  149 + return NULL;
  150 + }
  151 +
  152 + return it->second;
  153 +}
  154 +
  155 +SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name)
  156 +{
  157 + SrsAmf0Any* prop = get_property(name);
  158 +
  159 + if (!prop) {
  160 + return NULL;
  161 + }
  162 +
  163 + if (!prop->is_string()) {
  164 + return NULL;
  165 + }
  166 +
  167 + return prop;
  168 +}
  169 +
53 int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*&); 170 int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*&);
54 171
55 int srs_amf0_read_utf8(SrsStream* stream, std::string& value) 172 int srs_amf0_read_utf8(SrsStream* stream, std::string& value)
@@ -254,10 +371,12 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) @@ -254,10 +371,12 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value)
254 value = p; 371 value = p;
255 return ret; 372 return ret;
256 } 373 }
257 - default:  
258 - value = new SrsAmf0Any();  
259 - value->marker = stream->read_char(); 374 + case RTMP_AMF0_Invalid:
  375 + default: {
  376 + ret = ERROR_RTMP_AMF0_INVALID;
  377 + srs_error("invalid amf0 message type. marker=%#x, ret=%d", marker, ret);
260 return ret; 378 return ret;
  379 + }
261 } 380 }
262 381
263 return ret; 382 return ret;
@@ -343,91 +462,3 @@ int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value) @@ -343,91 +462,3 @@ int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value)
343 462
344 return ret; 463 return ret;
345 } 464 }
346 -  
347 -SrsAmf0Any::SrsAmf0Any()  
348 -{  
349 - marker = RTMP_AMF0_Null;  
350 -}  
351 -  
352 -SrsAmf0Any::~SrsAmf0Any()  
353 -{  
354 -}  
355 -  
356 -bool SrsAmf0Any::is_string()  
357 -{  
358 - return marker == RTMP_AMF0_String;  
359 -}  
360 -  
361 -bool SrsAmf0Any::is_boolean()  
362 -{  
363 - return marker == RTMP_AMF0_Boolean;  
364 -}  
365 -  
366 -bool SrsAmf0Any::is_number()  
367 -{  
368 - return marker == RTMP_AMF0_Number;  
369 -}  
370 -  
371 -bool SrsAmf0Any::is_object()  
372 -{  
373 - return marker == RTMP_AMF0_Object;  
374 -}  
375 -  
376 -bool SrsAmf0Any::is_object_eof()  
377 -{  
378 - return marker == RTMP_AMF0_ObjectEnd;  
379 -}  
380 -  
381 -SrsAmf0String::SrsAmf0String()  
382 -{  
383 - marker = RTMP_AMF0_String;  
384 -}  
385 -  
386 -SrsAmf0String::~SrsAmf0String()  
387 -{  
388 -}  
389 -  
390 -SrsAmf0Boolean::SrsAmf0Boolean()  
391 -{  
392 - marker = RTMP_AMF0_Boolean;  
393 - value = false;  
394 -}  
395 -  
396 -SrsAmf0Boolean::~SrsAmf0Boolean()  
397 -{  
398 -}  
399 -  
400 -SrsAmf0Number::SrsAmf0Number()  
401 -{  
402 - marker = RTMP_AMF0_Number;  
403 - value = 0;  
404 -}  
405 -  
406 -SrsAmf0Number::~SrsAmf0Number()  
407 -{  
408 - marker = RTMP_AMF0_ObjectEnd;  
409 -}  
410 -  
411 -SrsAmf0ObjectEOF::SrsAmf0ObjectEOF()  
412 -{  
413 - utf8_empty = 0x00;  
414 -}  
415 -  
416 -SrsAmf0ObjectEOF::~SrsAmf0ObjectEOF()  
417 -{  
418 -}  
419 -  
420 -SrsAmf0Object::SrsAmf0Object()  
421 -{  
422 - marker = RTMP_AMF0_Object;  
423 -}  
424 -  
425 -SrsAmf0Object::~SrsAmf0Object()  
426 -{  
427 - std::map<std::string, SrsAmf0Any*>::iterator it;  
428 - for (it = properties.begin(); it != properties.end(); ++it) {  
429 - SrsAmf0Any* any = it->second;  
430 - delete any;  
431 - }  
432 - properties.clear();  
433 -}  
@@ -37,46 +37,6 @@ class SrsStream; @@ -37,46 +37,6 @@ class SrsStream;
37 class SrsAmf0Object; 37 class SrsAmf0Object;
38 38
39 /** 39 /**
40 -* read amf0 utf8 string from stream.  
41 -* 1.3.1 Strings and UTF-8  
42 -* UTF-8 = U16 *(UTF8-char)  
43 -* UTF8-char = UTF8-1 | UTF8-2 | UTF8-3 | UTF8-4  
44 -* UTF8-1 = %x00-7F  
45 -* @remark only support UTF8-1 char.  
46 -*/  
47 -extern int srs_amf0_read_utf8(SrsStream* stream, std::string& value);  
48 -  
49 -/**  
50 -* read amf0 string from stream.  
51 -* 2.4 String Type  
52 -* string-type = string-marker UTF-8  
53 -*/  
54 -extern int srs_amf0_read_string(SrsStream* stream, std::string& value);  
55 -  
56 -/**  
57 -* read amf0 boolean from stream.  
58 -* 2.4 String Type  
59 -* boolean-type = boolean-marker U8  
60 -* 0 is false, <> 0 is true  
61 -*/  
62 -extern int srs_amf0_read_boolean(SrsStream* stream, bool& value);  
63 -  
64 -/**  
65 -* read amf0 number from stream.  
66 -* 2.2 Number Type  
67 -* number-type = number-marker DOUBLE  
68 -*/  
69 -extern int srs_amf0_read_number(SrsStream* stream, double& value);  
70 -  
71 -/**  
72 -* read amf0 object from stream.  
73 -* 2.5 Object Type  
74 -* anonymous-object-type = object-marker *(object-property)  
75 -* object-property = (UTF-8 value-type) | (UTF-8-empty object-end-marker)  
76 -*/  
77 -extern int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value);  
78 -  
79 -/**  
80 * any amf0 value. 40 * any amf0 value.
81 * 2.1 Types Overview 41 * 2.1 Types Overview
82 * value-type = number-type | boolean-type | string-type | object-type 42 * value-type = number-type | boolean-type | string-type | object-type
@@ -166,7 +126,50 @@ struct SrsAmf0Object : public SrsAmf0Any @@ -166,7 +126,50 @@ struct SrsAmf0Object : public SrsAmf0Any
166 126
167 SrsAmf0Object(); 127 SrsAmf0Object();
168 virtual ~SrsAmf0Object(); 128 virtual ~SrsAmf0Object();
  129 +
  130 + virtual SrsAmf0Any* get_property(std::string name);
  131 + virtual SrsAmf0Any* ensure_property_string(std::string name);
169 }; 132 };
  133 +
  134 +/**
  135 +* read amf0 utf8 string from stream.
  136 +* 1.3.1 Strings and UTF-8
  137 +* UTF-8 = U16 *(UTF8-char)
  138 +* UTF8-char = UTF8-1 | UTF8-2 | UTF8-3 | UTF8-4
  139 +* UTF8-1 = %x00-7F
  140 +* @remark only support UTF8-1 char.
  141 +*/
  142 +extern int srs_amf0_read_utf8(SrsStream* stream, std::string& value);
  143 +
  144 +/**
  145 +* read amf0 string from stream.
  146 +* 2.4 String Type
  147 +* string-type = string-marker UTF-8
  148 +*/
  149 +extern int srs_amf0_read_string(SrsStream* stream, std::string& value);
  150 +
  151 +/**
  152 +* read amf0 boolean from stream.
  153 +* 2.4 String Type
  154 +* boolean-type = boolean-marker U8
  155 +* 0 is false, <> 0 is true
  156 +*/
  157 +extern int srs_amf0_read_boolean(SrsStream* stream, bool& value);
  158 +
  159 +/**
  160 +* read amf0 number from stream.
  161 +* 2.2 Number Type
  162 +* number-type = number-marker DOUBLE
  163 +*/
  164 +extern int srs_amf0_read_number(SrsStream* stream, double& value);
  165 +
  166 +/**
  167 +* read amf0 object from stream.
  168 +* 2.5 Object Type
  169 +* anonymous-object-type = object-marker *(object-property)
  170 +* object-property = (UTF-8 value-type) | (UTF-8-empty object-end-marker)
  171 +*/
  172 +extern int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value);
170 173
171 /** 174 /**
172 * convert the any to specified object. 175 * convert the any to specified object.
@@ -33,6 +33,7 @@ SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) @@ -33,6 +33,7 @@ SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
33 : SrsConnection(srs_server, client_stfd) 33 : SrsConnection(srs_server, client_stfd)
34 { 34 {
35 ip = NULL; 35 ip = NULL;
  36 + req = new SrsRequest();
36 rtmp = new SrsRtmp(client_stfd); 37 rtmp = new SrsRtmp(client_stfd);
37 } 38 }
38 39
@@ -43,6 +44,11 @@ SrsClient::~SrsClient() @@ -43,6 +44,11 @@ SrsClient::~SrsClient()
43 ip = NULL; 44 ip = NULL;
44 } 45 }
45 46
  47 + if (req) {
  48 + delete req;
  49 + req = NULL;
  50 + }
  51 +
46 if (rtmp) { 52 if (rtmp) {
47 delete rtmp; 53 delete rtmp;
48 rtmp = NULL; 54 rtmp = NULL;
@@ -65,12 +71,13 @@ int SrsClient::do_cycle() @@ -65,12 +71,13 @@ int SrsClient::do_cycle()
65 } 71 }
66 srs_verbose("rtmp handshake success"); 72 srs_verbose("rtmp handshake success");
67 73
68 - SrsApp* app = NULL;  
69 - if ((ret = rtmp->connect_app(&app)) != ERROR_SUCCESS) { 74 + if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
70 srs_warn("rtmp connect vhost/app failed. ret=%d", ret); 75 srs_warn("rtmp connect vhost/app failed. ret=%d", ret);
71 return ret; 76 return ret;
72 } 77 }
73 - srs_verbose("rtmp connect vhost/app success"); 78 + srs_info("rtmp connect success. tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s",
  79 + req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
  80 + req->app.c_str());
74 81
75 return ret; 82 return ret;
76 } 83 }
@@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33 #include <srs_core_conn.hpp> 33 #include <srs_core_conn.hpp>
34 34
35 class SrsRtmp; 35 class SrsRtmp;
  36 +class SrsRequest;
36 37
37 /** 38 /**
38 * the client provides the main logic control for RTMP clients. 39 * the client provides the main logic control for RTMP clients.
@@ -41,6 +42,7 @@ class SrsClient : public SrsConnection @@ -41,6 +42,7 @@ class SrsClient : public SrsConnection
41 { 42 {
42 private: 43 private:
43 char* ip; 44 char* ip;
  45 + SrsRequest* req;
44 SrsRtmp* rtmp; 46 SrsRtmp* rtmp;
45 public: 47 public:
46 SrsClient(SrsServer* srs_server, st_netfd_t client_stfd); 48 SrsClient(SrsServer* srs_server, st_netfd_t client_stfd);
@@ -53,6 +53,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -53,6 +53,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
53 #define ERROR_RTMP_CHUNK_START 301 53 #define ERROR_RTMP_CHUNK_START 301
54 #define ERROR_RTMP_MSG_INVLIAD_SIZE 302 54 #define ERROR_RTMP_MSG_INVLIAD_SIZE 302
55 #define ERROR_RTMP_AMF0_DECODE 303 55 #define ERROR_RTMP_AMF0_DECODE 303
  56 +#define ERROR_RTMP_AMF0_INVALID 304
  57 +#define ERROR_RTMP_REQ_CONNECT 305
56 58
57 #define ERROR_SYSTEM_STREAM_INIT 400 59 #define ERROR_SYSTEM_STREAM_INIT 400
58 60
@@ -194,190 +194,6 @@ messages. @@ -194,190 +194,6 @@ messages.
194 */ 194 */
195 #define RTMP_AMF0_COMMAND_CONNECT "connect" 195 #define RTMP_AMF0_COMMAND_CONNECT "connect"
196 196
197 -SrsMessageHeader::SrsMessageHeader()  
198 -{  
199 - message_type = 0;  
200 - payload_length = 0;  
201 - timestamp = 0;  
202 - stream_id = 0;  
203 -}  
204 -  
205 -SrsMessageHeader::~SrsMessageHeader()  
206 -{  
207 -}  
208 -  
209 -SrsChunkStream::SrsChunkStream(int _cid)  
210 -{  
211 - fmt = 0;  
212 - cid = _cid;  
213 - extended_timestamp = false;  
214 - msg = NULL;  
215 -}  
216 -  
217 -SrsChunkStream::~SrsChunkStream()  
218 -{  
219 - if (msg) {  
220 - delete msg;  
221 - msg = NULL;  
222 - }  
223 -}  
224 -  
225 -SrsMessage::SrsMessage()  
226 -{  
227 - size = 0;  
228 - stream = NULL;  
229 - payload = NULL;  
230 - decoded_payload = NULL;  
231 -}  
232 -  
233 -SrsMessage::~SrsMessage()  
234 -{  
235 - if (payload) {  
236 - delete[] payload;  
237 - payload = NULL;  
238 - }  
239 -  
240 - if (decoded_payload) {  
241 - delete decoded_payload;  
242 - decoded_payload = NULL;  
243 - }  
244 -  
245 - if (stream) {  
246 - delete stream;  
247 - stream = NULL;  
248 - }  
249 -}  
250 -  
251 -SrsPacket* SrsMessage::get_packet()  
252 -{  
253 - if (!decoded_payload) {  
254 - srs_error("the payload is raw/undecoded, invoke decode_packet to decode it.");  
255 - }  
256 - srs_assert(decoded_payload != NULL);  
257 -  
258 - return decoded_payload;  
259 -}  
260 -  
261 -int SrsMessage::decode_packet()  
262 -{  
263 - int ret = ERROR_SUCCESS;  
264 -  
265 - srs_assert(payload != NULL);  
266 - srs_assert(size > 0);  
267 -  
268 - if (!stream) {  
269 - srs_verbose("create decode stream for message.");  
270 - stream = new SrsStream();  
271 - }  
272 -  
273 - if (header.message_type == RTMP_MSG_AMF0CommandMessage) {  
274 - srs_verbose("start to decode AMF0 command message.");  
275 -  
276 - // amf0 command message.  
277 - // need to read the command name.  
278 - if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) {  
279 - srs_error("initialize stream failed. ret=%d", ret);  
280 - return ret;  
281 - }  
282 - srs_verbose("decode stream initialized success");  
283 -  
284 - std::string command;  
285 - if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) {  
286 - srs_error("decode AMF0 command name failed. ret=%d", ret);  
287 - return ret;  
288 - }  
289 - srs_verbose("AMF0 command message, command_name=%s", command.c_str());  
290 -  
291 - stream->reset();  
292 - if (command == RTMP_AMF0_COMMAND_CONNECT) {  
293 - srs_info("decode the AMF0 command(connect vhost/app message).");  
294 - decoded_payload = new SrsConnectAppPacket();  
295 - return decoded_payload->decode(stream);  
296 - }  
297 -  
298 - // default packet to drop message.  
299 - srs_trace("drop the AMF0 command message, command_name=%s", command.c_str());  
300 - decoded_payload = new SrsPacket();  
301 - return ret;  
302 - }  
303 -  
304 - // default packet to drop message.  
305 - srs_trace("drop the unknown message, type=%d", header.message_type);  
306 - decoded_payload = new SrsPacket();  
307 -  
308 - return ret;  
309 -}  
310 -  
311 -SrsPacket::SrsPacket()  
312 -{  
313 -}  
314 -  
315 -SrsPacket::~SrsPacket()  
316 -{  
317 -}  
318 -  
319 -int SrsPacket::decode(SrsStream* /*stream*/)  
320 -{  
321 - int ret = ERROR_SUCCESS;  
322 - return ret;  
323 -}  
324 -  
325 -SrsConnectAppPacket::SrsConnectAppPacket()  
326 -{  
327 - command_name = RTMP_AMF0_COMMAND_CONNECT;  
328 - transaction_id = 1;  
329 - command_object = NULL;  
330 -}  
331 -  
332 -SrsConnectAppPacket::~SrsConnectAppPacket()  
333 -{  
334 -}  
335 -  
336 -int SrsConnectAppPacket::decode(SrsStream* stream)  
337 -{  
338 - int ret = ERROR_SUCCESS;  
339 -  
340 - if ((ret = super::decode(stream)) != ERROR_SUCCESS) {  
341 - return ret;  
342 - }  
343 -  
344 - if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {  
345 - srs_error("amf0 decode connect command_name failed. ret=%d", ret);  
346 - return ret;  
347 - }  
348 - if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CONNECT) {  
349 - ret = ERROR_RTMP_AMF0_DECODE;  
350 - srs_error("amf0 decode connect command_name failed. "  
351 - "command_name=%s, ret=%d", command_name.c_str(), ret);  
352 - return ret;  
353 - }  
354 -  
355 - if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {  
356 - srs_error("amf0 decode connect transaction_id failed. ret=%d", ret);  
357 - return ret;  
358 - }  
359 - if (transaction_id != 1.0) {  
360 - ret = ERROR_RTMP_AMF0_DECODE;  
361 - srs_error("amf0 decode connect transaction_id failed. "  
362 - "required=%.1f, actual=%.1f, ret=%d", 1.0, transaction_id, ret);  
363 - return ret;  
364 - }  
365 -  
366 - if ((ret = srs_amf0_read_object(stream, command_object)) != ERROR_SUCCESS) {  
367 - srs_error("amf0 decode connect command_object failed. ret=%d", ret);  
368 - return ret;  
369 - }  
370 - if (command_object == NULL) {  
371 - ret = ERROR_RTMP_AMF0_DECODE;  
372 - srs_error("amf0 decode connect command_object failed. ret=%d", ret);  
373 - return ret;  
374 - }  
375 -  
376 - srs_info("amf0 decode connect packet success");  
377 -  
378 - return ret;  
379 -}  
380 -  
381 SrsProtocol::SrsProtocol(st_netfd_t client_stfd) 197 SrsProtocol::SrsProtocol(st_netfd_t client_stfd)
382 { 198 {
383 stfd = client_stfd; 199 stfd = client_stfd;
@@ -771,3 +587,187 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh @@ -771,3 +587,187 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh
771 return ret; 587 return ret;
772 } 588 }
773 589
  590 +SrsMessageHeader::SrsMessageHeader()
  591 +{
  592 + message_type = 0;
  593 + payload_length = 0;
  594 + timestamp = 0;
  595 + stream_id = 0;
  596 +}
  597 +
  598 +SrsMessageHeader::~SrsMessageHeader()
  599 +{
  600 +}
  601 +
  602 +SrsChunkStream::SrsChunkStream(int _cid)
  603 +{
  604 + fmt = 0;
  605 + cid = _cid;
  606 + extended_timestamp = false;
  607 + msg = NULL;
  608 +}
  609 +
  610 +SrsChunkStream::~SrsChunkStream()
  611 +{
  612 + if (msg) {
  613 + delete msg;
  614 + msg = NULL;
  615 + }
  616 +}
  617 +
  618 +SrsMessage::SrsMessage()
  619 +{
  620 + size = 0;
  621 + stream = NULL;
  622 + payload = NULL;
  623 + decoded_payload = NULL;
  624 +}
  625 +
  626 +SrsMessage::~SrsMessage()
  627 +{
  628 + if (payload) {
  629 + delete[] payload;
  630 + payload = NULL;
  631 + }
  632 +
  633 + if (decoded_payload) {
  634 + delete decoded_payload;
  635 + decoded_payload = NULL;
  636 + }
  637 +
  638 + if (stream) {
  639 + delete stream;
  640 + stream = NULL;
  641 + }
  642 +}
  643 +
  644 +SrsPacket* SrsMessage::get_packet()
  645 +{
  646 + if (!decoded_payload) {
  647 + srs_error("the payload is raw/undecoded, invoke decode_packet to decode it.");
  648 + }
  649 + srs_assert(decoded_payload != NULL);
  650 +
  651 + return decoded_payload;
  652 +}
  653 +
  654 +int SrsMessage::decode_packet()
  655 +{
  656 + int ret = ERROR_SUCCESS;
  657 +
  658 + srs_assert(payload != NULL);
  659 + srs_assert(size > 0);
  660 +
  661 + if (!stream) {
  662 + srs_verbose("create decode stream for message.");
  663 + stream = new SrsStream();
  664 + }
  665 +
  666 + if (header.message_type == RTMP_MSG_AMF0CommandMessage) {
  667 + srs_verbose("start to decode AMF0 command message.");
  668 +
  669 + // amf0 command message.
  670 + // need to read the command name.
  671 + if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) {
  672 + srs_error("initialize stream failed. ret=%d", ret);
  673 + return ret;
  674 + }
  675 + srs_verbose("decode stream initialized success");
  676 +
  677 + std::string command;
  678 + if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) {
  679 + srs_error("decode AMF0 command name failed. ret=%d", ret);
  680 + return ret;
  681 + }
  682 + srs_verbose("AMF0 command message, command_name=%s", command.c_str());
  683 +
  684 + stream->reset();
  685 + if (command == RTMP_AMF0_COMMAND_CONNECT) {
  686 + srs_info("decode the AMF0 command(connect vhost/app message).");
  687 + decoded_payload = new SrsConnectAppPacket();
  688 + return decoded_payload->decode(stream);
  689 + }
  690 +
  691 + // default packet to drop message.
  692 + srs_trace("drop the AMF0 command message, command_name=%s", command.c_str());
  693 + decoded_payload = new SrsPacket();
  694 + return ret;
  695 + }
  696 +
  697 + // default packet to drop message.
  698 + srs_trace("drop the unknown message, type=%d", header.message_type);
  699 + decoded_payload = new SrsPacket();
  700 +
  701 + return ret;
  702 +}
  703 +
  704 +SrsPacket::SrsPacket()
  705 +{
  706 +}
  707 +
  708 +SrsPacket::~SrsPacket()
  709 +{
  710 +}
  711 +
  712 +int SrsPacket::decode(SrsStream* /*stream*/)
  713 +{
  714 + int ret = ERROR_SUCCESS;
  715 + return ret;
  716 +}
  717 +
  718 +SrsConnectAppPacket::SrsConnectAppPacket()
  719 +{
  720 + command_name = RTMP_AMF0_COMMAND_CONNECT;
  721 + transaction_id = 1;
  722 + command_object = NULL;
  723 +}
  724 +
  725 +SrsConnectAppPacket::~SrsConnectAppPacket()
  726 +{
  727 +}
  728 +
  729 +int SrsConnectAppPacket::decode(SrsStream* stream)
  730 +{
  731 + int ret = ERROR_SUCCESS;
  732 +
  733 + if ((ret = super::decode(stream)) != ERROR_SUCCESS) {
  734 + return ret;
  735 + }
  736 +
  737 + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
  738 + srs_error("amf0 decode connect command_name failed. ret=%d", ret);
  739 + return ret;
  740 + }
  741 + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CONNECT) {
  742 + ret = ERROR_RTMP_AMF0_DECODE;
  743 + srs_error("amf0 decode connect command_name failed. "
  744 + "command_name=%s, ret=%d", command_name.c_str(), ret);
  745 + return ret;
  746 + }
  747 +
  748 + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
  749 + srs_error("amf0 decode connect transaction_id failed. ret=%d", ret);
  750 + return ret;
  751 + }
  752 + if (transaction_id != 1.0) {
  753 + ret = ERROR_RTMP_AMF0_DECODE;
  754 + srs_error("amf0 decode connect transaction_id failed. "
  755 + "required=%.1f, actual=%.1f, ret=%d", 1.0, transaction_id, ret);
  756 + return ret;
  757 + }
  758 +
  759 + if ((ret = srs_amf0_read_object(stream, command_object)) != ERROR_SUCCESS) {
  760 + srs_error("amf0 decode connect command_object failed. ret=%d", ret);
  761 + return ret;
  762 + }
  763 + if (command_object == NULL) {
  764 + ret = ERROR_RTMP_AMF0_DECODE;
  765 + srs_error("amf0 decode connect command_object failed. ret=%d", ret);
  766 + return ret;
  767 + }
  768 +
  769 + srs_info("amf0 decode connect packet success");
  770 +
  771 + return ret;
  772 +}
  773 +
@@ -47,6 +47,39 @@ class SrsChunkStream; @@ -47,6 +47,39 @@ class SrsChunkStream;
47 class SrsAmf0Object; 47 class SrsAmf0Object;
48 48
49 /** 49 /**
  50 +* the protocol provides the rtmp-message-protocol services,
  51 +* to recv RTMP message from RTMP chunk stream,
  52 +* and to send out RTMP message over RTMP chunk stream.
  53 +*/
  54 +class SrsProtocol
  55 +{
  56 +private:
  57 + std::map<int, SrsChunkStream*> chunk_streams;
  58 + st_netfd_t stfd;
  59 + SrsBuffer* buffer;
  60 + SrsSocket* skt;
  61 + int32_t in_chunk_size;
  62 + int32_t out_chunk_size;
  63 +public:
  64 + SrsProtocol(st_netfd_t client_stfd);
  65 + virtual ~SrsProtocol();
  66 +public:
  67 + /**
  68 + * recv a message with raw/undecoded payload from peer.
  69 + * the payload is not decoded, use srs_rtmp_expect_message<T> if requires
  70 + * specifies message.
  71 + * @pmsg, user must free it. NULL if not success.
  72 + * @remark, only when success, user can use and must free the pmsg.
  73 + */
  74 + virtual int recv_message(SrsMessage** pmsg);
  75 +private:
  76 + virtual int recv_interlaced_message(SrsMessage** pmsg);
  77 + virtual int read_basic_header(char& fmt, int& cid, int& size);
  78 + virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);
  79 + virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg);
  80 +};
  81 +
  82 +/**
50 * 4.1. Message Header 83 * 4.1. Message Header
51 */ 84 */
52 struct SrsMessageHeader 85 struct SrsMessageHeader
@@ -162,7 +195,7 @@ class SrsConnectAppPacket : public SrsPacket @@ -162,7 +195,7 @@ class SrsConnectAppPacket : public SrsPacket
162 { 195 {
163 private: 196 private:
164 typedef SrsPacket super; 197 typedef SrsPacket super;
165 -private: 198 +public:
166 std::string command_name; 199 std::string command_name;
167 double transaction_id; 200 double transaction_id;
168 SrsAmf0Object* command_object; 201 SrsAmf0Object* command_object;
@@ -174,80 +207,48 @@ public: @@ -174,80 +207,48 @@ public:
174 }; 207 };
175 208
176 /** 209 /**
177 -* the protocol provides the rtmp-message-protocol services,  
178 -* to recv RTMP message from RTMP chunk stream,  
179 -* and to send out RTMP message over RTMP chunk stream. 210 +* expect a specified message, drop others util got specified one.
  211 +* @pmsg, user must free it. NULL if not success.
  212 +* @ppacket, store in the pmsg, user must never free it. NULL if not success.
  213 +* @remark, only when success, user can use and must free the pmsg/ppacket.
180 */ 214 */
181 -class SrsProtocol 215 +template<class T>
  216 +int srs_rtmp_expect_message(SrsProtocol* protocol, SrsMessage** pmsg, T** ppacket)
182 { 217 {
183 -private:  
184 - std::map<int, SrsChunkStream*> chunk_streams;  
185 - st_netfd_t stfd;  
186 - SrsBuffer* buffer;  
187 - SrsSocket* skt;  
188 - int32_t in_chunk_size;  
189 - int32_t out_chunk_size;  
190 -public:  
191 - SrsProtocol(st_netfd_t client_stfd);  
192 - virtual ~SrsProtocol();  
193 -public:  
194 - /**  
195 - * recv a message with raw/undecoded payload from peer.  
196 - * the payload is not decoded, use expect_message<T> if requires specifies message.  
197 - * @pmsg, user must free it. NULL if not success.  
198 - * @remark, only when success, user can use and must free the pmsg.  
199 - */  
200 - virtual int recv_message(SrsMessage** pmsg);  
201 -public:  
202 - /**  
203 - * expect a specified message, drop others util got specified one.  
204 - * @pmsg, user must free it. NULL if not success.  
205 - * @ppacket, store in the pmsg, user must never free it. NULL if not success.  
206 - * @remark, only when success, user can use and must free the pmsg/ppacket.  
207 - */  
208 - template<class T>  
209 - int expect_message(SrsMessage** pmsg, T** ppacket)  
210 - {  
211 - *pmsg = NULL;  
212 - *ppacket = NULL; 218 + *pmsg = NULL;
  219 + *ppacket = NULL;
  220 +
  221 + int ret = ERROR_SUCCESS;
  222 +
  223 + while (true) {
  224 + SrsMessage* msg = NULL;
  225 + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
  226 + srs_error("recv message failed. ret=%d", ret);
  227 + return ret;
  228 + }
  229 + srs_verbose("recv message success.");
213 230
214 - int ret = ERROR_SUCCESS; 231 + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {
  232 + delete msg;
  233 + srs_error("decode message failed. ret=%d", ret);
  234 + return ret;
  235 + }
215 236
216 - while (true) {  
217 - SrsMessage* msg = NULL;  
218 - if ((ret = recv_message(&msg)) != ERROR_SUCCESS) {  
219 - srs_error("recv message failed. ret=%d", ret);  
220 - return ret;  
221 - }  
222 - srs_verbose("recv message success.");  
223 -  
224 - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) {  
225 - delete msg;  
226 - srs_error("decode message failed. ret=%d", ret);  
227 - return ret;  
228 - }  
229 -  
230 - T* pkt = dynamic_cast<T*>(msg->get_packet());  
231 - if (!pkt) {  
232 - delete msg;  
233 - srs_trace("drop message(type=%d, size=%d, time=%d, sid=%d).",  
234 - msg->header.message_type, msg->header.payload_length,  
235 - msg->header.timestamp, msg->header.stream_id);  
236 - continue;  
237 - }  
238 -  
239 - *pmsg = msg;  
240 - *ppacket = pkt;  
241 - break; 237 + T* pkt = dynamic_cast<T*>(msg->get_packet());
  238 + if (!pkt) {
  239 + delete msg;
  240 + srs_trace("drop message(type=%d, size=%d, time=%d, sid=%d).",
  241 + msg->header.message_type, msg->header.payload_length,
  242 + msg->header.timestamp, msg->header.stream_id);
  243 + continue;
242 } 244 }
243 245
244 - return ret; 246 + *pmsg = msg;
  247 + *ppacket = pkt;
  248 + break;
245 } 249 }
246 -private:  
247 - virtual int recv_interlaced_message(SrsMessage** pmsg);  
248 - virtual int read_basic_header(char& fmt, int& cid, int& size);  
249 - virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size);  
250 - virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg);  
251 -}; 250 +
  251 + return ret;
  252 +}
252 253
253 #endif 254 #endif
@@ -28,6 +28,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -28,6 +28,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
28 #include <srs_core_socket.hpp> 28 #include <srs_core_socket.hpp>
29 #include <srs_core_protocol.hpp> 29 #include <srs_core_protocol.hpp>
30 #include <srs_core_auto_free.hpp> 30 #include <srs_core_auto_free.hpp>
  31 +#include <srs_core_amf0.hpp>
31 32
32 SrsRtmp::SrsRtmp(st_netfd_t client_stfd) 33 SrsRtmp::SrsRtmp(st_netfd_t client_stfd)
33 { 34 {
@@ -89,19 +90,28 @@ int SrsRtmp::handshake() @@ -89,19 +90,28 @@ int SrsRtmp::handshake()
89 return ret; 90 return ret;
90 } 91 }
91 92
92 -int SrsRtmp::connect_app(SrsApp** papp) 93 +int SrsRtmp::connect_app(SrsRequest* req)
93 { 94 {
94 int ret = ERROR_SUCCESS; 95 int ret = ERROR_SUCCESS;
95 96
96 SrsMessage* msg = NULL; 97 SrsMessage* msg = NULL;
97 SrsConnectAppPacket* pkt = NULL; 98 SrsConnectAppPacket* pkt = NULL;
98 - if ((ret = protocol->expect_message<SrsConnectAppPacket>(&msg, &pkt)) != ERROR_SUCCESS) { 99 + if ((ret = srs_rtmp_expect_message<SrsConnectAppPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
99 srs_error("expect connect app message failed. ret=%d", ret); 100 srs_error("expect connect app message failed. ret=%d", ret);
100 return ret; 101 return ret;
101 } 102 }
102 SrsAutoFree(SrsMessage, msg, false); 103 SrsAutoFree(SrsMessage, msg, false);
103 srs_info("get connect app message"); 104 srs_info("get connect app message");
104 105
  106 + SrsAmf0Any* prop = NULL;
  107 +
  108 + if ((prop = pkt->command_object->ensure_property_string("tcUrl")) == NULL) {
  109 + ret = ERROR_RTMP_REQ_CONNECT;
  110 + srs_error("invalid request, must specifies the tcUrl. ret=%d", ret);
  111 + return ret;
  112 + }
  113 + req->tcUrl = srs_amf0_convert<SrsAmf0String>(prop)->value;
  114 +
105 return ret; 115 return ret;
106 } 116 }
107 117
@@ -36,10 +36,18 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -36,10 +36,18 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
36 36
37 class SrsProtocol; 37 class SrsProtocol;
38 38
39 -struct SrsApp 39 +/**
  40 +* the original request from client.
  41 +*/
  42 +struct SrsRequest
40 { 43 {
  44 + std::string tcUrl;
  45 +
  46 + std::string schema;
41 std::string vhost; 47 std::string vhost;
  48 + std::string port;
42 std::string app; 49 std::string app;
  50 + std::string stream;
43 }; 51 };
44 52
45 /** 53 /**
@@ -57,7 +65,7 @@ public: @@ -57,7 +65,7 @@ public:
57 virtual ~SrsRtmp(); 65 virtual ~SrsRtmp();
58 public: 66 public:
59 virtual int handshake(); 67 virtual int handshake();
60 - virtual int connect_app(SrsApp** papp); 68 + virtual int connect_app(SrsRequest* req);
61 }; 69 };
62 70
63 #endif 71 #endif