Logo Search packages:      
Sourcecode: mas version File versions

mas_sbuf_device.c

/*
 * Copyright (c) 2001-2003 Shiman Associates Inc. All Rights Reserved.
 * 
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 * 
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 * 
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 */

/* 2 OCT 2002 - rocko - verified reentrant
 * 3 OCT 2002 - rocko - media timestamp clean
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <math.h>
#include "mas/mas_dpi.h"
#include "profile.h"

static char* stream_state_desc[] = { "Stopped.", "Paused.", "Playing.", "Buffering.", "Beginning playback.", "Finishing playback.", "" };

enum sbuf_stream_state
{
    STOP_STATE = 0,
    PAUSE_STATE = 1,
    PLAY_STATE = 2,
    BUFFERING_STATE = 3,
    START_STATE = 4,
    FINISH_STATE = 5
};

#define DEFAULT_BUFCAP_S 30
#define DEFAULT_BUFFER_TIME_MS 100
#define DEFAULT_DROP_TIME_MS 200
#define EXPECTED_TIME_FACTOR  0.70710678 /* 1 / sqrt( 2 ).  Used when
                                          * deciding to rebuffer on
                                          * mark-bit.  See below. */

/* by default, keep the buffer half full */
#define DEFAULT_POSTOUT_TIME_MS (DEFAULT_BUFFER_TIME_MS/2)

#define HALF_UINT32_MAX 2147483647U

struct sbuf_state
{
    int32 device_instance;
    int32 reaction;
    int32 sink;
    int32 source;

    int srate;
    int bpstc;
    enum sbuf_stream_state strst;
    int set_clkid;
    int drop;
    int poll_scheduled;
    int pom; /* play on mark */

    uint32 est_period_us;
    uint32 est_period_mt;
    int32 est_period_clkid;
    uint32 est_length;

    uint32 drop_time_ms;   /* When dropping packets, don't just drop
                            * one packet, drop drop_time_ms
                            * milliseconds of packets. */
    uint32 buffer_time_ms;  /* buffer fill time in milliseconds */
    uint32 postout_time_ms; /* postout time in milliseconds, see below. */
    uint32 drop_time_mt;   /* drop time in media timestamp units */
    uint32 buffer_time_mt;  /* Buffer fill time in media timestamp units. */
    uint32 postout_time_mt; /* Packets are posted to the source if the
                             * timestamp is due between now and
                             * postout_time_mt from now. */

    uint32 bufcap_s;  /* maximum buffer capacity, in seconds */
    uint32 bufcap_mt; /* "                      , in media timestamp units */

    int32  tb_mt;  /* result of total_buffer_media_time(), and
                    * adjusted incrementally by post and poll. */
    
    uint32 mcref;  /* wall clock time at media time reference */
    uint32 mtref;  /* media time at wall clock reference */
    uint32 mcnow;  /* wall clock time now */
    uint32 mtnow;  /* media time now */
    uint32 mtdu;   /* media time drop until */
    
    struct mas_data* head;
    struct mas_data* tail;

    uint32 count;   /* count of packets received at sink */
    uint32 dropped; /* total count of dropped packets */
    int32 contiguous_drop_mt; /* In current run of dropped packets,
                               * how much media time has been dropped.
                               * This is zero when data can be posted
                               * to the source. */
    
    int8 synth_ts; /* synthesize our own media timestamps */
    uint32 smt_next;  /* Synthetic media time for the next packet to
                       * be placed in the queue.  Only used if
                       * synth_mt is true. */
    uint32 sseq_next;

    int8 keep; /* If set, sbuf retains the data in the buffer using
                  keep_head.  Stop doesn't delete, it only resets the
                  head to keep_head. */
    struct mas_data* keep_head;
};

/* local prototypes ****************************************************/
static void estimate_params( struct sbuf_state* state, struct mas_data* data );
static int32 pollit( struct sbuf_state* state );
static void append_data( struct sbuf_state* state, struct mas_data* data );
static void delete_first( struct sbuf_state* state );
static int check_buffer_fullness( struct sbuf_state* state );
static void set_ref_mark( struct sbuf_state* state );
static int do_we_trust_timestamps( struct sbuf_state* state, struct mas_data* data1, struct mas_data* data2 );
static void synthesize_timestamp( struct sbuf_state* state, struct mas_data* data );
static uint32 calc_mtlen( struct sbuf_state* state, struct mas_data* data );
static uint32 synthesize_next_mt( struct sbuf_state* state, struct mas_data* data );
static uint32 total_buffer_media_time( struct sbuf_state* state );
static void set_clkid( struct sbuf_state* state );
static void change_state( struct sbuf_state* state, enum sbuf_stream_state strst );
static void round_buffer_times( struct sbuf_state* state );
static uint32 round2packet( struct sbuf_state* state, uint32 mt );

int32
mas_dev_init_instance( int32 device_instance, void* predicate )
{
    struct sbuf_state*  state;
    
    /* Allocate state holder and cast it so we can work on it */
    state = MAS_NEW(state);
    if ( state == 0 )
      return mas_error(MERR_MEMORY);

    masd_set_state(device_instance, state); /* set device state */
    state->device_instance = device_instance;
    
    masd_get_port_by_name( device_instance, "sink", &state->sink );
    masd_get_port_by_name( device_instance, "source", &state->source );
    masd_get_port_by_name( device_instance, "reaction", &state->reaction );

    /* initialize list */
    state->head = MAS_NEW(state->head);
    state->tail = state->head;

    state->drop_time_ms = DEFAULT_DROP_TIME_MS;
    state->buffer_time_ms = DEFAULT_BUFFER_TIME_MS;
    state->postout_time_ms = DEFAULT_POSTOUT_TIME_MS;
    state->bufcap_s = DEFAULT_BUFCAP_S;

    change_state( state, STOP_STATE );
    masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: Buffering %d ms; will post %d ms.", state->buffer_time_ms, state->postout_time_ms );
    masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: Dropping %d ms at a time.", state->drop_time_ms );
    masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: Using clock %d.", state->est_period_clkid );
    masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: %d seconds max. buffer capacity.", state->bufcap_s );

    return 0;
}

int32
mas_dev_configure_port( int32 device_instance, void* predicate )
{
    struct sbuf_state*  state;
    int32* dataflow_port_dependency;
    int32 portnum = *(int32*)predicate;
    struct mas_data_characteristic *dc, *odc;
    int32 err;
    uint8 format, resolution, channels, endian;
    uint32 srate;

    masd_get_state(device_instance, (void**)&state);

    /* interpret the dc */
    err = masd_get_data_characteristic( portnum, &dc );
    if ( err < 0 )
        return mas_error(MERR_INVALID);
    
    if ( portnum == state->sink )
    {
        err = masc_scan_audio_basic_dc( dc, &format, &srate, &resolution, &channels, &endian );
        if ( err < 0 )
            return mas_error(MERR_INVALID);

        state->srate = (int)srate;
        state->bpstc = masc_get_audio_basic_bpstc( resolution, channels );
        if ( state->bpstc < 0 )
            return mas_error(MERR_INVALID);
        state->buffer_time_mt = state->buffer_time_ms * state->srate / 1000;
        state->postout_time_mt = state->postout_time_ms * state->srate / 1000;
        state->drop_time_mt = state->drop_time_ms * state->srate / 1000;
        state->bufcap_mt = state->bufcap_s * state->srate;

        /* grab the clock to use for the periodic polling event */
        state->est_period_clkid = masd_mc_match_rate( state->srate );
        if ( state->est_period_clkid < 0 )
            state->est_period_clkid = MAS_MC_SYSCLK_US;
        
        /* schedule dataflow dependency on incoming data */
        dataflow_port_dependency = masc_rtalloc( sizeof (int32) );
        *dataflow_port_dependency = state->sink;
        err = masd_reaction_queue_action(state->reaction, device_instance, 
                                         "mas_sbuf_post", 0, 0, 0, 0, 0,
                                         MAS_PRIORITY_DATAFLOW, 1, 1, 
                                         dataflow_port_dependency);
        if ( err < 0 ) return err;

        /* make a copy of the dc for the other port */
        odc = MAS_NEW( odc );
        masc_setup_dc( odc, dc->numkeys );
        masc_copy_dc( odc, dc );
    
        /* set the other dc */
        masd_set_data_characteristic( state->source, odc );
    }
    else if ( portnum == state->source )
    {
        /* make a copy of the dc for the other port */
        odc = MAS_NEW( odc );
        masc_setup_dc( odc, dc->numkeys );
        masc_copy_dc( odc, dc );
        
        /* set the other dc */
        masd_set_data_characteristic( state->sink, odc );
    }
    else
    {
        return mas_error(MERR_NOTDEF);
    }
    
    return 0;
}

int32
mas_dev_disconnect_port( int32 device_instance, void* predicate )
{
    struct sbuf_state*  state;

    masd_get_state(device_instance, (void**)&state);

    state->strst = STOP_STATE;
    return 0;
}

int32
mas_dev_exit_instance( int32 device_instance, void* predicate )
{
    struct sbuf_state*  state;
    masd_get_state(device_instance, (void**)&state);

    while ( state->head->next != 0 )
        delete_first( state );

    masc_rtfree( state );
    
    return 0;
}

int32
mas_dev_terminate( int32 device_instance, void* predicate )
{
    struct sbuf_state*  state;
    masd_get_state(device_instance, (void**)&state);

    /* should we cough up what's in the buffer? */
    change_state( state, STOP_STATE );
    return 0;
}

int32
mas_dev_show_state( int32 device_instance, void* predicate )
{
    return 0;
}

int32
mas_source_play( int32 device_instance, void* predicate )
{
    struct sbuf_state* state;
    int32 err;
    
    masd_get_state(device_instance, (void**)&state);

    /* ignore multiple play actions */
    if ( state->strst == PLAY_STATE )
        return 0;

    change_state( state, BUFFERING_STATE );

    /* check the buffer, if it's full, start polling right away */
    if ( check_buffer_fullness( state ) )
    {
        /* Begin polling, establishing start of queue as new ref
         * mark. */
        change_state( state, START_STATE );
        err = pollit( state );
        masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: Buffer full.  Starting playback." );
        if (err < 0 )
            masc_logerror( err, "sbuf: Error queuing mas_source_poll action.");
        return err;
    }
    
    return 0;

}

int32
mas_source_rebuffer( int32 device_instance, void* predicate )
{
    struct sbuf_state* state;
    
    masd_get_state(device_instance, (void**)&state);

    if ( state->strst == STOP_STATE )
        return mas_error(MERR_INVALID);

    while ( state->head->next != 0 && !state->head->next->header.mark )
        delete_first( state );
    
    change_state( state, BUFFERING_STATE );

    return 0;
}


int32
mas_source_play_on_mark( int32 device_instance, void* predicate )
{
    struct sbuf_state* state;
    int32 err;
    
    masd_get_state(device_instance, (void**)&state);

    /* don't do this if we're already playing */
    if ( state->strst != STOP_STATE && state->strst != PAUSE_STATE )
        return 0;
    
    if ( state->strst == STOP_STATE && !state->keep )
    {

        if ( state->head->next != 0 && !state->head->next->header.mark )
        {
            masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: Deleting old data in buffer.");
        }
        
        while ( state->head->next != 0 && !state->head->next->header.mark )
            delete_first( state );
    }
        
    change_state( state, BUFFERING_STATE );

    if ( state->head->next != 0 && state->head->next->header.mark )
    {
        masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: Received marked packet.");
        state->pom = FALSE; /* got mark, now buffering */
    }
    else
    {
        state->pom = TRUE; /* no marked packet yet */
    }
    

    /* check the buffer, if it's full, start polling right away */
    if ( check_buffer_fullness( state ) )
    {
        /* Begin polling, establishing start of queue as new ref
         * mark. */
        change_state( state, START_STATE );
        err = pollit( state );
        state->pom = FALSE;
        masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: Buffer full.  Starting playback." );
        if (err < 0 )
            masc_logerror( err, "sbuf: Error queuing mas_source_poll action.");
        return err;
    }
    
    return 0;

}

int32
mas_source_stop( int32 device_instance, void* predicate )
{
    struct sbuf_state* state;

    masd_get_state(device_instance, (void**)&state);

    change_state( state, STOP_STATE ); /* this'll cause the poll action to bail */
    state->synth_ts = FALSE; /* redo decision to synthesize new timestamps */

    if ( state->keep )
    {
        /* Reset the buffer's head to keep_head and find the tail. */
        state->head->next = state->keep_head;
        state->tail = state->keep_head;
        while (state->tail->next)
            state->tail = state->tail->next;
        masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: rewinding kept buffer.");
    }
    else
    {
        /* Destroy what's in the buffer. */
        while ( state->head->next != 0 )
            delete_first( state );
    }
    

    return 0;
}

int32
mas_source_pause( int32 device_instance, void* predicate )
{
    struct sbuf_state* state;
    
    masd_get_state(device_instance, (void**)&state);

    change_state( state, PAUSE_STATE ); /* this'll cause the poll action to bail */

    /* don't blow away the buffer, since we'll probably resume. */
    
    return 0;
}

int32
mas_source_poll( int32 device_instance, void* predicate )
{
    struct sbuf_state* state;
    struct mas_data* d;
    struct mas_data* d_copy;
    uint32 min_expected_time_mt;
    
    masd_get_state(device_instance, (void**)&state);

    /* Do we need to stop? */
    if ( state->strst == STOP_STATE || state->strst == PAUSE_STATE || state->strst == BUFFERING_STATE )
    {
        state->poll_scheduled = FALSE;
        masd_reaction_queue_action_simple( state->reaction, MAS_SCH_INSTANCE, "mas_sch_strike_event", NULL, 0 );
        return 0;
    }
    

    /* Check to see if we have to change the clock id. */
    if ( state->set_clkid )
    {
        state->set_clkid = FALSE;
        set_clkid( state );
    }
    
    /* Do we have nothing to do? */
    if ( state->head->next == 0 )
        return 0;

    /** Compute the media timestamp of NOW, based on the master clock
     ** and the relative timestamps.
     **
     ** This should wrap correctly. mcnow == mcref for the first
     ** packet after buffering.
     ***/

    /** get the current time */
    masd_mc_val( state->est_period_clkid, &state->mcnow );
    state->mtnow = state->mtref + ( state->mcnow - state->mcref );

    /* If this packet is marked, we may have to stop polling and
       rebuffer.  */
    if ( (state->strst == PLAY_STATE) && state->head->next->header.mark )
    {
        /* This is a tricky one: we do not want to rebuffer the data
         * stream if we don't have to - that would only add latency.
         * The idea: if we receive a marked packet and the buffer is
         * "about as full as we expect it to be" then don't rebuffer
         * and clear the mark on this packet.
         *
         * Implementation: at any moment, the expected time in the
         * buffer is the postout_time.  However, one expects this to
         * vary from measurement to measurement; a better idea is to
         * collect statistics and say that values within one standard
         * deviation of the expected value are what we're looking for.
         *
         * For the time being, we're accepting greater than 0.707
         * postout_time rounded to the nearest packet boundary.  I
         * think this is a pretty good approximation.  As a final
         * check, this should never be so small as to be rounded to
         * zero -- then it would never rebuffer.  Clamp the minimum to
         * one packet size.
         */

        min_expected_time_mt = round2packet( state, EXPECTED_TIME_FACTOR * ( state->buffer_time_mt - state->postout_time_mt ));
        min_expected_time_mt = max( min_expected_time_mt, state->est_period_mt );

        /* This is "less than or equal to" on purpose.  We've gotta
         * have more than one packet in the buffer, or the one packet
         * with the mark bit could be enough to meet the expected
         * time. */
        if ( ( total_buffer_media_time( state ) <= min_expected_time_mt ) || ( state->head->next->header.media_timestamp - ( state->mtnow - state->est_period_mt ) >= HALF_UINT32_MAX ) )
        {
            masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: Received a marked packet, re-buffering.");
            change_state( state, BUFFERING_STATE );
            state->synth_ts = FALSE; /* redo decision to synthesize new
                                      * timestamps */
            state->poll_scheduled = FALSE;
            masd_reaction_queue_action_simple( state->reaction, MAS_SCH_INSTANCE, "mas_sch_strike_event", NULL, 0 );
            return 0;
        }
        else /* buffer is full and packet is new enough */
        {
            masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: marked packet, but it was on-time, and the buffer isn't running dry.");
            masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: not re-buffering.");
            state->head->next->header.mark = FALSE;
        }
    }
    
    /* if we're just starting up after buffering, mark this packet as
       the start of a "talkspurt" (rfc 1890), and establish this as
       the beginning of a new stream. */
    if ( state->strst == START_STATE )
    {
        /* Set our reference mark at this packet. */
        change_state( state, PLAY_STATE );
        set_ref_mark( state );
        /* The mcnow time from the ref mark overrides the time
         * computed above. */
    }

    /*******************************************************************
     ** Drop old stuff.
     **
     ** Marked packets and those following them in
     ** this run are never deleted. */
    d = state->head->next;
    while ( d )
    {
        /** Drop packets that are too old.
         **
         ** We have one packet of "wiggle room".  Hence: state->mtnow
         ** - state->est_period_mt. */
        if ( ( d->header.media_timestamp - ( state->mtnow - state->est_period_mt ) >= HALF_UINT32_MAX ) && !d->header.mark )
        {
            masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: dropped old packet %u, lagging by %u", d->header.sequence, state->mtnow - d->header.media_timestamp);
            state->contiguous_drop_mt += d->length / state->bpstc;
            /* drop the next "drop_time_ms" milliseconds of packets */
            state->mtdu = d->header.media_timestamp + state->drop_time_mt;
            d = d->next; /* advance */
            if ( !state->keep )
                delete_first( state );
            state->drop = TRUE;

            /** Don't drop TOO much data!
             **
             ** Stop dropping to satisfy mtdu policy if we've already
             ** dropped enough. */
            if ( state->contiguous_drop_mt >= state->drop_time_mt )
            {
                state->drop = FALSE;
            }
            else
            {
                /* reduce mtdu by the amount we've already dropped */
                state->mtdu -= state->contiguous_drop_mt;
            }
            state->dropped++;
        }
        else
        {
            /* newer entries in the queue won't be dropped, because
               they're newer, right?! */
            /* head is unchanged */
            break;
        }
    }

    /** Enforce "mtdu" policy: if we drop data, drop at least
     ** drop_time_ms milliseconds of it.
     **
     ** Doing this, we'll hear (for audio) a discernable gap, rather
     ** than high-frequency noise (crackling, static, etc.)
     **
     ** When a packet was dropped above, state->mtdu was set to
     ** drop_time_ms (converted to media time) from now.  However, we
     ** want to avoid dropping more data than we need to if we've
     ** already dropped enough data to satisfy this policy.  The
     ** solution: reduce mtdu by the media time of the contiguous
     ** dropped data packets.  This way, if we've already dropped more
     ** than mtdu packets (in media time) we don't drop any more. */

    d = state->head->next;
    while ( d && state->drop )
    {
        /* overflow safe */
        if ( ( d->header.media_timestamp - state->mtdu >= HALF_UINT32_MAX ) && state->drop && !d->header.mark )
        {
            state->contiguous_drop_mt += d->length / state->bpstc;
            masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: dropped packet %u within mtdu.", d->header.sequence);
            d = d->next; /* advance */
            if ( !state->keep )
                delete_first( state );
            state->dropped++;
        }
        else
        {
            state->drop = FALSE;

            /* newer entries in the queue won't be dropped, because
               they're newer, right?! */
            /* head is unchanged, headmt is unchanged */
            break;
        }
    }
    
    /*****************************************************************
     * Send packets when they're due.
     * 
     * Play zero or more packets.  If there's nothing in the queue,
     * bail; don't play silence.  The buffering scheme will fill the
     * buffer to the time given as state->buffer_time_mt before
     * starting playback.  Once playback is started, and we get here,
     * our strategy is not to keep a full buffer, but rather, keep
     * some data in it, forcing the rest out the output pipe --
     * essentially keeping the output pipe partially filled.  If the
     * rates are constant, this keeps some data "out there" between
     * the buffer and the destination.  So, instead of doling out one
     * packet at a time here, we may pump out two or three depending
     * on the times of the packets in our buffer.
     *
     * Summary: Post a packet out if it is valid within
     * state->postout_time_mt from now (mtnow).
     *
     * */
    for ( d = state->head->next; d; d = state->head->next )
    {
        /* If the packet's time is between now and postout_time_mt
           from now OR the packet is older than now, then play it out.
           This is "less-than" not "less-than-or-equal-to" on purpose.
           */
        if ( ( d->header.media_timestamp - state->mtnow < state->postout_time_mt ) || ( d->header.media_timestamp - state->mtnow >= HALF_UINT32_MAX ) )
        {
            /* pry it out of the queue */
            state->head->next = d->next;
            /* check on the damn tail */
            if ( state->tail == d ) state->tail = state->head;

#if (DEBUG >= 2)
            /* log the current play position */
            masc_log_message( 0, "played: mcnow %u, mtnow %u, mcref %u, mtref %u, headmt %u", state->mcnow, state->mtnow, state->mcref, state->mtref, state->headmt);
#endif
            /* If we're keeping the data, we need to copy each packet
               we output so we can hang onto the original. */
            if ( state->keep )
            {
                d_copy = masc_rtalloc( sizeof *d );
                mas_assert( d_copy != 0, "Memory error." ); 
                memcpy( d_copy, d, sizeof *d );
                d_copy->segment = masc_rtalloc( d->allocated_length );
                mas_assert( d_copy->segment != 0, "Memory error." );
                memcpy( d_copy->segment, d->segment, d->length );
                d = d_copy; /* just forget about the "old" d */
            }
            else
            {
                /* decrease the total buffer media time */
                state->tb_mt -= calc_mtlen( state, d );
            }
            

            /* break the linked list */
            d->next = 0;

            /* post output */
            masd_post_data( state->source, d );
            /* don't touch "d" after this, it's gone */
            
            /* reset our contiguous dropped packet media time counter */
            if ( state->contiguous_drop_mt > 0 )
            {
                masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: Back to normal.  Contiguous dropped data: %.f ms", (float)state->contiguous_drop_mt * 1000.0 / (float)state->srate);
                state->contiguous_drop_mt = 0;
            }
            
            
            /* If a packet other than the first in the queue has the mark
               bit set, then stop posting data for this action.  This
               leaves the marked packet at the head of the queue so it can
               be handled properly as the start of a new stream sequence. */
            if ( state->head->next )
                if ( state->head->next->header.mark )
                    break;
        }
        else
        {
            /* this and any subsequent packets are too new to played
               yet. */
            break;
        }
    }

    return 0;
}

int32
mas_sbuf_post( int32 device_instance, void* predicate )
{
    struct sbuf_state* state;
    struct mas_data*     data;
    int32 err;

    masd_get_state(device_instance, (void**)&state);

    masd_get_data( state->sink, &data );
    if ( data == 0 ) return mas_error(MERR_INVALID);
    if (data->length == 0) return mas_error(MERR_INVALID);
    
    /* Playback on mark:
       
       Discard packet unless it has a mark bit set.  If it does, clear
       the pom flag and begin buffering. */
    if ( state->pom )
    {
        if ( ! data->header.mark )
        {
            masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: Waiting for marked packet; deleting unmarked packet %d.", data->header.sequence);
            
            masc_strike_data( data );
            masc_rtfree( data );
            return 0;
        }
        else
        {
            state->pom = FALSE;
            masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: Marked packet received; buffering.");
        }
    }
    
    /* estimate the polling period from this data segment */
    if ( state->est_period_us == 0 )
    {
        estimate_params( state, data );
        round_buffer_times( state );
        masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: estimated period: %u [%d], %0.3f ms", state->est_period_mt, state->est_period_clkid, state->est_period_us/1000.0);
    }
    else
    {
        if( ( data->length / state->bpstc) != state->est_period_mt )
        {
            masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: Oops, size of incoming packet is %d, but my period is %u.", data->length / state->bpstc, state->est_period_mt );
            masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: I'm not set up to handle this yet, but I'll keep going.");
        }
        
    }
    

    data->next = 0; /* just in case */

    /** stuff to do before we append the packet to the queue -- only
     * if we're not playing. */
    if ( state->strst != PLAY_STATE )
    {
        /* Should we synthesize the incoming media timestamps?  Only
         * check if we're not already synthesizing them. */
        if ( !state->synth_ts )
        {
            if ( ! do_we_trust_timestamps( state, state->tail, data ) )
            {
                state->synth_ts = TRUE;
                masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: No input timestamps detected.  Synthesizing our own.");

                /* if there's a tail packet in the queue, we need to
                   synthesize a timestamp for it. */
                if ( state->tail )
                    synthesize_timestamp( state, state->tail );
            }
        }
    }

    append_data( state, data );

    /*** stuff to do after we append the packet to the queue */

    /* adjust total buffer media time */
    state->tb_mt += calc_mtlen( state, data );
    
    if ( state->strst == BUFFERING_STATE )
    {
        /* check the buffer, if it's full, start polling.  */
        if ( check_buffer_fullness( state ) )
        {
            masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: Buffer full." );
            /* Begin polling, establishing start of queue as new ref mark. */
            change_state( state, START_STATE );
            err = pollit( state );
            if (err < 0 )
                masc_logerror( err, "sbuf: error queuing mas_source_poll action.");
        }
    }

    /* If we're keeping this buffer, and keep_head is null, set the
       keep_head to point to the current head of the buffer. */
    if ( state->keep && !state->keep_head)
        state->keep_head = state->head->next;
    
    /* synthesize timestamp */
    if ( state->synth_ts )
        synthesize_timestamp( state, data );

    state->count++;

    /* Check to make sure we're not exceeding the allowed buffer
       capacity! */
    if ( state->tb_mt > state->bufcap_mt )
    {
        /* Uhh... we better double-check. */
        if ( total_buffer_media_time( state ) > state->bufcap_mt )
        {
            masc_log_message( MAS_VERBLVL_WARNING, "sbuf: [warning] exceeding alloted buffer capacity of %d seconds (%d KB).", state->bufcap_s, state->bufcap_mt * state->bpstc / 1024);
            masc_log_message( MAS_VERBLVL_WARNING, "sbuf: [warning] deleting some data to compensate.");
            delete_first( state );
        }
        else
        {
            masc_log_message( MAS_VERBLVL_WARNING, "sbuf: [warning] incremental total buffer media time and recomputed don't match.");
            masc_log_message( MAS_VERBLVL_WARNING, "sbuf: [warning] using recomputed.");
        }
    }
    
    return 0;
}

int32
mas_get( int32 device_instance, void* predicate )
{
    struct sbuf_state*  state;
    int32 err;
    int32 retport;
    char* key;
    struct mas_package arg;
    struct mas_package r_package;
    /* list of nuggets.  preserve the terminator */
    static char* nuggets[] = 
        { "list", "buftime_ms", "inbuf_ms", "dropped", "postout_time_ms", "drop_time_ms", "mc_clkid", "keep", "bufcap_s", "" };
    int i, n=0;
    
    masd_get_state(device_instance, (void**)&state);

    /* Use the standard get_nugget wrapper. */
    err = masd_get_pre( predicate, &retport, &key, &arg );
    if ( err < 0 ) return err;

    /* construct our response */
    masc_setup_package( &r_package, NULL, 0, MASC_PACKAGE_NOFREE );
    
    /* count the defined nuggets */
    while ( *nuggets[n] != 0 ) n++;

    i = masc_get_string_index(key, nuggets, n);

    switch(i)
    {
    case 0: /*list*/
        masc_push_strings( &r_package, nuggets, n );
        break;
    case 1: /*buftime_ms*/
        masc_pushk_uint32( &r_package, "buftime_ms", state->buffer_time_ms);
        break;
    case 2: /*inbuf_ms*/
        if ( state->srate == 0 )
        {
            masc_pushk_uint32( &r_package, "inbuf_ms", 0);
        }
        else
        {
            masc_pushk_uint32( &r_package, "inbuf_ms", 1000 * total_buffer_media_time( state )/state->srate);
        }
        break;
    case 3: /*dropped*/
        masc_pushk_uint32( &r_package, "dropped", state->dropped );
        break;
    case 4: /*postout_time_ms*/
        if ( state->srate == 0 )
        {
            masc_pushk_uint32( &r_package, "postout_time_ms", 0);
        }
        else
        {
            masc_pushk_uint32( &r_package, "postout_time_ms", 1000 * state->postout_time_mt/state->srate);
        }
        break;
    case 5: /*drop_time_ms*/
        masc_pushk_uint32( &r_package, "drop_time_ms", state->drop_time_ms);
        break;
    case 6: /*mc_clkid*/
        masc_pushk_int32( &r_package, "clkid", state->est_period_clkid );
        break;
    case 7: /*keep*/
        masc_pushk_int8( &r_package, "keep", state->keep );
        break;
    case 8: /*bufcap_s*/
        masc_pushk_uint32( &r_package, "bufcap_s", state->bufcap_s );
        break;
    default:
        break;
    }

    masc_finalize_package( &r_package );
    
    /* post the response where it belongs and free the data structures
     * we abused */
    err = masd_get_post( state->reaction, retport, key, &arg, &r_package );

    return err;
}

int32
mas_set( int32 device_instance, void* predicate )
{
    struct sbuf_state*  state;
    int32 err;
    char* key;
    struct mas_package arg;
    /* list of nuggets.  preserve the terminator */
    static char* nuggets[] = 
        { "buftime_ms", "postout_time_ms", "drop_time_ms", "mc_clkid", "keep", "bufcap_s", "" };
    int i, n=0;

    masd_get_state(device_instance, (void**)&state);

    /* Use the standard get_nugget wrapper. */
    err = masd_set_pre( predicate, &key, &arg );
    if ( err < 0 ) return err;

    /* count the defined nuggets */
    while ( *nuggets[n] != 0 ) n++;

    i = masc_get_string_index(key, nuggets, n);

    switch(i)
    {
    case 0: /*buftime_ms*/
    {
        uint32 btms;
        masc_pull_uint32( &arg, &btms );
        state->buffer_time_ms = btms;
        state->buffer_time_mt = btms * state->srate / 1000;
        /* set postout time to 1/2 buffer time.  */
        state->postout_time_ms = btms >> 1;
        state->postout_time_mt = state->buffer_time_mt >> 1;
        masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: mas_set(buftime_ms): %d ms buffer; %d ms postout.", state->buffer_time_ms, state->postout_time_ms );
        round_buffer_times( state );
        masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: after rounding to packet size: %d ms buffer; %d ms postout.", state->buffer_time_ms, state->postout_time_ms );
    }
    break;
    case 1: /*postout_time_ms*/
    {
        uint32 potms;
        masc_pull_uint32( &arg, &potms );
        state->postout_time_ms = potms;
        state->postout_time_mt = potms * state->srate / 1000;
        masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: mas_set(postout_time_ms): %d ms postout.", state->postout_time_ms );
        round_buffer_times( state );
        masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: after rounding to packet size: %d ms postout.", state->buffer_time_ms, state->postout_time_ms );
    }
    break;
    case 2: /*drop_time_ms*/
        masc_pull_uint32( &arg, &state->drop_time_ms );
        state->drop_time_mt = state->drop_time_ms * state->srate / 1000;
        masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: mas_set(drop_time_ms): dropping %d ms at a time.", state->drop_time_ms );
        round_buffer_times( state );
        masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: after rounding to packet size: dropping %d ms at a time.", state->drop_time_ms );
        break;
    case 3: /*mc_clkid*/
        masc_pull_int32( &arg, &state->est_period_clkid );
        state->set_clkid = TRUE;
        masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: mas_set(mc_clkid): using clock %d.", state->est_period_clkid );
        break;
    case 4: /*keep*/
    {
        int8 tk;
        masc_pull_int8( &arg, &tk );
        if ( tk && !state->keep )
        {
            /* If we're transitioning from not-keeping the buffer to
               keeping it, set the keep_head to the current buffer's
               head. */
            state->keep_head = state->head->next;
        }
        else if (!tk)
        {
            /* Otherwise, if we're shutting keep off, just null it. */
            state->keep_head = 0;
        }
        state->keep = tk;
        masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: mas_set(keep): %s.", state->keep?"true":"false" );
    }
    break;
    case 5: /*bufcap_s*/
    {
        uint32 bcs;
        masc_pull_uint32( &arg, &bcs );
        state->bufcap_s = bcs;
        state->bufcap_mt = bcs * state->srate;
        masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: mas_set(bufcap_s): %d s max. buffer capacity (%d KB).", state->bufcap_s, state->bufcap_mt * state->bpstc / 1024);
    }
    break;
    default:
        masc_log_message( MAS_VERBLVL_ERROR, "sbuf: mas_set(%s) unknown key", key );
        break;
    }

    /* cleanup after our mess */
    err = masd_set_post( key, &arg );

    return err;
}


/***** LOCAL FUNCTIONS ********************************************/

/* If the media timestamp of data2 is greater than or equal to that of
   data1 plus its time length, we can probably trust the
   timestamps.  */
int
do_we_trust_timestamps( struct sbuf_state* state, struct mas_data* data1, struct mas_data* data2 )
{
    if ( data1 == 0 || data2 == 0)
        return TRUE;
    
    if ( data1 == data2 )
        return TRUE;
    
    if ( data2->header.media_timestamp - data1->header.media_timestamp < calc_mtlen( state, data1 ) )
        return FALSE;

    return TRUE;
}

void
synthesize_timestamp( struct sbuf_state* state, struct mas_data* data )
{
    data->header.media_timestamp = state->smt_next;
    data->header.sequence = state->sseq_next;
    state->smt_next = synthesize_next_mt( state, data );
    state->sseq_next++;
}

/* compute media timestamp length of this segment */
uint32
calc_mtlen( struct sbuf_state* state, struct mas_data* data )
{
    if ( state->bpstc == 0 )
        return 0;
    
    return data->length / state->bpstc;
}

/* synthesize the media timestamp of the next packet, incrementing
   this packet's timestamp by its time length */
uint32
synthesize_next_mt( struct sbuf_state* state, struct mas_data* data )
{
    return data->header.media_timestamp + calc_mtlen( state, data );
}

/* Estimate the length and polling period, based on the incoming data. */    
void
estimate_params( struct sbuf_state* state, struct mas_data* data )
{
    if ( state->bpstc == 0 )
        return;
    /* keep the parentheses, they prevent overflow */
    state->est_period_mt = data->length / state->bpstc;
    state->est_period_us = (uint32) (1.0E6 * ((double)data->length / (double) ( state->bpstc * state->srate ) ) );
    state->est_length = data->length;
}

int32
pollit( struct sbuf_state* state )
{
    uint32 period;
    int32  err = 0;

    if ( !state->poll_scheduled )
    {
        /* schedule the periodic post_frame action */
        if ( state->est_period_clkid != MAS_MC_SYSCLK_US )
            period = state->est_period_mt;
        else
            period = state->est_period_us;

        err = masd_reaction_queue_periodic( state->reaction, state->device_instance, "mas_source_poll", 0, 0, MAS_PRIORITY_ASAP, period, state->est_period_clkid );
    }

    state->poll_scheduled = TRUE;

    return err;
}

void
append_data( struct sbuf_state* state, struct mas_data* data )
{
    state->tail->next = data;
    state->tail = data;
}

void
delete_first( struct sbuf_state* state )
{
    struct mas_data* d;

    d = state->head->next;
    if ( d )
    {
        /* first, subtract the packet's time from the total buffer
           media time. */
        state->tb_mt -= calc_mtlen( state, d );

        /* then manage the linked list */
        state->head->next = d->next;
        if ( state->tail == d ) state->tail = state->head;
        masc_strike_data( d );
        masc_rtfree( d );
    }
}

int
check_buffer_fullness( struct sbuf_state* state )
{
    if ( total_buffer_media_time( state ) >= state->buffer_time_mt )
        return TRUE;
    else return FALSE;
}

/* This measures the total media time stored in the buffer, not
   accounting for gaps in the media timestamps. */
uint32
total_buffer_media_time( struct sbuf_state* state )
{
    uint32 bmt; /* buffer media time */
    struct mas_data* d;

    state->tb_mt = 0;
    if ( !state->head->next )
        return 0;

    /* compute time in buffer */
    bmt = 0;
    d = state->head->next;
    while (d)
    {
        bmt += calc_mtlen( state, d );
        d = d->next;
    }

    state->tb_mt = bmt;
    return bmt;
}

/* Establish a new stream beginning. */
void
set_ref_mark( struct sbuf_state* state )
{
    masd_mc_val( state->est_period_clkid, & state->mcnow );

    /* set the reference mark at the current wall clock time and the
       media timestamp of the head of the queue. */
    state->mtref = state->head->next->header.media_timestamp;
    state->mcref = state->mcnow;
    state->mtnow = state->mtref;

    /* mark the first packet of a new stream of data (called a
       "talkspurt" in RFC 1890) */
    state->head->next->header.mark = TRUE;

    masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: Setting reference mark at beginning of buffer.");
    masc_log_message( MAS_VERBLVL_DEBUG, "sbuf: master clock: %u [%d], media timestamp %u.", state->mcref, state->est_period_clkid, state->mtref);

    /* clear drop until */
    state->mtdu = state->mtnow;
    state->drop = FALSE;
}

void
set_clkid( struct sbuf_state* state )
{
    struct mas_package p;
        
    masc_setup_package( &p, NULL, 0, MASC_PACKAGE_NOFREE );
    masc_push_int32( &p, state->est_period_clkid );
    masc_finalize_package( &p );

    masd_reaction_queue_action_simple( state->reaction, MAS_SCH_INSTANCE, "mas_sch_set_event_clkid", p.contents, p.size );
    masc_strike_package( &p );
}

void
change_state( struct sbuf_state* state, enum sbuf_stream_state strst )
{
    state->strst = strst;

    masc_log_message(MAS_VERBLVL_DEBUG, "sbuf: %s", stream_state_desc[strst] );
}

void
round_buffer_times( struct sbuf_state* state )
{
    if ( state->srate == 0 )
        return;
    
    state->buffer_time_mt = round2packet( state, state->buffer_time_mt );
    state->postout_time_mt = round2packet( state, state->postout_time_mt );
    state->drop_time_mt = round2packet( state, state->drop_time_mt );
    state->buffer_time_ms = state->buffer_time_mt * 1000 / state->srate;
    state->postout_time_ms = state->postout_time_mt * 1000 / state->srate;
    state->drop_time_ms = state->drop_time_mt * 1000 / state->srate;
}

uint32
round2packet( struct sbuf_state* state, uint32 mt )
{
    if ( state->srate == 0 || state->est_period_mt == 0 )
        return mt;
    return state->est_period_mt * (uint32)floor(( (double)mt / (double)state->est_period_mt ) + 0.5);
}

Generated by  Doxygen 1.6.0   Back to index