From 2a8db3ab83ebb92b22f4ea43321312ebfd37853a Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 21 May 2014 09:31:38 +0800 Subject: [PATCH] refine the re of ingest flv, re cleanup always sleep --- trunk/research/librtmp/srs_ingest_flv.c | 76 ++++++++++++++++--------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/trunk/research/librtmp/srs_ingest_flv.c b/trunk/research/librtmp/srs_ingest_flv.c index 45476afb2..207f0aa9c 100644 --- a/trunk/research/librtmp/srs_ingest_flv.c +++ b/trunk/research/librtmp/srs_ingest_flv.c @@ -46,6 +46,7 @@ int flv_read_packet(int flv_fd, int* type, u_int32_t* timestamp, char** data, in #define RE_PULSE_MS 300 int64_t re_create(); void re_update(int64_t re, u_int32_t time); +void re_cleanup(int64_t re, u_int32_t time); int64_t tools_main_entrance_startup_time; int main(int argc, char** argv) @@ -115,15 +116,41 @@ int main(int argc, char** argv) return ret; } -int proxy(int flv_fd, srs_rtmp_t ortmp) +int do_proxy(int flv_fd, srs_rtmp_t ortmp, int64_t re, u_int32_t* ptimestamp) { int ret = 0; // packet data int type, size; - u_int32_t timestamp = 0; char* data = NULL; + trace("start ingest flv to RTMP stream"); + for (;;) { + if ((ret = flv_read_packet(flv_fd, &type, ptimestamp, &data, &size)) != 0) { + trace("irtmp get packet failed. ret=%d", ret); + return ret; + } + verbose("irtmp got packet: type=%s, time=%d, size=%d", + srs_type2string(type), timestamp, size); + + if ((ret = srs_write_packet(ortmp, type, *ptimestamp, data, size)) != 0) { + trace("irtmp get packet failed. ret=%d", ret); + return ret; + } + verbose("ortmp sent packet: type=%s, time=%d, size=%d", + srs_type2string(type), *ptimestamp, size); + + re_update(re, *ptimestamp); + } + + return ret; +} + +int proxy(int flv_fd, srs_rtmp_t ortmp) +{ + int ret = 0; + u_int32_t timestamp = 0; + if ((ret = flv_open_ic(flv_fd)) != 0) { return ret; } @@ -131,27 +158,12 @@ int proxy(int flv_fd, srs_rtmp_t ortmp) return ret; } - // re int64_t re = re_create(); - trace("start ingest flv to RTMP stream"); - for (;;) { - if ((ret = flv_read_packet(flv_fd, &type, ×tamp, &data, &size)) != 0) { - trace("irtmp get packet failed. ret=%d", ret); - return ret; - } - verbose("irtmp got packet: type=%s, time=%d, size=%d", - srs_type2string(type), timestamp, size); - - if ((ret = srs_write_packet(ortmp, type, timestamp, data, size)) != 0) { - trace("irtmp get packet failed. ret=%d", ret); - return ret; - } - verbose("ortmp sent packet: type=%s, time=%d, size=%d", - srs_type2string(type), timestamp, size); - - re_update(re, timestamp); - } + ret = do_proxy(flv_fd, ortmp, re, ×tamp); + + // for the last pulse, always sleep. + re_cleanup(re, timestamp); return ret; } @@ -190,13 +202,11 @@ int64_t re_create() int64_t deviation = re - tools_main_entrance_startup_time; trace("deviation is %d ms, pulse is %d ms", (int)(deviation), (int)(RE_PULSE_MS)); - // so, we adjust time to max(0, deviation - pulse/10) - // because the last pulse, we never sleep, so we use pulse/10, - // for example, when EOF at the 120ms of last pulse, - // these bytes is additional data and to fill the deviation. - int adjust = (int)(deviation - (RE_PULSE_MS / 10)); + // so, we adjust time to max(0, deviation) + // because the last pulse, we already sleeped + int adjust = (int)(deviation); if (adjust > 0) { - trace("adjust re time, sub %d ms", adjust); + trace("adjust re time for %d ms", adjust); re -= adjust; } else { trace("no need to adjust re time"); @@ -206,12 +216,24 @@ int64_t re_create() } void re_update(int64_t re, u_int32_t time) { + // send by pulse algorithm. int64_t now = srs_get_time_ms(); int64_t diff = time - (now -re); if (diff > RE_PULSE_MS) { usleep(diff * 1000); } } +void re_cleanup(int64_t re, u_int32_t time) +{ + // for the last pulse, always sleep. + // for the virtual live encoder long time publishing. + int64_t now = srs_get_time_ms(); + int64_t diff = time - (now -re); + if (diff > 0) { + trace("re_cleanup sleep for the last pulse for %d ms", (int)diff); + usleep(diff * 1000); + } +} int open_flv_file(char* in_flv_file) {