提交 326289ca authored 作者: Anthony Minessale's avatar Anthony Minessale

FS-7083 #resolve #comment this should do it. The problem is linked to…

FS-7083 #resolve #comment this should do it.  The problem is linked to side-effects from the read thread being delayed by writing to the file handle.  It was so much worse on mp3 because the shout encoder blocks while its churning the data and delays it more.  This patch adds a dedicated thread for writing to the file and the channel_variable RECORD_USE_THREAD=false will disable it and sync may still be maintained at the cost of dropping more data from the audio signal.
上级 dc8c8cc7
......@@ -185,9 +185,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b
uint32_t blen;
switch_codec_implementation_t read_impl = { 0 };
int16_t *tp;
switch_size_t do_read = 0, do_write = 0;
int fill_read = 0, fill_write = 0;
switch_size_t do_read = 0, do_write = 0, has_read = 0, has_write = 0, fill_read = 0, fill_write = 0;
switch_core_session_get_read_impl(bug->session, &read_impl);
......@@ -213,50 +211,47 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b
frame->datalen = 0;
if (switch_test_flag(bug, SMBF_READ_STREAM)) {
has_read = 1;
switch_mutex_lock(bug->read_mutex);
do_read = switch_buffer_inuse(bug->raw_read_buffer);
switch_mutex_unlock(bug->read_mutex);
}
if (switch_test_flag(bug, SMBF_WRITE_STREAM)) {
has_write = 1;
switch_mutex_lock(bug->write_mutex);
do_write = switch_buffer_inuse(bug->raw_write_buffer);
switch_mutex_unlock(bug->write_mutex);
}
if (bug->record_frame_size && bug->record_pre_buffer_max && (do_read || do_write) && bug->record_pre_buffer_count < bug->record_pre_buffer_max) {
bug->record_pre_buffer_count++;
return SWITCH_STATUS_FALSE;
} else {
uint32_t frame_size;
switch_codec_implementation_t read_impl = { 0 };
//switch_codec_implementation_t other_read_impl = { 0 };
//switch_core_session_t *other_session;
switch_core_session_get_read_impl(bug->session, &read_impl);
frame_size = read_impl.decoded_bytes_per_packet;
bug->record_frame_size = frame_size;
#if 0
if (do_read && do_write) {
if (switch_core_session_get_partner(bug->session, &other_session) == SWITCH_STATUS_SUCCESS) {
switch_core_session_get_read_impl(other_session, &other_read_impl);
switch_core_session_rwunlock(other_session);
if (read_impl.actual_samples_per_second == other_read_impl.actual_samples_per_second) {
if (read_impl.decoded_bytes_per_packet < other_read_impl.decoded_bytes_per_packet) {
frame_size = read_impl.decoded_bytes_per_packet;
}
} else {
if (read_impl.decoded_bytes_per_packet > other_read_impl.decoded_bytes_per_packet) {
frame_size = read_impl.decoded_bytes_per_packet;
}
}
}
}
bug->record_frame_size = bytes = frame_size;
}
#endif
if (bug->record_frame_size && do_write > do_read && do_write > (bug->record_frame_size * 2)) {
switch_mutex_lock(bug->write_mutex);
switch_buffer_toss(bug->raw_write_buffer, bug->record_frame_size);
do_write = switch_buffer_inuse(bug->raw_write_buffer);
switch_mutex_unlock(bug->write_mutex);
}
if ((has_read && !do_read)) {
fill_read = 1;
}
if ((has_write && !do_write)) {
fill_write = 1;
}
......@@ -274,10 +269,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b
}
}
fill_read = !do_read;
fill_write = !do_write;
if ((fill_read && fill_write) || (!fill && fill_read)) {
if ((fill_read && fill_write) || (fill && (fill_read || fill_write))) {
return SWITCH_STATUS_FALSE;
}
......@@ -384,17 +376,26 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b
frame->rate = read_impl.actual_samples_per_second;
frame->codec = NULL;
if (fill_read && fill_write) {
return SWITCH_STATUS_BREAK;
}
if (fill_read || fill_write) {
return SWITCH_STATUS_BREAK;
if (switch_test_flag(bug, SMBF_STEREO)) {
frame->datalen *= 2;
frame->channels = 2;
}
memcpy(bug->session->recur_buffer, frame->data, frame->datalen);
bug->session->recur_buffer_len = frame->datalen;
if (has_read) {
switch_mutex_lock(bug->read_mutex);
do_read = switch_buffer_inuse(bug->raw_read_buffer);
switch_mutex_unlock(bug->read_mutex);
}
if (has_write) {
switch_mutex_lock(bug->write_mutex);
do_write = switch_buffer_inuse(bug->raw_write_buffer);
switch_mutex_unlock(bug->write_mutex);
}
return SWITCH_STATUS_SUCCESS;
}
......
......@@ -1044,6 +1044,10 @@ struct record_helper {
switch_bool_t hangup_on_error;
switch_codec_implementation_t read_impl;
switch_bool_t speech_detected;
switch_buffer_t *thread_buffer;
switch_thread_t *thread;
switch_mutex_t *buffer_mutex;
int thread_ready;
const char *completion_cause;
};
......@@ -1110,6 +1114,55 @@ static void send_record_stop_event(switch_channel_t *channel, switch_codec_imple
}
}
static void *SWITCH_THREAD_FUNC recording_thread(switch_thread_t *thread, void *obj)
{
switch_media_bug_t *bug = (switch_media_bug_t *) obj;
switch_core_session_t *session = switch_core_media_bug_get_session(bug);
switch_channel_t *channel = switch_core_session_get_channel(session);
struct record_helper *rh;
switch_size_t bsize = SWITCH_RECOMMENDED_BUFFER_SIZE, samples = 0, inuse = 0;
unsigned char *data = switch_core_session_alloc(session, bsize);
int channels = switch_core_media_bug_test_flag(bug, SMBF_STEREO) ? 2 : 1;
if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) {
return NULL;
}
rh = switch_core_media_bug_get_user_data(bug);
switch_buffer_create_dynamic(&rh->thread_buffer, 1024 * 512, 1024 * 64, 0);
rh->thread_ready = 1;
while(switch_test_flag(rh->fh, SWITCH_FILE_OPEN)) {
switch_mutex_lock(rh->buffer_mutex);
inuse = switch_buffer_inuse(rh->thread_buffer);
if (rh->thread_ready && switch_channel_up_nosig(channel) && inuse < bsize) {
switch_mutex_unlock(rh->buffer_mutex);
switch_yield(20000);
continue;
} else if ((!rh->thread_ready || switch_channel_down_nosig(channel)) && !inuse) {
break;
}
samples = switch_buffer_read(rh->thread_buffer, data, bsize) / 2 / channels;
switch_mutex_unlock(rh->buffer_mutex);
if (switch_core_file_write(rh->fh, data, &samples) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error writing %s\n", rh->file);
/* File write failed */
set_completion_cause(rh, "uri-failure");
if (rh->hangup_on_error) {
switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
switch_core_session_reset(session, SWITCH_TRUE, SWITCH_TRUE);
}
}
}
switch_core_session_rwunlock(session);
return NULL;
}
static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type)
{
switch_core_session_t *session = switch_core_media_bug_get_session(bug);
......@@ -1123,18 +1176,40 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s
switch (type) {
case SWITCH_ABC_TYPE_INIT:
if (switch_event_create(&event, SWITCH_EVENT_RECORD_START) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Record-File-Path", rh->file);
switch_channel_event_set_data(channel, event);
switch_event_fire(&event);
}
rh->silence_time = switch_micro_time_now();
rh->silence_timeout_ms = rh->initial_timeout_ms;
rh->speech_detected = SWITCH_FALSE;
rh->completion_cause = NULL;
{
const char *var = switch_channel_get_variable(channel, "RECORD_USE_THREAD");
if (zstr(var) || switch_true(var)) {
switch_threadattr_t *thd_attr = NULL;
switch_memory_pool_t *pool = switch_core_session_get_pool(session);
int sanity = 200;
switch_core_session_get_read_impl(session, &rh->read_impl);
switch_mutex_init(&rh->buffer_mutex, SWITCH_MUTEX_NESTED, pool);
switch_threadattr_create(&thd_attr, pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&rh->thread, thd_attr, recording_thread, bug, pool);
while(--sanity > 0 && !rh->thread_ready) {
switch_yield(10000);
}
}
switch_core_session_get_read_impl(session, &rh->read_impl);
if (switch_event_create(&event, SWITCH_EVENT_RECORD_START) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Record-File-Path", rh->file);
switch_channel_event_set_data(channel, event);
switch_event_fire(&event);
}
rh->silence_time = switch_micro_time_now();
rh->silence_timeout_ms = rh->initial_timeout_ms;
rh->speech_detected = SWITCH_FALSE;
rh->completion_cause = NULL;
switch_core_session_get_read_impl(session, &rh->read_impl);
}
break;
case SWITCH_ABC_TYPE_TAP_NATIVE_READ:
{
......@@ -1232,6 +1307,18 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s
uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE];
switch_frame_t frame = { 0 };
if (rh->thread_ready) {
switch_status_t st;
rh->thread_ready = 0;
switch_thread_join(&st, rh->thread);
}
if (rh->thread_buffer) {
switch_buffer_destroy(&rh->thread_buffer);
}
frame.data = data;
frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE;
......@@ -1313,18 +1400,24 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s
uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE];
switch_frame_t frame = { 0 };
switch_status_t status;
int i = 0;
frame.data = data;
frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE;
for (;;) {
status = switch_core_media_bug_read(bug, &frame, SWITCH_FALSE);
status = switch_core_media_bug_read(bug, &frame, i++ == 0 ? SWITCH_FALSE : SWITCH_TRUE);
if (status == SWITCH_STATUS_SUCCESS || status == SWITCH_STATUS_BREAK) {
len = (switch_size_t) frame.datalen / 2;
if (len && switch_core_file_write(rh->fh, mask ? null_data : data, &len) != SWITCH_STATUS_SUCCESS) {
if (status != SWITCH_STATUS_SUCCESS || !frame.datalen) {
break;
} else {
len = (switch_size_t) frame.datalen / 2 / frame.channels;
if (rh->thread_buffer) {
switch_mutex_lock(rh->buffer_mutex);
switch_buffer_write(rh->thread_buffer, mask ? null_data : data, frame.datalen);
switch_mutex_unlock(rh->buffer_mutex);
} else if (switch_core_file_write(rh->fh, mask ? null_data : data, &len) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error writing %s\n", rh->file);
/* File write failed */
set_completion_cause(rh, "uri-failure");
......@@ -1377,8 +1470,6 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s
rh->speech_detected = SWITCH_TRUE;
}
}
} else {
break;
}
}
}
......@@ -2269,7 +2360,7 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_record_session(switch_core_session_t
}
rh->hangup_on_error = hangup_on_error;
if ((status = switch_core_media_bug_add(session, "session_record", file,
record_callback, rh, to, flags, &bug)) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error adding media bug for file %s\n", file);
......
......@@ -1046,6 +1046,8 @@ SWITCH_MODULE_RUNTIME_FUNCTION(softtimer_runtime)
tfd = -1;
}
}
if (tfd > -1) MATRIX = 0;
}
#else
tfd = -1;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论