提交 5b499e8a authored 作者: Stefan Knoblich's avatar Stefan Knoblich

ftmod_misdn: Use a per-span I/O thread to handle B-channel data.

Move the B-channel message handling into a per-span I/O thread,
to solve most of the problems caused by the intermixed data + control
socket interface of mISDN, missing write poll() support on
mISDN B-channels and the FreeTDM I/O model. This eliminates most of
the audio problems (except for a few minor glitches).

A unix stream socket pair is used as a bi-directional pipe replacement
(the pipe code is still included in this commit, but will be removed later),
with the RX and TX buffer sizes carefully tuned to avoid excessive buffering
(= latency) and a deadlock situation between the write() call in ftdm_write()
and the code in misdn_span_run() that needs a minimum amount of data in the
TX buffer, before sending out a PH_DATA_REQ to the mISDN socket
(see misdn_span_run() comments for more details).

The minimum size for pipes is PAGE_SIZE (4k), which is ~500 ms worth of
audio. A socket pair RX/TX buffer size of 3k, seems to hold a maximum
amount of around 500 bytes data in practice, giving us a much lower
maximum latency than a unix pipe. (The socket pair might be replaced by a
custom ring buffer / fifo data structure to get even more fine grained
control of the maximum latency.)

The newly introduced span_start / span_stop callbacks in
ftdm_io_interface_t are used to start / stop the I/O thread. The callback
functions will wait up to 10 seconds for the thread to successfully
start up or shut down (using a mutex + condition var).

NOTE: Using any of the locking ftdm_span_() functions in the I/O will cause
      a deadlock between the I/O thread (trying to lock span->mutex) and the
      thread calling ftdm_start()/_stop() (holding the span->mutex).
      (The I/O thread currently uses direct span member access to avoid this.)

The I/O thread uses the epoll(7) family of functions for event handling.
An epoll context is created on startup and all B-channel sockets are
registered (READ, PRI and ERR). Before entering the event loop,
the I/O thread will send a signal on the condition variable, to
indicate it has completed the startup procedure.

Incoming b-channel and command pipe events are handled by the event loop.
Payload of incoming PH_DATA_IND frames (= audio data) is sent to the
rx_audio_pipe_in end of the b-channel's socket pair and, if enough data is
available, a PH_DATA_REQ of the same size is sent to the b-channel mISDN socket
to transmit audio.

A MISDN_CMD_STOP command on the event pipe will wake up the I/O thread and
cause it to shut down. All b-channels will be unregistered from the epoll context
and the epoll fd closed. The I/O thread terminates itself after signalling the
successfull shutdown on the condition variable.

TODOs:
    - Move D-Channel into I/O thread too

    - Custom FIFO/ring buffer for data (even lower latency)

    - Improve epoll() code (per-channel struct w/ callback, for epfd.data.ptr)

    - Use mISDN DSP for audio (e.g. tone generator, dtmf detector, echo cancel)

    - Use a per-port / span control socket to execute channel commands
      synchronously, or add misdn_commands (queue?) that can be used that way

    - Name I/O threads 'mISDN-%SPAN_NAME%', e.g. 'mISDN-M_BRI1'
      (= add ftdm_thread_set_namef(thread, fmt, ...) / ftdm_thread_set_name(thread, name))

TL;DR: "tweak", solves "booboo" with audio
Signed-off-by: 's avatarStefan Knoblich <stkn@openisdn.net>
上级 72d67cdb
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <poll.h> #include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <sys/epoll.h>
/* this is how it should have been... /* this is how it should have been...
#ifdef HAVE_FREETDM_FREETDM_H #ifdef HAVE_FREETDM_FREETDM_H
...@@ -70,6 +71,7 @@ ...@@ -70,6 +71,7 @@
//#define MISDN_DEBUG_EVENTS //#define MISDN_DEBUG_EVENTS
//#define MISDN_DEBUG_IO //#define MISDN_DEBUG_IO
#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))
typedef enum { typedef enum {
MISDN_CAPS_NONE = 0, MISDN_CAPS_NONE = 0,
...@@ -145,7 +147,7 @@ const static struct { ...@@ -145,7 +147,7 @@ const static struct {
#undef MISDN_CONTROL_TYPE #undef MISDN_CONTROL_TYPE
}; };
#if 0 /* unused for now */ #ifdef MISDN_DEBUG_EVENTS
static const char *misdn_control2str(const int ctrl) static const char *misdn_control2str(const int ctrl)
{ {
int x; int x;
...@@ -219,6 +221,16 @@ static inline void misdn_convert_audio_bits(char *buf, int buflen) ...@@ -219,6 +221,16 @@ static inline void misdn_convert_audio_bits(char *buf, int buflen)
* mISDN <-> FreeTDM data structures * mISDN <-> FreeTDM data structures
***********************************************************************************/ ***********************************************************************************/
typedef enum {
MISDN_CMD_NONE = 0,
MISDN_CMD_STOP, /*!< Stop the I/O thread */
} misdn_cmd_t;
struct misdn_command {
misdn_cmd_t type;
/* union { } cmd; */ /*!< Command-specific parameters */
};
enum { enum {
MISDN_SPAN_NONE = 0, MISDN_SPAN_NONE = 0,
MISDN_SPAN_RUNNING = (1 << 0), MISDN_SPAN_RUNNING = (1 << 0),
...@@ -227,10 +239,18 @@ enum { ...@@ -227,10 +239,18 @@ enum {
struct misdn_span_private { struct misdn_span_private {
int flags; int flags;
int running;
int event_pipe_in;
int event_pipe_out;
/* event conditional */ /* event conditional */
pthread_mutex_t event_cond_mutex; pthread_mutex_t event_cond_mutex;
pthread_cond_t event_cond; pthread_cond_t event_cond;
/* start / stop feedback */
pthread_mutex_t ctrl_cond_mutex;
pthread_cond_t ctrl_cond;
}; };
struct misdn_event_queue; struct misdn_event_queue;
...@@ -243,9 +263,11 @@ struct misdn_chan_private { ...@@ -243,9 +263,11 @@ struct misdn_chan_private {
/* hw addr of channel */ /* hw addr of channel */
struct sockaddr_mISDN addr; struct sockaddr_mISDN addr;
/* audio tx pipe */ /* audio tx pipe (= socketpair ends) */
int audio_pipe_in; int tx_audio_pipe_in;
int audio_pipe_out; int tx_audio_pipe_out;
int rx_audio_pipe_in;
int rx_audio_pipe_out;
/* counters */ /* counters */
unsigned long tx_cnt; unsigned long tx_cnt;
...@@ -254,6 +276,14 @@ struct misdn_chan_private { ...@@ -254,6 +276,14 @@ struct misdn_chan_private {
unsigned long slip_rx_cnt; unsigned long slip_rx_cnt;
unsigned long slip_tx_cnt; unsigned long slip_tx_cnt;
unsigned long tx_pipe_wr_bytes; /*!< Number of bytes written into tx audio pipe */
unsigned long tx_pipe_rd_bytes; /*!< Number of bytes read from tx audio pipe */
unsigned long tx_miss_bytes; /*!< Number of bytes missing in short reads from tx audio pipe */
unsigned long tx_lost_bytes; /*!< Number of bytes lost in short writes to the mISDN B-Channel */
unsigned long tx_sent_bytes; /*!< Number of bytes successfully sent to the mISDN B-Channel */
unsigned long tx_pipe_under_cnt; /*!< Number of tx audio pipe underflows */
unsigned long tx_pipe_over_cnt; /*!< Number of tx audio pipe overflows */
struct misdn_event_queue *events; struct misdn_event_queue *events;
}; };
...@@ -557,11 +587,11 @@ static ftdm_status_t _misdn_toggle_channel(ftdm_channel_t *chan, int activate) ...@@ -557,11 +587,11 @@ static ftdm_status_t _misdn_toggle_channel(ftdm_channel_t *chan, int activate)
(activate) ? "activation" : "deactivation", strerror(errno)); (activate) ? "activation" : "deactivation", strerror(errno));
return FTDM_FAIL; return FTDM_FAIL;
} }
//#ifdef MISDN_DEBUG_EVENTS #ifdef MISDN_DEBUG_EVENTS
ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN got event '%s (%#x)', id %#x, while waiting for %s confirmation on %c-channel\n", ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN got event '%s (%#x)', id %#x, while waiting for %s confirmation on %c-channel\n",
misdn_event2str(hh->prim), hh->prim, hh->id, (activate) ? "activation" : "deactivation", misdn_event2str(hh->prim), hh->prim, hh->id, (activate) ? "activation" : "deactivation",
ftdm_channel_get_type(chan) == FTDM_CHAN_TYPE_B ? 'B' : 'D'); ftdm_channel_get_type(chan) == FTDM_CHAN_TYPE_B ? 'B' : 'D');
//#endif #endif
switch (hh->prim) { switch (hh->prim) {
case PH_ACTIVATE_IND: case PH_ACTIVATE_IND:
case PH_ACTIVATE_CNF: case PH_ACTIVATE_CNF:
...@@ -685,10 +715,10 @@ static ftdm_status_t misdn_get_ph_info(ftdm_channel_t *chan, struct ph_info *inf ...@@ -685,10 +715,10 @@ static ftdm_status_t misdn_get_ph_info(ftdm_channel_t *chan, struct ph_info *inf
misdn_event2str(req), strerror(errno)); misdn_event2str(req), strerror(errno));
return FTDM_FAIL; return FTDM_FAIL;
} }
//#ifdef MISDN_DEBUG_EVENTS #ifdef MISDN_DEBUG_EVENTS
ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN got event '%s' while waiting for %s answer\n", ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN got event '%s' while waiting for %s answer\n",
misdn_event2str(hh->prim), misdn_event2str(req)); misdn_event2str(hh->prim), misdn_event2str(req));
//#endif #endif
switch (hh->prim) { switch (hh->prim) {
case MPH_INFORMATION_IND: /* success */ case MPH_INFORMATION_IND: /* success */
if (retval < MISDN_HEADER_LEN + sizeof(*info)) { if (retval < MISDN_HEADER_LEN + sizeof(*info)) {
...@@ -936,7 +966,7 @@ static int misdn_handle_mph_information_ind(ftdm_channel_t *chan, const struct m ...@@ -936,7 +966,7 @@ static int misdn_handle_mph_information_ind(ftdm_channel_t *chan, const struct m
* mISDN <-> FreeTDM interface functions * mISDN <-> FreeTDM interface functions
***********************************************************************************/ ***********************************************************************************/
struct misdn_globals { static struct misdn_globals {
int sockfd; int sockfd;
} globals; } globals;
...@@ -1022,6 +1052,11 @@ static FIO_CLOSE_FUNCTION(misdn_close) ...@@ -1022,6 +1052,11 @@ static FIO_CLOSE_FUNCTION(misdn_close)
ftdm_channel_get_type(ftdmchan) == FTDM_CHAN_TYPE_B ? 'B' : 'D'); ftdm_channel_get_type(ftdmchan) == FTDM_CHAN_TYPE_B ? 'B' : 'D');
} }
ftdm_log_chan(ftdmchan, FTDM_LOG_NOTICE, "mISDN tx stats: wr: %lu, rd: %lu, tx: %lu, tx-lost: %lu, tx-miss: %lu, tx-under#: %lu, tx-over#: %lu\n",
chan_priv->tx_pipe_wr_bytes, chan_priv->tx_pipe_rd_bytes,
chan_priv->tx_sent_bytes, chan_priv->tx_lost_bytes, chan_priv->tx_miss_bytes,
chan_priv->tx_pipe_over_cnt, chan_priv->tx_pipe_under_cnt);
chan_priv->active = 0; chan_priv->active = 0;
} }
...@@ -1109,16 +1144,22 @@ static FIO_WAIT_FUNCTION(misdn_wait) ...@@ -1109,16 +1144,22 @@ static FIO_WAIT_FUNCTION(misdn_wait)
switch (ftdm_channel_get_type(ftdmchan)) { switch (ftdm_channel_get_type(ftdmchan)) {
case FTDM_CHAN_TYPE_B: case FTDM_CHAN_TYPE_B:
if (*flags & FTDM_WRITE) { if (*flags & FTDM_WRITE) {
pfds[nr_fds].fd = chan_priv->audio_pipe_in; pfds[nr_fds].fd = chan_priv->tx_audio_pipe_in;
pfds[nr_fds].events = POLLOUT; pfds[nr_fds].events = POLLOUT;
nr_fds++; nr_fds++;
} }
if (*flags & (FTDM_READ | FTDM_EVENTS)) { if (*flags & FTDM_READ) {
pfds[nr_fds].fd = chan_priv->rx_audio_pipe_out;
pfds[nr_fds].events = POLLIN;
nr_fds++;
}
/* if (*flags & (FTDM_READ | FTDM_EVENTS)) {
pfds[nr_fds].fd = ftdmchan->sockfd; pfds[nr_fds].fd = ftdmchan->sockfd;
pfds[nr_fds].events |= (*flags & FTDM_READ) ? POLLIN : 0; pfds[nr_fds].events |= (*flags & FTDM_READ) ? POLLIN : 0;
pfds[nr_fds].events |= (*flags & FTDM_EVENTS) ? POLLPRI : 0; pfds[nr_fds].events |= (*flags & FTDM_EVENTS) ? POLLPRI : 0;
nr_fds++; nr_fds++;
} }
*/
break; break;
default: default:
if (*flags & FTDM_READ) if (*flags & FTDM_READ)
...@@ -1149,7 +1190,7 @@ static FIO_WAIT_FUNCTION(misdn_wait) ...@@ -1149,7 +1190,7 @@ static FIO_WAIT_FUNCTION(misdn_wait)
switch (ftdm_channel_get_type(ftdmchan)) { switch (ftdm_channel_get_type(ftdmchan)) {
case FTDM_CHAN_TYPE_B: case FTDM_CHAN_TYPE_B:
if (pfds[0].revents & POLLOUT) if ((pfds[0].revents & POLLOUT) || (pfds[1].revents & POLLOUT))
*flags |= FTDM_WRITE; *flags |= FTDM_WRITE;
if ((pfds[0].revents & POLLIN) || (pfds[1].revents & POLLIN)) if ((pfds[0].revents & POLLIN) || (pfds[1].revents & POLLIN))
*flags |= FTDM_READ; *flags |= FTDM_READ;
...@@ -1171,10 +1212,10 @@ static FIO_WAIT_FUNCTION(misdn_wait) ...@@ -1171,10 +1212,10 @@ static FIO_WAIT_FUNCTION(misdn_wait)
/** /**
* Handle incoming mISDN message on d-channel * Handle incoming mISDN message on d-channel
* @param[in] ftdmchan * \param[in] ftdmchan
* @param[in] msg_buf * \param[in] msg_buf
* @param[in] msg_len * \param[in] msg_len
* @internal * \internal
*/ */
static ftdm_status_t misdn_handle_incoming(ftdm_channel_t *ftdmchan, const char *msg_buf, const int msg_len) static ftdm_status_t misdn_handle_incoming(ftdm_channel_t *ftdmchan, const char *msg_buf, const int msg_len)
{ {
...@@ -1258,13 +1299,6 @@ static FIO_READ_FUNCTION(misdn_read) ...@@ -1258,13 +1299,6 @@ static FIO_READ_FUNCTION(misdn_read)
int retval; int retval;
int maxretry = 10; int maxretry = 10;
if (!priv->active) {
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN ignoring read on closed channel\n");
/* ignore */
*datalen = 0;
return FTDM_SUCCESS;
}
/* nothing read yet */ /* nothing read yet */
*datalen = 0; *datalen = 0;
...@@ -1273,6 +1307,8 @@ static FIO_READ_FUNCTION(misdn_read) ...@@ -1273,6 +1307,8 @@ static FIO_READ_FUNCTION(misdn_read)
* we'll get a lot of "mISDN_send: error -12" message in dmesg otherwise * we'll get a lot of "mISDN_send: error -12" message in dmesg otherwise
* (= b-channel receive queue overflowing) * (= b-channel receive queue overflowing)
*/ */
switch (ftdm_channel_get_type(ftdmchan)) {
case FTDM_CHAN_TYPE_DQ921: {
while (maxretry--) { while (maxretry--) {
struct sockaddr_mISDN addr; struct sockaddr_mISDN addr;
socklen_t addrlen = sizeof(addr); socklen_t addrlen = sizeof(addr);
...@@ -1291,16 +1327,7 @@ static FIO_READ_FUNCTION(misdn_read) ...@@ -1291,16 +1327,7 @@ static FIO_READ_FUNCTION(misdn_read)
if (hh->prim == PH_DATA_IND) { if (hh->prim == PH_DATA_IND) {
*datalen = ftdm_clamp(retval - MISDN_HEADER_LEN, 0, bytes); *datalen = ftdm_clamp(retval - MISDN_HEADER_LEN, 0, bytes);
#ifdef MISDN_DEBUG_IO
ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "misdn_read() received '%s', id: %#x, with %d bytes from channel socket %d [dev.ch: %d.%d]\n",
misdn_event2str(hh->prim), hh->id, retval - MISDN_HEADER_LEN, ftdmchan->sockfd, addr.dev, addr.channel);
if (*datalen > 0) {
char hbuf[MAX_DATA_MEM] = { 0 };
print_hex_bytes(data, *datalen, hbuf, sizeof(hbuf));
ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "mISDN read data: %s\n", hbuf);
}
#endif
if (*datalen <= 0) if (*datalen <= 0)
continue; continue;
...@@ -1309,76 +1336,33 @@ static FIO_READ_FUNCTION(misdn_read) ...@@ -1309,76 +1336,33 @@ static FIO_READ_FUNCTION(misdn_read)
* NOTE: audio data needs to be converted to a-law / u-law! * NOTE: audio data needs to be converted to a-law / u-law!
*/ */
memcpy(data, rbuf + MISDN_HEADER_LEN, *datalen); memcpy(data, rbuf + MISDN_HEADER_LEN, *datalen);
return FTDM_SUCCESS;
switch (ftdm_channel_get_type(ftdmchan)) { } else {
case FTDM_CHAN_TYPE_B: *datalen = 0;
hh->prim = PH_DATA_REQ; /* event */
hh->id = MISDN_ID_ANY; misdn_handle_incoming(ftdmchan, rbuf, retval);
bytes = *datalen;
/* Convert incoming audio data to *-law */
misdn_convert_audio_bits(data, *datalen);
/*
* Fetch required amount of audio from tx pipe, using the amount
* of received bytes as an indicator for how much free space the
* b-channel tx buffer has available.
*
* (see misdn_write() for the part that fills the tx pipe)
*
* NOTE: can't use blocking I/O here since both parts are serviced
* from the same thread
*/
if ((retval = read(priv->audio_pipe_out, rbuf + MISDN_HEADER_LEN, bytes)) < 0) {
if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to read %d bytes of audio data: %s\n",
bytes, strerror(errno));
break;
} }
/* Tx pipe is empty, completely fill buffer up to "bytes" with silence value */
retval = 0;
} }
break;
/* }
* Use a-law / u-law silence to fill missing bytes, case FTDM_CHAN_TYPE_B: {
* in case there was not enough audio data available in the if (!priv->active) {
* tx pipe to satisfy the request. ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN ignoring read on closed b-channel\n");
*/ return FTDM_SUCCESS;
if (retval < bytes) {
memset(&rbuf[MISDN_HEADER_LEN + retval],
(ftdm_channel_get_codec(ftdmchan) == FTDM_CODEC_ALAW) ? 0x2a : 0xff,
bytes - retval);
} }
/* Convert outgoing audio data to wire format */ if ((retval = read(priv->rx_audio_pipe_out, data, bytes)) < 0) {
misdn_convert_audio_bits(rbuf + MISDN_HEADER_LEN, bytes); ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to read audio data from rx pipe: %s\n",
bytes += MISDN_HEADER_LEN; strerror(errno));
return FTDM_FAIL;
/* Send converted audio to b-channel */
if ((retval = sendto(ftdmchan->sockfd, rbuf, bytes, 0, (struct sockaddr *)&priv->addr, sizeof(priv->addr))) < bytes) {
ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to send %d bytes of audio data: (%d) %s\n",
bytes, retval, strerror(errno));
} }
*datalen = retval;
break; break;
}
default: default:
break; break;
} }
return FTDM_SUCCESS; return FTDM_SUCCESS;
} else {
*datalen = 0;
#ifdef MISDN_DEBUG_IO
ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "misdn_read() received '%s', id: %#x, with %d bytes from channel socket %d [dev.ch: %d.%d]\n",
misdn_event2str(hh->prim), hh->id, retval - MISDN_HEADER_LEN, ftdmchan->sockfd, addr.dev, addr.channel);
#endif
/* event */
misdn_handle_incoming(ftdmchan, rbuf, retval);
}
}
#ifdef MISDN_DEBUG_IO
ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "mISDN nothing received on %c-channel\n",
ftdm_channel_get_type(ftdmchan) == FTDM_CHAN_TYPE_B ? 'B' : 'D');
#endif
return FTDM_SUCCESS;
} }
/** /**
...@@ -1427,11 +1411,18 @@ static FIO_WRITE_FUNCTION(misdn_write) ...@@ -1427,11 +1411,18 @@ static FIO_WRITE_FUNCTION(misdn_write)
* NOTE: can't use blocking I/O here since both parts are serviced * NOTE: can't use blocking I/O here since both parts are serviced
* from the same thread * from the same thread
*/ */
if ((retval = write(priv->audio_pipe_in, data, size)) < size) { if ((retval = write(priv->tx_audio_pipe_in, data, size)) < 0) {
ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN channel audio pipe write error: %s\n", ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN channel audio pipe write error [wr: %lu, rd: %lu: tx: %lu, tx-under#: %lu, tx-over#: %lu]: %s\n",
strerror(errno)); priv->tx_pipe_wr_bytes, priv->tx_pipe_rd_bytes, priv->tx_sent_bytes,
priv->tx_pipe_under_cnt, priv->tx_pipe_over_cnt, strerror(errno));
return FTDM_FAIL; return FTDM_FAIL;
} else if (retval < size) {
priv->tx_pipe_over_cnt++;
ftdm_log_chan(ftdmchan, FTDM_LOG_WARNING, "mISDN channel audio pipe short write [wr: %lu, rd: %lu: tx: %lu, tx-under#: %lu, tx-over#: %lu], expected: %d, written: %d\n",
priv->tx_pipe_wr_bytes, priv->tx_pipe_rd_bytes, priv->tx_sent_bytes,
priv->tx_pipe_under_cnt, priv->tx_pipe_over_cnt, size, retval);
} }
ACCESS_ONCE(priv->tx_pipe_wr_bytes) += retval;
*datalen = retval; *datalen = retval;
break; break;
default: default:
...@@ -1482,6 +1473,12 @@ static FIO_WRITE_FUNCTION(misdn_write) ...@@ -1482,6 +1473,12 @@ static FIO_WRITE_FUNCTION(misdn_write)
return FTDM_SUCCESS; return FTDM_SUCCESS;
} }
/**
* Carefully choosen size for socket send/recv buffers
* larger values will add more latency, while lower values will cause deadlocks
* (see misdn_span_run() comments below for an explanation)
*/
#define SOCKETPAIR_BUFFER_SIZE 3072
static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type, struct mISDN_devinfo *devinfo, int start, int end) static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type, struct mISDN_devinfo *devinfo, int start, int end)
{ {
...@@ -1556,9 +1553,11 @@ static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type, ...@@ -1556,9 +1553,11 @@ static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type,
ftdm_log(FTDM_LOG_DEBUG, "mISDN opened socket (on chan:dev => %d:%d): %d\n", ftdm_log(FTDM_LOG_DEBUG, "mISDN opened socket (on chan:dev => %d:%d): %d\n",
addr.dev, addr.channel, sockfd); addr.dev, addr.channel, sockfd);
/* set non-blocking */ /* Set mISDN channel socket non-blocking */
if (fcntl(sockfd, F_SETFL, O_NONBLOCK) < 0) { if (fcntl(sockfd, F_SETFL, O_NONBLOCK) < 0) {
ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket fd to non-blocking: %s\n", strerror(errno)); ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket fd to non-blocking: %s\n",
strerror(errno));
close(sockfd);
return FTDM_FAIL; return FTDM_FAIL;
} }
...@@ -1604,25 +1603,109 @@ static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type, ...@@ -1604,25 +1603,109 @@ static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type,
if (ftdmchan->type == FTDM_CHAN_TYPE_B) { if (ftdmchan->type == FTDM_CHAN_TYPE_B) {
int pipefd[2] = { -1, -1 }; int pipefd[2] = { -1, -1 };
ftdmchan->packet_len = 10 /* ms */ * (ftdmchan->rate / 1000); ftdmchan->packet_len = 30 /* ms */ * (ftdmchan->rate / 1000);
ftdmchan->effective_interval = ftdmchan->native_interval = ftdmchan->packet_len / 8; ftdmchan->effective_interval = ftdmchan->native_interval = ftdmchan->packet_len / 8;
ftdmchan->native_codec = ftdmchan->effective_codec = FTDM_CODEC_ALAW; ftdmchan->native_codec = ftdmchan->effective_codec = FTDM_CODEC_ALAW;
ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_INTERVAL); // ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_INTERVAL);
#ifdef USE_PIPE
/* /*
* Create audio tx pipe, use non-blocking I/O to avoid deadlock since both ends * Create audio tx pipe, use non-blocking I/O to avoid deadlock since both ends
* are used from the same thread * are used from the same thread
*/ */
if (pipe2(pipefd, O_NONBLOCK) < 0) { if (pipe2(pipefd, 0 | O_NONBLOCK) < 0) {
ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio write pipe [%d:%d]: %s\n", ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio tx pipe [%d:%d]: %s\n",
addr.dev, x, strerror(errno)); addr.dev, x, strerror(errno));
close(sockfd); close(sockfd);
return FTDM_FAIL; return FTDM_FAIL;
} }
priv->audio_pipe_in = pipefd[1]; priv->tx_audio_pipe_in = pipefd[1];
priv->audio_pipe_out = pipefd[0]; priv->tx_audio_pipe_out = pipefd[0];
#if 1 || defined(HAVE_F_SETPIPE_SZ)
if (fcntl(priv->tx_audio_pipe_in, F_SETPIPE_SZ, 4096) < 0) {
ftdm_log(FTDM_LOG_WARNING, "Failed to set mISDN audio tx pipe size [%d:%d]: %s\n",
addr.dev, x, strerror(errno));
}
#endif
/*
* Create audio rx pipe, use non-blocking I/O to avoid deadlock since both ends
* are used from the same thread
*/
if (pipe2(pipefd, 0 | O_NONBLOCK) < 0) {
ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio rx pipe [%d:%d]: %s\n",
addr.dev, x, strerror(errno));
close(sockfd);
return FTDM_FAIL;
}
priv->rx_audio_pipe_in = pipefd[1];
priv->rx_audio_pipe_out = pipefd[0];
#if 1 || defined(HAVE_F_SETPIPE_SZ)
if (fcntl(priv->rx_audio_pipe_in, F_SETPIPE_SZ, 4096) < 0) {
ftdm_log(FTDM_LOG_WARNING, "Failed to set mISDN audio rx pipe size [%d:%d]: %s\n",
addr.dev, x, strerror(errno));
}
#endif
#else /* !USE_PIPE */
/*
* Use a socket pair for audio rx/tx, allows for more fine-grained control
* of latency (= amounts of data in buffers)
*/
if (socketpair(AF_UNIX, SOCK_STREAM, 0, pipefd) < 0) {
ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio socket pair [%d:%d]: %s\n",
addr.dev, x, strerror(errno));
close(sockfd);
return FTDM_FAIL;
} else {
int opt = SOCKETPAIR_BUFFER_SIZE;
socklen_t optlen = sizeof(opt);
if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0) {
ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket pair fd[0] to non-blocking: %s\n",
strerror(errno));
close(sockfd);
close(pipefd[0]);
close(pipefd[1]);
return FTDM_FAIL;
}
if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0) {
ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket pair fd[1] to non-blocking: %s\n",
strerror(errno));
close(sockfd);
close(pipefd[0]);
close(pipefd[1]);
return FTDM_FAIL;
}
/*
* Set RX/TX buffer sizes on each end of the socket pair
*/
if (setsockopt(pipefd[0], SOL_SOCKET, SO_RCVBUF, &opt, optlen) < 0) {
ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[0] RCVBUF: %s\n",
strerror(errno));
}
if (setsockopt(pipefd[0], SOL_SOCKET, SO_SNDBUF, &opt, optlen) < 0) {
ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[0] SNDBUF: %s\n",
strerror(errno));
}
if (setsockopt(pipefd[1], SOL_SOCKET, SO_RCVBUF, &opt, optlen) < 0) {
ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[1] RCVBUF: %s\n",
strerror(errno));
}
if (setsockopt(pipefd[1], SOL_SOCKET, SO_SNDBUF, &opt, optlen) < 0) {
ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[1] SNDBUF: %s\n",
strerror(errno));
}
priv->rx_audio_pipe_in = pipefd[1];
priv->rx_audio_pipe_out = pipefd[0];
priv->tx_audio_pipe_in = pipefd[0];
priv->tx_audio_pipe_out = pipefd[1];
}
#endif
} else { } else {
/* early activate D-Channel */ /* early activate D-Channel */
misdn_activate_channel(ftdmchan); misdn_activate_channel(ftdmchan);
...@@ -1738,6 +1821,8 @@ static FIO_CONFIGURE_SPAN_FUNCTION(misdn_configure_span) ...@@ -1738,6 +1821,8 @@ static FIO_CONFIGURE_SPAN_FUNCTION(misdn_configure_span)
/* allocate span private */ /* allocate span private */
if (!span_priv) { if (!span_priv) {
int pipe[2] = { -1, -1 };
/* /*
* Not perfect, there should be something like span_create too * Not perfect, there should be something like span_create too
*/ */
...@@ -1751,6 +1836,19 @@ static FIO_CONFIGURE_SPAN_FUNCTION(misdn_configure_span) ...@@ -1751,6 +1836,19 @@ static FIO_CONFIGURE_SPAN_FUNCTION(misdn_configure_span)
/* init event condition */ /* init event condition */
pthread_cond_init(&span_priv->event_cond, NULL); pthread_cond_init(&span_priv->event_cond, NULL);
pthread_mutex_init(&span_priv->event_cond_mutex, NULL); pthread_mutex_init(&span_priv->event_cond_mutex, NULL);
/* init control condition */
pthread_cond_init(&span_priv->ctrl_cond, NULL);
pthread_mutex_init(&span_priv->ctrl_cond_mutex, NULL);
/* create event pipe */
if (pipe2(pipe, O_CLOEXEC) < 0) {
ftdm_log(FTDM_LOG_ERROR, "mISDN failed to create event pipe: %s\n",
strerror(errno));
return FTDM_FAIL;
}
span_priv->event_pipe_in = pipe[0];
span_priv->event_pipe_out = pipe[1];
} }
/* split channel list by ',' */ /* split channel list by ',' */
...@@ -1819,8 +1917,6 @@ static FIO_GET_ALARMS_FUNCTION(misdn_get_alarms) ...@@ -1819,8 +1917,6 @@ static FIO_GET_ALARMS_FUNCTION(misdn_get_alarms)
add event queues and data fifos, so we can sift all the add event queues and data fifos, so we can sift all the
messages we get to forward them to the right receiver messages we get to forward them to the right receiver
*/ */
ftdm_span_t *span = ftdm_channel_get_span(ftdmchan);
struct misdn_span_private *span_priv = ftdm_span_io_private(span);
char buf[MAX_DATA_MEM] = { 0 }; char buf[MAX_DATA_MEM] = { 0 };
struct sockaddr_mISDN addr; struct sockaddr_mISDN addr;
struct mISDNhead *hh; struct mISDNhead *hh;
...@@ -1910,31 +2006,50 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event) ...@@ -1910,31 +2006,50 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event)
int retval = 0, nr_events = 0; int retval = 0, nr_events = 0;
int i; int i;
clock_gettime(CLOCK_REALTIME, &ts);
ts_add_msec(&ts, ms);
for (i = 1; i <= ftdm_span_get_chan_count(span); i++) { for (i = 1; i <= ftdm_span_get_chan_count(span); i++) {
ftdm_channel_t *chan = ftdm_span_get_channel(span, i); ftdm_channel_t *chan = ftdm_span_get_channel(span, i);
struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan); struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan);
/* Skip channels that have event processing pending (Avoids event storms) */
if (ftdm_test_io_flag(chan, FTDM_CHANNEL_IO_EVENT))
continue;
if (misdn_event_queue_has_data(chan_priv->events)) { if (misdn_event_queue_has_data(chan_priv->events)) {
#ifdef MISDN_DEBUG_EVENTS #ifdef MISDN_DEBUG_EVENTS
ftdm_log(FTDM_LOG_DEBUG, "mISDN channel %d:%d has event(s)\n", ftdm_log(FTDM_LOG_DEBUG, "mISDN channel %d:%d has event(s)\n",
ftdm_channel_get_span_id(chan), ftdm_channel_get_id(chan)); ftdm_channel_get_span_id(chan), ftdm_channel_get_id(chan));
#endif #endif
ftdm_set_flag(chan, FTDM_CHANNEL_IO_EVENT); ftdm_set_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
chan->last_event_time = ftdm_current_time_in_ms(); chan->last_event_time = ftdm_current_time_in_ms();
nr_events++; nr_events++;
} }
} }
if (nr_events) if (nr_events) {
#ifdef MISDN_DEBUG_EVENTS
ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d has %d new events pending (pre poll)\n",
ftdm_span_get_id(span), nr_events);
#endif
return FTDM_SUCCESS; return FTDM_SUCCESS;
}
#ifdef MISDN_DEBUG_EVENTS
ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d has no events pending, polling for new events with %d ms timeout\n",
ftdm_span_get_id(span), ms);
#endif
/* Wait at least 1 ms, max 1 s */
ms = ftdm_clamp(ms, 1, 1000);
clock_gettime(CLOCK_REALTIME, &ts);
ts_add_msec(&ts, ms);
if ((retval = pthread_cond_timedwait(&span_priv->event_cond, &span_priv->event_cond_mutex, &ts))) { if ((retval = pthread_cond_timedwait(&span_priv->event_cond, &span_priv->event_cond_mutex, &ts))) {
switch (retval) { switch (retval) {
case ETIMEDOUT: case ETIMEDOUT:
// ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d: No events within %d ms\n", #ifdef MISDN_DEBUG_EVENTS
// ftdm_span_get_id(span), ms); ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d: No events within %d ms\n",
ftdm_span_get_id(span), ms);
#endif
return FTDM_TIMEOUT; return FTDM_TIMEOUT;
default: default:
ftdm_log(FTDM_LOG_DEBUG, "mISDN failed to poll for events on span %d: %s\n", ftdm_log(FTDM_LOG_DEBUG, "mISDN failed to poll for events on span %d: %s\n",
...@@ -1943,12 +2058,20 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event) ...@@ -1943,12 +2058,20 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event)
} }
} }
#ifdef MISDN_DEBUG_EVENTS
ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d received new event notification, checking channel event queues\n",
ftdm_span_get_id(span));
#endif
for (i = 1; i <= ftdm_span_get_chan_count(span); i++) { for (i = 1; i <= ftdm_span_get_chan_count(span); i++) {
ftdm_channel_t *chan = ftdm_span_get_channel(span, i); ftdm_channel_t *chan = ftdm_span_get_channel(span, i);
struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan); struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan);
/* Skip channels that have event processing pending (Avoids event storms) */
if (ftdm_test_io_flag(chan, FTDM_CHANNEL_IO_EVENT))
continue;
if (misdn_event_queue_has_data(chan_priv->events)) { if (misdn_event_queue_has_data(chan_priv->events)) {
ftdm_set_flag(chan, FTDM_CHANNEL_IO_EVENT); ftdm_set_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
chan->last_event_time = ftdm_current_time_in_ms(); chan->last_event_time = ftdm_current_time_in_ms();
nr_events++; nr_events++;
} }
...@@ -1956,6 +2079,57 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event) ...@@ -1956,6 +2079,57 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event)
return (nr_events) ? FTDM_SUCCESS : FTDM_TIMEOUT; /* no events? => timeout */ return (nr_events) ? FTDM_SUCCESS : FTDM_TIMEOUT; /* no events? => timeout */
} }
/**
* Retrieve event from channel
* \param ftdmchan Channel to retrieve event from
* \param event FreeTDM event to return
* \return Success or failure
*/
static FIO_CHANNEL_NEXT_EVENT_FUNCTION(misdn_channel_next_event)
{
struct misdn_chan_private *chan_priv = ftdm_chan_io_private(ftdmchan);
struct misdn_event *evt = NULL;
ftdm_span_t *span = ftdm_channel_get_span(ftdmchan);
uint32_t event_id = FTDM_OOB_INVALID;
ftdm_assert(span, "span == NULL");
ftdm_clear_io_flag(ftdmchan, FTDM_CHANNEL_IO_EVENT);
if (!(evt = misdn_event_queue_pop(chan_priv->events))) {
#ifdef MISDN_DEBUG_EVENTS
ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN channel event queue has no events\n");
#endif
return FTDM_FAIL;
}
#ifdef MISDN_DEBUG_EVENTS
ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "Got event '%s' from channel event queue\n",
misdn_event2str(evt->id));
#endif
/* Convert from misdn event to ftdm */
switch (evt->id) {
case PH_DEACTIVATE_IND:
event_id = FTDM_OOB_ALARM_TRAP;
ftdmchan->alarm_flags |= FTDM_ALARM_RED;
break;
case PH_ACTIVATE_IND:
event_id = FTDM_OOB_ALARM_CLEAR;
ftdmchan->alarm_flags &= ~FTDM_ALARM_RED;
break;
default:
ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "Unhandled event id %d (0x%x) %s\n",
evt->id, evt->id, misdn_event2str(evt->id));
}
ftdmchan->last_event_time = 0;
span->event_header.e_type = FTDM_EVENT_OOB;
span->event_header.enum_id = event_id;
span->event_header.channel = ftdmchan;
*event = &span->event_header;
return FTDM_SUCCESS;
}
/** /**
* \brief Retrieve event * \brief Retrieve event
* \param span FreeTDM span * \param span FreeTDM span
...@@ -1974,11 +2148,12 @@ static FIO_SPAN_NEXT_EVENT_FUNCTION(misdn_next_event) ...@@ -1974,11 +2148,12 @@ static FIO_SPAN_NEXT_EVENT_FUNCTION(misdn_next_event)
struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan); struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan);
struct misdn_event *evt = NULL; struct misdn_event *evt = NULL;
ftdm_clear_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
if (!(evt = misdn_event_queue_pop(chan_priv->events))) { if (!(evt = misdn_event_queue_pop(chan_priv->events))) {
#ifdef MISDN_DEBUG_EVENTS #ifdef MISDN_DEBUG_EVENTS
ftdm_log_chan_msg(chan, FTDM_LOG_DEBUG, "mISDN channel event queue has no events\n"); ftdm_log_chan_msg(chan, FTDM_LOG_DEBUG, "mISDN channel event queue has no events\n");
#endif #endif
ftdm_clear_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
continue; continue;
} }
...@@ -2051,6 +2226,7 @@ static FIO_SPAN_DESTROY_FUNCTION(misdn_span_destroy) ...@@ -2051,6 +2226,7 @@ static FIO_SPAN_DESTROY_FUNCTION(misdn_span_destroy)
{ {
struct misdn_span_private *span_priv = ftdm_span_io_private(span); struct misdn_span_private *span_priv = ftdm_span_io_private(span);
/* free resources */
ftdm_span_io_private(span) = NULL; ftdm_span_io_private(span) = NULL;
ftdm_safe_free(span_priv); ftdm_safe_free(span_priv);
...@@ -2060,10 +2236,397 @@ static FIO_SPAN_DESTROY_FUNCTION(misdn_span_destroy) ...@@ -2060,10 +2236,397 @@ static FIO_SPAN_DESTROY_FUNCTION(misdn_span_destroy)
} }
/**
* Called by misdn_span_run() to handle incoming b-channel events
* \param[in] chan FreeTDM channel object
* \return FTDM_SUCCESS on success, FTDM_* on error
*/
static ftdm_status_t handle_b_channel_event(ftdm_channel_t *chan)
{
struct misdn_chan_private *priv = ftdm_chan_io_private(chan);
char buf[MAX_DATA_MEM] = { 0 };
struct mISDNhead *mh = (void *)buf;
int retval;
if ((retval = recvfrom(chan->sockfd, buf, sizeof(buf), 0, NULL, NULL)) <= 0) {
ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to receive message: %s\n",
strerror(errno));
return FTDM_FAIL;
}
if (retval < MISDN_HEADER_LEN) {
ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN message too short, min.: %d, read: %d\n",
(int)MISDN_HEADER_LEN, retval);
return FTDM_FAIL;
}
switch (mh->prim) {
case PH_DATA_IND: {
int datalen = retval - MISDN_HEADER_LEN;
char *data = buf + MISDN_HEADER_LEN;
/* Convert audio data */
misdn_convert_audio_bits(data, datalen);
/* Write audio into receive pipe */
if ((retval = write(priv->rx_audio_pipe_in, data, datalen)) < 0) {
ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to write audio data into rx pipe: %s\n",
strerror(errno));
return FTDM_FAIL;
} else if (retval < datalen) {
ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN short write into rx pipe, written: %d, expected: %d\n",
retval, datalen);
return FTDM_FAIL;
}
/* Get receive buffer usage */
if (ioctl(priv->tx_audio_pipe_out, FIONREAD, &retval) < 0) {
ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to get tx audio buffer usage: %s\n",
strerror(errno));
return FTDM_FAIL;
} else if (retval < datalen) {
// ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN has not enough bytes in tx audio pipe, available: %d, requested: %d\n",
// retval, datalen);
priv->tx_pipe_under_cnt++;
return FTDM_SUCCESS;
}
#ifdef MISDN_DEBUG_IO
ftdm_log_chan(chan, FTDM_LOG_INFO, "mISDN tx audio buffer usage: %d\n",
retval);
#endif
/* Get audio from tx pipe */
if ((retval = read(priv->tx_audio_pipe_out, data, datalen)) < 0) {
ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to read audio data from tx pipe: %s\n",
strerror(errno));
return FTDM_FAIL;
} else if (retval == 0) {
ftdm_log_chan_msg(chan, FTDM_LOG_NOTICE, "mISDN tx pipe is empty\n");
priv->tx_pipe_under_cnt++;
return FTDM_SUCCESS;
} else if (retval < datalen) {
ftdm_log_chan(chan, FTDM_LOG_NOTICE, "mISDN short read from tx pipe, read: %d, expected: %d\n",
retval, datalen);
priv->tx_pipe_under_cnt++;
priv->tx_miss_bytes += ftdm_max(0, datalen - retval);
datalen = retval;
}
priv->tx_pipe_rd_bytes += retval;
if (!priv->active) {
/* discard */
return FTDM_SUCCESS;
}
/* Convert audio data */
misdn_convert_audio_bits(data, datalen);
/* Write to channel */
mh->prim = PH_DATA_REQ;
mh->id = 0;
datalen += MISDN_HEADER_LEN;
if ((retval = write(chan->sockfd, buf, datalen)) < 0) {
ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to write audio data into b-channel: %s\n",
strerror(errno));
return FTDM_FAIL;
} else if (retval < datalen) {
ftdm_log_chan(chan, FTDM_LOG_WARNING, "mISDN short write into b-channel, written: %d, expected: %d\n",
retval, datalen);
priv->tx_lost_bytes += ftdm_max(0, datalen - retval - MISDN_HEADER_LEN);
}
priv->tx_sent_bytes += ftdm_max(0, retval - MISDN_HEADER_LEN);
break;
}
case PH_DATA_CNF:
priv->tx_ack_cnt++;
break;
case PH_DEACTIVATE_IND:
priv->active = 0;
break;
case PH_ACTIVATE_IND:
priv->active = 1;
break;
default:
ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN received unknown/unhandled event primitive: (%d) %s\n",
mh->prim, misdn_event2str(mh->prim));
break;
}
return FTDM_SUCCESS;
}
/**
* Timeout (miliseconds) for epoll_wait()
*/
#define MISDN_EPOLL_WAIT_MAX_MSEC 1000
/**
* mISDN I/O thread
* This thread handles all of the B-Channel I/O, this avoids all of the hazzles with
* intermixed data + control frames on mISDN sockets and the missing write poll support on B-Channels.
*
* Each channel uses a unix stream socketpair as a two-way, pipe replacement for incoming and outgoing
* data. Socketpairs allow a more fine grained tuning of the buffer sizes (pipe are restricted to multiples of
* the native page size (with the smallest possible size (4k) being already 500ms worth of audio).
*
* The socketpair buffer sizes and the send algorithm have been carefully tuned to:
*
* - Minimize the risk of sending too much data and making the mISDN drivers unhappy, by
* sending PH_DATA_REQ only when there is as much data available as we have received in
* the PH_DATA_IND.
*
* - Avoid deadlocks between ftdm_write() trying to fill an almust full socket buffer and
* the I/O thread not having enough data to send a PH_DATA_REQ message.
* (The write() call will return EAGAIN since there is not ehough space free to send all audio data.)
*
* \param thread FreeTDM thread handle
* \param data Private data pointer passed to ftdm_thread_create_detached() (the span object)
* \return Always returns NULL (unused)
*
* \note
* ftdm_span_start/_stop() locks the span mutex,
* use direct access to span members to avoid deadlocking
*
* \todo
* Move D-Channel handling into the I/O thread too.
* Use custom ring buffer structures instead of socketpairs
* (for even more fine grained size control).
*/
static void *misdn_span_run(ftdm_thread_t *thread, void *data)
{
ftdm_span_t *span = data;
struct misdn_span_private *priv = ftdm_span_io_private(span);
struct epoll_event evh;
int epfd = -1;
int ret;
int i;
ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread initializing\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
/* Use epoll for event handling */
epfd = epoll_create(1);
if (epfd < 0) {
ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to create epoll context: %s\n",
ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
goto error;
}
ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding event pipe to epoll context\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
/* Add event pipe */
evh.events = EPOLLIN | EPOLLPRI | EPOLLERR;
evh.data.fd = priv->event_pipe_out;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, priv->event_pipe_out, &evh);
if (ret < 0) {
ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to add event pipe to epoll context: %s\n",
ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
goto error;
}
ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding b-channels to epoll context\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
/* Add b-channels */
for (i = 1; i <= span->chan_count; i++) {
ftdm_channel_t *chan = span->channels[i];
ftdm_assert(chan, "channel == NULL");
if (ftdm_channel_get_type(chan) != FTDM_CHAN_TYPE_B)
continue;
ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding b-channel [%d:%d] to epoll context\n",
ftdm_span_get_id(span), ftdm_span_get_name(span),
ftdm_channel_get_id(chan), ftdm_channel_get_ph_id(chan));
evh.events = EPOLLIN | EPOLLPRI | EPOLLERR;
evh.data.ptr = chan;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, chan->sockfd, &evh);
if (ret < 0) {
ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to add b-channel [%d] socket to epoll context: %s\n",
ftdm_span_get_id(span), ftdm_span_get_name(span), ftdm_channel_get_id(chan), strerror(errno));
goto error;
}
}
ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread started\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
/* Notify world we're running */
priv->running = 1;
pthread_cond_signal(&priv->ctrl_cond);
while (priv->running > 0) {
struct epoll_event ev[10];
int timeout_ms = MISDN_EPOLL_WAIT_MAX_MSEC;
ret = epoll_wait(epfd, ev, ftdm_array_len(ev), timeout_ms);
if (ret < 0) {
switch (errno) {
case EAGAIN:
case EINTR:
continue;
default:
ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] epoll_wait() failed: %s\n",
ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
goto error;
}
}
/* Check events */
for (i = 0; i < ret; i++) {
/* */
if (ev[i].data.fd == priv->event_pipe_out) {
struct misdn_command cmd;
/* event pipe */
ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] event pipe notification\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
ret = read(priv->event_pipe_out, &cmd, sizeof(cmd));
if (ret < sizeof(cmd)) {
ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to read span thread command\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
continue;
}
switch (cmd.type) {
case MISDN_CMD_STOP:
ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] got STOP command\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
priv->running = -1;
break;
default:
ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] got unknown command: %d\n",
ftdm_span_get_id(span), ftdm_span_get_name(span), cmd.type);
}
} else {
ftdm_channel_t *chan = ev[i].data.ptr;
handle_b_channel_event(chan);
}
}
}
error:
ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread stopped\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
/* Remove epoll event sources */
for (i = 1; i <= span->chan_count; i++) {
ftdm_channel_t *chan = span->channels[i];
ftdm_assert(chan, "channel == NULL");
if (ftdm_channel_get_type(chan) != FTDM_CHAN_TYPE_B)
continue;
ret = epoll_ctl(epfd, EPOLL_CTL_DEL, chan->sockfd, NULL);
if (ret < 0) {
ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to remove b-channel [%d] socket from epoll context: %s\n",
ftdm_span_get_id(span), ftdm_span_get_name(span), ftdm_channel_get_id(chan), strerror(errno));
}
}
/* Close epoll context */
if (epfd >= 0) close(epfd);
/* Notify world we stopped running */
priv->running = 0;
pthread_cond_signal(&priv->ctrl_cond);
return NULL;
}
/**
* Timeout (miliseconds) for span start/stop completion
*/
#define SPAN_DEFAULT_TIMEOUT_MSEC 10000
static FIO_SPAN_START_FUNCTION(misdn_span_start)
{
struct misdn_span_private *span_priv = ftdm_span_io_private(span);
struct timespec timeout;
int retval;
ftdm_log(FTDM_LOG_NOTICE, "mISDN starting span %d (%s)\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
span_priv->running = 0;
if (ftdm_thread_create_detached(misdn_span_run, span) != FTDM_SUCCESS) {
ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s)\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
return FTDM_FAIL;
}
/*
* Wait SPAN_DEFAULT_TIMEOUT_MSEC miliseconds for I/O thread to start up
*/
clock_gettime(CLOCK_REALTIME, &timeout);
ts_add_msec(&timeout, SPAN_DEFAULT_TIMEOUT_MSEC);
pthread_mutex_lock(&span_priv->ctrl_cond_mutex);
retval = pthread_cond_timedwait(&span_priv->ctrl_cond, &span_priv->ctrl_cond_mutex, &timeout);
if (retval == ETIMEDOUT) {
ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s) in 10 seconds\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
return FTDM_FAIL;
} else if (retval) {
ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s): %s\n",
ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
return FTDM_FAIL;
}
pthread_mutex_unlock(&span_priv->ctrl_cond_mutex);
return FTDM_SUCCESS;
}
static FIO_SPAN_STOP_FUNCTION(misdn_span_stop)
{
struct misdn_span_private *span_priv = ftdm_span_io_private(span);
struct timespec timeout;
struct misdn_command cmd;
int retval;
ftdm_log(FTDM_LOG_NOTICE, "mISDN stopping span %d (%s)\n",
ftdm_span_get_id(span), ftdm_span_get_name(span));
span_priv->running = -1;
/* Wake up thread */
cmd.type = MISDN_CMD_STOP;
retval = write(span_priv->event_pipe_in, &cmd, sizeof(cmd));
if (retval < sizeof(cmd)) {
ftdm_log(FTDM_LOG_WARNING, "mISDN failed to send STOP command to span thread\n");
}
/*
* Wait SPAN_DEFAULT_TIMEOUT_MSEC miliseconds for I/O thread to shut down
*/
clock_gettime(CLOCK_REALTIME, &timeout);
ts_add_msec(&timeout, SPAN_DEFAULT_TIMEOUT_MSEC);
pthread_mutex_lock(&span_priv->ctrl_cond_mutex);
retval = pthread_cond_timedwait(&span_priv->ctrl_cond, &span_priv->ctrl_cond_mutex, &timeout);
if (retval == ETIMEDOUT) {
ftdm_log(FTDM_LOG_ERROR, "mISDN failed to stop thread in 10 seconds\n");
return FTDM_FAIL;
} else if (retval) {
ftdm_log(FTDM_LOG_ERROR, "mISDN failed to stop thread: %s\n",
strerror(errno));
return FTDM_FAIL;
}
pthread_mutex_unlock(&span_priv->ctrl_cond_mutex);
return FTDM_SUCCESS;
}
/** /**
* \brief ftmod_misdn interface * \brief ftmod_misdn interface
*/ */
//static const ftdm_io_interface_t misdn_interface = {
static const ftdm_io_interface_t misdn_interface = { static const ftdm_io_interface_t misdn_interface = {
.name = "misdn", .name = "misdn",
...@@ -2080,8 +2643,12 @@ static const ftdm_io_interface_t misdn_interface = { ...@@ -2080,8 +2643,12 @@ static const ftdm_io_interface_t misdn_interface = {
.get_alarms = misdn_get_alarms, .get_alarms = misdn_get_alarms,
.configure = misdn_configure, /* configure global parameters */ .configure = misdn_configure, /* configure global parameters */
.configure_span = misdn_configure_span, /* assign channels to span */ .configure_span = misdn_configure_span, /* assign channels to span */
.channel_next_event = misdn_channel_next_event,
.channel_destroy = misdn_channel_destroy, /* clean up channel */ .channel_destroy = misdn_channel_destroy, /* clean up channel */
.span_destroy = misdn_span_destroy, /* clean up span */ .span_destroy = misdn_span_destroy, /* clean up span */
.span_start = misdn_span_start,
.span_stop = misdn_span_stop,
}; };
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论