Logo Search packages:      
Sourcecode: mas version File versions

scheduler.c

/*
 * Copyright (c) 2001-2003 Shiman Associates Inc. All Rights Reserved.
 * 
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 * 
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 * 
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 */
/*
 *
 * Copyright (c) 2000, 2001 by Shiman Associates Inc. and Sun
 * Microsystems, Inc. All Rights Reserved.
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions: The above
 * copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 * DEALINGS IN THE SOFTWARE.
 *
 * Except as contained in this notice, the names of the authors or
 * copyright holders shall not be used in advertising or otherwise to
 * promote the sale, use or other dealings in this Software without
 * prior written authorization from the authors or copyright holders,
 * as applicable.
 *
 * All trademarks and registered trademarks mentioned herein are the
 * property of their respective owners. No right, title or interest in
 * or to any trademark, service mark, logo or trade name of the
 * authors or copyright holders or their licensors is granted.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef _POSIX_C_SOURCE
#include <sys/types.h>
#include <unistd.h>
#endif
#include "mas_internal.h"
#include "assembler.h"
#include "scheduler.h"
#include "mc.h"
#include "mas/mas_nanosleep_reality.h"
#include "mas_dpi.h"
#include "signal_handler.h"
#include "procstat.h"

#define MAX_SLEEP_TIME_US 20000
#define HALF_UINT32_MAX 2147483647U
#define MAS_ACTIVE_INCREASE 0.5
#define EVENT_FREQ_WINSIZE 3

/* list of actions.  preserve the terminator */
static char* actions[] =  { "mas_sch_move_data", "mas_sch_shutdown", "mas_sch_end", "mas_sch_response", "mas_sch_priority", "mas_sch_set_event_period", "mas_sch_set_event_clkid", "mas_sch_log_event", "mas_sch_update_event_freq", "mas_sch_strike_event", "" };

static struct mas_event* _event_queue_head;
static struct mas_mc_cb* _mc_cb;
static int _finished;
static struct mas_stats _event_freq_stats;
static int32 _event_counter = 0;
static int   _handle_signal = 0;
struct mas_event *_current_event = 0;


/*** A good percentage of the computations and debug messages below
     rely on _sclkid == MAS_MC_SYSCLK_US.  Don't change unless you
     really know what you're doing! */
static int _sclkid = MAS_MC_SYSCLK_US; /* scheduler clock used for loop */

/* local prototypes *************************************/
static int32 are_port_dependencies_met( struct mas_event* event );
static int32 are_signal_dependencies_met( struct mas_event* event );
static int32 event_is_pending_before( struct mas_event* event, uint32
                              before_time_us ); 
static int32 process_reaction_port( int32 portnum, struct mas_event**
                            retval_event);
static int32 adjust_event_time_for_action_time( struct mas_event* event );
static struct mas_event* get_event_by_id( struct mas_event* head, int id );
static void compute_event_sc_times( struct mas_event* event );
static void update_jitter( struct mas_event* event, struct mas_stats* jitterstats, uint32 now_ts_sc );
static void update_event_freq( void );
static int32 update_event_delta( struct mas_event *event );

/* static variables *************************************/

char** 
mas_sch_actions( void )
{
    return actions;
}

int32
do_action( int32 device_instance, int action_index, void* predicate)
{
    int     (*action)(int32 , void* ) = NULL;
    int32   err;

    err = mas_asm_get_action_from_index(device_instance, action_index, &action); 
    if (err < 0) return mas_error(MERR_NOTDEF);

    err = (*action)(device_instance, predicate);

    return err;
}

int32
do_event( struct mas_event* event )
{
    int32 retval = 0;
    int32 err;
    struct mas_event* reaction_event;
    struct mas_data*  data;
    struct mas_package package;
    int n = 0;
    char action_debug[256];
    static uint32 expected_ts_clk = 0;
    static uint32 ts_us_ref = 0; 

    _current_event = event;
    snprintf(action_debug, 255, "action '%s' on device %d", event->action_name, event->device_instance );
    masc_entering_log_level( action_debug );
    masc_log_message( MAS_VERBLVL_DEBUG+3, "* \"%s\" on device %d", event->action_name, event->device_instance); 
    
    _event_counter++;
    
    if ( event->device_instance == MAS_SCH_INSTANCE )
    {
        if ( ! event->valid_action_index )
        {
            /* count the defined actions */
            while ( *actions[n] != 0 ) n++;
            
            event->action_index = masc_get_string_index(event->action_name, actions, n);
            event->valid_action_index = TRUE;
        }
        
        /**
         ** Be sure to add your new action to the "actions" array
         ** defined at the top of this file.
         **
         **/
        switch (event->action_index)
        {
        case 0: /*mas_sch_move_data*/
          /* predicate is source, sink */
          masd_get_data( *(int32*)(event->predicate), &data );
/*             masc_log_message( 0, "sch: moved data seq # %d from port %d to port %d.", data->header.sequence, *(int32*)(event->predicate), *((int32*)(event->predicate)+1) ); */
          masd_post_data( *((int32*)(event->predicate)+1), data );
            break;
      case 1: /*mas_sch_shutdown*/
            /* This begins the shutdown procedure. */
            mas_sig_shutdown = TRUE;
            break;
        case 2: /*mas_sch_end*/
            /* This bails out of the main loop. */
            _finished = TRUE;
        case 3: /*mas_sch_response*/
          if ( event->response > 0 )
          {
            /* post the predicate to a response port */
                data = MAS_NEW( data );
            data->length = event->predicate_length;
            data->allocated_length = data->length;
            data->segment = event->predicate;
            event->predicate = 0;
            masd_post_data( event->response, data );
          } 
            break;
        case 4: /* mas_sch_priority */
          if ( event->response > 0 )
          {
                /* post "yes" package */
                data = MAS_NEW( data );
                masc_setup_package( &package, NULL, 0, MASC_PACKAGE_NOFREE );
                masc_pushk_int32( &package, "pri", masc_get_real_time_priority() );
#ifdef _POSIX_C_SOURCE
                masc_pushk_int32( &package, "pid", getpid() );
#else
                masc_pushk_int32( &package, "pid", 0 );
#endif
                masc_finalize_package( &package );
                data->segment = package.contents;
                data->allocated_length = package.allocated_size;
            data->length = package.size;
            masd_post_data( event->response, data );
                masc_strike_package( &package );
          } 
            break;
        case 5: /*mas_sch_set_event_period*/
        {
            uint32 period;
            
            struct mas_event* source_event = 0;
            masc_setup_package( &package, event->predicate, event->predicate_length, MASC_PACKAGE_NOFREE|MASC_PACKAGE_EXTRACT );
            masc_pull_uint32( &package, &period );
            masc_strike_package( &package );

            if ( event->source_event_id != 0 )
                source_event = get_event_by_id( _event_queue_head, event->source_event_id );

            if ( source_event == 0 )
            {
                masc_log_message( MAS_VERBLVL_ERROR, "error: nonexistent event %u.", event->source_event_id );
                retval = mas_error(MERR_INVALID);
            }
            
            if ( retval >= 0 ) source_event->period = period;
            
            break;
        }
        case 6: /*mas_sch_set_event_clkid*/
        {
            int32 clkid;
            uint32 ts;

            struct mas_event* source_event = 0;
            masc_setup_package( &package, event->predicate, event->predicate_length, MASC_PACKAGE_NOFREE|MASC_PACKAGE_EXTRACT );
            masc_pull_int32( &package, &clkid );
            masc_strike_package( &package );

            if ( event->source_event_id != 0 )
                source_event = get_event_by_id( _event_queue_head, event->source_event_id );

            if ( source_event == 0 )
            {
                masc_log_message( MAS_VERBLVL_ERROR, "sch: error: nonexistent event %u.", event->source_event_id );
                retval = mas_error(MERR_INVALID);
            }

            if ( retval >= 0 )
            {
                /* We have to convert the act_time clock values.
                   Force a recompute of the scheduler-clock values and
                   adjust either the periodic event times or the
                   single event time. */
                masc_log_message( MAS_VERBLVL_DEBUG, "sch: changing clkid of event %d.  Old clkid %d, new clkid %d.", source_event->id, source_event->clkid, clkid );
                source_event->sc_valid = FALSE;
                if ( !source_event->act_time )
                {
                    mas_mc_convert( _mc_cb, source_event->clkid, source_event->next_act_time, clkid, &ts );
                    masc_log_message( MAS_VERBLVL_DEBUG, "sch: event %d next_act_time was %u, is now %u.", source_event->id, source_event->next_act_time, ts );
                    source_event->next_act_time = ts;
                    mas_mc_convert( _mc_cb, source_event->clkid, source_event->last_act_time, clkid, &ts );
                    masc_log_message( MAS_VERBLVL_DEBUG, "sch: event %d last_act_time was %u, is now %u.", source_event->id, source_event->last_act_time, ts );
                    source_event->last_act_time = ts;
                }
                else
                {
                    mas_mc_convert( _mc_cb, source_event->clkid, source_event->act_time, clkid, &ts );
                    masc_log_message( MAS_VERBLVL_DEBUG, "sch: event %d act_time was %u, is now %u.", source_event->id, source_event->act_time, ts );
                    source_event->act_time = ts;
                }
                
                source_event->clkid = clkid;
            }
            
            break;
        }
        case 7: /*mas_sch_log_event*/
        {
            /* This function can be used to test scheduler accuracy. */
            uint32 ts_us, ts_clk;
            uint32 expected_ts_us;
            int32  ts_error;

            mas_mc_get_val( _mc_cb, event->clkid, &ts_clk );
            mas_mc_convert( _mc_cb, event->clkid, ts_clk, MAS_MC_SYSCLK_US, &ts_us );

            if ( expected_ts_clk == 0 )
            {
                expected_ts_clk = ts_clk;
                ts_us_ref = ts_us;
            }
            else
            {
                expected_ts_clk += event->period;
            }
            
            mas_mc_convert( _mc_cb, event->clkid, expected_ts_clk, MAS_MC_SYSCLK_US, &expected_ts_us );

            /* handle unsigned integer wrapping cases correctly.
               In a perfect universe, we'd want x = a-b, except that here,
               a and b are unsigned.  We handle the following cases:
               
               if a>>b, x = -(signed)(b-a)   : b wrapped before a
               if a<<b, x =  (signed)(a-b)   : a wrapped before b
               if a>b,  x =  (signed)(a-b)
               if a<b,  x = -(signed)(b-a)
               
            */
            
            if ( ts_us - expected_ts_us >= HALF_UINT32_MAX )
            {
                ts_error = (int32)(expected_ts_us - ts_us);
            }
            else
            {
                ts_error = -1 * (int32)(ts_us - expected_ts_us);
            }
            
            masc_log_message( 0, "mas_sch_log_event: scheduling error: %u %d", ts_us-ts_us_ref, ts_error );
            masi_print_event( event );
            break;
        }
        case 8: /*mas_sch_update_event_freq*/
        {
            update_event_freq();
            break;
        }
        case 9: /*mas_sch_strike_event*/
        {
            struct mas_event* source_event = 0;
            
            if ( event->source_event_id != 0 )
                source_event = get_event_by_id( _event_queue_head, event->source_event_id );

            if ( source_event == 0 )
            {
                masc_log_message( MAS_VERBLVL_ERROR, "sch: error: nonexistent event %u.", event->source_event_id );
                retval = mas_error(MERR_INVALID);
            }

            if ( retval >= 0 )
            {
                masc_log_message( MAS_VERBLVL_DEBUG, "sch: striking event %u", source_event->id );
                masc_strike_event( source_event );
                masc_rtfree( source_event );
            }
            
            break;
        }
        default: /* else: just discard the event */
            masc_log_message( MAS_VERBLVL_ERROR, "sch error: unknown action %s.", event->action_name );
            break;
        }
    }     /* test if this is an assembler action */
    else if (strncmp(event->action_name, "mas_asm_", 8) == 0 || event->device_instance == MAS_ASM_INSTANCE)
    {
      retval = mas_asm_action_handler( event );
    }
    else if (event->device_instance == MAS_MC_INSTANCE)
    {
      retval = mas_mc_action_handler( _mc_cb, event );
    }
    else /* it's not an assembler action */
    {
        /* make sure we have an action index */
        retval = mas_asm_set_event_action_index( event );
        if ( retval < 0 )
            goto done;

        retval = do_action( event->device_instance, event->action_index, event->predicate );
      /* get the reaction port if we haven't already */
        if ( event->reaction == 0 )
        {
            err = masd_get_port_by_name ( event->device_instance, "reaction", &event->reaction);
            if ( err < 0 )
                goto done;
        }
        
        /* We're not handling this quite right: If there's an error,
           we might need to send a response back to the guy who queued
           the event! */
        /**** get all the events */
        while ( process_reaction_port( event->reaction, &reaction_event) == 0 )
        {
            if ( reaction_event != 0 )
            {
                /* set the source event for this reaction event */
                reaction_event->source_event_id = event->id;

                /* the reaction event's source is the current device */
                reaction_event->source_device_instance = event->device_instance;
                    
                /* if this is a response, set the event's response port */
                if ( strcmp( reaction_event->action_name, "mas_sch_response" ) == 0 )
                {
                    /* Don't queue response events that don't have
                       a destination */
                    if ( event->response == 0 ) 
                    {
                        masc_strike_event( reaction_event );
                        masc_rtfree( reaction_event );
                        reaction_event = 0;
                    }
                    else reaction_event->response = event->response;
                }

                /* check again, then schedule. */
                if ( reaction_event != 0 )
                {
                    err = mas_asm_schedule_event_struct( reaction_event );
                    if ( err < 0 )
                    {
                        masc_log_message( err, "Error queuing event.  Event to follow" );
                        masi_print_event( reaction_event );
                    }
                }

                reaction_event = 0;
            }
        }
    }

 done:
    masc_log_message( MAS_VERBLVL_DEBUG+3, "* returned %d", err);

    /* log errors if they occur */
    if ( retval < 0 )
    {
        if ( ! mas_get_severity(retval) ) 
            retval |= MAS_ERR_ERROR;
        masc_logerror(retval, "sch: Error triggering action.  Event structure to follow.");
        masi_print_event( event );
    }
        
    masc_exiting_log_level();
    _current_event = 0;
    return retval;
}

int32 scheduler_bootstrap( int flags )
{
    int32 net_device_instance = 0;
    int32 anx_instance = 0;
    int32 mix_instance = 0;
    int32 err;
    int32 audio_sink, mix_source;
    struct mas_data_characteristic dc;
    struct mas_package p;
    struct mas_package arg;

    masc_entering_log_level("Bootstrapping initial assemblage");
    
    _event_queue_head = mas_asm_get_event_queue_head( );
    _mc_cb = mas_asm_get_mc_cb( );
    
    err = mas_asm_instantiate_device("net", 0, NULL, &net_device_instance);
    if (err < 0)
    {
      err |= MAS_ERR_CRITICAL;
      masc_logerror(err, "Can't instantiate device.");
        masc_exiting_log_level();
      return err;
    }

    mas_asm_schedule_event_simple( net_device_instance, "mas_net_listen", 0); 

    if ( flags & LOAD_ANX_ASMG )
    {
        err = mas_asm_instantiate_device("anx", 0, "anx 0", &anx_instance);
        if (err < 0)
        {
            err |= MAS_ERR_CRITICAL;
            masc_logerror(err, "Can't instantiate anx device.");
            masc_exiting_log_level();
            return err;
        }
        
        err = mas_asm_instantiate_device("mix", 0, "anx 0", &mix_instance);
        if (err < 0)
        {
            err |= MAS_ERR_CRITICAL;
            masc_logerror(err, "Can't instantiate mix device.");
            masc_exiting_log_level();
            return err;
        }

        masc_setup_dc(&dc, 6);
        masc_append_dc_key_value(&dc, "format", "linear");
        masc_append_dc_key_value(&dc, "resolution", "16");
        masc_append_dc_key_value(&dc, "channels", "2");
        masc_append_dc_key_value(&dc, "sampling rate", "44100");
        masc_append_dc_key_value(&dc, "endian", "host");
        err = masd_get_port_by_name(MAS_ALL_DEVICES, "audio_sink", &audio_sink);
        if ( err < 0 ) 
            masc_logerror(err, "couldnt get the audio sink");
        err = masd_get_port_by_name(MAS_ALL_DEVICES, "mix_source", &mix_source);
        if ( err < 0 ) 
            masc_logerror(err, "couldnt get the mix source");
        
        err = mas_asm_connect_source_sink(mix_source, audio_sink, &dc);
        if ( err < 0 )
            masc_logerror(err, "couldn't connect mix source to anx sink");

        masc_strike_dc( &dc );

        /* Set the clock on the mix device to the sample clock from
         * the anx device. */
        masc_setup_package( &arg, NULL, 0, 0 ); /* free when striking */
        masc_pushk_int32( &arg, "mc_clkid", 9 ); /* ugly, I know... */
        masc_finalize_package( &arg );
        masc_setup_package( &p, NULL, 0, MASC_PACKAGE_NOFREE );
        masc_pushk_string( &p, "key", "mc_clkid" );
        masc_pushk_package( &p, "arg", &arg );
        masc_finalize_package( &p );
        masc_strike_package( &arg );
        
        mas_asm_schedule_event_simple( mix_instance, "mas_set", p.contents );
        masc_strike_package( &p );
    }
    
    /* compute the event frequency */
    mas_asm_schedule_event_periodic( MAS_SCH_INSTANCE, "mas_sch_update_event_freq", 0, MAS_PRIORITY_ASAP, 1000000, MAS_MC_SYSCLK );

#if 0
    /* test the scheduler's periodic response */
    err = mas_asm_schedule_event_periodic( MAS_SCH_INSTANCE, "mas_sch_log_event", 0, MAS_PRIORITY_ASAP, 144, MAS_MC_44100 );
#endif

#if 0
    /* log the master clock values */
    mas_asm_schedule_event_periodic( MAS_MC_INSTANCE, "mas_mc_log_clock_values", 0, MAS_PRIORITY_ASAP, 1000000, MAS_MC_SYSCLK );
#endif
    
    masc_log_message( 0, "" );
    masi_print_instantiated_devices();
    masc_log_message( 0, "" );
    masi_print_assemblages();
    masc_log_message( 0, "" );
    masi_print_ports();
    masc_log_message( 0, "" );
    masi_print_loaded_device_libraries();
    
    masc_exiting_log_level();
    return 0;
}

int32
scheduler_main( int flags )
{
    /* timestamps in _sclkid units */
    u_int32            now_ts             = 0;
    u_int32            start_ts           = 0;/*invalid after wrap*/
    u_int32            action_complete_ts = 0;
    u_int32            last_ts            = 0;
    int32              err = 0;
    int32              triggered_pending_event = 0;
    struct mas_stats   jitterstats;
    struct mas_event*  event;
    struct mas_event*  next_event; /* in case the current event is
                                    * destroyed */

    _finished = 0;

    mas_asm_init();

    /** bootstrap the scheduler's event queue */
    scheduler_bootstrap( flags );

    /** init statistics for jitter and event frequency */ 
    masc_stats_init( &jitterstats, 512, MASC_STATS_ALL );
    masc_stats_init( &_event_freq_stats, EVENT_FREQ_WINSIZE, MASC_STATS_MEAN|MASC_STATS_MINMAX|MASC_STATS_WIN_MEAN|MASC_STATS_WIN_MINMAX );
    
    masc_entering_log_level( "** Scheduler main loop **************" );
    mas_mc_get_val( _mc_cb, _sclkid, &start_ts );
    
    /* new strategy - sleep when you can.
       Do all pending events.  Then sleep, knowing our minimum sleep
       time. */
    while ( !_finished )
    {
      last_ts = now_ts;

      /* timestamp the head of the loop */
        mas_mc_get_val( _mc_cb, _sclkid, &now_ts );
        masc_log_message( MAS_VERBLVL_DEBUG+5, "sch: time at head of loop: %u us from start.\n", now_ts - start_ts );
        
        /* note the the time spent */
        masc_log_message( MAS_VERBLVL_DEBUG+5, "sch: last action: %u us.\n", now_ts - last_ts );

        /** look for pending events in the queue */
      /** do all pending events */
      /** if there aren't any in the queue, then sleep */
      triggered_pending_event = FALSE;
      event = _event_queue_head;
        next_event = _event_queue_head->next;

        /** Check to see if a signal has been raised.  If so, handle
         ** it.  If we were handling a signal before, we're surely
         ** done with it now, so null it out. */
        _handle_signal = 0;
        if ( mas_sig_numraised > 0 )
        {
            _handle_signal = mas_sig_stack[--mas_sig_numraised] + 1;
        }
        
      while ( ( event = next_event ) && !_finished )
      {
            /* For safe removal of event later */
            next_event = event->next;

            /* Derive scheduler clock (usually MAS_MC_SYSCLK_US) times
               from the clocks required by the event.  If the event's
               clocks are the same as the scheduler's clocks, this is
               just a copy. */
            compute_event_sc_times( event );
            
           /* do the event sooner to account for the average time
             * this action has been taking */
            adjust_event_time_for_action_time( event );

            /* check dependencies and pending time */
            /* MAS_MIN_SLEEP/2000 converts the value in nanoseconds to
               microseconds, then divides it by two.  This centers the
               jitter around zero so that roughly half of events are
               early by half the minimum sleep time and half of events
               are late by the same amount.  */
          if ( event_is_pending_before( event, now_ts + MAS_MIN_SLEEP/2000 ) )
          {

#if (DEBUG >= 3)
            /* show the event queue */
            masi_print_event_queue(_event_queue_head);
            masi_print_ports();
#endif

                if ( event->period )
                {
                    /* Set the last action timestamp for the periodic
                   event.  If the event's clocks are the same as
                   the scheduler's, then use the previously
                   computed timestamp.  Otherwise, fetch a new
                   one. */
                    if ( event->clkid == _sclkid )
                        event->last_act_time = now_ts;
                    else
                    {
                        if ( mas_mc_get_val( _mc_cb, event->clkid, &event->last_act_time ) < 0 )
                            masc_logerror( mas_error(MERR_INVALID), "Error reading clock %d.", event->clkid );
                    }
                    
                    /* if event hasn't been done before, set the next
                       act time to this time.  We're sure we haven't lost
                       any events. */
                    if ( event->count == 0 )
                  event->next_act_time = event->last_act_time;

                    update_event_delta( event );
                }
                    
                /****** DO THE EVENT *******/
            err = do_event( event );
                /***************************/

                /* re-set next_event, so we can pick up last-minute
                 * changes to the schedule. */
                next_event = event->next; 

            /* grab timestamp & update the action statistics */
                mas_mc_get_val( _mc_cb, _sclkid, &action_complete_ts );
            mas_asm_record_action_stats( event, action_complete_ts - now_ts );

                /* update the scheduler's jitter measure */
                update_jitter( event, &jitterstats, now_ts );

                /* Update periodic event structures if they aren't
                   flagged to be destroyed. */
                if ( event->period )
            {
                    /* this is a special case.  These events are
                       periodic, but there's no way we can keep up
                       with them.  So, to avoid problems, we just set
                       the time so it's "always due".  The actual
                       minimum usable period length is machine
                       dependent.  So, this only counts for 1us. */
                    if ( event->period == 1 && event->clkid == MAS_MC_SYSCLK_US )
                    {
                        event->next_act_time = event->last_act_time;
                    }
                    else
                    {
                        /* Act again in one full period... */
                        event->next_act_time += event->period;

                        /* NOTE: Since this function operates on the
                         * scheduler clock (sc) counters that are
                         * derived from the clocks required by the
                         * event, we don't have to account for the
                         * adjustment time in the update of
                         * next_act_time anymore.  next_act_time is
                         * incremented by one period for each action,
                         * next_act_time_sc is derived from it, and
                         * adjusted to account for the action time.
                         */
                    }
                    
                    /* the time for the event is no longer adjusted */
                    event->adjusted = FALSE;

                    /* and the scheduler clock-based counters are no
                       longer valid */
                    event->sc_valid = FALSE;
                    
                    /* increment event's counter */
                    event->count++;
            }
                else /* event is not periodic - destroy it */
            {
                    /* Destroy the event if it isn't periodic, or if
                     * the destroy flag is set. */
                masc_strike_event( event );
                    masc_rtfree( event );
            }
                    
                triggered_pending_event = TRUE;

            last_ts = now_ts;
            now_ts = action_complete_ts;

                /* note the the time spent */
                masc_log_message( MAS_VERBLVL_DEBUG+5, "sch: last action: %u us.\n", now_ts - last_ts );
                masc_log_message( MAS_VERBLVL_DEBUG+5, "sch: time now: %u us from start.\n", now_ts - start_ts );
        
          }
      }

      /* test for shutdown signal */
      if ( mas_sig_shutdown )
        {
            mas_sig_shutdown = FALSE;
            mas_asm_killall();
        }
      
      /* see if we did one run through the queue without finding
         stuff to do */
      if ( ! triggered_pending_event )
      {
          /**** Process cpu-usage modulating sleepytime ***********************/

          /* We wrap the call to nanosleep with timestamp counters, so
             we can keep an eye on how long we actually slept.  We work
             with nanoseconds to control nanosleep(), but with
             microseconds to see how long we slept.  You can only fit
             about 4 seconds worth of nanoseconds in a 32-bit unsigned
             int. */
          
           masc_log_message(MAS_VERBLVL_DEBUG+5, "sch: sleeping for %d ms", MAS_MIN_SLEEP/1000000); 
             masc_realsleep(MAS_MIN_SLEEP);
      }
    }
    
    masc_exiting_log_level();
    
    masc_log_message(MAS_VERBLVL_DEBUG, "");
    masc_log_message(MAS_VERBLVL_DEBUG, "************************************************************************");
    masc_log_message(MAS_VERBLVL_DEBUG, "sch: Leaving main scheduler loop.");

    masc_log_message(0, "");
    masc_log_message(0, "Scheduler jitter:");
    if ( jitterstats.count == 0 )
    {
        masc_log_message( 0, "                   NO DATA", jitterstats.count);
    }
    else
    {
        masc_stats_recompute_window( &jitterstats );
        masc_log_message( 0, "                                  N  mean(ms)  sdev(ms)   min(ms)   max(ms)");
        
        masc_log_message( 0, "                   total: % 9lu % 9.2f       n/a % 9.2f % 9.2f", jitterstats.count, jitterstats.mean/1000, jitterstats.min/1000, jitterstats.max/1000);
        masc_log_message( 0, "                windowed: % 9lu % 9.2f % 9.2f % 9.2f % 9.2f", jitterstats.N, jitterstats.win_mean/1000, jitterstats.win_std_dev/1000, jitterstats.win_min/1000, jitterstats.win_max/1000);
    }
    masc_log_message(0, "");

    mas_mc_exit(); /* THIS IS TEMPORARY -- SHOULD BE ASM_EXIT */
    
    masc_exiting_log_level();
    return 0;
}

/***************************************************************************
 * process_reaction_port
 *
 * arguments:
 *  1. portnum
 *  2. pointer to pointer to mas_event struct
 *
 *  Reads a mas_event structure on a reaction port.  Event structure
 *  is allocated by device.
 *
 * returns: MERR_NOTDEF if port is not defined or has no data.
 *
 ***************************************************************************/
int32
process_reaction_port( int32 portnum, struct mas_event** retval_event)
{
    struct mas_data* data;
    int32 err;

    if ( ( err = masd_get_data( portnum, &data )) < 0 )
      return err;
    
    if ( data == NULL ) return mas_error(MERR_NOTDEF);
    
    /* Cast the segment to an event struct -- it's always from inside
       this process. */
    *retval_event = (struct mas_event*)data->segment;
    data->segment = NULL;
    
    /* destroy the data structure, but NOT the segment, since we need
     * to return that. */
    masc_rtfree( data );

    return 0;
}

int32
are_signal_dependencies_met( struct mas_event* event )
{
    /* true for no dependencies */
    if ( event->signal_dep == 0 )
        return TRUE;

    if ( event->signal_dep == _handle_signal )
        return TRUE;
    
    return FALSE;
}

/***************************************************************************
 * are_port_dependencies_met
 *
 * arguments:
 *  1. pointer to struct mas_event
 *
 *  Returns true if the port dependencies of an event are met or if
 *  there are no dependencies.
 *
 * returns: MERR_NULLPTR if event is NULL.
 *
 ***************************************************************************/
int32
are_port_dependencies_met( struct mas_event* event )
{
    int i;

    if ( event == 0 ) return mas_error(MERR_NULLPTR);
    
    /* true for no dependencies */
    if ( event->num_port_dependencies == 0 ) return TRUE;
    
    for (i=0; i<event->num_port_dependencies; i++)
    {
      /* bail if we haven't met a dependency */
      if ( !mas_asm_is_data_on_port( event->port_dependencies[i] ) )
          return FALSE;
    }

    /* we're here, so all dependencies are met */
    return TRUE;
}

int32
event_is_pending_before( struct mas_event* event, uint32 before_time_sc )
{
    int is_pending_now = TRUE;

    /* Check when the periodic event will come up, but only if it has
     * already been acted on once.
     */
    if ( ( event->period > 1 ) && event->count ) 
    {
        /* KNOW WHAT YOU'RE DOING HERE -- The expression below will
           avoid problems due to integer overflow.  For instance, lets
           say the timer would overflow after it reached 9.  If
           before_time_sc == 1 and next_act_time_sc == 9, then
           (before_time_sc - next_act_time_sc) == 2.  This expression
           relies on the assumption that all periods will be less than
           half the total time that can be represented in the timer
           (UINT32_MAX). */

        /* SPECIAL NOTE: An epsilon period (1) event is "always
         * pending", requiring (typically) a seperate dataflow
         * dependency. */

        if (( event->period > 1 ) && ( before_time_sc - event->next_act_time_sc >= HALF_UINT32_MAX ))
        {
          is_pending_now = FALSE;
        }
    }
    else if ( event->act_time_sc )
    {
        /* not periodic, but timed with act_time_sc */
        
        /* For now, just handle like this.  Eventually, we'll need to
           work on a coarse resolution timer. */

        if ( before_time_sc - event->act_time_sc >= HALF_UINT32_MAX )
          is_pending_now = FALSE;
    }
    
    /* only return true if event is pending AND the dependencies are
       met, otherwise say "it's not pending before". */
    /* Perhaps we should do some priority processing here.  Dataflow
       events should behave as described, but maybe other kinds of
       events don't? */
    if ( is_pending_now ) 
      if ( are_port_dependencies_met( event ) )
            if ( are_signal_dependencies_met( event ) )
                return TRUE;
    
    return 0;
}

int32
adjust_event_time_for_action_time( struct mas_event* event )
{
    int32 err;
    struct mas_stats* stats;

    if ( !(event->adjusted) )
    {
        /* just check to make sure we have an action index */
        err = mas_asm_set_event_action_index( event );
        if (err < 0) return err;
        
        err = mas_asm_get_action_stats_ro( event->device_instance, event->action_index, &stats );
        if (err < 0) return err;

        if ( stats )
        {
            /* we're using the windowed mean now, since its a better
               indicator of the current state of the server than the
               cumulative mean */
            if ( ! ( stats->valid & MASC_STATS_WIN_MEAN ) )
                masc_stats_recompute_window( stats );

            event->adjustment_sc = (uint32)stats->win_mean;
            event->next_act_time_sc -= event->adjustment_sc;
            event->adjusted = TRUE;

            masc_log_message( MAS_VERBLVL_DEBUG+3, "adjust_event_time_for_action_time: scheduling %s by %d us earlier", event->action_name, event->adjustment_sc);
        }
    }
    
    return 0;
}

struct mas_event*
get_event_by_id( struct mas_event* head, int id )
{
    struct mas_event* event;

    event = head;
    while (event)
    {
        if ( event->id == id )
            return event;

        event = event->next;
    }

    return 0;
}

void
compute_event_sc_times( struct mas_event* event )
{
    int32 err;
    uint32 fts = 0;

    if ( !event->sc_valid )
    {
        /* only convert periodic event-related times if the event is
           periodic */
        if ( event->period )
        {
            /* don't compute if the times aren't set anyway */
            if ( event->next_act_time == 0 )
                return;
            
            err = mas_mc_convert( _mc_cb, event->clkid, event->last_act_time, _sclkid, &event->last_act_time_sc );
            if ( err < 0 )
            {
                fts = event->last_act_time;
                goto fail;
            }
            
            err = mas_mc_convert( _mc_cb, event->clkid, event->next_act_time, _sclkid, &event->next_act_time_sc );
            if ( err < 0 )
            {
                fts = event->next_act_time;
                goto fail;
            }
            
        }
        else if ( event->act_time )
        {
            /* only convert this time if the event isn't periodic and
               it has act_time set */
            err = mas_mc_convert( _mc_cb, event->clkid, event->act_time, _sclkid, &event->act_time_sc );
            if ( err < 0 )
            {
                fts = event->act_time;
                goto fail;
            }
        }
        
        event->sc_valid = TRUE;
    }

    return;
    
 fail:
    masc_entering_log_level( "sch: compute_event_sc_times" );
    masc_log_message( MAS_VERBLVL_ERROR, "Couldn't convert time %u on clock %d to scheduler's clock.", fts, event->clkid );
    masi_print_event( event );
    masc_exiting_log_level();
    
    return;
}

/* Jitter is measured as (time - expected time).  Positive jitter
   measures indicate that the event happened late, negative jitter
   means the event happened early. */
void
update_jitter( struct mas_event* event, struct mas_stats* jitterstats, uint32 now_ts_sc )
{
    int32  ts_error;

    /* only compute jitter for periodic events (for now) */
    if ( event->period > 1000 && event->count > 0 )
    {
        if ( event->next_act_time_sc - now_ts_sc >= HALF_UINT32_MAX )
        {
            ts_error = (int32)( now_ts_sc - event->next_act_time_sc );
        }
        else
        {
            ts_error = -1 * (int32)( event->next_act_time_sc - now_ts_sc );
        }
        masc_stats_update( jitterstats, (double)ts_error );
    }
    
    return;
}

void
update_event_freq( void )
{
    static int first = 0;
    
    /* throw out the first result */
    if ( first == 0 )
    {
        _event_counter = 0;
        first = 1;
        return;
    }
    
    masc_stats_update( &_event_freq_stats, _event_counter );

    /* TODO: create an activity scale, so that this has more than two
     * values: idle and active. */
    if ( _event_counter >= _event_freq_stats.min + ( _event_freq_stats.min * MAS_ACTIVE_INCREASE ) )
    {
        procstat_update( "active" );
    }
    else
    {
        procstat_update( "idle" );
    }
    
    _event_counter = 0;
}

int32
update_event_delta( struct mas_event *event )
{
    /* Only valid for periodic events */
    if ( event->period == 0 )
        return mas_error(MERR_INVALID);

    /* ...and only those that have been triggered once already */
    if ( event->count == 0 )
    {
        event->delta = 0;
        return 0;
    }
    
    if ( event->next_act_time - event->last_act_time >= HALF_UINT32_MAX )
    {
        event->delta = (int32)( event->last_act_time - event->next_act_time );
    }
    else
    {
        event->delta = -1 * (int32)( event->next_act_time - event->last_act_time );
    }

    return 0;
}

int32
masd_sch_action_delta( int32 *delta_ts )
{
    *delta_ts = 0;

    /* Only valid for periodic events */
    if ( _current_event->period == 0 )
        return mas_error(MERR_INVALID);
    
    *delta_ts = _current_event->delta;

    return 0;
}



Generated by  Doxygen 1.6.0   Back to index