Logo Search packages:      
Sourcecode: mas version File versions

mas_net_device.c

/*
 * Copyright (c) 2001-2003 Shiman Associates Inc. All Rights Reserved.
 * 
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 * 
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 * 
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 */
/*
 * Copyright (c) 2000, 2001 by Shiman Associates Inc. and Sun
 * Microsystems, Inc. All Rights Reserved.
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions: The above
 * copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 * DEALINGS IN THE SOFTWARE.
 *
 * Except as contained in this notice, the names of the authors or
 * copyright holders shall not be used in advertising or otherwise to
 * promote the sale, use or other dealings in this Software without
 * prior written authorization from the authors or copyright holders,
 * as applicable.
 *
 * All trademarks and registered trademarks mentioned herein are the
 * property of their respective owners. No right, title or interest in
 * or to any trademark, service mark, logo or trade name of the
 * authors or copyright holders or their licensors is granted.
 *
 */

/* 2 OCT 2002 - rocko - verified reentrant
 * 2 OCT 2002 - rocko - verified timestamp clean
 *
 */

#include "net_internal.h"
#include "auth_host.h"
#include "net_common.h"
#include "profile.h"

/* USES NON-THREADSAFE NETWORKING FUNCTIONS */


#define usec_to_ntp_frac(x)    (x << 12) + (x << 8) - ((x * 3650) >> 6)


/**** local prototypes ****************************************************/

static struct net_rtp_peer_node* new_node( void );
static int32 delete_node( struct net_rtp_peer_node* node );
static int32 append_node(struct net_rtp_peer_node* head,
                         struct net_rtp_peer_node* node);
static int32 close_peer( struct net_rtp_state* state,
                         struct net_rtp_peer_node* node );
static int32 close_peers_with_control_ssrc( struct net_rtp_state* state, uint32 ssrc );
static int32 transform_rtppacket_to_data( struct rtp_packet* rtppacket, 
                                          struct mas_data** data_ptr );
static int32 transform_rtppacket_to_event( struct rtp_packet* rtppacket, 
                                           struct mas_event** event_ptr );
static int32 _print_sd( struct rtp_source_data* sd );
static struct net_rtp_peer_node* get_peer_from_port( struct net_rtp_peer_node* head, int32 portnum, int* is_source );
static int32 set_tsu_from_dc( struct net_rtp_peer_node* peer, struct mas_data_characteristic* dc, float* tsu_retval );
static int32 terminate_tracking_assemblage( struct net_rtp_state* state, uint32 ssrc );
static int32 make_tracking_assemblage( struct net_rtp_state* state, uint32 ssrc );
static int32 poll_data_if_necessary( struct net_rtp_state *state );
static int32 schedule_net_send( struct net_rtp_state *state, struct net_rtp_peer_node *peer, int32 dfdep_portnum );
static int32 setup_port( struct net_rtp_state *state, int16 type, char *cmatrix_name, int32 *portnum_retval );


#if 0
static char* get_host_ip_addr_string( char* host );
static void print_rtp_packet(rtp_packet* packet);
static void print_sdes(struct rtp *session, uint32_t ssrc);
static void print_last_sr(struct rtp *session, uint32_t ssrc);
static void rtp_callback(struct rtp *session, rtp_event *event);
#endif

/*************************************************************************
 * ACTIONS
 *************************************************************************/

int32
mas_dev_init_library( )
{
    return 0;
}

int32
mas_dev_exit_library( )
{
    return 0;
}


/***************************************************************************
 * mas_dev_init - standard device action
 *
 *  predicate: unused
 *
 * Initializes state structure.
 *  
 * returns: error
 *
 ***************************************************************************/
int32
mas_dev_init_instance( int32 device_instance, void* predicate )
{
    struct net_rtp_state*  state;
    char*                  tempstr;
    int32            err, return_value;
    
    masc_entering_log_level( "Initializing net device: mas_dev_init_instance()" );

    /* Allocate state holder and cast it so we can work on it */
    state = MAS_NEW( state );
    if ( state == 0 )
    {
        return_value = mas_error(MERR_MEMORY);
      masc_log_message(MAS_VERBLVL_ERROR, "Failed to allocate state buffer.");
      goto failure;
    }

    masd_set_state(device_instance, state); /* set device state */

    state->device_instance = device_instance;
    state->peer_list_head = new_node();
    state->peer_list_head->id = 0;
    state->next_peer_id = 1;
    FD_ZERO( &(state->fd_set_all_auth_sockets) );
    FD_ZERO( &(state->fd_set_all_listen_sockets) );

    /* get the hostname of the local machine */
    gethostname( state->hostname, HOSTNAME_LEN - 1 );
    state->hostname[HOSTNAME_LEN - 1] = 0; /* just to be sure */
    strncpy( state->short_hostname, state->hostname, HOSTNAME_LEN - 1 );
    state->short_hostname[HOSTNAME_LEN - 1] = 0; /* just to be sure */
    tempstr = strchr( state->short_hostname, '.' );
    if ( tempstr != NULL )
        *tempstr = 0; /* terminate the short hostname at the first dot. */
    
    masc_log_message( MAS_VERBLVL_DEBUG, "Running on host: %s", state->hostname );

    /* set the reaction port in the state structure */
    err = masd_get_port_by_name( device_instance, "reaction",
                                 &state->reaction );
    if ( err < 0 )
    {
      return_value = mas_error(MERR_NOTDEF);
      masc_log_message(MAS_VERBLVL_ERROR, "Failed to get the reaction port.");
      goto failure;
    }
    
    /* initialize dynamic port pool */
    err = masd_init_dynport_pool( &state->dp_pool, device_instance, state->reaction, DYNPORT_POOL_SIZE );
    if ( err < 0 )
    {
      return_value = err;
      masc_log_message(MAS_VERBLVL_ERROR, "Failed to initialize the dynamic port pool.");
      goto failure;
    }

    /* default policy, for now: allow all connections */
    auth_host_allow_all( &state->hl );

    /* default policy, for now: listen for both UNIX and TCP connections */
    state->unix_listen_state = SETUP;
    state->tcp_listen_state = SETUP;

    /* store the version number string for future use */
    snprintf(state->version, sizeof state->version, "%d.%d.%d", profile_major_version, profile_minor_version, profile_teeny_version );

    /* install the asynchronous signal handler to avoid polling for
       new connections -- if we can. */
    err = masd_signal_action( SIGIO, MAS_SIGHNDL_MAS, device_instance, "mas_net_check_for_connections" );
    if ( err >= 0 )
    {
        state->has_signals = TRUE;
        masc_log_message( MAS_VERBLVL_INFO, "net: using signals to detect new connections.");
        /* ignore SIGPIPE and out-of-band signaling */
        masd_signal_action( SIGPIPE, MAS_SIGHNDL_IGNORE, 0, "" );
        masd_signal_action( SIGURG, MAS_SIGHNDL_IGNORE, 0, "" );
    }
    else
    {
        masc_log_message( MAS_VERBLVL_INFO, "net: can't use signals to detect new connections.  Polling instead.");
    }
        
    return_value = 0;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

int32
mas_dev_exit_instance( int32 device_instance, void* predicate )
{
    struct net_rtp_state*  state;
    struct net_rtp_peer_node* peer;
    
    masd_get_state(device_instance, (void**)&state);

    peer = state->peer_list_head;
    
    while ( peer->next )
        close_peer( state, peer->next );

    masd_cleanup_dynport_pool( &state->dp_pool, device_instance, state->reaction );
    
    /* need to free memory here... */
    
    return 0;
}

int32
mas_dev_terminate( int32 device_instance, void* predicate )
{
    struct net_rtp_state*  state;
    struct net_rtp_peer_node* peer;
    
    masd_get_state(device_instance, (void**)&state);

    peer = state->peer_list_head;
    
    while ( peer->next )
        close_peer( state, peer->next );

    /* need to free memory here... */
    
    return 0;
}

int32
mas_dev_disconnect_port( int32 device_instance, void* predicate )
{
    /* nothing to do? */
    return 0;
}

int32
mas_dev_configure_port( int32 device_instance, void* predicate )
{
    struct net_rtp_state*  state;
    struct net_rtp_peer_node* peer;
    struct mas_data_characteristic* dc;
    float  tsu;
    int    is_source = 0;
    int32  err, return_value;
    int32  portnum;
    
    masc_entering_log_level("Configuring net device port: mas_dev_configure_port()");

    MASD_GET_STATE(device_instance, state);

    portnum = *(int32*)predicate;
    err = masd_get_data_characteristic( portnum, &dc );
    if ( err < 0 )
    {
      return_value = err;
      masc_log_message(MAS_VERBLVL_ERROR, "Failed to get the data characteristic.");
      goto failure;
    }

    peer = get_peer_from_port( state->peer_list_head, portnum, &is_source );
    if ( ! peer ) 
    {
        return_value = mas_error(MERR_INVALID);
      masc_log_message(MAS_VERBLVL_ERROR, "Failed to get the peer from port.");
      goto failure;
    }

    err = set_tsu_from_dc( peer, dc, &tsu );
    if ( err >= 0 )
    {
        masc_log_message( MAS_VERBLVL_DEBUG, "net: set tsu of peer %d to %f", peer->id, tsu );
    }
    
    return_value = 0;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

/***************************************************************************
 * mas_net_poll_data
 *
 *  predicate: unused
 *
 * Polls the connected network channels for incoming data.  this
 * returns true for data waiting in the socket buffer, and data
 * waiting in our local buffer.
 *
 * This relies on connect/accept having added the filedescriptors to
 * the select set stored in the state structure, and on it setting
 * max_auth_fd.
 *  
 * returns: error
 *
 ***************************************************************************/

int32
mas_net_poll_data( int32 device_instance, void* predicate )
{
    struct net_rtp_state*  state;
    struct net_rtp_peer_node* node;
    struct timeval     timeout;
    fd_set             local_fd_set;
    int16              peers_with_data = 0;
    int16*             reaction_predicate;
    int32              err, return_value;

    masc_entering_log_level("Polling for network data: mas_net_poll_data()");
    
    masd_get_state(device_instance, (void**)&state);

    /* do we even have any peers? */
    if ( state->peer_list_head->next == NULL )
    {
        state->polling_scheduled = FALSE;
        masc_log_message( MAS_VERBLVL_DEBUG, "net: No more connections; striking the mas_net_poll_data action.");
        masd_reaction_queue_action_simple( state->reaction, MAS_SCH_INSTANCE, "mas_sch_strike_event", NULL, 0 );
        return_value = 0;
        goto success;
    }

    memcpy( &local_fd_set, &state->fd_set_all_auth_sockets, sizeof(fd_set));
    timeout.tv_sec = 0;  /* just poll */
    timeout.tv_usec = 0;

    /* select on the connected socket file descriptors */
    if ( select( state->max_auth_fd + 1, &local_fd_set, 0, 0, &timeout ) > 0 ) 
    {
        node = state->peer_list_head->next;

        /** traverse node list, testing the filedescriptors. */
        while (node)
        {
            if ( FD_ISSET( node->session->rx_rtp_socket, &local_fd_set ) )
            {
                peers_with_data++;
                node->data_in_rtp_socket = 1;

                /* don't count rtcp sockets towards peers with data */
                /* only check them if the rtp socket had data on it */
                if ( FD_ISSET( node->session->rx_rtcp_socket, &local_fd_set ) )
                    node->data_in_rtcp_socket = 1;
            }
        
            node = node->next;
        }
    }

    if ( peers_with_data )
    {
        /* set up the predicate */
        reaction_predicate = masc_rtalloc( sizeof (int16) );
        if ( reaction_predicate == 0)
      {
          masc_log_message(MAS_VERBLVL_ERROR, "Failed to allocate memory for the reaction predicate.");
          return_value = mas_error(MERR_MEMORY);
          goto failure;
      }
        *(int16*)reaction_predicate = peers_with_data;

        /* queue the read action */
        err = masd_reaction_queue_action_simple(state->reaction, device_instance, "mas_net_recv",  reaction_predicate, sizeof (int16) );
    }

    return_value = 0;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

int32
mas_net_send( int32 device_instance, void* predicate )
{
    struct net_rtp_state*  state;
    struct mas_data*       data;
    struct mas_package     package;
    struct net_rtp_peer_node* peer;
    int32                  id;
    int32                  portnum;
    int32                  err = 0, return_value;
    fd_set                 local_fd_set;
    struct timeval         timeout;

    masc_entering_log_level("Sending net data: mas_net_send()");

    masd_get_state(device_instance, (void**)&state);

    /* process predicate */
    masc_setup_package( &package, predicate, 0, MASC_PACKAGE_STATIC|MASC_PACKAGE_EXTRACT );
    masc_pull_int32( &package, &id );
    masc_pull_int32( &package, &portnum );
    masc_strike_package( &package );

    /* find the peer we have to deal with */
    peer = state->peer_list_head->next;
    while ( peer != 0) 
    {
        if ( peer->id == id ) break;
        else peer = peer->next;
    }
    if ( peer == 0 )
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Unable to find peer id: %d", id);
      return_value = mas_error(MERR_INVALID);
      goto failure;
    }


    /* Make sure the send call isn't going to block. */

    FD_ZERO( &local_fd_set );
    FD_SET( peer->session->rx_rtp_socket, &local_fd_set);
    timeout.tv_sec = 0;  /* just poll */
    timeout.tv_usec = 0;

    /* select on the connected socket file descriptors */
    if ( select( peer->session->rx_rtp_socket + 1, NULL, &local_fd_set, NULL, &timeout ) >= 0 ) 
    {
        if ( ! FD_ISSET( peer->session->rx_rtp_socket, &local_fd_set ) )
        {
            /* Lengthen the period of the dataflow event, unless it's
               already marked as blocked.  By doing this, we avoid a
               crazy busy loop in the scheduler since we'll have data
               waiting on the sink.  For now, we'll do 40ms. */
            if ( !peer->send_blocked )
            {
                peer->send_blocked = TRUE;
                masc_setup_package( &package, NULL, 0, MASC_PACKAGE_NOFREE );
                masc_pushk_uint32( &package, "period", 40000 );
                masc_finalize_package( &package );
                
                masc_log_message( MAS_VERBLVL_DEBUG, "net: Send queue to peer %u is blocked.  Setting period to 40ms.", peer->ssrc );
                
                masd_reaction_queue_action_simple( state->reaction, MAS_SCH_INSTANCE, "mas_sch_set_event_period", package.contents, package.size );
                masc_strike_package( &package );
            }
            
            return_value = mas_error(MERR_XRUN);
            goto failure;
        }
    }
    else
    {
        return_value = mas_error(MERR_IO);
        goto failure;
    }

    /* We were blocked, but now we aren't. */
    if ( peer->send_blocked )
    {
        peer->send_blocked = FALSE;
        masc_setup_package( &package, NULL, 0, MASC_PACKAGE_NOFREE );
        masc_pushk_uint32( &package, "period", 1 );
        masc_finalize_package( &package );
        
        masc_log_message( MAS_VERBLVL_DEBUG, "net: Send queue to peer %u is now clear.", peer->ssrc );
        masd_reaction_queue_action_simple( state->reaction, MAS_SCH_INSTANCE, "mas_sch_set_event_period", package.contents, package.size );
        masc_strike_package( &package );
    }

    masd_get_data( portnum, &data );

    /* is this a response or a data send? */
    if ( peer->sink == portnum )
    {
        err = rtp_p2p_send( peer->session, data->segment, data->length,
                            data->header.type, data->header.mark,
                            data->header.media_timestamp, data->header.sequence );
    }
    else if ( peer->response == portnum )
    {
        err = rtp_p2p_send_control( peer->session, data->segment, data->length );
    }

    rtp_process_rtcp_if_any( peer->session );
    
    if (err < 0)
    {
        masc_log_message( MAS_VERBLVL_ERROR, "net: [error] mas_net_send: Failed %d byte packet to peer %d.", data->length, peer->id );
        masc_strike_data( data );
        masc_rtfree( data );
        return_value = mas_error(MERR_COMM) | mas_make_derror(-err) | MAS_ERR_ERROR;
      goto failure;
    }
    
    masc_strike_data( data );
    masc_rtfree( data );

    return_value = 0;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

int32
mas_net_recv( int32 device_instance, void* predicate )
{
    struct net_rtp_state*     state;
    struct net_rtp_peer_node* node;
    struct mas_data*          data;
    struct mas_event*         event;
    int16                     peers_with_data = *(int16*)predicate;
    int32                     err, return_value;
    struct rtp_packet         rtppacket;
    int                       peer_rcvd_data = 0;
    int32                     queue_err = 0, net_err = 0; 
   
    masc_entering_log_level("Recieving net data: mas_net_recv()");

    masd_get_state(device_instance, (void**)&state);
    node = state->peer_list_head->next;
    rtppacket.payload = 0;

    
    /* The outer loop traverses the list of peers until it reaches the
       end or "peers_with_data" is zero. */
    while (peers_with_data && node)
    {
        rtppacket.payload = 0;
        peer_rcvd_data = 0;
        err = 0;
        net_err = 0;
        
        if ( node->data_in_rtp_socket )
        {
            /* Read every complete packet that's waiting on the
               socket, process for timing data and add to jrtplib's
               packet queue. */
            net_err = rtp_process_packets( node->session, node->data_in_rtp_socket, node->data_in_rtcp_socket, TRUE );
            
            if ( net_err < 0 )
            {
                masc_log_message(MAS_VERBLVL_ERROR, "net: Couldn't read data from network socket of peer %u.  Disconnecting peer.", node->ssrc );
                return_value = mas_error(MERR_COMM) | mas_make_derror(-net_err);
                goto kill_peers;
            }

            /* Set data_in_rtp/rtcp_sockets to FALSE, so that when we
               loop, we will NOT receive any more data from the
               network - we'll only get data that's in the RTP
               library's queue. */
            node->data_in_rtp_socket = FALSE;
            node->data_in_rtcp_socket = FALSE;

            /* Test to see if there's data in the queue - there
             * SHOULD be! */
            node->data_in_library = rtp_is_data_in_library_queue(node->session);
            
        }

        /* This loop will pull packets in from jrtplib until there are
           no more packets waiting in jrtplib's packet queue.  When
           the loop exits, peer_rcvd_data contains the number of
           packets received in the loop. */

        while ( node->data_in_library )
        {
            /* NOTE: the C wrapper around jrtplib memcpy()s the stuff
               from the packet into our packet structure, then deletes
               the jrtplib packet.  We can hold onto it as long as
               convenient.  We have to mark the mas_data struct as
               having an allocated length so the segment will be
               free()d eventually. */

            rtppacket.payload = 0;
            queue_err = 0;

            /* DO NOT read in any packets from the socket. */
            /* Just test to see if there's data in jrtplib's
             * queue. */
            node->data_in_library = rtp_is_data_in_library_queue(node->session);

            /* here, err is from rtp_process_packets above, or it is
               zero. */
            /* Retrieve the next packet from jrtplib's packet queue. */
            if ( node->data_in_library && net_err >= 0 )
            {
                err = rtp_p2p_retrieve_packet_from_library( node->session, &rtppacket );
                if ( err == 0 )
                {
                    /* one packet received */
                    queue_err = 1;
                }
                else
                {
                    /* receive error */
                    queue_err = err;
                }
                
                /* Again, Test to see if there's data in the queue */
                node->data_in_library = rtp_is_data_in_library_queue(node->session);
                if ( node->data_in_library < 0 )
                    queue_err = node->data_in_library;
            }
            else
            {
                /* no packets received */
                queue_err = 0;
            }

            /* socket buffer is clear */

            /* if we get a recv error, close the peer and all peers
               with the same SSRC.  jrtplib may be using this as an
               opportunity to send RTCP packets, and the error may
               have come from send() or sendto(). */
            if ( queue_err < 0 || net_err < 0 )
            {
                masc_log_message(MAS_VERBLVL_ERROR, "net: Couldn't read data from network queue of peer %u.  Disconnecting peer.", node->ssrc );
                return_value = mas_error(MERR_COMM) | mas_make_derror(-queue_err);
                goto kill_peers;
            }
            else if ( queue_err > 0 )         /* received packet */
            {
                mas_assert( rtppacket.ssrc == node->ssrc, "Why isn't this the SSRC I was expecting?");
                peer_rcvd_data++;
                if (node->data_in_library < 0) 
                {
                    err = node->data_in_library;
                    node->data_in_library = 0;
                    if (rtppacket.payload != 0 )
                        masc_rtfree( rtppacket.payload );
                masc_log_message(MAS_VERBLVL_ERROR, "Error, data_in_library < 0.");
                    return_value = mas_error(MERR_COMM) | mas_make_derror(-node->data_in_library) | MAS_ERR_WARNING;
                goto failure;
                }

                switch ( node->type )
                {
                case MAS_CHANNEL_TYPE_CONTROL:
                    err = transform_rtppacket_to_event( &rtppacket, &event );
                    if ( err < 0 )
                {
                  masc_log_message(MAS_VERBLVL_ERROR, "Error transforming rtp packet into an event.");
                  return_value = err;
                  goto failure;
                }

                    /* set the response port back to us */
                    event->response = node->response;

                    /* Make sure the event is associated with a
                       particular source. */
                    event->source_device_subscript = node->ssrc;
                    
                    /* queue the event */
                    err = masd_reaction_queue_event( state->reaction, event );
                    /* we're done with the payload */
                    masc_rtfree( rtppacket.payload );
                    break;
                default: /* fall through */
                case MAS_CHANNEL_TYPE_DATA:
                    err = transform_rtppacket_to_data( &rtppacket, &data );
                    if ( err < 0 )
                {
                  masc_log_message(MAS_VERBLVL_ERROR, "Error transforming rtp packet into data.");
                  return_value = err;
                  goto failure;
                }

                    /* post data to port queue */
                    err = masd_post_data( node->source, data );
                    if ( err < 0 )
                {
                  masc_log_message(MAS_VERBLVL_ERROR, "Error posting data.");
                  return_value = err;
                  goto failure;
                }
                    break;
                }
            }
            else
            {
                /* received nothing, queue_err == 0 */
                if (rtppacket.payload != 0)
                    masc_rtfree( rtppacket.payload );

                if (node->data_in_library < 0) 
                {
                    err = node->data_in_library;
                    node->data_in_library = 0;
                masc_log_message(MAS_VERBLVL_ERROR, "Error, data_in_library < 0.");
                    return_value = mas_error(MERR_COMM) | mas_make_derror(-node->data_in_library) | MAS_ERR_WARNING;
                goto failure;
                }
            }

        } /* endif (node->data_in_rtp_socket || node->data_in_library ) */

        /* loop to next peer */
        if ( peer_rcvd_data )
            peers_with_data--;
                
        node = node->next;
    }

    return_value = 0;
    goto success;

 kill_peers:
    if ( node )
    {
        close_peers_with_control_ssrc( state, node->ssrc );
        if ( rtppacket.payload != 0 )
            masc_rtfree( rtppacket.payload );
    }
    /* fallthrough */
failure:
    /* fallthrough */
success:
    masc_exiting_log_level();
    return return_value;
}

/***************************************************************************
 * mas_net_check_for_connections
 *
 *  predicate: unused
 *
 * Polls the listening network transports for connection attempts.
 *  
 * returns: error or session type bitmap
 *          see jrtplib/rtp_transport.h for values.
 *
 ***************************************************************************/

int32
mas_net_check_for_connections(int32 device_instance, void* predicate)
{
    int                err, return_value;
    struct net_rtp_state*  state;
    struct timeval     tv;
    fd_set             local_fd_set;
    int32*             session_type_bitmap;
    

    masc_entering_log_level("Checking for connections: mas_net_check_for_connections()");

    masd_get_state(device_instance, (void**)&state);
    
    tv.tv_sec = 0;
    tv.tv_usec = 0;
    /* copy our shadow copy */
    memcpy( &local_fd_set, &state->fd_set_all_listen_sockets,
            sizeof(fd_set));

    if (select(state->max_listen_fd + 1, &local_fd_set, 0, 0, &tv) > 0)
    {
        session_type_bitmap = MAS_NEW( session_type_bitmap );
        if ( session_type_bitmap == 0)
      {
          masc_log_message(MAS_VERBLVL_ERROR, "Error allocating memory for session type bitmap.");
          return_value = mas_error(MERR_MEMORY);
          goto failure;
      }

        /* check for each session type */
        if ( state->tcp_listen_state == LISTENING && FD_ISSET(state->rtp_listen_tcp, &local_fd_set) )
            *session_type_bitmap |= RTP_SESSTYPE_TCP;
        if ( state->unix_listen_state == LISTENING && FD_ISSET(state->rtp_listen_unix, &local_fd_set) )
            *session_type_bitmap |= RTP_SESSTYPE_UNIX;

        /* queue the accept action */
        err = masd_reaction_queue_action_simple(state->reaction, device_instance, "mas_net_accept",  session_type_bitmap, sizeof (int32) );
    }

    return_value = 0;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

int32
mas_net_accept(int32 device_instance, void* predicate)
{
    int                             err, return_value;
    int                             reterrcode;
    struct net_rtp_state*           state;
    struct net_rtp_peer_node* node;
    MAS_SOCKET                      rtpsock;
    MAS_SOCKET                      rtcpsock;
    struct sockaddr_un              rtcp_peer_addr_unix;
    struct sockaddr_in              rtcp_peer_addr_tcp;
    int32                           *peer_id;
    int                             sesstype = *(int32*)predicate;
    int res;

    masc_entering_log_level("Accepting net connection: mas_net_accept().");

    masd_get_state(device_instance, (void**)&state);

    node = new_node();

    /** UNIX session ***************************************************/
    if ( sesstype & RTP_SESSTYPE_UNIX )
    {
        node->session_type = RTP_SESSTYPE_UNIX;
    
        /* allocate the peer address structure */
        node->peer_addr = masc_rtalloc(sizeof(struct sockaddr_un));
        if ( node->peer_addr == 0 )
      { 
          masc_log_message(MAS_VERBLVL_ERROR, "Error allocating memory for peer address.");
          return_value = mas_error(MERR_MEMORY);
          goto failure;
      }
        memset( node->peer_addr, 0, sizeof (struct sockaddr_un) );

        /* accept the connection */
        err = rtp_transport_stream_accept(node->session_type,
                                          state->rtp_listen_unix,
                                          state->rtcp_listen_unix,
                                          &rtpsock, &rtcpsock,
                                          (struct sockaddr*)node->peer_addr, 
                                          (struct sockaddr*)&rtcp_peer_addr_unix,
                                          &reterrcode);
        if (err < 0) 
        {
          masc_log_message(MAS_VERBLVL_ERROR, "Error accepting rtp transport stream.");

            if (err == ERR_RTP_NULLPTR) return_value = mas_error(MERR_NULLPTR);
            else return_value = mas_error(MERR_CANTCREATE | mas_make_serror(reterrcode));

          goto failure;
        }
        
        /* fix: on some systems (i.e. Solaris), we may inherit the FASYNC flag
           from the listening socket, which would cause us to receive
           SIGIO's when we don't want them. Turn it off. */
        if( state->has_signals )
        {
            res = fcntl( rtpsock, F_GETFL, 0 );
            if( res & FASYNC )
            {
                masc_log_message( MAS_VERBLVL_DEBUG, "turning off FASYNC flag on new unix socket, because it was ON" );
                res = res & (~FASYNC);
                res = fcntl( rtpsock, F_SETFL, res );
                mas_assert( res==0, "couldn't turn off signals" );
            }
        }
        
        /* create the session structure */
        node->session = MAS_NEW( node->session );
        if ( node->session == 0 )
      {
          masc_log_message(MAS_VERBLVL_ERROR, "Error allocating memory for session structure.");
          return_value = mas_error(MERR_MEMORY);
          goto failure;
      }
    
        /* create the session */
        err = rtp_create_stream_pair_p2p_session(node->session, 0, 0,
                                                 node->session_type,
                                                 rtpsock, rtcpsock);
        if (err < 0)
      {
          masc_log_message(MAS_VERBLVL_ERROR, "Error creating stream pair session.");
          return_value = err;
          goto failure;
      }

        node->state = AUTH1;
        
        sesstype = 0; /* prevent session types below from being
                       * accepted this time */
    }

    /** TCP session ****************************************************/
    if ( sesstype & RTP_SESSTYPE_TCP )
    {
        node->session_type = RTP_SESSTYPE_TCP;

        /* allocate the peer address structure */
        node->peer_addr = masc_rtalloc(sizeof(struct sockaddr_in));
        if ( node->peer_addr == 0 )
      {
          masc_log_message(MAS_VERBLVL_ERROR, "Error allocating memory for peer address.");
          return_value = mas_error(MERR_MEMORY);
          goto failure;
      }
        memset( node->peer_addr, 0, sizeof (struct sockaddr_in) );

        /* accept the control connection */
        err = rtp_transport_stream_accept(node->session_type, state->rtp_listen_tcp, state->rtcp_listen_tcp, &rtpsock, &rtcpsock, (struct sockaddr*)node->peer_addr,  (struct sockaddr*)&rtcp_peer_addr_tcp, &reterrcode);
        if (err < 0)
        {
          masc_log_message(MAS_VERBLVL_ERROR, "Error accepting transport stream.");
            if (err == ERR_RTP_NULLPTR) return_value = mas_error(MERR_NULLPTR);
            else return_value = mas_error(MERR_CANTCREATE | mas_make_serror(reterrcode));
          goto failure;
        }

        /* fix: on some systems (i.e. Solaris), we may inherit the FASYNC flag
           from the listening socket, which would cause us to receive
           SIGIO's when we don't want them. Turn it off. */
        if( state->has_signals )
        {
            res = fcntl( rtpsock, F_GETFL, 0 );
            if( res & FASYNC )
            {
                masc_log_message( MAS_VERBLVL_DEBUG, "turning off FASYNC flag on new tcp socket, because it was ON" );
                res = res & (~FASYNC);
                res = fcntl( rtpsock, F_SETFL, res );
                mas_assert( res==0, "couldn't turn off signals" );
            }
        }

        
        err = auth_host_authenticate_addr4( &state->hl,  mas_ntohl(((struct sockaddr_in*)node->peer_addr)->sin_addr.s_addr) );
        if ( err != MAS_SUCCESS )
        {
            masc_log_message(MAS_VERBLVL_ERROR, "net: refused connect" );
            return_value = mas_error(MERR_INVALID);
            goto failure;
        }
        
        /* create the session structure */
        node->session = MAS_NEW( node->session );
        if ( node->session == 0 )
      {
          masc_log_message(MAS_VERBLVL_ERROR, "Error allocating memory for peer session.");
          return_value = mas_error(MERR_MEMORY);
          goto failure;
      }
    
        /* create the session */
        err = rtp_create_stream_pair_p2p_session(node->session, mas_ntohl(((struct sockaddr_in*)node->peer_addr)->sin_addr.s_addr), RTP_TCP_LISTEN_PORT, node->session_type, rtpsock, rtcpsock);
        if (err < 0)
      {
          masc_log_message(MAS_VERBLVL_ERROR, "Error creating stream pair session.");
          return_value = err;
          goto failure;
      }

        node->state = AUTH1;
        
        sesstype = 0; /* prevent session types below from being
                       * accepted this time */
    }
    
    if ( sesstype & RTP_SESSTYPE_UDP )
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error session type UDP unsupported.");
      return_value = mas_error(MERR_NOSUPP);
      goto failure;
    }

    if ( sesstype & RTP_SESSTYPE_FIFO )
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error session type FIFO unsupported.");
      return_value = mas_error(MERR_NOSUPP);
      goto failure;
    }

    if ( sesstype & RTP_SESSTYPE_XCLIENTMSG)
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error session type XCLIENTMSG unsupported.");
      return_value = mas_error(MERR_NOSUPP);
      goto failure;
    }

    /* increment connection id counter */
    node->id = state->next_peer_id++;
    
    /* add the new connection node to the list */
    append_node(state->peer_list_head, node);

    /* stuff the predicate */
    peer_id = masc_rtalloc( sizeof (int32) );
    if ( peer_id == 0)
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error allocating memory for peer id.");
      return_value = mas_error(MERR_MEMORY);
      goto failure;
    }
    *(int32*)peer_id = node->id;
    
    /* queue the authenticate action */
    masd_reaction_queue_action_simple(state->reaction, device_instance, "mas_net_auth1", (void*)peer_id, sizeof (int32));

    return_value = 0;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

int32
mas_net_connect(int32 device_instance, void* predicate)
{
    char                            portname[MAX_PORT_NAME_LENGTH];
    int                             err, return_value;
    int                             reterrcode;
    struct net_rtp_state*           state;
    struct net_rtp_peer_node*       peer;
    MAS_SOCKET                      rtpsock;
    MAS_SOCKET                      rtcpsock;
    struct sockaddr_in              rtcp_peer_addr_tcp;
    int32                           channel_type;
    char*                           display_host;
    char*                           channel_name;
    struct mas_package              package;
    struct rtp_packet               packet;
    struct hostent*                 hp;
    int32                           remote_source, remote_sink;
    uint32                          control_ssrc;
    struct net_rtp_peer_node        *control_peer;
    char                            *arg;
    char *authmsg = NULL;
    char *rarg = NULL;
    int authmsg_len = 0;
    
    masc_entering_log_level("Connecting net device: mas_net_connect()");

    masd_get_state(device_instance, (void**)&state);

    /* parse the predicate package */
    masc_setup_package( &package, predicate, 0, MASC_PACKAGE_STATIC|MASC_PACKAGE_EXTRACT);
    masc_pullk_int32( &package, "t", &channel_type );
    masc_pullk_uint32( &package, "ssrc", &control_ssrc );
    masc_pullk_string( &package, "hn", &display_host, FALSE );
    if ( channel_type == MAS_CHANNEL_TYPE_DATA ) 
        masc_pullk_string( &package, "cn", &channel_name, FALSE );
    masc_strike_package( &package );

    /* Find the controlling peer in our list */
    for ( control_peer = state->peer_list_head->next; control_peer != NULL; control_peer = control_peer->next )
    {
        if ( control_peer->ssrc == control_ssrc && control_peer->type == MAS_CHANNEL_TYPE_CONTROL )
        {
            break;
        }
    }

    if ( control_peer == NULL )
    {
        masc_log_message( MAS_VERBLVL_ERROR, "net: [error] no control channel with ssrc %u found.", control_ssrc );
        return_value = mas_error(MERR_INVALID);
        goto failure;
    }
    
    /** TCP session ****************************************************/
    /* avoid "Snake biting its tail" - check to make sure we're not
     * connecting to ourselves */
    err = 0;
    masc_log_message( MAS_VERBLVL_DEBUG, "Connecting TCP session to '%s'", display_host );

    /* is our hostname the same as the display host? */
    if ( strncmp(display_host, state->hostname, HOSTNAME_LEN-1) != 0 )
    {
        /* it's not, how about the short hostname? */
        if ( strncmp(display_host, state->short_hostname, HOSTNAME_LEN-1) == 0 )
            err = -1; /* same host. */
    }
    else err = -1; /* same host. */
    
    if ( err < 0 ) /* display and local are the same hosts */
    {
        return_value = mas_error(MERR_INVALID) | mas_make_derror(1);
      masc_log_message(MAS_VERBLVL_ERROR, "net: [error] display and local are the same hosts.");
      goto failure;
    }

    /* create new node */
    peer = new_node();

    peer->type = MAS_CHANNEL_TYPE_DATA; /* no matter what.  It's the other
                                           end that has to differentiate
                                           between CONTROL and DATA. */ 
    peer->session_type = RTP_SESSTYPE_TCP;
    peer->control_ssrc = control_ssrc;

    /* allocate the peer address structure */
    peer->peer_addr = masc_rtalloc(sizeof(struct sockaddr_in));
    if ( peer->peer_addr == 0 )
    {
      masc_log_message(MAS_VERBLVL_ERROR, "net: [error] allocating memory for peer addr.");
      return_value = mas_error(MERR_MEMORY);
      goto failure;
    }
    memset( peer->peer_addr, 0, sizeof (struct sockaddr_in) );

    if ( ! (hp = gethostbyname( display_host )) )
    {
        /* error! */
      masc_log_message(MAS_VERBLVL_ERROR, "net: [error] unable to lookup hostname: %s", display_host);
        return_value = mas_error(MERR_NOTDEF);
      goto failure;
    }
    memcpy( &((struct sockaddr_in*)(peer->peer_addr))->sin_addr.s_addr, *(hp->h_addr_list), sizeof ( ((struct sockaddr_in*)(peer->peer_addr))->sin_addr.s_addr)); 
    ((struct sockaddr_in*)(peer->peer_addr))->sin_port = mas_htons( RTP_TCP_LISTEN_PORT );
    ((struct sockaddr_in*)(peer->peer_addr))->sin_family = AF_INET;
    memcpy( &((struct sockaddr_in*)&rtcp_peer_addr_tcp)->sin_addr.s_addr, &((struct sockaddr_in*)(peer->peer_addr))->sin_addr.s_addr, sizeof ( ((struct sockaddr_in*)(peer->peer_addr))->sin_addr.s_addr));
    ((struct sockaddr_in*)&rtcp_peer_addr_tcp)->sin_port = mas_htons( RTCP_TCP_LISTEN_PORT );
    rtcp_peer_addr_tcp.sin_family = AF_INET;
    /* try to connect */
    err = rtp_transport_stream_connect(peer->session_type, &rtpsock, &rtcpsock, (struct sockaddr*)peer->peer_addr, (struct  sockaddr*)&rtcp_peer_addr_tcp, &reterrcode);
    if (err < 0)
    {
      masc_log_message(MAS_VERBLVL_ERROR, "net: [error] failed to connect RTP transport stream.");
        if (err == ERR_RTP_NULLPTR) return_value = mas_error(MERR_NULLPTR);
        else return_value = mas_error(MERR_CANTCREATE | mas_make_serror(reterrcode));
      goto failure;
    }
    
    /* create the session structure */
    peer->session = MAS_NEW( peer->session );
    if ( peer->session == 0 )
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error allocating memory for peer session.");
      return_value = mas_error(MERR_MEMORY);
      goto failure;
    }
    
    /* create the session */
    err = rtp_create_stream_pair_p2p_session(peer->session, mas_ntohl(((struct sockaddr_in*)peer->peer_addr)->sin_addr.s_addr),  RTP_TCP_LISTEN_PORT, peer->session_type, rtpsock, rtcpsock);
    if (err < 0)
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error, failed to connect rtp transport stream.");
      return_value = err;
      goto failure;
    }
    
    /* increment connection id counter */
    peer->id = state->next_peer_id++;
    
    /* add the new connection node to the list */
    append_node(state->peer_list_head, peer);

    /* set our ssrc to the ssrc of the associated control channel */
    /* be sure this gets set BEFORE authentication */
    rtp_set_local_ssrc( peer->session, control_ssrc );
    
    /* Control channels use 32-bit NTP timestamp */
    if ( channel_type == MAS_CHANNEL_TYPE_CONTROL )
        rtp_set_tsu(peer->session, 1.52590218967E-5);

    /********************************************************************
     **** No identity verification is sent.  If it were to be done, it
     **** would be done here by putting appropriate stuff into the
     **** info package.
     ********************************************************************/

    if ( channel_type == MAS_CHANNEL_TYPE_CONTROL )
        arg = "CONTROL";
    else
        arg = "DATA";
    
    if ( control_peer->infopack.contents == NULL )
    {
        net_create_authmsg( state->version, arg, NULL, &authmsg, &authmsg_len );
    }
    else
    {
        net_create_authmsg( state->version, arg, &control_peer->infopack, &authmsg, &authmsg_len );
    }
        
    
    err = rtp_p2p_send_control(peer->session, authmsg, authmsg_len);
    if ( err < 0 )
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error, failed to send rtp control.");
        return_value = mas_error(MERR_COMM);
      goto failure;
    }

    masc_rtfree( authmsg );
    
    /* receive the server response */
    err = rtp_p2p_recv( peer->session, &packet, TRUE, FALSE );
    if ( err < 0 )
    {
        if ( packet.payload != NULL )
            masc_rtfree( packet.payload );
        return_value = mas_error(MERR_IO);
        goto failure;
    }

    /* parse it */
    err = net_parse_authmsg( packet.payload, packet.payload_len, &rarg, &peer->infopack, &peer->version );
    if ( err < 0 )
        return err;

    masc_rtfree( packet.payload );

    if ( rarg == NULL || rarg[0] != 'O' || rarg[1] != 'K' )
    {
        masc_rtfree( rarg );
        return_value = mas_error( MERR_INVALID );
        goto failure;
    }

    masc_rtfree( rarg );

    /* grab our peer's SSRC */
    rtp_get_peer_ssrc( peer->session, &peer->ssrc );

    /* detect error condition */
    if ( masc_test_key( &peer->infopack, "err" ) == 0 )
    {
        masc_pullk_int32( &peer->infopack, "err", &err );
        return_value = err;
        goto failure;
    }

    /* grab the remote end's port numbers, if this is a data channel */
    if ( channel_type == MAS_CHANNEL_TYPE_DATA )
    {
        masc_pullk_int32( &peer->infopack, "src", &remote_source );
        masc_pullk_int32( &peer->infopack, "snk", &remote_sink );
    }
    
    /** retrieve dynamic mas ports and store in peer */
    /*** SINK *********/
    err = setup_port( state, MAS_SINK, MAS_CMATRIX_ANYTHING, &peer->sink );
    if (err < 0)
    {
        masc_log_message( MAS_VERBLVL_ERROR, "net: Couldn't not set up sink for peer %u", peer->ssrc );
        return_value = err;
        goto failure;
    }

    /* schedule our dataflow dependency on the sink for sending */
    schedule_net_send( state, peer, peer->sink );

    /*** SOURCE ********/
    err = setup_port( state, MAS_SOURCE, MAS_CMATRIX_ANYTHING, &peer->source );
    if (err < 0)
    {
        masc_log_message( MAS_VERBLVL_ERROR, "net: Couldn't not set up source for peer %u", peer->ssrc );
        return_value = err;
        goto failure;
    }

    /*** SET THE PORT NAMES ****************/
    
    sprintf( portname, "%u to %s sink", peer->ssrc, display_host );
    err = masd_set_port_name( peer->sink, portname );
        
    sprintf( portname, "%u to %s source", peer->ssrc, display_host );
    err = masd_set_port_name( peer->source, portname );

    /* form the reply */
    masc_setup_package( &package, NULL, 0, MASC_PACKAGE_NOFREE );
    masc_pushk_int32( &package, "lsrc", peer->source );
    masc_pushk_int32( &package, "lsnk", peer->sink );
    if ( channel_type == MAS_CHANNEL_TYPE_DATA )
    {
        masc_pushk_int32( &package, "rsrc", remote_source );
        masc_pushk_int32( &package, "rsnk", remote_sink );
    }
    masc_finalize_package( &package );

    /* ...and send it */
    masd_reaction_queue_response( state->reaction, package.contents, package.size );
    masc_strike_package( &package );

    masc_log_message(MAS_VERBLVL_DEBUG, "net: added channel to peer %u", peer->ssrc);

    /* add RTP socket to select() fd_set bitmap */
    FD_SET(peer->session->rx_rtp_socket, &state->fd_set_all_auth_sockets);
    FD_SET(peer->session->rx_rtcp_socket, &state->fd_set_all_auth_sockets);
    state->max_auth_fd = max(state->max_auth_fd, peer->session->rx_rtp_socket);
    state->max_auth_fd = max(state->max_auth_fd, peer->session->rx_rtcp_socket);

    poll_data_if_necessary( state );
    
    return_value = 0;
    goto success;

 failure:
    /* return error to client. */
    masd_error_response( state->reaction, return_value );

 success:
    masc_exiting_log_level();
    return return_value;
}

int32
mas_net_auth1( int32 device_instance, void* predicate )
{
    char                      *authmsg;
    int                       authmsg_len;
    char                      nametemp[MAX_PORT_NAME_LENGTH];
    char                      portname[MAX_PORT_NAME_LENGTH];
    char                      channeltype[MAX_PORT_NAME_LENGTH];
    char buffer[1024];
    struct net_rtp_peer_node* peer;
    struct net_rtp_state*     state;
    int32                     peer_id = *(int32*)predicate;
    struct rtp_packet         rtppacket;
    int32                     err, return_value;
    struct mas_package        package;
    char *arg;

    masc_entering_log_level("net: mas_net_auth1");

    masd_get_state(device_instance, (void**)&state);

    /* find the peer we're authenticating */
    peer = state->peer_list_head->next;
    while ( peer )
    {
        if ( peer->id == peer_id ) break;
        peer = peer->next;
    }
    if ( peer == NULL )
    {
      masc_log_message(MAS_VERBLVL_ERROR, "net: unable to find peer id: %d", peer_id);
      return_value = mas_error(MERR_INVALID);
      goto failure;
    }

    /***** AUTHENTICATION *******************************************/
    
    /* retrieve the next packet from the library's buffers */
    err = rtp_p2p_recv( peer->session, &rtppacket, TRUE, FALSE );
    if ( err < 0 ) 
    {
      masc_log_message(MAS_VERBLVL_ERROR, "net: Error recieving authentication packet.  Disconnecting peer.");
        return_value = mas_error(MERR_COMM) | mas_make_derror(-err) | MAS_ERR_WARNING; 
      goto failure;
    }

    /* parse the message sent by the peer */
    err = net_parse_authmsg( rtppacket.payload, rtppacket.payload_len, &arg, &peer->infopack, &peer->version );
    if ( err < 0 )
    {
      masc_log_message(MAS_VERBLVL_ERROR, "net: Error in peer's authentication packet.  Disconnecting peer.");
        return_value = mas_error(MERR_COMM) | mas_make_derror(-err) | MAS_ERR_WARNING; 
        return err;
    }

    masc_rtfree( rtppacket.payload );
    rtppacket.payload = NULL;

    /********************************************************************
     **** No identity verification is done.  If it were to be done, it
     **** would be done here, using the info package pulled out of the
     **** authmsg.
     ********************************************************************/
    
    if ( peer->version )
        masc_log_message( MAS_VERBLVL_DEBUG, "net: authenticating client with maslib version %s", peer->version );
    
    if ( peer->infopack.contents )
    {
        masc_log_message( MAS_VERBLVL_DEBUG, "net: client's information package follows:");
        masc_debug_package( &peer->infopack, 0 );
    }
    
    /* Set OUR ssrc for this peer.  This is the SSRC that the peer
     * will see on packets from US. */
    if ( state->ssrc == 0 )
        rtp_get_local_ssrc( peer->session, &state->ssrc );
    else rtp_set_local_ssrc( peer->session, state->ssrc );

    /* test for channel type in the arg */
    if ( strncmp( arg, "CONTROL", 7 ) == 0 )
    {
        /* Control channels use 32-bit NTP timestamp */
        rtp_set_tsu(peer->session, 1.52590218967E-5);
        peer->type = MAS_CHANNEL_TYPE_CONTROL;
        strcpy( channeltype, "control");
    }
    else if ( strncmp( arg, "DATA", 4 ) == 0 )
    {
        peer->type = MAS_CHANNEL_TYPE_DATA;
        strcpy( channeltype, "data");
    }
    else
    {
        strcpy( channeltype, "undefined type");
    }
    
    /* reset the state of the socket & library buffers */
    peer->data_in_rtp_socket = FALSE;
    peer->data_in_rtcp_socket = FALSE;

    peer->data_in_library = rtp_is_data_in_library_queue(peer->session);
    if (peer->data_in_library < 0) 
        peer->data_in_library = 0;
        
    /* grab our peer's SSRC */
    rtp_get_peer_ssrc( peer->session, &peer->ssrc );

    /** special handling for DATA channel */
    if ( peer->type == MAS_CHANNEL_TYPE_DATA )
    {
        /*** SINK *********/
        err = setup_port( state, MAS_SINK, MAS_CMATRIX_ANYTHING, &peer->sink );
        if (err < 0)
        {
            masc_log_message( MAS_VERBLVL_ERROR, "net: Couldn't not set up sink for peer %u", peer->ssrc );
            return_value = err;
          goto failure;
        }

        /* schedule our dataflow dependency on the sink for sending */
        schedule_net_send( state, peer, peer->sink );
        
        /*** SOURCE ********/
        err = setup_port( state, MAS_SOURCE, MAS_CMATRIX_ANYTHING, &peer->source );
        if (err < 0)
        {
            masc_log_message( MAS_VERBLVL_ERROR, "net: Couldn't not set up source for peer %u", peer->ssrc );
            return_value = err;
          goto failure;
        }
    
        /*** SET THE PORT NAMES ****************/
        sprintf( portname, "%u %s sink", peer->ssrc, channeltype );
        err = masd_set_port_name( peer->sink, portname );
        
        sprintf( portname, "%u %s source", peer->ssrc, channeltype );
        err = masd_set_port_name( peer->source, portname );

        /* HACK: this is an issue.  For lossless connections, this is
           fine.  For UDP, we can't really rely on this making it to
           the other side.  In that case, it needs to go over the
           control connection belonging to this SSRC */
        masc_setup_package( &package, buffer, sizeof buffer, MASC_PACKAGE_STATIC );
        masc_pushk_int32( &package, "src", peer->source );
        masc_pushk_int32( &package, "snk", peer->sink );
        masc_finalize_package( &package );

        /* TEMPORARY, 44kHz srate */
        rtp_set_tsu(peer->session, 2.26757E-5);

        masc_log_message( MAS_VERBLVL_DEBUG, "net: adding data channel for peer %u", peer->ssrc);
    } /* ENDIF peer->type == MAS_CHANNEL_TYPE_DATA */

    /** special handling for CONTROL channel */
    if ( peer->type == MAS_CHANNEL_TYPE_CONTROL )
    {
        /** retrieve dynamic mas ports and store in peer */
        err = setup_port( state, MAS_RESPONSE, "", &peer->response );
        if (err < 0)
        {
            masc_log_message( MAS_VERBLVL_ERROR, "net: Couldn't not set up response port for peer %u", peer->ssrc );
            return_value = err;
          goto failure;
        }

        /* schedule our dataflow dependency on the sink for sending */
        schedule_net_send( state, peer, peer->response );

        /** construct the mas_port name */
        switch ( peer->session_type )
        {
        case RTP_SESSTYPE_UNIX:
            strcpy(portname, "UNIX");
            break;
        case RTP_SESSTYPE_TCP:
            strcpy(portname, inet_ntoa(((struct sockaddr_in*)peer->peer_addr)->sin_addr));
            break;
        default:
            break;
        }
        sprintf( nametemp, " %u", peer->ssrc );
        strcat( portname, nametemp );
        masc_log_message( MAS_VERBLVL_DEBUG, "net: authenticated peer %d: %s", peer->id, portname);
        strcat( portname, " response");
        err = masd_set_port_name( peer->response, portname );

        /* create a tracking assemblage for this new peer, uniquely
           identified with the SSRC.  When the peer disconnects, this
           will allow the assembler to terminate its devices. */
        make_tracking_assemblage( state, peer->ssrc );

        masc_setup_package( &package, buffer, sizeof buffer, MASC_PACKAGE_STATIC );
        masc_pushk_string( &package, "build date", profile_build_date );
        masc_finalize_package( &package );
    } /* ENDIF peer->type == MAS_CHANNEL_TYPE_CONTROL */

    /* send our auth response */
    net_create_authmsg( state->version, "OK", &package, &authmsg, &authmsg_len );
    rtp_p2p_send_control( peer->session, authmsg, authmsg_len );
    masc_strike_package( &package );
    masc_rtfree( authmsg );

    /* add RTP socket to select() fd_set bitmap */
    FD_SET(peer->session->rx_rtp_socket, &state->fd_set_all_auth_sockets);
    FD_SET(peer->session->rx_rtcp_socket, &state->fd_set_all_auth_sockets);
    state->max_auth_fd = max(state->max_auth_fd, peer->session->rx_rtp_socket);
    state->max_auth_fd = max(state->max_auth_fd, peer->session->rx_rtcp_socket);

    poll_data_if_necessary( state );
    
    return_value = 0;

    goto success;

 failure:
    if ( rtppacket.payload != 0 )
        masc_rtfree(rtppacket.payload);
    if ( peer != NULL )
        close_peer( state, peer );
    
success:
    masc_exiting_log_level();
    return return_value;
}

/***************************************************************************
 * mas_net_listen
 *
 *  predicate: unused
 *
 * Starts listening to network transports for connection attempts.
 *  
 * returns: error
 *
 ***************************************************************************/
int32
mas_net_listen(int32 device_instance, void* predicate)
{
    int err, return_value;
    struct net_rtp_state*  state;
    struct sockaddr_in rtp_addr_in;
    struct sockaddr_in rtcp_addr_in;
    struct sockaddr_un rtp_addr_un;
    struct sockaddr_un rtcp_addr_un;
    char rtp_unix_listen_path[MAX_FNAME_LENGTH];
    char rtcp_unix_listen_path[MAX_FNAME_LENGTH];
    int was_listening = FALSE;
    
    masd_get_state(device_instance, (void**)&state);

    if ( state->unix_listen_state == LISTENING || state->tcp_listen_state == LISTENING )
        was_listening = TRUE;
    
    if ( state->unix_listen_state == SETUP )
    {
        /*** Do UNIX domain sockets ****************************************/
        if ( mkdir(MAS_UNIXSOCKDIR, 0777) < 0)
        {
            if (errno != EEXIST)
            {
                masc_log_message(MAS_VERBLVL_ERROR, "Error, can't create directory for socket: %s", MAS_UNIXSOCKDIR); 
                return_value = mas_error(MERR_COMM);
                goto failure;
            }
        }
        if (chmod(MAS_UNIXSOCKDIR, 0777) < 0)
        {
            masc_log_message(MAS_VERBLVL_ERROR, "Error, can't change permissions on directory for socket: %s", MAS_UNIXSOCKDIR);
            return_value = mas_error(MERR_COMM);
            goto failure;
        }
    
        /* create rtp/rtcp UNIX filenames - these are static */
        masc_strlcpy( rtp_unix_listen_path, MAS_UNIXSOCKDIR, sizeof rtp_unix_listen_path );
        masc_strlcat( rtp_unix_listen_path, "/", sizeof rtp_unix_listen_path );
        masc_strlcpy( rtcp_unix_listen_path, rtp_unix_listen_path, sizeof rtcp_unix_listen_path );
        masc_strlcat( rtp_unix_listen_path, RTP_UNIX_LISTEN_FILE, sizeof rtp_unix_listen_path );
        masc_strlcat( rtcp_unix_listen_path, RTCP_UNIX_LISTEN_FILE, sizeof rtcp_unix_listen_path );

        rtp_addr_un.sun_family = AF_UNIX;
        masc_strlcpy(rtp_addr_un.sun_path, rtp_unix_listen_path, RTP_UNIX_PATH_MAX);

        rtcp_addr_un.sun_family = AF_UNIX;
        masc_strlcpy(rtcp_addr_un.sun_path, rtcp_unix_listen_path, RTP_UNIX_PATH_MAX);

        err = rtp_transport_stream_listen(RTP_SESSTYPE_UNIX, &(state->rtp_listen_unix), &(state->rtcp_listen_unix), (struct sockaddr*)&rtp_addr_un, (struct sockaddr*)&rtcp_addr_un);
        if (err < 0)
        {
            masc_log_message(MAS_VERBLVL_ERROR, "Error listening to UNIX rtp transport stream.");
            return_value = mas_error(MERR_COMM);
            goto failure;
        }

        state->unix_listen_state = LISTENING;
        masc_log_message(MAS_VERBLVL_DEBUG, "net: listening for UNIX connections");

        /* use signals to detect incoming connections */
        if ( state->has_signals )
        {
            if ( fcntl( state->rtp_listen_unix, F_SETOWN, getpid() ) < 0 )
                mas_assert( FALSE, "");
            
            if ( fcntl( state->rtp_listen_unix, F_SETFL, FASYNC ) < 0 )
                mas_assert( FALSE, "");
        }
    }

    if ( state->unix_listen_state == STRIKE )
    {
        RTPCLOSESOCKET( state->rtp_listen_unix );
        RTPCLOSESOCKET( state->rtcp_listen_unix );
        state->rtp_listen_unix = -1;
        state->rtcp_listen_unix = -1;
        state->unix_listen_state = NOT_LISTENING;
        masc_log_message(MAS_VERBLVL_DEBUG, "net: no longer listening for UNIX connections.");
    }

    /*** Do TCP sockets *************************************************/

    if ( state->tcp_listen_state == SETUP )
    {
        rtp_addr_in.sin_family = AF_INET;
        rtp_addr_in.sin_port = mas_htons(RTP_TCP_LISTEN_PORT);
        rtp_addr_in.sin_addr.s_addr = mas_htonl(INADDR_ANY);

        rtcp_addr_in.sin_family = AF_INET;
        rtcp_addr_in.sin_port = mas_htons(RTCP_TCP_LISTEN_PORT);
        rtcp_addr_in.sin_addr.s_addr = mas_htonl(INADDR_ANY);

        err = rtp_transport_stream_listen(RTP_SESSTYPE_TCP,
                                          &(state->rtp_listen_tcp),
                                          &(state->rtcp_listen_tcp),
                                          (struct sockaddr*)&rtp_addr_in,
                                          (struct sockaddr*)&rtcp_addr_in);
        if (err < 0)
        {
            masc_log_message(MAS_VERBLVL_ERROR, "Error listening to TCP rtp transport stream.");
            return_value = mas_error(MERR_COMM);
            goto failure;
        }

        state->tcp_listen_state = LISTENING;
        masc_log_message(MAS_VERBLVL_DEBUG, "net: listening for TCP connections");

        /* use signals to detect incoming connections */
        if ( state->has_signals )
        {
            if ( fcntl( state->rtp_listen_tcp, F_SETOWN, getpid() ) < 0 )
                mas_assert( FALSE, "");
            
            if ( fcntl( state->rtp_listen_tcp, F_SETFL, FASYNC ) < 0 )
                mas_assert( FALSE, "");
        }
    }
    
    if ( state->tcp_listen_state == STRIKE )
    {
        RTPCLOSESOCKET( state->rtp_listen_tcp );
        RTPCLOSESOCKET( state->rtcp_listen_tcp );
        state->rtp_listen_tcp = -1;
        state->rtcp_listen_tcp = -1;
        state->tcp_listen_state = NOT_LISTENING;
        masc_log_message(MAS_VERBLVL_DEBUG, "net: no longer listening for TCP connections.");
    }
    
    FD_ZERO(&state->fd_set_all_listen_sockets);
    state->max_listen_fd = 0;

    /* set up select() fd_set bitmap */
    if ( state->unix_listen_state == LISTENING )
    {
        FD_SET(state->rtp_listen_unix, &state->fd_set_all_listen_sockets);
        FD_SET(state->rtcp_listen_unix, &state->fd_set_all_listen_sockets);
        state->max_listen_fd = max(state->max_listen_fd, state->rtp_listen_unix);
        state->max_listen_fd = max(state->max_listen_fd, state->rtcp_listen_unix);
    }

    if ( state->tcp_listen_state == LISTENING )
    {
        /* set up select() fd_set bitmap */
        FD_SET(state->rtp_listen_tcp, &state->fd_set_all_listen_sockets);
        FD_SET(state->rtcp_listen_tcp, &state->fd_set_all_listen_sockets);
        state->max_listen_fd = max(state->max_listen_fd, state->rtp_listen_tcp);
        state->max_listen_fd = max(state->max_listen_fd, state->rtcp_listen_tcp);
    }

    if ( !was_listening && !state->has_signals )
    {
        /* We're not capable of using asynchronous signals to detect
         * incoming connections.  Instead, we gotta poll!  Schedule
         * periodic reaction, 0.1 second period */
        masd_reaction_queue_action(state->reaction, device_instance, "mas_net_check_for_connections", 0, 0, 0, 0, 0, MAS_PRIORITY_ROUNDTUIT, 100000, 0, 0);
    }
    
    return_value = 0;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

/***************************************************************************
 * mas_dev_show_state - standard debugging action
 *
 *  predicate: unused
 *
 * Sends state information to stdout.
 *  
 * returns: error
 *
 ***************************************************************************/
int32
mas_dev_show_state( int32 device_instance, void* predicate )
{
    struct net_rtp_state* state;
    struct net_rtp_peer_node* node;

    masd_get_state(device_instance, (void**)&state);

    masc_log_message(0, "== net state ==================================================");
    masc_log_message(0, "           local SSRC: %u", state->ssrc);
/* first entry in list is a placeholder */
    if (! state->peer_list_head->next)
        masc_log_message(0, "No peers.");
    node = state->peer_list_head->next;

    while( node ) 
    {
        masc_log_message(0, "");
        masc_log_message(0, "-- peer %d, ssrc: %u -----------------------------------------", node->id, node->ssrc);
        masc_log_message(0, "             type: %s", (node->type == MAS_CHANNEL_TYPE_DATA)?"data":"control");
        switch (node->session_type)
        {
        case RTP_SESSTYPE_NONE:
            masc_log_message(0, "          session: NONE");
            break;
        case RTP_SESSTYPE_TCP:
            masc_log_message(0, "          session: TCP");
            if (node->peer_addr)
            {
                masc_log_message(0, "               IP: %s", inet_ntoa(((struct sockaddr_in*)node->peer_addr)->sin_addr));
                masc_log_message(0, "             port: %d", mas_ntohs( ((struct sockaddr_in*)node->peer_addr)->sin_port) );
            }
            break;
        case RTP_SESSTYPE_UNIX:
            masc_log_message(0, "          session: UNIX"); break;
        case RTP_SESSTYPE_UDP:
            masc_log_message(0, "          session: UDP");
            if (node->peer_addr)
            {
                masc_log_message(0, "               IP: %s", inet_ntoa(((struct sockaddr_in*)node->peer_addr)->sin_addr));
                masc_log_message(0, "             port: %d", mas_ntohs( ((struct sockaddr_in*)node->peer_addr)->sin_port) );
            }
            break;
        case RTP_SESSTYPE_FIFO:
            masc_log_message(0, "          session: FIFO"); break;
        case RTP_SESSTYPE_XCLIENTMSG:
            masc_log_message(0, "          session: XCLIENTMSG"); break;
        default:
            masc_log_message(0, "          session: [unknown]");
        }

        masc_log_message(0, "             sink: %d", node->sink);
        masc_log_message(0, "           source: %d", node->source);

        if (node->data_in_rtp_socket) masc_log_message(0, "     data in rtp socket");
        if (node->data_in_rtcp_socket) masc_log_message(0, "     data in rtcp socket");
        if (node->data_in_library) masc_log_message(0, "     data in library");

        rtp_update_sd( node->session );
        _print_sd( &(node->session->sd) );

        node = node->next;
    }

    return 0;
}

int32
mas_get( int32 device_instance, void* predicate )
{
    struct net_rtp_state* state;
    int32 err, return_value = 0;
    int32 retport;
    char* key;
    struct mas_package arg;
    struct mas_package r_package;
    /* list of nuggets.  preserve the terminator */
    static char* nuggets[] = 
        { "list", "stats", "" };
    int i, n=0;
    

    masc_entering_log_level("Getting property: mas_get()");
    
    masd_get_state(device_instance, (void**)&state);

    /* Use the standard get_nugget wrapper. */
    err = masd_get_pre( predicate, &retport, &key, &arg );
    if ( err < 0 )
    {
        masc_log_message(MAS_VERBLVL_ERROR, "Error getting key from package: %s", key);
      return_value = err;
      goto done;
    }

    /* construct our response */
    masc_setup_package( &r_package, NULL, 0, MASC_PACKAGE_NOFREE );
    
    /* count the defined nuggets */
    while ( *nuggets[n] != 0 ) n++;

    i = masc_get_string_index(key, nuggets, n);

    switch(i)
    {
    case 0: /*list*/
        masc_push_strings( &r_package, nuggets, n );
        break;
    case 1: /*stats*/
    {
        struct net_rtp_peer_node* peer;
        struct rtp_source_data* sd;
        int    is_source = FALSE;
        int32  portnum;
        
        if ( arg.contents == 0 )
        {
            return_value = mas_error(MERR_INVALID);
            break;
        }
        
        masc_pull_int32( &arg, &portnum );

        /* find the peer we have to deal with */
        peer = get_peer_from_port( state->peer_list_head, portnum, &is_source);
        /* no such peer? */
        if ( peer == 0 )
        {
            return_value = mas_error(MERR_INVALID);
            break;
        }

        /* collect statistics */
        rtp_update_sd( peer->session );
        sd = &peer->session->sd;

        masc_pushk_float( &r_package, "tsu", sd->tsunit );
        
        if ( is_source )
        {
            masc_pushk_int32( &r_package, "packrcvd", sd->stats.numpacketsreceived );
            masc_pushk_int32( &r_package, "jitter", sd->stats.djitter );
        }
        else
        {
            masc_pushk_float( &r_package, "rtt", sd->stats.rtt.tv_sec*1000.0 + sd->stats.rtt.tv_usec/1000.0 );
            masc_pushk_float( &r_package, "fraclost", (float)sd->rr.fractionlost/2.56 );
            masc_pushk_uint32( &r_package, "packlost", sd->rr.packetslost);
            masc_pushk_uint32( &r_package, "rjitter", sd->rr.jitter);
        }
    }
    break;
    default:
        return_value = mas_error(MERR_INVALID);
        break;
    }

done:
    /* return an error */
    if ( return_value < 0 )
        masc_pushk_int32( &r_package, "err", return_value );

    masc_finalize_package( &r_package );

    /* post the response where it belongs and free the data structures
     * we abused */
    masd_get_post( state->reaction, retport, key, &arg, &r_package );
        
    masc_exiting_log_level();
    return return_value;
}


int32
mas_set( int32 device_instance, void* predicate )
{
    struct net_rtp_state*  state;
    int32 err, return_value;
    char* key;
    struct mas_package arg;
    /* list of nuggets.  preserve the terminator */
    static char* nuggets[] = 
        { "tsu", "mashost", "" };
    int i, n=0;
    

    masc_entering_log_level("Setting property: mas_set()");

    masd_get_state(device_instance, (void**)&state);

    /* Use the standard get_nugget wrapper. */
    err = masd_set_pre( predicate, &key, &arg );
    if ( err < 0 )
    {
        masc_log_message(MAS_VERBLVL_ERROR, "Error setting key from package: %s", key);
      return_value = err;
      goto failure;
    }

    /* count the defined nuggets */
    while ( *nuggets[n] != 0 ) n++;

    i = masc_get_string_index(key, nuggets, n);

    switch(i)
    {
    case 0: /*tsu*/
    {
        float tsu;
        int is_source;
        int32 portnum;
        struct net_rtp_peer_node* peer;

        masc_pullk_int32( &arg, "portnum", &portnum );

        /* find the peer we have to deal with */
        peer = get_peer_from_port( state->peer_list_head, portnum, &is_source);
        /* no such peer? */
        if ( peer == 0 )
        {
            /* cleanup after our mess */
            masc_log_message(MAS_VERBLVL_DEBUG, "net: mas_set('tsu') failed to find peer of port %d", portnum, tsu );
            err = masd_set_post( key, &arg );
          masc_log_message(MAS_VERBLVL_ERROR, "Error getting peer from port.");
            return_value = mas_error(MERR_INVALID);
          goto failure;
        }

        masc_pullk_float( &arg, "tsu", &tsu );
        rtp_set_tsu( peer->session, tsu );
        masc_log_message(MAS_VERBLVL_DEBUG, "net: mas_set('tsu') set tsu of port %d to %f", portnum, tsu );
        break;
    }
    case 1: /* mashost */
    {
        char *hn;
        
        masc_pull_string( &arg, &hn, FALSE );
        err = auth_host_parse( &state->hl, hn );
        
        if ( err < 0 )
        {
            masc_log_message( MAS_VERBLVL_ERROR, "net: mas_set(mashost): failed");
            break;
        }
        
        if ( state->hl.members == 0 )
        {
            state->tcp_listen_state = STRIKE;
            masd_reaction_queue_action_simple(state->reaction, device_instance, "mas_net_listen", NULL, 0 );
        }
        else if ( state->tcp_listen_state != LISTENING )
        {
            state->tcp_listen_state = SETUP;
            masd_reaction_queue_action_simple(state->reaction, device_instance, "mas_net_listen", NULL, 0 );
        }
        
        break;
    }
    default:
        break;
    }

    /* cleanup after our mess */
    err = masd_set_post( key, &arg );

    
    return_value = err;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

/*************************************************************************
 * LOCAL FUNCTIONS
 *************************************************************************/
int32
close_peer( struct net_rtp_state* state, struct net_rtp_peer_node* peer )
{
    /* terminate the peer's assembler tracking assemblage */
    terminate_tracking_assemblage( state, peer->ssrc );
    
    /* destroy the jrtplib session object */
    rtp_destroy_session( peer->session );

    /* pull the socket fd's out of the select auth bitmap */
    if ( peer->session_type == RTP_SESSTYPE_TCP || 
         peer->session_type == RTP_SESSTYPE_UNIX || 
         peer->session_type == RTP_SESSTYPE_UDP )
    {
        FD_CLR( peer->session->rx_rtp_socket,
                &state->fd_set_all_auth_sockets );
        FD_CLR( peer->session->rx_rtcp_socket,
                &state->fd_set_all_auth_sockets );
    }
    /** jrtplib handles closing the sockets for us */


    /* TODO: shrink select auth fd bitmap
     *
     *
     */
    

    /* recycle the mas ports and connections */
    if ( peer->type == MAS_CHANNEL_TYPE_DATA )
    {
        masd_recycle_dynport( &state->dp_pool, state->device_instance, state->reaction, peer->sink );
        masd_recycle_dynport( &state->dp_pool, state->device_instance, state->reaction, peer->source );
    }
    if ( peer->type == MAS_CHANNEL_TYPE_CONTROL )
    {
        masd_recycle_dynport( &state->dp_pool, state->device_instance, state->reaction, peer->response );
    }

    /* destroy the peer */
    masc_rtfree( peer->peer_addr );
    masc_rtfree( peer->session );
    delete_node( peer );

    return 0;
}

int32
close_peers_with_control_ssrc( struct net_rtp_state* state, uint32 ssrc )
{
    struct net_rtp_peer_node* peer;
    struct net_rtp_peer_node* next_peer;
    
    peer = state->peer_list_head->next;

    while ( peer )
    {
        next_peer = peer->next; /* must set before we destroy peer */
        if ( peer->ssrc == ssrc || peer->control_ssrc == ssrc )
        {
            /* kill it */
            close_peer( state, peer );
        }
        peer = next_peer;
    }
    return 0;
}

int32
terminate_tracking_assemblage( struct net_rtp_state* state, uint32 ssrc )
{
    struct mas_event* event;
    char   action_name[] = "mas_asm_terminate_tracking_assemblage";

    event = MAS_NEW( event );
    if ( event == NULL ) return mas_error( MERR_MEMORY );
    masc_setup_event( event );
    
    event->device_instance = MAS_ASM_INSTANCE;
    event->action_name = strdup(action_name);
    event->source_device_subscript = ssrc;
    
    return masd_reaction_queue_event(state->reaction, event );
}

int32
make_tracking_assemblage( struct net_rtp_state* state, uint32 ssrc )
{
    struct mas_event* event;
    char   action_name[] = "mas_asm_make_tracking_assemblage";
            
    event = MAS_NEW( event );
    if ( event == NULL ) return mas_error( MERR_MEMORY );
    masc_setup_event( event );

    event->device_instance = MAS_ASM_INSTANCE;
    event->action_name = strdup(action_name);
    event->source_device_subscript = ssrc;
    
    return masd_reaction_queue_event(state->reaction, event );
}

int32
transform_rtppacket_to_data( struct rtp_packet* rtppacket, 
                             struct mas_data** data_ptr )
{
    struct mas_data* data;

    data = MAS_NEW( data );
    if ( data == NULL ) return mas_error(MERR_MEMORY);
    
    data->header.media_timestamp = rtppacket->ts;
    data->header.sequence = rtppacket->seq;
    data->header.ntp_seconds = 0;
    data->header.ntp_fraction = 0;
    data->length = rtppacket->payload_len;
    data->segment = rtppacket->payload;
    data->allocated_length = rtppacket->payload_len;
    data->header.type = rtppacket->pt;
    data->header.mark = rtppacket->m;
    *data_ptr = data;

    return 0;
}

int32
transform_rtppacket_to_event( struct rtp_packet* rtppacket, struct mas_event** event_ptr )
{
    struct mas_package package;
    struct mas_event* event;
    int32 err, return_value;
    
    masc_entering_log_level("Transforming rtppacket to event.");

    event = MAS_NEW( event );
    if ( event == NULL )
    {
        masc_log_message(MAS_VERBLVL_ERROR, "Error making MAS event.");
      return_value = mas_error(MERR_MEMORY);
      goto failure;
    }

    masc_setup_package( &package, rtppacket->payload, 0, MASC_PACKAGE_STATIC|MASC_PACKAGE_EXTRACT );
    err = masc_unpack_event( &package, event );
    masc_strike_package( &package );
    if ( err < 0 )
    {
        masc_log_message(MAS_VERBLVL_ERROR, "Error unpacking MAS event.");
      return_value = err;
      goto failure;
    }

    
    *event_ptr = event;

    return_value = 0;
    goto success;
failure:
    if ( event != NULL ) masc_rtfree( event );
success:
    masc_exiting_log_level();
    return return_value;
}

int32
_print_sd( struct rtp_source_data* sd )
{
    int    i;
    char   tempstr[256];
    double ntime;
    
    masc_log_message(0, "");
    masc_log_message(0, "- source data ------------------------------------");
    masc_log_message(0, "                     ssrc: %u", sd->ssrc);
    masc_log_message(0, "        has sent new data: %s", (sd->hassentnewdata)?"yes":"no");
    masc_log_message(0, "                is a CSRC: %s", (sd->isaCSRC)?"yes":"no");
    masc_log_message(0, "                       ip: %u", sd->ip);
    masc_log_message(0, "                 rtp port: %d", sd->rtpport);
    masc_log_message(0, "                rtcp port: %d", sd->rtcpport);
    masc_log_message(0, "     timestamp unit (tsu): %f", sd->tsunit);
    masc_log_message(0, "");
    masc_log_message(0, "  - sender report --------------");
    if ( !sd->sr.srreceived ) masc_log_message(0, "    NONE");
    else
    {
        masc_timeval_to_double(&(sd->sr.srtime), &ntime);
        masc_log_message(0, "           received SR at: %lf sec", ntime);
        masc_ntp_to_double(&(sd->sr.ntp), &ntime);
        masc_log_message(0, "                   NTP ts: %lf sec", ntime);
        masc_log_message(0, "                   RTP ts: %u tsu", sd->sr.ts);
        masc_log_message(0, "             packet count: %u", sd->sr.packetcount);
        masc_log_message(0, "               byte count: %u", sd->sr.bytecount);
}
    
    masc_log_message(0, "");
    masc_log_message(0, "  - receiver report ------------");
    if ( !sd->rr.rrreceived ) masc_log_message(0, "    NONE");
    else
    {
        masc_timeval_to_double(&(sd->rr.rrtime), &ntime);
        masc_log_message(0, "             RR timestamp: %lf sec", ntime);
        masc_log_message(0, "            fraction lost: %f%% of whole", (float)sd->rr.fractionlost/2.56);
        masc_log_message(0, "             packets lost: %u", sd->rr.packetslost);
        masc_log_message(0, "extended highest seq. num: %u", sd->rr.exthighseqnum);
        masc_log_message(0, "           receive jitter: %u tsu", sd->rr.jitter);
        masc_ntp32_to_double( sd->rr.lsr, &ntime );
        masc_log_message(0, "         last SR received: %lf sec", ntime);
        masc_ntp32_to_double( sd->rr.dlsr, &ntime );
        masc_log_message(0, "       time since last SR: %lf sec", ntime);
    }

    masc_log_message(0, "");
    masc_log_message(0, "  - source description fields --");
    for (i=0; i<RTP_NUM_SDES_INDICES; i++)
    {
        if ( sd->sdes.sdesinfolen[i] > 0 )
        {
            memcpy(tempstr, sd->sdes.sdesinfo[i], sd->sdes.sdesinfolen[i]);
            tempstr[sd->sdes.sdesinfolen[i]] = 0; /* the null isn't there */
            switch (i)
            {
            case 0: masc_log_message(0, "                    CNAME: %s", tempstr); break;
            case 1: masc_log_message(0, "                     NAME: %s", tempstr); break;
            case 2: masc_log_message(0, "                    EMAIL: %s", tempstr); break;
            case 3: masc_log_message(0, "                    PHONE: %s", tempstr); break;
            case 4: masc_log_message(0, "                      LOC: %s", tempstr); break;
            case 5: masc_log_message(0, "                     TOOL: %s", tempstr); break;
            case 6: masc_log_message(0, "                     NOTE: %s", tempstr); break;
            case 7: masc_log_message(0, "                     PRIV: %s", tempstr); break;
            default: break;
            }
        }
    }

    masc_log_message(0, "");
    masc_log_message(0, "  - statistics -----------------");
    masc_log_message(0, "            has sent data: %s", (sd->stats.hassentdata)?"yes":"no");
    masc_log_message(0, "         packets received: %d", sd->stats.numpacketsreceived);
    masc_log_message(0, "              new packets: %d", sd->stats.numnewpackets);
    masc_log_message(0, "                   cycles: %d", sd->stats.numcycles);
    masc_log_message(0, "     base sequence number: %u", sd->stats.seqbase);
    masc_log_message(0, "extended highest seq. num: %u", sd->stats.maxseq);
    masc_log_message(0, "   previous high seq. num: %u", sd->stats.prevmaxseq);
    masc_log_message(0, "          previous RTP ts: %u tsu", sd->stats.prevts);
    masc_log_message(0, "                   jitter: %u tsu", sd->stats.jitter);
    masc_log_message(0, "                  djitter: %f tsu", sd->stats.djitter);
    masc_log_message(0, "        last message time: %u sec", sd->stats.lastmsgtime);
    masc_timeval_to_double( &(sd->stats.prevpacktime), &ntime);
    masc_log_message(0, "     previous packet time: %lf sec", ntime);
    masc_timeval_to_double( &(sd->stats.rtt), &ntime );
    masc_log_message(0, "          round trip time: %lf ms", sd->stats.rtt.tv_sec*1000.0 + sd->stats.rtt.tv_usec/1000.0);
    return 0;
}

/*** rtp connection manager linked list handling routines *************/

struct net_rtp_peer_node*
new_node( void )
{
    struct net_rtp_peer_node* node;
    struct net_rtp_peer_node* return_value;


    masc_entering_log_level("Creating node.");

    node = MAS_NEW( node );
    if ( node == 0)
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error, unable to allocate memory for node.");
        return_value = 0;
      goto failure;
    }

    node->session_type = RTP_SESSTYPE_NONE;

    return_value = node;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

int32
delete_node( struct net_rtp_peer_node* node )
{
    int32   return_value;
      
    masc_entering_log_level("Deleting node.");
    
    if (node)
    {
        /* connect nodes on either side in list */
        if (node->prev != 0)
            node->prev->next = node->next;
        if (node->next != 0)
            node->next->prev = node->prev;

        masc_rtfree(node);
    }
    else
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error, can not free a NULL node.");
      return_value = mas_error(MERR_MEMORY);
      goto failure;
    }

    return_value = 0;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

int32
append_node(struct net_rtp_peer_node* head,
            struct net_rtp_peer_node* node)
{
    struct net_rtp_peer_node* tail = head;
    
    while( tail->next != NULL ) tail = tail->next;

    tail->next = node;
    node->prev = tail;

    return 0;
}

struct net_rtp_peer_node*
get_peer_from_port( struct net_rtp_peer_node* head, int32 portnum, int* is_source )
{
    struct net_rtp_peer_node* peer;
    struct net_rtp_peer_node* return_value;

    masc_entering_log_level("Deleting node.");

    if ( ! head )
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error, recieved a NULL head.");
      return_value = 0;
      goto failure;
    }
    
    peer = head->next;
    while ( peer != 0) 
    {
        if ( peer->source == portnum )
        {
            *is_source = TRUE;
            break;
        }
        else if ( peer->sink == portnum )
        {
            *is_source = FALSE;
            break;
        }
        else peer = peer->next;
    }

    return_value = peer;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

int32
set_tsu_from_dc( struct net_rtp_peer_node* peer, struct mas_data_characteristic* dc, float* tsu_retval )
{
    int i;
    int rate = 0;
    int mt_rate = 0;
    float tsu = 0.0;
    int32   return_value;

    masc_entering_log_level("Setting tsu from dc: set_tsu_from_dc().");
    
    /* If there's a "mt rate" field in the dc, it overrides the
       sampling rate.  Otherwise, we use the sampling rate to get our
       tsu, assuming one tick == one sampling instant, independent of
       number of channels. */
    i = masc_get_index_of_key(dc, "mt rate");
    if ( i >= 0 )
    {
        mt_rate = atoi( dc->values[i] );
        tsu = 1/(float)mt_rate;
    }

    if ( mt_rate == 0 )
    {
        /* No media timestamp rate, set tsu using sampling rate */
        i = masc_get_index_of_key(dc, "sampling rate");
        if ( i < 0 )
      {
          masc_log_message(MAS_VERBLVL_ERROR, "Error getting index of key: sampling rate.");
          return_value = mas_error(MERR_INVALID);
          goto failure;
      }
        
        rate = atoi( dc->values[i] );
        tsu = 1/(float)rate;
    }
    
    if ( rtp_set_tsu( peer->session, tsu ) < 0 )
    {
      masc_log_message(MAS_VERBLVL_ERROR, "Error setting rtp tsu.");
      return_value = mas_error(MERR_INVALID);
      goto failure;
    }

    *tsu_retval = tsu;
    
    return_value = 0;
    goto success;
failure:
success:
    masc_exiting_log_level();
    return return_value;
}

int32
poll_data_if_necessary( struct net_rtp_state *state )
{
    if ( !state->polling_scheduled )
    {
        /* schedule periodic reaction, max(MAS_MIN_SLEEP,10ms) period */
        masc_log_message( MAS_VERBLVL_DEBUG, "net: scheduling mas_net_poll_data action");
        state->polling_scheduled = TRUE;
        return masd_reaction_queue_action(state->reaction, state->device_instance, "mas_net_poll_data", 0, 0, 0, 0, 0, MAS_PRIORITY_ROUNDTUIT, max(MAS_MIN_SLEEP/1000, 10000), 0, 0);
    }

    return 0;
}

int32
schedule_net_send( struct net_rtp_state *state, struct net_rtp_peer_node *peer, int32 dfdep_portnum )
{
    int32 *dataflow_port_dependency;
    struct mas_package package;
    int32 err;
    
    /* schedule our dataflow dependency on the sink for sending */
    dataflow_port_dependency = masc_rtalloc( sizeof (int32) );
    *dataflow_port_dependency = dfdep_portnum;
    masc_setup_package( &package, NULL, 0, MASC_PACKAGE_NOFREE ) ;
    masc_push_int32( &package, peer->id );
    masc_push_int32( &package, dfdep_portnum );
    masc_finalize_package( &package );
    err = masd_reaction_queue_action(state->reaction, state->device_instance, "mas_net_send", package.contents, package.size, 0, 0, 0, MAS_PRIORITY_DATAFLOW, 1, 1,  dataflow_port_dependency);
    masc_strike_package( &package );
    return err;
}

int32
setup_port( struct net_rtp_state *state, int16 type, char *cmatrix_name, int32 *portnum_retval )
{
    int32 err;
    int32 return_value = 0;
    struct mas_characteristic_matrix *cmatrix;
    
    err = masd_get_dynport( &state->dp_pool, state->device_instance, state->reaction, portnum_retval );
    if (err < 0)
    {
        masc_log_message( MAS_VERBLVL_ERROR, "net: couldn't retrieve dynamic port" );
        return_value = err;
        goto failure;
    }

    err = masd_set_port_type( *portnum_retval, type );
    if ( err < 0 )
    {
        masc_log_message(MAS_VERBLVL_ERROR, "net: Error setting port type to %d.", type);
        return_value = err;
        goto failure;
    }

    if ( cmatrix_name != NULL && cmatrix_name[0] != 0 )
    {
        err = masd_get_cmatrix_from_name( state->device_instance, cmatrix_name, &cmatrix ); 
        if ( err < 0 )
        {
            masc_log_message(MAS_VERBLVL_ERROR, "net: Error getting characteristic matrix from name '%s'", cmatrix_name);
            return_value = err;
          goto failure;
      }

        err = masd_set_port_cmatrix ( *portnum_retval, cmatrix );
        if ( err < 0 )
      {
          masc_log_message(MAS_VERBLVL_ERROR, "net: Error setting port characteristic matrix.");
          return_value = err;
          goto failure;
      }
    }

    /* cmatrix is static, don't free it. */
 failure:
 success:
    return return_value;
}

/* not used? */
#if 0
char* 
get_host_ip_addr_string( char* hostname )
{
    struct hostent* h;
    char* addr;

    h = gethostbyname(hostname);
    if ( h == 0 ) return 0;
    else addr = inet_ntoa(*((struct in_addr*)h->h_addr_list[0]));

    return addr;
}
#endif


Generated by  Doxygen 1.6.0   Back to index