AdaPower Logged in as Guest
Ada Tools and Resources

Ada 95 Reference Manual
Ada Source Code Treasury
Bindings and Packages
Ada FAQ


Join >
Articles >
Ada FAQ >
Getting Started >
Home >
Books & Tutorials >
Source Treasury >
Packages for Reuse >
Latest Additions >
Ada Projects >
Press Releases >
Ada Audio / Video >
Home Pages >
Links >
Contact >
About >
Login >
Back
Bounded Buffer with Entry-less Get Queue (Matthew Heaney)

In an earlier article, we implemented a bounded buffer using a protected
type with an entry to put an item, and another entry to get an item.
Here we present another bounded buffer, without the entry for getting an
item, implemented using suspension objects.

The basic idea is that we're going to implement the reader queue
ourselves, instead of letting the runtime system do it.

A reader "registers" with the buffer in order to state his intention to
read, passing in a reference to a suspension object.  He then
immediately waits for the suspension to become true.  When it does, the
reader calls a protected procedure to get the item.

Reading looks like this:

  Buffer.Wait_To_Get (My_Suspension_Object'Access);

  Suspend_Until_True (My_Suspension_Object);

  Buffer.Get (Item);


The writer's interaction with the buffer is unchanged.  He calls a
protected entry to put an item in the queue.  It the item queue is full,
then the writer blocks until it becomes not full (because a reader came
along and read one of the items).

Writing thus looks like this:

  Buffer.Put (Item);


It's up to us as implementors to decide whether to place an upper limit
on the number of readers that may be waiting to get an item, or to have
no limit at all.  However, this implementation would typically be used
in a restricted runtime (a la Ravenscar), so we decide to bound the
maximum number of readers.

Therefore we pass in the size of the item queue and the size of the
reader queue as discriminants of the protected type.  The spec of the
buffer type looks like this:

  generic

    type Item_Type is private;

  package Buffers is

     ...

     protected type Buffer_Type
       (Size        : Positive;
        Max_Readers : Positive) is

        entry Put
          (Item : in Item_Type);

        procedure Wait_To_Get
          (SO : access Suspension_Object);

        procedure Get
          (Item : out Item_Type);

     private

        Item_Queue : Item_Queues.Queue_Type (Size);

        Reader_Queue : SO_Queues.Queue_Type (Max_Readers);

     end Buffer_Type;

  end Buffers;


The Item_Queue, unchanged from our earlier example, is a bounded queue
of items.  We keep a writer blocked if the item queue is full, and keep
a reader suspended if the item queue is empty.

The Reader_Queue is a bounded queue of suspension object pointers, which
we use to keep track of who's waiting to get an item.

That's it for the spec.  Next up is the implementation of the protected
operations.  Obviously it's going to be a little more complex than our
earlier example, because we have to manage the reader queue ourselves.

The protected operation Put is implemented using an entry, which means
it has a barrier expression.  Here the barrier is closed if the item
queue is full.

When the barrier opens (the item queue becomes "not full"), we add the
item to the item queue, and then check to see if there are any waiting
readers.  If so, we resume the reader at the front of the reader queue.

The protected entry Put looks like this:

    entry Put (Item : in Item_Type)
       when not Is_Full (Item_Queue) is
    begin
       Add (Item, To => Item_Queue);

       if not Is_Empty (Reader_Queue) then
          Set_True (Get_Front (Reader_Queue).all);
          Pop (Reader_Queue);
       end if;
    end Put;


A reader states his intention to read by calling procedure Wait_To_Get,
passing in a pointer to a suspension object.  No blocking is performed
directly by the protected object, because this is a protected procedure,
not an entry.

If the item queue is empty, we add the reader (really, the reader's
suspension object) to the reader queue, and suspend the reader.
Otherwise, we just resume the reader, so he doesn't actually do any
waiting.  Like this:

    procedure Wait_To_Get
      (SO : access Suspension_Object) is
    begin
       if Is_Empty (Item_Queue) then
          Add (Item => SO_Access (SO), To => Reader_Queue);
          Set_False (SO.all);
       else
          Set_True (SO.all);
       end if;
    end Wait_To_Get;


The procedure Get returns the item at the front of the item queue, and
pops the queue.  As in Put, some reader queue management is then
performed.  If there's a reader waiting to get an item, and there's an
item, then we resume the front-most reader.

    procedure Get
      (Item : out Item_Type) is
    begin
       Item := Get_Front (Item_Queue);
       Pop (Item_Queue);

       if not Is_Empty (Reader_Queue)
         and not Is_Empty (Item_Queue)
       then
          Set_True (Get_Front (Reader_Queue).all);
          Pop (Reader_Queue);
       end if;
    end Get;


Note that the buffer was implemented so that it doesn't depend on the
actual number of writers.  This means it can be used in restricted
runtimes like Ravenscar, which limit the maximum number of callers
queued on an entry to one.  (The Ravenscar profile also limits the
number of protected entries to one.)

Like our earlier buffer example, this buffer suffers from the problem of
not being able to let a writer announce that no more items will be
placed in the queue.  Suspended readers will therefore remain suspended
forever.

However, in a not-as-early article we showed how solve this problem, by
providing another procedure to announce "end of file," and passing back
an EOF flag to the reader.  We should be able to use the same technique
here.


--STX
package body Buffers is

   use Item_Queues;
   use SO_Queues;


   protected body Buffer_Type is


      entry Put (Item : in Item_Type)
         when not Is_Full (Item_Queue) is
      begin
         Add (Item, To => Item_Queue);

         if not Is_Empty (Reader_Queue) then
            Set_True (Get_Front (Reader_Queue).all);
            Pop (Reader_Queue);
         end if;
      end Put;


      procedure Wait_To_Get
        (SO : access Suspension_Object) is
      begin
         if Is_Empty (Item_Queue) then
            Add (Item => SO_Access (SO), To => Reader_Queue);
            Set_False (SO.all);
         else
            Set_True (SO.all);
         end if;
      end Wait_To_Get;


      procedure Get
        (Item : out Item_Type) is
      begin
         Item := Get_Front (Item_Queue);
         Pop (Item_Queue);

         if not Is_Empty (Reader_Queue)
           and not Is_Empty (Item_Queue)
         then
            Set_True (Get_Front (Reader_Queue).all);
            Pop (Reader_Queue);
         end if;
      end Get;


   end Buffer_Type;


end Buffers;
with Ada.Synchronous_Task_Control;  use Ada.Synchronous_Task_Control;
with Queues.Bounded;

pragma Elaborate (Queues.Bounded);

generic

   type Item_Type is private;

package Buffers is

   pragma Elaborate_Body;


   package Item_Queues is
     new Queues.Bounded (Item_Type);


   type SO_Access is
      access all Suspension_Object;

   for SO_Access'Storage_Size use 0;

   package SO_Queues is
     new Queues.Bounded (SO_Access);



   protected type Buffer_Type
     (Size        : Positive;
      Max_Readers : Positive) is

      entry Put
        (Item : in Item_Type);

      procedure Wait_To_Get
        (SO : access Suspension_Object);

      procedure Get
        (Item : out Item_Type);

   private

      Item_Queue : Item_Queues.Queue_Type (Size);

      Reader_Queue : SO_Queues.Queue_Type (Max_Readers);

   end Buffer_Type;


end Buffers;
package body Queues.Bounded is

   procedure Add
     (Item : in     Item_Type;
      To   : in out Queue_Type) is
   begin
      pragma Assert (To.Length < To.Size);
      To.Items (To.B) := Item;
      To.B := To.B mod To.Size + 1;
      To.Length := To.Length + 1;
   end Add;


   procedure Pop
     (Queue : in out Queue_Type) is
   begin
      pragma Assert (Queue.Length > 0);
      Queue.F := Queue.F mod Queue.Size + 1;
      Queue.Length := Queue.Length - 1;
   end;


   function Get_Front
     (Queue : Queue_Type) return Item_Type is
   begin
      pragma Assert (Queue.Length > 0);
      return Queue.Items (Queue.F);
   end;


   function Is_Empty
     (Queue : Queue_Type) return Boolean is
   begin
      return Queue.Length = 0;
   end;


   function Is_Full
     (Queue : Queue_Type) return Boolean is
   begin
      return Queue.Length = Queue.Size;
   end;


end Queues.Bounded;




generic
   type Item_Type is private;
package Queues.Bounded is

   type Queue_Type (Size : Positive) is limited private;

   procedure Add
     (Item : in     Item_Type;
      To   : in out Queue_Type);

   procedure Pop
     (Queue : in out Queue_Type);

   function Get_Front
     (Queue : Queue_Type) return Item_Type;

   function Is_Empty
     (Queue : Queue_Type) return Boolean;

   function Is_Full
     (Queue : Queue_Type) return Boolean;

private

   type Item_Array is array (Positive range <>) of Item_Type;

   type Queue_Type (Size : Positive) is
      limited record
         Items  : Item_Array (1 .. Size);
         Length : Natural := 0;
         F, B   : Positive := 1;
      end record;

end Queues.Bounded;




package Queues is

   pragma Pure;

end Queues;


(c) 1998-2004 All Rights Reserved David Botton