11 Objects and Concurrency

As we have seen, objects in Oz are stateful data structures. Threads are the active computation entities. Threads can communicate either by message passing using ports, or through common shared objects. Communication through shared objects requires the ability to serialize concurrent operations on objects so that the object state is kept coherent after each such an operation. In Oz, we separate the issue of acquiring exclusive access of an object from the object system. This gives us the ability to perform coarse-grain atomic operation on a set of objects, a very important requirement, e.g. in distributed database systems. The basic mechanism in Oz to get exclusive access is through locks.

11.1 Locks

The purpose of a lock is to mediate exclusive access to a shared resource between threads. Such a mechanism is typically made safer and more robust by restricting this exclusive access to a critical region. On entry into the region, the lock is secured and the thread is granted exclusive access rights to the resource, and when execution leaves the region, whether normally or through an exception, the lock is released. A concurrent attempt to obtain the same lock will block until the thread currently holding it has released it.

11.1.1 Simple Locks

In the case of a simple lock, a nested attempt by the same thread to reacquire the same lock during the dynamic scope of a critical section guarded by the lock will block. We say reentrancy is not supported. Simple locks can be modeled in Oz as follows, where Code is a nullary procedure encapsulating the computation to be performed in the critical section. The lock is represented as a procedure: when applied to some code, it tries to get the lock by waiting until Old gets bound to unit. Notice that the lock is released upon normal as well as abnormal exit.

proc {NewSimpleLock ?Lock}
   Cell = {NewCell unit}
   proc {Lock Code}
      Old New in  
         {Exchange Cell Old New}
         {Wait Old} {Code}  
      finally New=unit end 

11.1.2 Atomic Exchange on Object Attributes

Another implementation is using an object as shown below to implement a lock. Notice the use of the construct:

Old = lck := New

Similar to the Exchange operation on cells, this is an atomic exchange on an object attribute.

class SimpleLock 
   attr lck:unit 
   meth init skip end 
   meth 'lock'(Code)
     Old New in 
        Old = lck := New
        {Wait Old} {Code}
     finally New= unit end 

11.2 Thread-Reentrant Locks

In Oz, the computational unit is the thread. Therefore an appropriate locking mechanism should grant exclusive access rights to threads. As a consequence the non-reentrant simple lock mechanism presented above is inadequate. A thread-reentrant lock allows the same thread to reenter the lock, i.e. to enter a dynamically nested critical region guarded by the same lock. Such a lock can be acquired by at most one thread at a time. Concurrent threads that attempt to get the same lock are queued. When the lock is released, it is granted to the thread standing first in line etc. Thread-reentrant locks can be modeled in Oz as follows:

class ReentrantLock from SimpleLock 
   attr Current:unit 
   meth 'lock'(Code)
      ThisThread = {Thread.this} in 
      if ThisThread == @Current then 
         proc {Code1}
               Current := ThisThread
               Current := unit 
         SimpleLock, 'lock'(Code1)

Thread reentrant locks are given syntactic and implementational support in Oz. They are implemented as subtype of chunks. Oz provides the following syntax for guarded critical regions:

lock E then S end

E is an expression that evaluates to a lock. The construct blocks until S is executed. If E is not a lock, then a type error is raised.

11.2.1 Arrays

Oz has arrays as chunk subtype. Operations on arrays are defined in module Array.

As a simple illustration of the use of locks consider the program in Figure 11.1. The procedure Switch transforms negative elements of an array to positive, and zero elements to the atom zero! The procedure Zero resets all elements to zero.

declare A L in 
A = {NewArray 1 100 ~5}
L = {NewLock}
proc {Switch A}
   {For {Array.low A} {Array.high A} 1
    proc {$ I}  
       X := A.in 
       if X<then A.:= ~X
       elseif X == 0  then A.:= zero end 
       {Delay 100}
proc {Zero A}
   {For {Array.low A} {Array.high A} 1
    proc {$ I} A.:= 0 {Delay 100} end}

Figure 11.1: Using a lock

Try the following program.

local X Y in 
   thread {Zero A} X = unit end   
   thread {Switch A} Y = X end  
   {Wait Y}  
   {For 1 10 1 proc {$ I} {Browse A.I} end}  

The elements of the array will be mixed 0 and zero.

Assume that we want to perform the procedures Zero and Switch, each atomically but in an arbitrary order. To do this we can use locks as in the following example.

local X Y in 
      {Delay 100}  
      lock L then {Zero A} end   
      X = unit  
      lock L then {Switch A} end  
      Y = X  
   {Wait Y}  
   {For 1 10 1 proc {$ I} {Browse  A.I} end}  

By Switching the delay statement above between the first and the second thread, we observe that all the elements of the array either will get the value zero or 0. We have no mixed values.

Warning:*** Write an example of an atomic transaction on multiple objects using multiple locks.

11.3 Locking Objects

To guarantee mutual exclusion on objects one may use the locks described in the previous subsection. Alternatively, we may declare in the class that its instance objects can be locked with a default lock existing in the objects when they are created. A class with an implicit lock is declared as follows:

class C from .... 
   prop locking

This does not automatically lock the object when one of its methods is called. Instead we have to use the construct:

lock S end

inside any method to guarantee exclusive access when S is executed. Remember that our locks are thread-reentrant. This implies that:

Of course, if we use multiple threads calling methods in multiple objects, we might deadlock if there is any cyclic dependency. Writing nontrivial concurrent programs needs careful understanding of the dependency patterns between threads. In such programs deadlock may occur whether locks are used or not. It suffices to have a cyclic communication pattern for deadlock to occur.

The program in Figure 10.3 can be refined to work in concurrent environment by refining it as follows:

class CCounter from Counter 
   prop locking
   meth inc(Value)
      lock Counter,inc(Value) end 
   meth init(Value)
      lock Counter,init(Value) end 

Let us now study a number of interesting examples where threads not only perform atomic transactions on objects, but also synchronize through objects.

11.4 Concurrent FIFO Channel

The first example shows a concurrent channel, which is shared among an arbitrary number of threads. Any producing thread may put information in the channel asynchronously. A consuming thread has to wait until information exists in the channel. Waiting threads are served fairly. Figure 11.2 shows one possible realization. This program relies on the use of logical variables to achieve the desired synchronization. The method put/1 inserts an element in the channel. A thread executing the method get/1 will wait until an element is put in the channel. Multiple consuming threads will reserve their place in the channel, thereby achieving fairness. Notice that {Wait I} is done outside an exclusive region. If waiting was done inside lock ... end the program would deadlock. So, as a rule of thumb:

class Channel from BaseObject 
   prop locking
   attr f r
   meth init 
      X in f := X r := X
   meth put(I)
      X in lock @r=I|X r:=end 
   meth get(?I)
      X in lock @f=I|X f:=end {Wait I}

Figure 11.2: An Asynchronous Channel Class

11.5 Monitors

The next example shows a traditional way to write monitors. We start by defining a class that defines the notion of events and the monitor operations notify(Event) and wait(Event) by specializing the class Channel.

class Event from Channel 
   meth wait 
      Channel , get(_)
   meth notify 
      Channel , put(unit)

We show here an example of a unit buffer in the traditional monitor style. The unit buffer behaves in a way very similar to a channel when it comes to consumers. Each consumer waits until the buffer is full. In the case of producers only one is allowed to insert an item in the empty buffer. Other producers have to suspend until the item is consumed. The program in Figure 11.3 shows a single buffer monitor. Here we had to program a signaling mechanism for producers and consumers. Observe the pattern in put/1 and get/1 methods. Most execution is done in an exclusive region. If waiting is necessary it is done outside the exclusive region. This is done by using an auxiliary variable X, which gets bound to yes. The get/1 method notifies one producer at a time by setting the empty flag and notifying one producer (if any). This is done as an atomic step. The put/1 method does the reciprocal action.

class UnitBufferM 
   attr item empty psignal csignal
   prop locking
   meth init 
      empty := true 
      psignal := {New Event init}
      csignal := {New Event init}
   meth put(I)
      X in 
         if @empty then 
            item := I
            empty := false 
            X = yes
            {@csignal notify}
         else X = no end 
      if X == no then 
         {@psignal wait}
         {self put(I)}
   meth get(I)
      X in 
         if {Not @empty} then 
            I = @item
            empty := true 
            {@psignal notify}
            X = yes
         else X = no end 
      if X == no then 
         {@csignal wait}
         {self get(I)}

Figure 11.3: A Unit Buffer Monitor

Try the above example by running the following code:

  UB = {New UnitBufferM init} in 
  {For 1 15 1
   proc{$ I} thread {UB put(I)} {Delay 500} end end}
  {For 1 15 1
   proc{$ I} thread {UB get({Browse}}{Delay 500} end end}

11.5.1 Bounded Buffers Oz Style

In Oz, it is very rare to write programs in the monitor style shown above. In general it is very awkward. There is a simpler way to write a UnitBuffer class that is not traditional. This is due to the combination of objects and logic variable, Figure 11.4 shows a simple definition. No locking is needed directly.

class UnitBuffer from BaseObject 
   attr prodq buffer
   meth init 
      buffer := {New Channel init}
      prodq := {New Event init}
      {@prodq notify}
   meth put(I)
      {@prodq wait}
      {@buffer put(I)}
   meth get(?I)
      {@buffer get(I)}
      {@prodq notify}

Figure 11.4: Unit Buffer

A simple generalization of the above program leads to an arbitrary size bounded buffer class. This is shown in below. The put and get methods are the same as before. Only the initialization method is changed.

class BoundedBuffer from UnitBuffer 
   attr prodq buffer
   meth init(N)
      buffer := {New Channel init}
      prodq := {New Event init}
      {For 1 N 1 proc {$ _} {@prodq notify} end}

11.6 Active Objects

An active object is a thread whose behavior is described by a class definition. Communication with active objects is through asynchronous message passing. An active object reacts to received messages by executing the corresponding methods in its associated class. An active object executes one method at a time. Therefore locking is not needed for methods performed by an active object. The interface to an active object is through Oz ports. Clients of an active object send messages to the object by sending messages to its associated port. We will show how to create generically this abstraction. Since active objects resemble servers receiving messages from clients though a network we call this abstraction the server abstraction. To create a server S from a class Class we execute:

S = {NewServer Class init}

Here init is the initial object construction method. To get the basic idea we show first a simplified form of the NewServer function. The following function:

fun {NewServer Class Init}
   S    % The stream of the port
   Port = {NewPort S}
   Object = {New Class Init}
   thread {ForAll S  
           proc{$ M} {Object M} end}

We would like to add the ability of terminating the thread by making a protected method Close accessible to the method in Class. This leads us to the following extension of the above function. We use the exception handling mechanism to jump out of the receiving loop.

   class Server 
     attr close:Close
     meth Close raise closeException end end 
fun {NewServer Class Init}
   S    % The stream of the port
   Port = {NewPort S}
   Object = {New class $ from Server Class end Init}
     try  {ForAll S  
           proc{$ M} {Object M} end}
     catch closeException then skip end 

Seif Haridi and Nils Franzén
Version 1.4.0 (20080702)