8 Concurrency

So far, we have seen only one thread executing. It is time to introduce concurrency. In Oz a new concurrent thread of control is spawned by:

thread S end

Executing this statement, a thread is forked that runs concurrently with the current thread. The current thread resumes immediately with the next statement. Each nonterminating thread, that is not blocking, will eventually be allocated a time slice of the processor. This means that threads are executed fairly.

However, there are three priority levels: high, medium, and low that determine how often a runnable thread is allocated a time slice. In Oz, a high priority thread cannot starve a low priority one. Priority determines only how large piece of the processor cake a thread can get.

Each thread has a unique name. To get the name of the current thread the procedure Thread.this/1 is called. Having a reference to a thread, by using its name, enables operations on threads such as terminating a thread, or raising an exception in a thread. Thread operations are defined the base module Thread.

Let us see what we can do with threads. First, remember that each thread is a data-flow thread that blocks on data dependency. Consider the following program:

declare X0 X1 X2 X3 in 
thread 
   local Y0 Y1 Y2 Y3 in 
      {Browse [Y0 Y1 Y2 Y3]}  
      Y0 = X0+1
      Y1 = X1+Y0
      Y2 = X2+Y1
      Y3 = X3+Y2
      {Browse completed}
   end 
end 
{Browse [X0 X1 X2 X3]}

If you input this program and watch the display of the Browser tool, the variables will appear unbound. Observe now what happens when you input the following statements one at a time:

X0 = 0
X1 = 1
X2 = 2
X3 = 3

You will see how the thread resumes and then suspends again. First when X0 is bound the thread can execute Y0 = X0+1 and suspends again because it needs the value of X1 while executing Y1 =X1+Y0, and so on.


fun {Map Xs F}
   case Xs
   of nil then nil
   [] X|Xr then thread {F X} end |{Map Xr F}
   end 
end

Figure 8.1: A concurrent Map function


The program shown in Figure 8.1 defines a concurrent Map function. Notice that thread ... end is used here as an expression. Let us discuss the behavior of this program. If we enter the following statements:

declare 
F X Y Z
{Browse thread {Map X F} end}

a thread executing Map is created. It will suspend immediately in the case-statement because X is unbound. If we thereafter enter the following statements:

X = 1|2|Y
fun {F X} X*end

the main thread will traverse the list creating two threads for the first two arguments of the list, thread {F 1} end, and thread {F 2} end, and then it will suspend again on the tail of the list Y. Finally,

Y = 3|Z
Z = nil

will complete the computation of the main thread and the newly created thread thread {F 3} end, resulting in the final list [1 4 9].

The program shown in Figure 8.2 is a concurrent divide-and-conquer program, which is rather inefficient way to compute the Fibonacci function. This program creates an exponential number of threads! See how easy it is to create concurrent threads. You may use this program to test how many threads your Oz installation can create. Try

{Browse {Fib 25}}

while using the panel program in your Oz menu to see the threads. If it works, try a larger number. The panel is show in Figure 8.3.


fun {Fib X}
   case X
   of 0 then 1
   [] 1 then 1
   else thread {Fib X-1} end  + {Fib X-2} end 
end

Figure 8.2: A concurrent Fibonacci function



Figure 8.3: The Mozart Panel showing thread creation {Fib 26 X}


The whole idea of explicit thread creation in Oz is to enable the programmer to structure his/her application in a modular way. In this respect the Mozart system is excellent. Threads are so cheap that can afford to create say 100000 of them. As a comparison thread creation in Mozart 1.0 is about 60 time faster than in Java JDK 1.2. If concurrency makes an easier structure of you program then use it without hesitation. However sequential programs are always faster than concurrent programs having the same structure. The Fib program in Figure 8.2 is faster if you remove thread ... end. Therefore, create threads only when the application needs it, and not because concurrency is fun.

8.1 Time

In module Time, we can find a number of useful soft real-time procedures. Among them are:


local 
   proc {Ping N}
      if N==then {Browse 'ping terminated'}
      else {Delay 500} {Browse ping} {Ping N-1} end 
   end 
   proc {Pong N}
      {For 1 N 1  
         proc {$ I} {Delay 600} {Browse pong} end }
      {Browse 'pong terminated'}
   end 
in 
   {Browse 'game started'}
   thread {Ping 50} end 
   thread {Pong 50} end 
end

Figure 8.4: A 'Ping Pong' program


The program shown in Figure 8.4 starts two threads, one displays ping periodically after 500 milliseconds, and the other pong after 600 milliseconds. Some pings will be displayed immediately after each other because of the periodicity difference.

8.1.1 Making Standalone Application

It is easy to make stand-alone applications in Mozart. We show this by make the program in Figure 8.4 stand-alone by making a functor of the program as shown in Figure 8.5, and storing it in your file PingPong.oz. Thereafter use the command:

 ozc -x PingPong.oz 

Now type PingPong in your shell to start the program.1


functor 
import 
   Browser(browse:Browse) %Import Browse form Browser module
define 
   proc {Ping N}
      if N==then {Browse 'ping terminated'}
      else {Delay 500} {Browse ping} {Ping N-1} end 
   end 
   proc {Pong N}
      {For 1 N 1  
         proc {$ I} {Delay 600} {Browse pong} end }
      {Browse 'pong terminated'}
   end 
in 
   {Browse 'game started'}
   thread {Ping 50} end 
   thread {Pong 50} end 
end

Figure 8.5: A 'Ping Pong' program stand-alone


8.2 Stream Communication

The data-flow property of Oz easily enables writing threads that communicate through streams in a producer-consumer pattern. A stream is a list that is created incrementally by one thread (the producer) and subsequently consumed by one or more threads (the consumers). The threads consume the same elements of the stream. For example, the program in Figure 8.6 is an example of stream communication, where the producer generates a list of numbers and the consumer sums all the numbers.


fun {Generator N}
   if N > 0 then N|{Generator N-1}
   else nil end 
end 
local 
    fun {Sum1 L A}
      case L
      of nil then A
      [] X|Xs then {Sum1 Xs A+X}
      end 
    end 
in fun {Sum L} {Sum1 L 0} end 
end

Figure 8.6: Summing the elements in a list


Try the program above by running the following program:

{Browse thread {Sum thread {Generator 150000} end } end}

It should produce the number 11250075000. Let us understand the working of stream communication. A producer incrementally creates a stream (a list) of elements as in the following example where it is producing volvo's. This happens in general in an eager fashion.

fun {Producer ...... volvo|{Producer ...... end

The consumer waits on the stream until items arrive, then the items are consumed as in:

proc {Consumer Ls ...}
   case Ls of volvo|Lr then 
'Consume volvo' ... end 
   {Consumer Lr}
end

The data-flow behavior of the case-statement suspends the consumer until the arrival of the next item of the stream. The recursive call allows the consumer to iterate the action over again. The following pattern avoids the use of recursion by using an iterator instead:

proc {Consumer Ls ...}
   {ForAll Ls
    proc {$ Item}
       case Item of volvo then  
          
Consume volvo ...  
       end 
    end}
end

Figure 8.7 shows a simple example using this pattern. The consumer counts the cars received. Each time it receives 1000 cars it prints a message on the display of the Browser.


fun {Producer N}
   if N > 0 then 
      volvo|{Producer N-1}
   else nil end 
end 
local 
   proc {ConsumerN Ls N}
      case Ls of nil then skip 
      [] volvo|Lr then 
         if N mod 1000 == 0 then 
            {Browse 'riding a new volvo'}
         end 
         {ConsumerN Lr N+1}
      else 
         {ConsumerN {List.drop Ls 1} N}
      end 
   end 
in 
   proc {Consumer Ls} {ConsumerN Ls 1} end 
end

Figure 8.7: Producing volvo's


You may run this program using:

{Consumer thread {Producer 10000} end}

Warning:When you feed a statement into the emulator, it is executed in its own thread. Therefore, after feeding the above statement two threads are created. The main one is for the consumer, and the forked thread is for the producer.

Notice that the consumer was written using the recursive pattern. Can we write this program using the iterative ForAll/2 construct? This is not possible because the consumer carries an extra argument N that accumulates a result which, is passed to the next recursive call. The argument corresponds to some kind of state. In general, there are two solutions. We either introduce a stateful (mutable) data structure, which we will do in Section 9.4, or define another iterator that passes the state around. In our case, some iterators that fit our needs exist in the module List. First, we need an iterator that filters away all items except volvo's. We can use {Filter Xs P ?Ys} which outputs in Ys all the elements that satisfies the procedure P/2 used as a Boolean function. The second construct is {List.forAllInd Xs P} which is similar to ForAll, but P/2 takes the index of the current element of the list, starting from 1, as its first argument, and the element of the list as its second argument. Here is the program:

proc {Consumer Ls}
   fun {IsVolvo X} X == volvo end 
   Ls1
in 
   thread Ls1 = {Filter Ls IsVolvo} end 
   {List.forAllInd Ls1
      proc {$ N X}
         if N mod 1000 == 0 then 
           {Browse 'riding a new volvo'}
         end 
      end}
end

8.3 Thread Priority and Real Time

Try to run the program using the following statement:

{Consumer thread {Producer 5000000} end}

Switch on the panel and observe the memory behavior of the program. You will quickly notice that this program does not behave well. The reason has to do with the asynchronous message passing. If the producer sends messages i.e. create new elements in the stream, in a faster rate than the consumer can consume, increasingly more buffering will be needed until the system starts to break down.2 There are a number of ways to solve this problem. One is to create a bounded buffer between producers and consumers which we will discuss later. Another way is to change the thread execution speed (by changing the thread's priority) so that consumers get more time-slices than producers.

The modules Thread and Property provide a number of operations pertinent to threads. Some of these are summarized in Table 8.1.


Procedure

Description

{Thread.state +T ?A}

Returns current state of T

{Thread.suspend +T}

Suspends T

{Thread.resume +T}

Resumes T

{Thread.terminate +T}

Terminates T

{Thread.injectException ++E}

Raises exception E in T

{Thread.this +T}

Returns the current thread T

{Thread.setPriority ++P}

Sets T's priority

{Thread.setThisPriority +P}

Sets current thread's priority

{Property.get priorities ?Pr }

Gets system-priority ratios

{Property.put priorities(high:+X medium:+Y)}

Sets system-priority ratios

Table 8.1: Thread operations


Oz has three priority levels. The system procedure

{Property.put priorities(high:X medium:Y)}

sets the processor-time ratio to X:1 between high-priority threads and medium-priority thread. It also sets the processor-time ratio to Y:1 between medium-priority threads and low-priority thread. X and Y are integers. So, if we execute

{Property.put priorities(high:10 medium:10)}

for each 10 time-slices allocated to runnable high-priority threads, the system will allocate one time-slice for medium-priority threads, and similarly between medium and low priority threads. Within the same priority level, scheduling is fair and round-robin. Now let us make our producer-consumer program work. We give the producer low priority, and the consumer high. We also set the priority ratios to 10:1 and 10:1.

local L in 
   {Property.put threads priorities(high:10 medium:10)}
   thread 
     {Thread.setThisPriority low}
      L = {Producer 5000000}
   end 
   thread 
       {Thread.setThisPriority high}
       {Consumer L}
   end 
end    

8.4 Demand-driven Execution

An extreme alternative solution is to make the producer lazy, only producing an item when the consumer requests one. A consumer, in this case, constructs the stream with unbound variables (empty boxes). The producer waits for the unbound variables (empty boxes) to appear on the stream. It then binds the variables (fills the boxes). The general pattern of the producer is as follows.

proc {Producer Xs}
   case Xs of X|Xr then  
      I in 'Produce I'  
      X=I ...  
      {Producer Xr}  
   end 
end

The general pattern of the consumer is as follows.

proc {Consumer ... Xs}
   X Xr in  
      ...  
      Xs = X|Xr  
      'Consume X' 
      ... {Consumer ... Xr}
end  

The program shown in Figure 8.8 is a demand driven version of the program in Figure 8.7. You can run it with very large number of volvo's!


local 
   proc {Producer Xs}
      case Xs of X|Xr then X = volvo {Producer Xr}
      [] nil then {Browse 'end of line'}
      end 
   end 
   proc {Consumer N Xs}
      if N=<then Xs=nil
      else X|Xr = Xs in 
         if X == volvo then 
            if N mod 1000 == 0 then 
               {Browse 'riding a new volvo'}
            end 
            {Consumer N-1 Xr}
         else 
            {Consumer N Xr}
         end 
      end 
   end 
in 
   {Consumer 10000000 thread {Producer $} end}
end

Figure 8.8: Producing Volvo's lazily


8.4.1 Futures

There is another way to program demand-driven computations. This uses the notion of future and the ByNeed primitive operation. A future is a read-only capability of a logic variable. For example to create a future of the variable X we perform the operation !! to create a future Y.

 Y = !!

A thread trying to use the value of a future, e.g. using Y, will suspend until the variable of the future, e.g. X, gets bound.

One way to execute a procedure lazily, i.e. in a demand-driven manner, is to use the operation {ByNeed +P ?F}. ByNeed takes a one-argument procedure P, and returns a future F. When a thread tries to access the value of F, the procedure {P X} is called, and its result value X is bound to F. This allows us to perform demand-driven computations in a straightforward manner. For example by feeding

declare Y
{ByNeed proc {$ X} X=1 end Y}
{Browse Y}

we will observe that Y becomes a future, i.e. we will see Y<Future> in the Browser. If we try to access the value of Y, it will get bound to 1. One way to access Y is by performing the operation {Wait Y} which triggers the producing procedure.

Now we can rewrite program of Figure 8.8 as shown in Figure 8.9. This looks very similar to Figure 8.7


local 
   proc {Producer Xs}
      Xr in 
      Xs = volvo|{ByNeed {Producer Xr} $}
   end 
   proc {Consumer N Xs}
      if N>then 
         case Xs of X|Xr then 
            if X==volvo then 
              if N mod 1000 == 0 then 
                 {Browse 'riding a new volvo'}
              end 
              {Consumer N-1 Xr}
            else {Consume N Xr} end 
         end   
      end 
   end 
in 
   {Consumer 10000000 thread {Producer $} end}
end

Figure 8.9: Producing Volvo's using ByNeed


8.5 Thread Termination-Detection

We have seen how threads are forked using the statement thread S end. A natural question that arises is how to join back a forked thread into the original thread of control. In fact, this is a special case of detecting termination of multiple threads, and making another thread wait on that event. The general scheme is quite easy because Oz is a data-flow language.

thread T1 X1=unit end 
thread 
T2 X2=X1 end 
... 
thread 
TN XN=XN-1 end 
{Wait 
XN}
MainThread

When All threads terminate the variables X1 ... XN will be merged together and bound to unit. {Wait XN} suspends the main thread until XN is bound.

In Figure 8.10 we define a higher-order construct (combinator), that implements the concurrent-composition control construct that has been outlined above. It takes a single argument that is a list of nullary procedures. When it is executed, the procedures are forked concurrently. The next statement is executed only when all procedures in the list terminate.


local 
    proc {Conc1 Ps I O}
      case Ps of P|Pr then M in  
         thread {P} M = I end 
         {Conc1 Pr M O}
      [] nil then O = I
      end 
    end 
in 
   proc {Conc Ps} {Wait {Conc1 Ps unit $}} end 
end

Figure 8.10: Concurrent Composition


The program Figure 8.5 didn't terminate properly when the Ping and the Pong threads terminated. This problem can be remedied now. If we use Application.exit/1 a stand-alone application terminates aborting remaining threads. We can arrange things such that the main thread terminates only when the Ping and the Pong threads terminate. This is shown in Figure 8.11.


functor 
import 
   Browser(browse:Browse) %Import Browse form Browser module
   Application
define 
   proc {Ping N}
      if N==then {Browse 'ping terminated'}
      else {Delay 500} {Browse ping} {Ping N-1} end 
   end 
   proc {Pong N}
      {For 1 N 1  
         proc {$ I} {Delay 600} {Browse pong} end }
      {Browse 'pong terminated'}
   end 
   X1 X2
in 
   {Browse 'game started'}
   thread {Ping 50} X1=unit end 
   thread {Pong 50} X2=X1 end 
   {Wait X2}
   {Application.exit 0}
end

Figure 8.11: A 'Ping Pong' program stand-alone



1. To terminate this program in the OS shell you have to type CONTROL-C. We will see later how to terminate it properly.
2. Ironically in the Mozart system, using the distributed programming capabilities, stream communication across sites works better because of designed flow control mechanism that suspends producers when the network buffers are full.

Seif Haridi and Nils Franzén
Version 1.4.0 (20080702)