ATC: Interrupting Blocking Calls


In this example we have a bounded buffer with operations to Put and Get
characters.  A producer thread puts characters into the buffer, and a
consumer thread gets characters.

If the buffer is full, we want the producer to block until there is more
room in the buffer.  If the buffer is empty, we want the consumer to
block until there's something in the buffer.

We also want to be able to tell the waiting consumer to stop getting
characters.  This requires some way of interrupting the blocked task.

Implementation

The bounded buffer is implemented as an array encapsulated by a protected
type, with a discriminant that indicates the size:

  generic
     type Item_Type is private;
  package Buffers is

     type Item_Array is array (Positive range <>) of Item_Type;

     protected type Buffer_Type (Size : Positive) is

        ...

     private

        Items     : Item_Array (1 .. Size);
        Get_Index : Positive := 1;
        Put_Index : Positive := Size;
        Count     : Natural := 0;

     end Buffer_Type;

  end Buffers;


We want the producer to block if the buffer is full, so we implement the
Put operation as a protected entry:

      entry Put (Item : in Item_Type);

The entry barrier tests whether the count of the number of items is less
then the buffer size, and if so, copies the item to the array:

      entry Put (Item : in Item_Type)
        when Count < Size is
      begin
         Put_Index := Put_Index mod Size + 1;
         Items (Put_Index) := Item;
         Count := Count + 1;
      end;

We implement Get using separate operations for waiting until there's
an item, and actually fetching the item:

      entry Wait_For_Item;

      procedure Get (Item : out Item_Type);


We've done it this way because we only want to interrupt waiting for an
item, not retrieving one.  If we were to interrupt a combined
wait-and-fetch operation, then an item could get lost.

The entry Wait_For_Item has only a barrier to test whether there's an
item in the buffer:

      entry Wait_For_Item when Count > 0 is
      begin
         null;
      end;

The protected procedure Get copies the first item in the array, and
increments the index:

      procedure Get (Item : out Item_Type) is
      begin
         Item := Items (Get_Index);
         Get_Index := Get_Index mod Size + 1;
         Count := Count - 1;
      end;


Our consumer abstraction binds to a character buffer via an access
discriminant, and provides an operation to shut down the thread:

  package Consumers is

     type Consumer_Type (Buffer : access Buffer_Type) is
        limited private;

     procedure Shutdown (Consumer : in out Consumer_Type);

  private

     ...

  end Consumers;


The Consumer_Type has an active thread of control, implemented as a task
component.  The type is not implemented as a task directly, because (as
we shall see) it needs another component.

  private

     task type Consumer_Task_Type (Consumer : access Consumer_Type) is
     end;

     ...

     type Consumer_Type (Buffer : access Buffer_Type) is
        limited record
           ...
           Consumer_Task : Consumer_Task_Type (Consumer_Type'Access);
        end record;

  end Consumers;


The Consumer_Task_Type has its own discriminant, to bind to the
enclosing Consumer_Type.  This gives the task visibility to the Buffer
and to the other components of the same record.

The consumer task is implemented as a loop that waits for a character,
fetches it, then displays it:

      Main:
      loop
         <wait for character>

         Buffer.Get (C);
         Put_Line ("Consumed '" & C & "'");
      end loop Main;


If we were to implement the wait as a straight call to the buffer's
protected entry:

         Buffer.Wait_For_Item;

then there would be no way to interrupt the entry call once in progress.
You can't use a task entry for this purpose, because the task is not
waiting to accept them -- it's blocked on the buffer call.

In order to interrupt the waiting task, we use Asynchronous Transfer of
Control (ATC).  By moving the wait to the abortable region,

         select
            <triggering alternative>
            exit Main;
         then abort
            Buffer.Wait_For_Item;
         end select;

then we can interrupt the call when a triggering alternative is
selected, which can be either a delay or an entry call.

The task itself can't handle the shutdown, because it's blocked waiting
for a character from the buffer.  So what we do is introduce an internal
protected type to receive the shutdown request from a client, and with
another entry to act as the triggering alternative for the task:

   protected type Trigger_Type is
      procedure Shutdown;
      entry Stop_Consuming;
   private
      Stop_Requested : Boolean := False;
   end Trigger_Type;


The protected procedure Shutdown is just sets a flag that indicates
we're done:

      procedure Shutdown is
      begin
         Stop_Requested := True;
      end;


The protected entry Stop_Consuming blocks until the done flag is true:

      entry Stop_Consuming when Stop_Requested is
      begin
         null;
      end;


The complete implementation of the Consumer_Type thus looks like this:

   type Consumer_Type (Buffer : access Buffer_Type) is
      limited record
         Trigger       : Trigger_Type;
         Consumer_Task : Consumer_Task_Type (Consumer_Type'Access);
      end record;


The consumer task calls the Stop_Consuming entry of the trigger:

         select
            Consumer.Trigger.Stop_Consuming;
            exit Main;
         then abort
            Buffer.Wait_For_Item;
         end select;


This (asynchronous) select statement will complete either because the
trigger accepts the Stop_Consuming entry, or because the Buffer accepts
the Wait_For_Item entry.

Shutdown is implemented by calling the trigger's protected Shutdown
procedure:

   procedure Shutdown (Consumer : in out Consumer_Type) is
   begin
      Consumer.Trigger.Shutdown;
   end;

This sets the done flag to True, which immediately forces the barrier
for Stop_Consuming to be reevaluated.  The trigger object accepts the
entry call, which triggers abortion of the Wait_For_Item call already in
progress.

There's one more issue to discuss.  Text_IO is not a concurrent
abstraction, and it's possible that simultaneous calls to Put_Line can
produce interleaved output.

The package Message_IO provides a thread-safe version of Put_Line that
synchronizes its callers:

  package Message_IO is

     procedure Put_Line (Message : in String);

  end Message_IO;


The body of Message_IO declares a semaphore that is seized and released
during calls to Put_Line:

  package body Message_IO is

     Semaphore : aliased Semaphore_Type;

     procedure Put_Line (Message : in String) is
        Control : Semaphore_Control (Semaphore'Access);
     begin
        Ada.Text_IO.Put_Line (Message);
     end;

  end Message_IO;


We use a controlled object to automatically seize and release the
semaphore, which guarantees that the semaphore will get released even if
there's an exception.

Note that we can't just wrap the Text_IO.Put_Line call in a protected
procedure:

  protected body Synchronization is

     procedure Do_Put_Line (S : String) is
     begin
        Text_IO.Put_Line (S);
     end;

  end Synchronization;

because Text_IO.Put_Line is a "potentially blocking call," and it is a
bounded error to invoke a blocking operation from within a protected
operation.  See RM95 9.5.1 (8 - 18).

For whatever reason, the test program dumps core as it terminates
because of a segmentation violation.  I don't know why...


--STX
package body Binary_Semaphores.Controls is

   procedure Initialize (Control : in out Semaphore_Control) is
   begin
      Control.Semaphore.Seize;
   end;

   procedure Finalize (Control : in out Semaphore_Control) is
   begin
      Control.Semaphore.Release;
   end;

end Binary_Semaphores.Controls;
with Ada.Finalization;

package Binary_Semaphores.Controls is

   pragma Preelaborate;

   type Semaphore_Control (Semaphore : access Semaphore_Type) is
      limited private;

private

   use Ada.Finalization;

   type Semaphore_Control (Semaphore : access Semaphore_Type) is
     new Limited_Controlled with null record;

   procedure Initialize (Control : in out Semaphore_Control);

   procedure Finalize (Control : in out Semaphore_Control);

end Binary_Semaphores.Controls;
package body Binary_Semaphores is

   protected body Semaphore_Type is

      procedure Release is
      begin
         In_Use := False;
      end;

      entry Seize when not In_Use is
      begin
         In_Use := True;
      end;

   end Semaphore_Type;

end Binary_Semaphores;
package Binary_Semaphores is

   pragma Pure;


   protected type Semaphore_Type is

      procedure Release;

      entry Seize;

   private

      In_Use : Boolean := False;

   end Semaphore_Type;

end Binary_Semaphores;
package body Buffers is

   protected body Buffer_Type is

      entry Put (Item : in Item_Type)
        when Count < Size is
      begin
         Put_Index := Put_Index mod Size + 1;
         Items (Put_Index) := Item;
         Count := Count + 1;
      end;

      entry Wait_For_Item when Count > 0 is
      begin
         null;
      end;

      procedure Get (Item : out Item_Type) is
      begin
         Item := Items (Get_Index);
         Get_Index := Get_Index mod Size + 1;
         Count := Count - 1;
      end;

      function Is_Full return Boolean is
      begin
         return Count = Size;
      end;

      function Is_Empty return Boolean is
      begin
         return Count = 0;
      end;

   end Buffer_Type;

end Buffers;



generic
   type Item_Type is private;
package Buffers is

   type Item_Array is array (Positive range <>) of Item_Type;

   protected type Buffer_Type (Size : Positive) is

      entry Put (Item : in Item_Type);

      entry Wait_For_Item;

      procedure Get (Item : out Item_Type);

      function Is_Full return Boolean;

      function Is_Empty return Boolean;

   private

      Items     : Item_Array (1 .. Size);
      Get_Index : Positive := 1;
      Put_Index : Positive := Size;
      Count     : Natural := 0;

   end Buffer_Type;

end Buffers;



with Buffers;

package Character_Buffers is new Buffers (Character);
with Message_IO; use Message_IO;

package body Consumers is

   procedure Shutdown (Consumer : in out Consumer_Type) is
   begin
      Consumer.Trigger.Shutdown;
   end;


   protected body Trigger_Type is

      procedure Shutdown is
      begin
         Stop_Requested := True;
      end;

      entry Stop_Consuming when Stop_Requested is
      begin
         null;
      end;

   end Trigger_Type;


   task body Consumer_Task_Type is

      C : Character;

      Buffer : Buffer_Type renames Consumer.Buffer.all;

   begin

      Main:
      loop

         select
            Consumer.Trigger.Stop_Consuming;
            exit Main;
         then abort
            Buffer.Wait_For_Item;
         end select;

         Buffer.Get (C);
         Put_Line ("Consumed '" & C & "'");

      end loop Main;

      while not Buffer.Is_Empty loop
         Buffer.Get (C);
         Put_Line ("Consumed '" & C & "'");
      end loop;

      Put_Line ("Consumer done.");

   end Consumer_Task_Type;

end Consumers;

with Character_Buffers;  use Character_Buffers;

package Consumers is

   type Consumer_Type (Buffer : access Buffer_Type) is
      limited private;

   procedure Shutdown (Consumer : in out Consumer_Type);

private

   task type Consumer_Task_Type (Consumer : access Consumer_Type) is
   end;

   protected type Trigger_Type is
      procedure Shutdown;
      entry Stop_Consuming;
   private
      Stop_Requested : Boolean := False;
   end Trigger_Type;

   type Consumer_Type (Buffer : access Buffer_Type) is
      limited record
         Trigger       : Trigger_Type;
         Consumer_Task : Consumer_Task_Type (Consumer_Type'Access);
      end record;

end Consumers;

with Binary_Semaphores.Controls;
with Ada.Text_IO;

package body Message_IO is

   use Binary_Semaphores;
   Semaphore : aliased Semaphore_Type;


   procedure Put_Line (Message : in String) is
      use Controls;
      Control : Semaphore_Control (Semaphore'Access);
   begin
      Ada.Text_IO.Put_Line (Message);
   end;

end Message_IO;

package Message_IO is

   procedure Put_Line (Message : in String);

end Message_IO;


with Ada.Text_IO;
with Message_IO;         use Message_IO;
with Character_Buffers;  use Character_Buffers;
with Consumers;          use Consumers;

procedure Test_Buffers is

   Buffer : aliased Buffer_Type (Size => 5);

   Consumer : Consumer_Type (Buffer'Access);

   C : Character;

begin

   Put_Line ("Enter characters, and use '0' to indicate you're done.");

   loop
      Ada.Text_IO.Get (C);
      Buffer.Put (C);
      exit when C = '0';
   end loop;

   Put_Line ("Shutting down consumer");
   Shutdown (Consumer);
   Put_Line ("Shutting down complete; exiting.");

end Test_Buffers;

Contributed by: Matthew Heaney
Contributed on: May 24, 1999
License: Public Domain

Back