<< Prev | - Up - | Next >> |
As we have seen, objects in Oz are stateful data structures. Threads are the active computation entities. Threads can communicate either by message passing using ports, or through common shared objects. Communication through shared objects requires the ability to serialize concurrent operations on objects so that the object state is kept coherent after each such an operation. In Oz, we separate the issue of acquiring exclusive access of an object from the object system. This gives us the ability to perform coarse-grain atomic operation on a set of objects, a very important requirement, e.g. in distributed database systems. The basic mechanism in Oz to get exclusive access is through locks.
The purpose of a lock is to mediate exclusive access to a shared resource between threads. Such a mechanism is typically made safer and more robust by restricting this exclusive access to a critical region. On entry into the region, the lock is secured and the thread is granted exclusive access rights to the resource, and when execution leaves the region, whether normally or through an exception, the lock is released. A concurrent attempt to obtain the same lock will block until the thread currently holding it has released it.
In the case of a simple lock, a nested attempt by the same thread to reacquire the same lock during the dynamic scope of a critical section guarded by the lock will block. We say reentrancy is not supported. Simple locks can be modeled in Oz as follows, where Code
is a nullary procedure encapsulating the computation to be performed in the critical section. The lock is represented as a procedure: when applied to some code, it tries to get the lock by waiting until Old
gets bound to unit
. Notice that the lock is released upon normal as well as abnormal exit.
proc {NewSimpleLock ?Lock}
Cell = {NewCell unit}
in
proc {Lock Code}
Old New in
try
{Exchange Cell Old New}
{Wait Old} {Code}
finally New=unit end
end
end
Another implementation is using an object as shown below to implement a lock. Notice the use of the construct:
Old = lck := New
Similar to the Exchange operation on cells, this is an atomic exchange on an object attribute.
class SimpleLock
attr lck:unit
meth init skip end
meth 'lock'(Code)
Old New in
try
Old = lck := New
{Wait Old} {Code}
finally New= unit end
end
end
In Oz, the computational unit is the thread. Therefore an appropriate locking mechanism should grant exclusive access rights to threads. As a consequence the non-reentrant simple lock mechanism presented above is inadequate. A thread-reentrant lock allows the same thread to reenter the lock, i.e. to enter a dynamically nested critical region guarded by the same lock. Such a lock can be acquired by at most one thread at a time. Concurrent threads that attempt to get the same lock are queued. When the lock is released, it is granted to the thread standing first in line etc. Thread-reentrant locks can be modeled in Oz as follows:
class ReentrantLock from SimpleLock
attr Current:unit
meth 'lock'(Code)
ThisThread = {Thread.this} in
if ThisThread == @Current then
{Code}
else
proc {Code1}
try
Current := ThisThread
{Code}
finally
Current := unit
end
end
in
SimpleLock, 'lock'(Code1)
end
end
end
Thread reentrant locks are given syntactic and implementational support in Oz. They are implemented as subtype of chunks. Oz provides the following syntax for guarded critical regions:
lock
Ethen
Send
E is an expression that evaluates to a lock. The construct blocks until S is executed. If E is not a lock, then a type error is raised.
{NewLock L}
creates a new lock L
.
{IsLock E
} returns true iff E
is a lock.
Oz has arrays as chunk subtype. Operations on arrays are defined in module Array
.
{NewArray +L +H +I ?A}
creates an array A
, where L
is the lower-bound index, H
is the higher-bound index, and I
is the initial value of the array elements.
{Array.low +A ?L}
returns the lower index.
{Array.high +A ?L}
returns the higher index.
R:=A.I
returns A[I]
in R
.
A.I:=X
assigns X
to the entry A[I]
.
As a simple illustration of the use of locks consider the program in Figure 11.1. The procedure Switch
transforms negative elements of an array to positive, and zero elements to the atom zero
! The procedure Zero
resets all elements to zero.
declare A L in
A = {NewArray 1 100 ~5}
L = {NewLock}
proc {Switch A}
{For {Array.low A} {Array.high A} 1
proc {$ I}
X := A.I in
if X<0 then A.I := ~X
elseif X == 0 then A.I := zero end
{Delay 100}
end}
end
proc {Zero A}
{For {Array.low A} {Array.high A} 1
proc {$ I} A.I := 0 {Delay 100} end}
end
Try the following program.
local X Y in
thread {Zero A} X = unit end
thread {Switch A} Y = X end
{Wait Y}
{For 1 10 1 proc {$ I} {Browse A.I} end}
end
The elements of the array will be mixed 0
and zero
.
Assume that we want to perform the procedures Zero
and Switch
, each atomically but in an arbitrary order. To do this we can use locks as in the following example.
local X Y in
thread
{Delay 100}
lock L then {Zero A} end
X = unit
end
thread
lock L then {Switch A} end
Y = X
end
{Wait Y}
{For 1 10 1 proc {$ I} {Browse A.I} end}
end
By Switching the delay statement above between the first and the second thread, we observe that all the elements of the array either will get the value zero
or 0
. We have no mixed values.
Warning:*** Write an example of an atomic transaction on multiple objects using multiple locks.
To guarantee mutual exclusion on objects one may use the locks described in the previous subsection. Alternatively, we may declare in the class that its instance objects can be locked with a default lock existing in the objects when they are created. A class with an implicit lock is declared as follows:
class
Cfrom ....
prop locking
....
end
This does not automatically lock the object when one of its methods is called. Instead we have to use the construct:
lock
Send
inside any method to guarantee exclusive access when S is executed. Remember that our locks are thread-reentrant. This implies that:
if we take all objects that we have constructed and enclose each method body with lock ... end
, and
execute our program with only one thread, then
the program will behave exactly as before
Of course, if we use multiple threads calling methods in multiple objects, we might deadlock if there is any cyclic dependency. Writing nontrivial concurrent programs needs careful understanding of the dependency patterns between threads. In such programs deadlock may occur whether locks are used or not. It suffices to have a cyclic communication pattern for deadlock to occur.
The program in Figure 10.3 can be refined to work in concurrent environment by refining it as follows:
class CCounter from Counter
prop locking
meth inc(Value)
lock Counter,inc(Value) end
end
meth init(Value)
lock Counter,init(Value) end
end
end
Let us now study a number of interesting examples where threads not only perform atomic transactions on objects, but also synchronize through objects.
The first example shows a concurrent channel, which is shared among an arbitrary number of threads. Any producing thread may put information in the channel asynchronously. A consuming thread has to wait until information exists in the channel. Waiting threads are served fairly. Figure 11.2 shows one possible realization. This program relies on the use of logical variables to achieve the desired synchronization. The method put/1
inserts an element in the channel. A thread executing the method get/1
will wait until an element is put in the channel. Multiple consuming threads will reserve their place in the channel, thereby achieving fairness. Notice that {Wait I}
is done outside an exclusive region. If waiting was done inside lock ... end
the program would deadlock. So, as a rule of thumb:
Do not wait inside an exclusive region, if the waking-up action has to acquire the same lock.
class Channel from BaseObject
prop locking
attr f r
meth init
X in f := X r := X
end
meth put(I)
X in lock @r=I|X r:=X end
end
meth get(?I)
X in lock @f=I|X f:=X end {Wait I}
end
end
The next example shows a traditional way to write monitors. We start by defining a class that defines the notion of events and the monitor operations notify(Event)
and wait(Event)
by specializing the class Channel
.
class Event from Channel
meth wait
Channel , get(_)
end
meth notify
Channel , put(unit)
end
end
We show here an example of a unit buffer in the traditional monitor style. The unit buffer behaves in a way very similar to a channel when it comes to consumers. Each consumer waits until the buffer is full. In the case of producers only one is allowed to insert an item in the empty buffer. Other producers have to suspend until the item is consumed. The program in Figure 11.3 shows a single buffer monitor. Here we had to program a signaling mechanism for producers and consumers. Observe the pattern in put/1
and get/1
methods. Most execution is done in an exclusive region. If waiting is necessary it is done outside the exclusive region. This is done by using an auxiliary variable X
, which gets bound to yes
. The get/1
method notifies one producer at a time by setting the empty
flag and notifying one producer (if any). This is done as an atomic step. The put/1
method does the reciprocal action.
class UnitBufferM
attr item empty psignal csignal
prop locking
meth init
empty := true
psignal := {New Event init}
csignal := {New Event init}
end
meth put(I)
X in
lock
if @empty then
item := I
empty := false
X = yes
{@csignal notify}
else X = no end
end
if X == no then
{@psignal wait}
{self put(I)}
end
end
meth get(I)
X in
lock
if {Not @empty} then
I = @item
empty := true
{@psignal notify}
X = yes
else X = no end
end
if X == no then
{@csignal wait}
{self get(I)}
end
end
end
Try the above example by running the following code:
local
UB = {New UnitBufferM init} in
{For 1 15 1
proc{$ I} thread {UB put(I)} {Delay 500} end end}
{For 1 15 1
proc{$ I} thread {UB get({Browse}}{Delay 500} end end}
end
In Oz, it is very rare to write programs in the monitor style shown above. In general it is very awkward. There is a simpler way to write a UnitBuffer
class that is not traditional. This is due to the combination of objects and logic variable, Figure 11.4 shows a simple definition. No locking is needed directly.
class UnitBuffer from BaseObject
attr prodq buffer
meth init
buffer := {New Channel init}
prodq := {New Event init}
{@prodq notify}
end
meth put(I)
{@prodq wait}
{@buffer put(I)}
end
meth get(?I)
{@buffer get(I)}
{@prodq notify}
end
end
A simple generalization of the above program leads to an arbitrary size bounded buffer class. This is shown in below. The put and get methods are the same as before. Only the initialization method is changed.
class BoundedBuffer from UnitBuffer
attr prodq buffer
meth init(N)
buffer := {New Channel init}
prodq := {New Event init}
{For 1 N 1 proc {$ _} {@prodq notify} end}
end
end
An active object is a thread whose behavior is described by a class definition. Communication with active objects is through asynchronous message passing. An active object reacts to received messages by executing the corresponding methods in its associated class. An active object executes one method at a time. Therefore locking is not needed for methods performed by an active object. The interface to an active object is through Oz ports. Clients of an active object send messages to the object by sending messages to its associated port. We will show how to create generically this abstraction. Since active objects resemble servers receiving messages from clients though a network we call this abstraction the server abstraction. To create a server S
from a class Class we execute:
S = {NewServer Class init}
Here init
is the initial object construction method. To get the basic idea we show first a simplified form of the NewServer
function. The following function:
creates a port Port
,
creates an object Object
, and finally
creates a thread that serves messages sent to the port, by applying the corresponding class methods.
fun {NewServer Class Init}
S % The stream of the port
Port = {NewPort S}
Object = {New Class Init}
in
thread {ForAll S
proc{$ M} {Object M} end}
end
Port
end
We would like to add the ability of terminating the thread by making a protected method Close
accessible to the method in Class
. This leads us to the following extension of the above function. We use the exception handling mechanism to jump out of the receiving loop.
local
class Server
attr close:Close
meth Close raise closeException end end
end
in
fun {NewServer Class Init}
S % The stream of the port
Port = {NewPort S}
Object = {New class $ from Server Class end Init}
in
thread
try {ForAll S
proc{$ M} {Object M} end}
catch closeException then skip end
end
Port
end
<< Prev | - Up - | Next >> |