AdaPower Logged in as Guest
Ada Tools and Resources

Ada 95 Reference Manual
Ada Source Code Treasury
Bindings and Packages
Ada FAQ


Join >
Articles >
Ada FAQ >
Getting Started >
Home >
Books & Tutorials >
Source Treasury >
Packages for Reuse >
Latest Additions >
Ada Projects >
Press Releases >
Ada Audio / Video >
Home Pages >
Links >
Contact >
About >
Login >
Back
Bounded Buffer with Entry-less Get Queue - Update (Matthew Heaney)

Bounded Buffer with Entry-less Get Queue - Update

A couple of days ago (24 Oct 1999) I sent out an example of a bounded
buffer that didn't use a protected entry to queue readers, an
implementation suitable for use in restricted runtime environments such
as Ravenscar.

However, that example contains an error that can result in trying to
read from an empty item queue.  This article explains what's wrong with
the earlier example, and provides a new version of the code that doesn't
contain the error.

Suppose there are no readers, and a writer puts an item in the queue.
Sometime after that, a pair of readers come along at about the same time
to read an item.  In theory, the first reader should be released to read
the item, and the second reader should suspend.  But that's not what
happens:

                                             Item Q        Reader Q
1) Writer puts item                          1 item        empty


2a) Reader calls Wait_To_Get, and is         1 item        empty
released immediately, because the
item queue isn't empty

2b) Reader calls Wait_To_Get, and is         1 item        empty
also released immediately, because the
item queue isn't empty (the first reader
hasn't actually called Get yet to read it)

3a) Reader calls Get and pops the            empty         empty
item queue

3b) Reader calls Get and pops the            <empty>       empty
item queue.


The error occurs in step 3b, when the second reader attempts to fetch
the front item in the item queue.  The first reader popped the item
queue in step 3a, so the item queue is now empty when the second reader
tries to read it.  So the item queue raises an error, to indicate that
you're trying to read from an empty queue.

The problem is that there's a gap between the time when the queue is
logically popped (at the time a reader is resumed) and when the queue is
physically popped (at the time the reader calls Get).  The buffer
algorithm is based on the physical state of the item queue, when it
should be based in the logical state.

It's not a show-stopper.  We just have a maintain a separate variable to
indicate the logical length of the item queue, and re-write the
algorithm to use that value instead.  As we did in the earlier article,
let's discuss the implementation of each protected operation.

Here's what Put looks like:

      entry Put (Item : in Item_Type)
         when not Is_Full (Item_Queue) is
      begin
         Add (Item, To => Item_Queue);

         if Is_Empty (Reader_Queue) then
            Item_Count := Item_Count + 1;
         else
            Set_True (Get_Front (Reader_Queue).all);
            Pop (Reader_Queue);
         end if;
      end Put;

The Put entry still has a barrier that opens only when the item queue
isn't full.  Here, we use the physical length of the item queue, not the
logical length, because we can't put another item in the queue until it
really is not full.  (Logically it may be not full, because the
reader(s) have been released but haven't actually gotten around to
calling Get.  So the writer still has to wait.)

Item_Count is the variable we use to keep track of the logical length of
the item queue.  If there are no waiting readers, we increment the
Item_Count.  However, if there is a waiting reader, we can release him
so he can read the item we just placed in the queue.

Prior to Get'ing an item, a reader calls Wait_To_Get:

      procedure Wait_To_Get
        (SO : access Suspension_Object) is
      begin
         if Item_Count = 0 then
            Add (Item => SO_Access (SO), To => Reader_Queue);
            Set_False (SO.all);
         else
            pragma Assert (Is_Empty (Reader_Queue));
            Set_True (SO.all);
            Item_Count := Item_Count - 1;
         end if;
      end Wait_To_Get;


If the item queue is logically empty, then the reader is suspended, and
added to queue of waiting readers.  (Note that the item queue may be
physically non-empty, because readers have been resumed but haven't
actually gotten around to calling Get.)

If the item queue is logically non-empty, then there's an item available
for reading, and we can resume the reader immediately, and decrement the
logical length of the item queue.

Note that we always decrement the logical length of the item queue at
the time when we release a reader.  We don't wait until he's actually
read the item (during Get), because that would be equivalent to a
physical queue length (which we know is an incorrect algorithm).

There's a check in there to assert that the reader queue is empty when
the item queue is logically non-empty.  We can reason about this as
follows.  First of all, if there were readers waiting to get an item,
they got there first, and therefore should be resumed first.  To release
this reader right away would be the equivalent of him jumping to the
front of the queue, ahead of the existing (waiting) readers.

But this can't happen (I think...).  If there are items in the queue, it
can only mean that when a writer came around to write, there weren't any
waiting readers, so the Item_Count was incremented during Put.  If there
had been waiting readers, then the writer would have resumed the
front-most reader, and the Item_Count would have stayed at 0.

That leaves the last protected operation, Get:

      procedure Get
        (Item : out Item_Type) is
      begin
         Item := Get_Front (Item_Queue);
         Pop (Item_Queue);

         if not Is_Empty (Reader_Queue)
           and Item_Count > 0
         then
            Set_True (Get_Front (Reader_Queue).all);
            Pop (Reader_Queue);
            Item_Count := Item_Count - 1;
         end if;
      end Get;


As before, we make a copy of the front-most item, and then pop the item
queue.  Note that we don't have to decrement the logical queue length,
because that was already done at the time when this reader was released.

If there are any more items, and there's another waiting reader, then we
release the reader, and decrement the logical length.  We see here how
the logical length can differ from the physical length, because we
decrement the logical length now, at the time we release a reader; the
physical length won't get decremented until the time of actual reading
(the item queue is popped during Get).

Simon Wright pointed out to me that in a Ravenscar profile, you're only
allowed to have barriers implemented using a simple Boolean variable,
not a Boolean expression.  We can accommodate this additional
restriction
by declaring another protected variable, and explicitly setting it
during the execution of protected operations:

      entry Put (Item : in Item_Type)
         when Item_Queue_Is_Not_Full is
      begin
         Add (Item, To => Item_Queue);
         Item_Queue_Is_Not_Full := not Is_Full (Item_Queue);
         ...
      end Put;


      procedure Get
        (Item : out Item_Type) is
      begin
         Item := Get_Front (Item_Queue);
         Pop (Item_Queue);
         Item_Queue_Is_Not_Full := False;
         ...
      end Get;



--STX
package body Buffers is

   use Item_Queues;
   use SO_Queues;


   protected body Buffer_Type is


      entry Put (Item : in Item_Type)
         when not Is_Full (Item_Queue) is
      begin
         Add (Item, To => Item_Queue);

         if Is_Empty (Reader_Queue) then
            Item_Count := Item_Count + 1;
         else
            Set_True (Get_Front (Reader_Queue).all);
            Pop (Reader_Queue);
         end if;
      end Put;


      procedure Wait_To_Get
        (SO : access Suspension_Object) is
      begin
         if Item_Count = 0 then
            Add (Item => SO_Access (SO), To => Reader_Queue);
            Set_False (SO.all);
         else
            pragma Assert (Is_Empty (Reader_Queue));
            Set_True (SO.all);
            Item_Count := Item_Count - 1;
         end if;
      end Wait_To_Get;


      procedure Get
        (Item : out Item_Type) is
      begin
         Item := Get_Front (Item_Queue);
         Pop (Item_Queue);

         if not Is_Empty (Reader_Queue)
           and Item_Count > 0
         then
            Set_True (Get_Front (Reader_Queue).all);
            Pop (Reader_Queue);
            Item_Count := Item_Count - 1;
         end if;
      end Get;


   end Buffer_Type;


end Buffers;
with Ada.Synchronous_Task_Control;  use Ada.Synchronous_Task_Control;
with Queues.Bounded;

pragma Elaborate (Queues.Bounded);

generic

   type Item_Type is private;

package Buffers is

   pragma Elaborate_Body;


   package Item_Queues is
     new Queues.Bounded (Item_Type);


   type SO_Access is
      access all Suspension_Object;

   for SO_Access'Storage_Size use 0;

   package SO_Queues is
     new Queues.Bounded (SO_Access);



   protected type Buffer_Type
     (Size        : Positive;
      Max_Readers : Positive) is

      entry Put
        (Item : in Item_Type);

      procedure Wait_To_Get
        (SO : access Suspension_Object);

      procedure Get
        (Item : out Item_Type);

   private

      Item_Queue : Item_Queues.Queue_Type (Size);

      Reader_Queue : SO_Queues.Queue_Type (Max_Readers);

      Item_Count : Natural := 0;
      -- The difference between the actual number of items in the item
      -- queue, and the number of readers who have been resumed but
haven't
      -- fetched their item yet.

   end Buffer_Type;


end Buffers;
package body Queues.Bounded is

   procedure Add
     (Item : in     Item_Type;
      To   : in out Queue_Type) is
   begin
      pragma Assert (To.Length < To.Size);
      To.Items (To.B) := Item;
      To.B := To.B mod To.Size + 1;
      To.Length := To.Length + 1;
   end Add;


   procedure Pop
     (Queue : in out Queue_Type) is
   begin
      pragma Assert (Queue.Length > 0);
      Queue.F := Queue.F mod Queue.Size + 1;
      Queue.Length := Queue.Length - 1;
   end;


   function Get_Front
     (Queue : Queue_Type) return Item_Type is
   begin
      pragma Assert (Queue.Length > 0);
      return Queue.Items (Queue.F);
   end;


   function Is_Empty
     (Queue : Queue_Type) return Boolean is
   begin
      return Queue.Length = 0;
   end;


   function Is_Full
     (Queue : Queue_Type) return Boolean is
   begin
      return Queue.Length = Queue.Size;
   end;


   function Get_Length
     (Queue : Queue_Type) return Natural is
   begin
      return Queue.Length;
   end;

end Queues.Bounded;




generic
   type Item_Type is private;
package Queues.Bounded is

   type Queue_Type (Size : Positive) is limited private;

   procedure Add
     (Item : in     Item_Type;
      To   : in out Queue_Type);

   procedure Pop
     (Queue : in out Queue_Type);

   function Get_Front
     (Queue : Queue_Type) return Item_Type;

   function Is_Empty
     (Queue : Queue_Type) return Boolean;

   function Is_Full
     (Queue : Queue_Type) return Boolean;

   function Get_Length
     (Queue : Queue_Type) return Natural;

private

   type Item_Array is array (Positive range <>) of Item_Type;

   type Queue_Type (Size : Positive) is
      limited record
         Items  : Item_Array (1 .. Size);
         Length : Natural := 0;
         F, B   : Positive := 1;
      end record;

end Queues.Bounded;




package Queues is

   pragma Pure;

end Queues;


(c) 1998-2004 All Rights Reserved David Botton