winlin

refine the re of ingest flv, re cleanup always sleep

@@ -46,6 +46,7 @@ int flv_read_packet(int flv_fd, int* type, u_int32_t* timestamp, char** data, in @@ -46,6 +46,7 @@ int flv_read_packet(int flv_fd, int* type, u_int32_t* timestamp, char** data, in
46 #define RE_PULSE_MS 300 46 #define RE_PULSE_MS 300
47 int64_t re_create(); 47 int64_t re_create();
48 void re_update(int64_t re, u_int32_t time); 48 void re_update(int64_t re, u_int32_t time);
  49 +void re_cleanup(int64_t re, u_int32_t time);
49 50
50 int64_t tools_main_entrance_startup_time; 51 int64_t tools_main_entrance_startup_time;
51 int main(int argc, char** argv) 52 int main(int argc, char** argv)
@@ -115,47 +116,58 @@ int main(int argc, char** argv) @@ -115,47 +116,58 @@ int main(int argc, char** argv)
115 return ret; 116 return ret;
116 } 117 }
117 118
118 -int proxy(int flv_fd, srs_rtmp_t ortmp) 119 +int do_proxy(int flv_fd, srs_rtmp_t ortmp, int64_t re, u_int32_t* ptimestamp)
119 { 120 {
120 int ret = 0; 121 int ret = 0;
121 122
122 // packet data 123 // packet data
123 int type, size; 124 int type, size;
124 - u_int32_t timestamp = 0;  
125 char* data = NULL; 125 char* data = NULL;
126 126
127 - if ((ret = flv_open_ic(flv_fd)) != 0) {  
128 - return ret;  
129 - }  
130 - if ((ret = connect_oc(ortmp)) != 0) {  
131 - return ret;  
132 - }  
133 -  
134 - // re  
135 - int64_t re = re_create();  
136 -  
137 trace("start ingest flv to RTMP stream"); 127 trace("start ingest flv to RTMP stream");
138 for (;;) { 128 for (;;) {
139 - if ((ret = flv_read_packet(flv_fd, &type, &timestamp, &data, &size)) != 0) { 129 + if ((ret = flv_read_packet(flv_fd, &type, ptimestamp, &data, &size)) != 0) {
140 trace("irtmp get packet failed. ret=%d", ret); 130 trace("irtmp get packet failed. ret=%d", ret);
141 return ret; 131 return ret;
142 } 132 }
143 verbose("irtmp got packet: type=%s, time=%d, size=%d", 133 verbose("irtmp got packet: type=%s, time=%d, size=%d",
144 srs_type2string(type), timestamp, size); 134 srs_type2string(type), timestamp, size);
145 135
146 - if ((ret = srs_write_packet(ortmp, type, timestamp, data, size)) != 0) { 136 + if ((ret = srs_write_packet(ortmp, type, *ptimestamp, data, size)) != 0) {
147 trace("irtmp get packet failed. ret=%d", ret); 137 trace("irtmp get packet failed. ret=%d", ret);
148 return ret; 138 return ret;
149 } 139 }
150 verbose("ortmp sent packet: type=%s, time=%d, size=%d", 140 verbose("ortmp sent packet: type=%s, time=%d, size=%d",
151 - srs_type2string(type), timestamp, size); 141 + srs_type2string(type), *ptimestamp, size);
152 142
153 - re_update(re, timestamp); 143 + re_update(re, *ptimestamp);
154 } 144 }
155 145
156 return ret; 146 return ret;
157 } 147 }
158 148
  149 +int proxy(int flv_fd, srs_rtmp_t ortmp)
  150 +{
  151 + int ret = 0;
  152 + u_int32_t timestamp = 0;
  153 +
  154 + if ((ret = flv_open_ic(flv_fd)) != 0) {
  155 + return ret;
  156 + }
  157 + if ((ret = connect_oc(ortmp)) != 0) {
  158 + return ret;
  159 + }
  160 +
  161 + int64_t re = re_create();
  162 +
  163 + ret = do_proxy(flv_fd, ortmp, re, &timestamp);
  164 +
  165 + // for the last pulse, always sleep.
  166 + re_cleanup(re, timestamp);
  167 +
  168 + return ret;
  169 +}
  170 +
159 int connect_oc(srs_rtmp_t ortmp) 171 int connect_oc(srs_rtmp_t ortmp)
160 { 172 {
161 int ret = 0; 173 int ret = 0;
@@ -190,13 +202,11 @@ int64_t re_create() @@ -190,13 +202,11 @@ int64_t re_create()
190 int64_t deviation = re - tools_main_entrance_startup_time; 202 int64_t deviation = re - tools_main_entrance_startup_time;
191 trace("deviation is %d ms, pulse is %d ms", (int)(deviation), (int)(RE_PULSE_MS)); 203 trace("deviation is %d ms, pulse is %d ms", (int)(deviation), (int)(RE_PULSE_MS));
192 204
193 - // so, we adjust time to max(0, deviation - pulse/10)  
194 - // because the last pulse, we never sleep, so we use pulse/10,  
195 - // for example, when EOF at the 120ms of last pulse,  
196 - // these bytes is additional data and to fill the deviation.  
197 - int adjust = (int)(deviation - (RE_PULSE_MS / 10)); 205 + // so, we adjust time to max(0, deviation)
  206 + // because the last pulse, we already sleeped
  207 + int adjust = (int)(deviation);
198 if (adjust > 0) { 208 if (adjust > 0) {
199 - trace("adjust re time, sub %d ms", adjust); 209 + trace("adjust re time for %d ms", adjust);
200 re -= adjust; 210 re -= adjust;
201 } else { 211 } else {
202 trace("no need to adjust re time"); 212 trace("no need to adjust re time");
@@ -206,12 +216,24 @@ int64_t re_create() @@ -206,12 +216,24 @@ int64_t re_create()
206 } 216 }
207 void re_update(int64_t re, u_int32_t time) 217 void re_update(int64_t re, u_int32_t time)
208 { 218 {
  219 + // send by pulse algorithm.
209 int64_t now = srs_get_time_ms(); 220 int64_t now = srs_get_time_ms();
210 int64_t diff = time - (now -re); 221 int64_t diff = time - (now -re);
211 if (diff > RE_PULSE_MS) { 222 if (diff > RE_PULSE_MS) {
212 usleep(diff * 1000); 223 usleep(diff * 1000);
213 } 224 }
214 } 225 }
  226 +void re_cleanup(int64_t re, u_int32_t time)
  227 +{
  228 + // for the last pulse, always sleep.
  229 + // for the virtual live encoder long time publishing.
  230 + int64_t now = srs_get_time_ms();
  231 + int64_t diff = time - (now -re);
  232 + if (diff > 0) {
  233 + trace("re_cleanup sleep for the last pulse for %d ms", (int)diff);
  234 + usleep(diff * 1000);
  235 + }
  236 +}
215 237
216 int open_flv_file(char* in_flv_file) 238 int open_flv_file(char* in_flv_file)
217 { 239 {