Logo Search packages:      
Sourcecode: mas version File versions

mas_split_device.c

/*
 * Copyright (c) 2001-2003 Shiman Associates Inc. All Rights Reserved.
 * 
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 * 
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 * 
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 */

/* 2 OCT 2002 - rocko - verified reentrant
 * 2 OCT 2002 - rocko - verified timestamp clean
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include "mas/mas_dpi.h"
#include "profile.h"

struct port_node
{
    int32 portnum;
    struct port_node* next;
    struct port_node* prev;
};

struct split_state
{
    struct masd_dynport_pool dp_pool;
    int32  sink, defsrc, reaction;
    struct port_node* src_head;
    struct mas_data_characteristic* dc;
    struct mas_characteristic_matrix* cmatrix;
    int nsrcs;
};

static int32 delete_node( struct port_node* node );
static int32 append_node(struct port_node* head, struct port_node* node);

/*************************************************************************
 * ACTIONS
 *************************************************************************/

/* standard actions ****************************************************/
int32 mas_dev_init_instance( int32 , void* );
int32 mas_dev_show_state( int32 device_instance, void* predicate );
int32 mas_dev_configure_port( int32 device_instance, void* predicate );
int32 mas_dev_disconnect_port( int32 device_instance, void* predicate );


/* device specific actions *********************************************/
int32 mas_split_split( int32 , void* );

int32
mas_dev_init_instance( int32 device_instance, void* predicate )
{
    struct split_state*  state;

    /* Allocate state holder and cast it so we can work on it */
    state       = MAS_NEW(state);
    if ( state == 0 )
      return mas_error(MERR_MEMORY);

    masd_set_state(device_instance, state); /* set device state */
    
    masd_get_port_by_name( device_instance, "default_split_source", &state->defsrc );

    masd_get_port_by_name( device_instance, "sink", &state->sink );

    masd_get_port_by_name( device_instance, "reaction", &state->reaction );
    
    masd_init_dynport_pool( &state->dp_pool, device_instance, state->reaction, 4 );

    /* initialize active source port queue */
    state->src_head = MAS_NEW(state->src_head);
    
    return 0;
}

int32
mas_dev_exit_instance( int32 device_instance, void* predicate )
{
    struct split_state*  state;
    
    MASD_GET_STATE(device_instance, state);

    masd_cleanup_dynport_pool( &state->dp_pool, device_instance, state->reaction );

    /* TODO: there's more stuff to free here. */

    masc_rtfree( state );
    
    return 0;
}

int32
mas_dev_configure_port( int32 device_instance, void* predicate )
{
    struct split_state* state;
    int32*              dataflow_port_dependency;
    int32               err;
    int32               portnum = *(int32*)predicate;
    int32               new_default_portnum;
    char                name[MAX_PORT_NAME_LENGTH];
    struct port_node*   pn;
    struct mas_data_characteristic* odc;

    MASD_GET_STATE(device_instance, state);

    
    /* handle the sink */
    if ( portnum == state->sink )
    {
        err = masd_get_data_characteristic( portnum, &state->dc );
        if ( err < 0 ) return err;

        /* make a copy of the dc for the other port */
        odc = MAS_NEW( odc );
        masc_setup_dc( odc, state->dc->numkeys );
        masc_copy_dc( odc, state->dc );
        
        /* set the default source's data characteristic */
        masd_set_data_characteristic( state->defsrc, odc );

        err = masd_get_cmatrix_from_name( device_instance, MAS_CMATRIX_ANYTHING, &state->cmatrix ); 
        if ( err < 0 ) return err;
        
        /* schedule polling action for the sink we were called for */
        dataflow_port_dependency  = masc_rtalloc( sizeof portnum );
        *dataflow_port_dependency = portnum;
        err = masd_reaction_queue_action(state->reaction, device_instance, 
                                         "mas_split_split", 0, 0, 0, 0, 0,
                                         MAS_PRIORITY_DATAFLOW, 1, 1, 
                                         dataflow_port_dependency);   

        return 0;
    }
    
    /* so we are configuring a source... set the portname to something
       other than default_split_source */
    sprintf(name, "source %d", state->nsrcs);
    masd_set_port_name( portnum, name );
    pn = MAS_NEW(pn);
    pn->portnum = portnum;
    append_node(state->src_head, pn);
    state->nsrcs++;
    
    /** retrieve dynamic mas port  */
    err = masd_get_dynport( &state->dp_pool, device_instance, state->reaction, &new_default_portnum);
    if (err < 0)
    {
      masc_logerror( err, "couldn't retrieve dynamic port");
      return err;
    }

    /* make a copy of the dc for the new default port */
    odc = MAS_NEW( odc );
    masc_setup_dc( odc, state->dc->numkeys );
    masc_copy_dc( odc, state->dc );
        
    /* configure the new default port's info */
    state->defsrc = new_default_portnum;
    masd_set_port_type( new_default_portnum, MAS_SOURCE );    
    masd_set_port_name( new_default_portnum, "default_split_source" );
    masd_set_port_cmatrix ( new_default_portnum, state->cmatrix );
    masd_set_data_characteristic( new_default_portnum, odc );

    return 0;
}

int32
mas_dev_disconnect_port( int32 device_instance, void* predicate )
{
    struct split_state*  state;
    int32 portnum = *(int32*)predicate;
    struct port_node* pn;
    
    MASD_GET_STATE(device_instance, state);
    
    pn = state->src_head->next;
    
    while (pn != 0)
    {
        if ( pn->portnum == portnum )
        {
            delete_node(pn);

            /* recycle the source */
            masd_recycle_dynport( &state->dp_pool, device_instance, state->reaction, portnum );
            return 0;
        }
        pn = pn->next;
    }
    
    return mas_error(MERR_INVALID);
}

int32
mas_dev_show_state( int32 device_instance, void* predicate )
{
    return 0;
}

int32
mas_dev_terminate( int32 device_instance, void* predicate )
{
    return 0;
}

int32
mas_split_split( int32 device_instance, void* predicate )
{
    struct split_state* state;
    struct mas_data* d;
    struct mas_data* nd;
    struct port_node* pn;

    MASD_GET_STATE(device_instance, state);    
    
    /* fetch data */
    masd_get_data( state->sink, &d );
    nd = d; /* the first source will get the original data */

    /* trivial case: if there's no sources configured, just junk the
     * data */
    if ( state->src_head->next == 0 )
    {
        masc_strike_data( d );
        masc_rtfree( d );
    }

    /* loop through all sources */
    pn = state->src_head->next;
    while (pn != 0)
    {
        /* post data to this source */
        masd_post_data( pn->portnum, nd );

        /* copy the data, if we have to */
        if ( pn->next )
        {
            nd = masc_rtalloc( sizeof *nd );
            if ( nd == 0 ) return mas_error(MERR_MEMORY);
            memcpy(nd, d, sizeof *nd);
            
            nd->segment = masc_rtalloc( d->length );
            if ( nd->segment == 0 ) return mas_error(MERR_MEMORY);
            memcpy( nd->segment, d->segment, d->length );
        }

        pn = pn->next; /* advance */
    }
    
    return 0;
}

/*************************************************************************
 * LOCAL FUNCTIONS
 *************************************************************************/

int32
delete_node( struct port_node* node )
{
    if (node)
    {
        /* connect nodes on either side in list */
        if (node->prev != 0)
            node->prev->next = node->next;
        if (node->next != 0)
            node->next->prev = node->prev;

        masc_rtfree(node);
    }
    else return mas_error(MERR_MEMORY);

    return 0;
}

int32
append_node(struct port_node* head, struct port_node* node)
{
    struct port_node* tail = head;
    
    while( tail->next != NULL ) tail = tail->next;

    tail->next = node;
    node->prev = tail;

    return 0;
}

Generated by  Doxygen 1.6.0   Back to index