Logo Search packages:      
Sourcecode: mas version File versions

mas_frag_device.c

/*
 * Copyright (c) 2001-2003 Shiman Associates Inc. All Rights Reserved.
 * 
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 * 
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 * 
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 */


#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include "mas/mas_dpi.h"
#include "profile.h"


#define DEFAULT_OUT_PACKET_SIZE 576
#define CIRCBUF_SIZE 131072
#define MIN_SEGMENT_ALLOC 3072

/* FILE *fp; */


struct circular_buffer
{
    char buf[CIRCBUF_SIZE];
    int  write_head; /* next free position */
    int  read_head; /* start of unused data */
};


struct frag_state
{
    int32 device_instance;
    int32 reaction;
    int32 sink;
    int32 source;

    int bytes_per_sample;
    struct circular_buffer cb;

    uint16 out_packet_size;    
    uint32 media_ts;
    uint32 sequence;
    uint32 expected_in_ts;

    uint8  need_mark;
};


/* local functions */


/* stock circular buffer routines, but changed to work with bytes
   instead of int16's */

void
circular_buffer_init( struct circular_buffer *cb )
{
    memset( cb->buf, 0, CIRCBUF_SIZE );
    cb->write_head = 0;
    cb->read_head = 0;
}


void
circular_buffer_add( struct circular_buffer *cb, char *data, int length )
{
    char *buf;
    int remain;
    int fit;

    buf = cb->buf;
    
    remain = cb->write_head+length - CIRCBUF_SIZE;
    
    if( remain > 0 )
    {
        fit = CIRCBUF_SIZE - cb->write_head;

        if( fit>0 ) 
            memcpy( buf+cb->write_head, data, fit );
        
        memcpy( buf, data+fit , remain );

        cb->write_head = remain;
    }
    else
    {
        memcpy( buf+cb->write_head, data, length );
        cb->write_head += length;
    }
}


void
circular_buffer_get( struct circular_buffer *cb, char *data, int length )
{
    char *buf;
    int remain;
    int fit;

    buf = cb->buf;
    
    remain = cb->read_head+length - CIRCBUF_SIZE;
    
    if( remain > 0 )
    {
        fit = CIRCBUF_SIZE - cb->read_head;

        if( fit>0 ) 
            memcpy( data, buf+cb->read_head, fit );
        
        memcpy( data+fit, buf, remain );

        cb->read_head = remain;
    }
    else
    {
        memcpy( data, buf+cb->read_head, length );
        cb->read_head += length;
    }
}


int
circular_buffer_get_len( struct circular_buffer *cb )
{
    return ( (cb->write_head - cb->read_head + CIRCBUF_SIZE)%CIRCBUF_SIZE );
}



/* standard device actions */


int32
mas_dev_init_instance( int32 device_instance, void* predicate )
{
    struct frag_state*  state;
    
    /* Allocate state holder and cast it so we can work on it */
    state = MAS_NEW(state);
    if ( state == 0 )
      return mas_error(MERR_MEMORY);

    masd_set_state(device_instance, state); /* set device state */

    /* memset( state, 0, sizeof(struct frag_state) ); */
    state->device_instance = device_instance;
    state->out_packet_size = DEFAULT_OUT_PACKET_SIZE;
    circular_buffer_init( &state->cb );
    
    
    masd_get_port_by_name( device_instance, "sink", &state->sink );
    masd_get_port_by_name( device_instance, "source", &state->source );
    masd_get_port_by_name( device_instance, "reaction", &state->reaction );

/*     fp = fopen( "/home/silvio/out.raw", "w" ); */
    
    return 0;
}



int32
mas_dev_configure_port( int32 device_instance, void* predicate )
{
    struct frag_state*  state;
    int32* dataflow_port_dependency;
    int32 portnum = *(int32*)predicate;
    struct mas_data_characteristic* dc;
    int32 err;
    uint8 format, resolution, channels, endian;
    uint32 srate;
    struct mas_data_characteristic *odc;

    masd_get_state(device_instance, (void**)&state);

    /* interpret the dc */
    masd_get_data_characteristic( portnum, &dc );

    /* make a copy of the dc for the other port */
    odc = MAS_NEW( odc );
    masc_setup_dc( odc, dc->numkeys );
    masc_copy_dc( odc, dc );

    if ( portnum == state->sink )
    {
        err = masc_scan_audio_basic_dc( dc, &format, &srate, &resolution, &channels, &endian );

        switch( (int)resolution )
        {
        case 8:
            state->bytes_per_sample = (int)channels;
            break;
            
        case 16:
            state->bytes_per_sample = 2 * (int)channels;
            break;
            
        default:
            state->bytes_per_sample = 4 * (int)channels;
        }
        
        
        /* schedule dataflow dependency on incoming data */
        dataflow_port_dependency = masc_rtalloc( sizeof (int32) );
        *dataflow_port_dependency = state->sink;
        err = masd_reaction_queue_action(state->reaction, device_instance, 
                                         "mas_frag_post", 0, 0, 0, 0, 0,
                                         MAS_PRIORITY_DATAFLOW, 1, 1, 
                                         dataflow_port_dependency);
        if ( err < 0 ) return err;

        /* set the other dc */
        masd_set_data_characteristic( state->source, odc );
    }
    else
    {
        /* set the other dc */
        masd_set_data_characteristic( state->sink, odc );
    }
    
    
    return 0;
}



int32
mas_dev_disconnect_port( int32 device_instance, void* predicate )
{
    struct frag_state*  state;

    masd_get_state(device_instance, (void**)&state);
    
    return 0;
}


int32 mas_dev_terminate( int32 device_instance, void* predicate )
{
    return 0;
}


int32
mas_dev_exit_instance( int32 device_instance, void* predicate )
{
    struct frag_state*  state;

    masd_get_state(device_instance, (void**)&state);
    masc_rtfree( state );
    
    return 0;
}



int32
mas_dev_show_state( int32 device_instance, void* predicate )
{
    return 0;
}



int32
mas_frag_post( int32 device_instance, void* predicate )
{
    struct frag_state* state;
    struct mas_data*   data;
    struct mas_data*   newdata;
    int                can_reuse_data;
    int32              err;

    masd_get_state(device_instance, (void**)&state);

    masd_get_data( state->sink, &data );
    if ( data == 0 ) return mas_error(MERR_INVALID);
    if (data->length == 0) return mas_error(MERR_INVALID);

    
    can_reuse_data = 1;

    
    /* exceptional conditions: either data was dropped, or we got a
       mark bit. In these cases, scrap buffered data and start from
       scratch */

    if( data->header.mark )
    {
        masc_log_message( MAS_VERBLVL_DEBUG, "frag: got a marked packet... passing mark along." );
    }
    
        
    if( data->header.mark || (data->header.media_timestamp != state->expected_in_ts) )
    {
        if( !data->header.mark )
        {
            masc_log_message( MAS_VERBLVL_DEBUG, "frag: data->header.media_timestamp < expected_ts (%lu < %lu); scrapping whatever I didn't get to send out yet and generating a mark bit.", data->header.media_timestamp, state->expected_in_ts );
        }

        state->cb.read_head   = state->cb.write_head = 0;
        state->expected_in_ts = data->header.media_timestamp;
        state->media_ts       = data->header.media_timestamp;
        
        state->need_mark = 1;
    }
    
    state->expected_in_ts += (data->length / state->bytes_per_sample);
    
    
    circular_buffer_add( &state->cb, data->segment, data->length );
    
    
    while( circular_buffer_get_len(&state->cb) >= state->out_packet_size )
    {
        if( can_reuse_data && (data->allocated_length >= state->out_packet_size) )
        {
            newdata          = data;
            can_reuse_data   = 0;
        }
        else
        {
            newdata          = MAS_NEW(newdata);
            if ( newdata == NULL )
                return mas_error(MERR_MEMORY);
            
            newdata->length  = state->out_packet_size;
            newdata->allocated_length = max( state->out_packet_size, MIN_SEGMENT_ALLOC );
            newdata->segment = masc_rtalloc( newdata->allocated_length );
            if ( newdata->segment == NULL )
                return mas_error(MERR_MEMORY);
        }

        
        /* timestamp, sequence and mark bit considerations */
        
        newdata->header.media_timestamp = state->media_ts;
        state->media_ts += state->out_packet_size / state->bytes_per_sample;

        if( state->need_mark )
        {
            state->need_mark     = 0;
            newdata->header.mark = 1;
        }
        
        newdata->header.sequence = state->sequence++;
        
        /* leaving out NTP timestamps for now... */
        
        newdata->length          = state->out_packet_size;


        /* finally, retrieve the data from our buffer */
        circular_buffer_get( &state->cb, newdata->segment, state->out_packet_size );

/*         fwrite( newdata->segment, 1, state->out_packet_size, fp ); */
        
        err = masd_post_data( state->source, newdata );
        if ( err < 0 ) return err;
    }

    
    /* if we were unable to reuse it, free up the old data struct */
    if( can_reuse_data )
    {
        masc_strike_data( data );
        masc_rtfree( data );
    }

    return 0;
}

int32
mas_get( int32 device_instance, void* predicate )
{
    struct frag_state*  state;
    int32 err;
    int32 retport;
    char* key;
    struct mas_package arg;
    struct mas_package r_package;
    /* list of nuggets.  preserve the terminator */
    static char* nuggets[] =
        { "list", "packet_size", "" };
    int i, n=0;
    
    masd_get_state(device_instance, (void**)&state);

    /* Use the standard get_nugget wrapper. */
    err = masd_get_pre( predicate, &retport, &key, &arg );
    if ( err < 0 ) return err;

    /* construct our response */
    masc_setup_package( &r_package, NULL, 0, MASC_PACKAGE_NOFREE );
    
    /* count the defined nuggets */
    while ( *nuggets[n] != 0 ) n++;

    i = masc_get_string_index(key, nuggets, n);

    switch( i )
    {
    case 0: /*list*/
        masc_push_strings( &r_package, nuggets, n );
        break;
    case 1: /*packet_size*/
        masc_pushk_uint16( &r_package, "packet_size", state->out_packet_size );
        break;
    default:
        break;
    }

    masc_finalize_package( &r_package );
    
    /* post the response where it belongs and free the data structures
     * we abused */
    err = masd_get_post( state->reaction, retport, key, &arg, &r_package );

    return err;
}


int32
mas_set( int32 device_instance, void* predicate )
{
    struct frag_state*  state;
    int32 err;
    char* key;
    struct mas_package arg;
    /* list of nuggets.  preserve the terminator */
    static char* nuggets[] =
        { "packet_size", "" };
    int i, n=0;

    masd_get_state(device_instance, (void**)&state);

    /* Use the standard get_nugget wrapper. */
    err = masd_set_pre( predicate, &key, &arg );
    if ( err < 0 ) return err;

    /* count the defined nuggets */
    while ( *nuggets[n] != 0 ) n++;

    i = masc_get_string_index(key, nuggets, n);

    switch(i)
    {
    case 0: /*packet_size*/
    {
        masc_pull_uint16( &arg, &state->out_packet_size );
        masc_log_message( MAS_VERBLVL_DEBUG, "frag: mas_set(packet_size) setting packet size to %d bytes", state->out_packet_size );
    }
    break;
    
    default:
        break;
    }

    /* cleanup after our mess */
    err = masd_set_post( key, &arg );

    return err;
}


Generated by  Doxygen 1.6.0   Back to index