<< Prev | - Up - | Next >> |
So far, we have seen only one thread executing. It is time to introduce concurrency. In Oz a new concurrent thread of control is spawned by:
thread
Send
Executing this statement, a thread is forked that runs concurrently with the current thread. The current thread resumes immediately with the next statement. Each nonterminating thread, that is not blocking, will eventually be allocated a time slice of the processor. This means that threads are executed fairly.
However, there are three priority levels: high, medium, and low that determine how often a runnable thread is allocated a time slice. In Oz, a high priority thread cannot starve a low priority one. Priority determines only how large piece of the processor cake a thread can get.
Each thread has a unique name. To get the name of the current thread the procedure Thread.this/1
is called. Having a reference to a thread, by using its name, enables operations on threads such as terminating a thread, or raising an exception in a thread. Thread operations are defined the base module Thread
.
Let us see what we can do with threads. First, remember that each thread is a data-flow thread that blocks on data dependency. Consider the following program:
declare X0 X1 X2 X3 in
thread
local Y0 Y1 Y2 Y3 in
{Browse [Y0 Y1 Y2 Y3]}
Y0 = X0+1
Y1 = X1+Y0
Y2 = X2+Y1
Y3 = X3+Y2
{Browse completed}
end
end
{Browse [X0 X1 X2 X3]}
If you input this program and watch the display of the Browser tool, the variables will appear unbound. Observe now what happens when you input the following statements one at a time:
X0 = 0
X1 = 1
X2 = 2
X3 = 3
You will see how the thread resumes and then suspends again. First when X0
is bound the thread can execute Y0 = X0+1
and suspends again because it needs the value of X1
while executing Y1 =X1+Y0,
and so on.
fun {Map Xs F}
case Xs
of nil then nil
[] X|Xr then thread {F X} end |{Map Xr F}
end
end
The program shown in Figure 8.1 defines a concurrent Map
function. Notice that thread ... end
is used here as an expression. Let us discuss the behavior of this program. If we enter the following statements:
declare
F X Y Z
{Browse thread {Map X F} end}
a thread executing Map
is created. It will suspend immediately in the case-statement because X
is unbound. If we thereafter enter the following statements:
X = 1|2|Y
fun {F X} X*X end
the main thread will traverse the list creating two threads for the first two arguments of the list, thread {F 1} end
, and thread {F 2} end
, and then it will suspend again on the tail of the list Y
. Finally,
Y = 3|Z
Z = nil
will complete the computation of the main thread and the newly created thread thread {F 3} end
, resulting in the final list [1 4 9]
.
The program shown in Figure 8.2 is a concurrent divide-and-conquer program, which is rather inefficient way to compute the Fibonacci function. This program creates an exponential number of threads! See how easy it is to create concurrent threads. You may use this program to test how many threads your Oz installation can create. Try
{Browse {Fib 25}}
while using the panel program in your Oz menu to see the threads. If it works, try a larger number. The panel is show in Figure 8.3.
fun {Fib X}
case X
of 0 then 1
[] 1 then 1
else thread {Fib X-1} end + {Fib X-2} end
end
The whole idea of explicit thread creation in Oz is to enable the programmer to structure his/her application in a modular way. In this respect the Mozart system is excellent. Threads are so cheap that can afford to create say 100000 of them. As a comparison thread creation in Mozart 1.0 is about 60 time faster than in Java JDK 1.2. If concurrency makes an easier structure of you program then use it without hesitation. However sequential programs are always faster than concurrent programs having the same structure. The Fib
program in Figure 8.2 is faster if you remove thread ... end
. Therefore, create threads only when the application needs it, and not because concurrency is fun.
In module Time
, we can find a number of useful soft real-time procedures. Among them are:
{Alarm I ?U}
which creates immediately its own thread, and binds U
to unit
after I
milliseconds.
{Delay I}
suspends the executing thread for, a least, I
milliseconds and then reduces to skip
.
local
proc {Ping N}
if N==0 then {Browse 'ping terminated'}
else {Delay 500} {Browse ping} {Ping N-1} end
end
proc {Pong N}
{For 1 N 1
proc {$ I} {Delay 600} {Browse pong} end }
{Browse 'pong terminated'}
end
in
{Browse 'game started'}
thread {Ping 50} end
thread {Pong 50} end
end
The program shown in Figure 8.4 starts two threads, one displays ping
periodically after 500 milliseconds, and the other pong
after 600 milliseconds. Some pings
will be displayed immediately after each other because of the periodicity difference.
It is easy to make stand-alone applications in Mozart. We show this by make the program in Figure 8.4 stand-alone by making a functor of the program as shown in Figure 8.5, and storing it in your file PingPong.oz
. Thereafter use the command:
ozc -x PingPong.oz
Now type PingPong in your shell to start the program.1
functor
import
Browser(browse:Browse) %Import Browse form Browser module
define
proc {Ping N}
if N==0 then {Browse 'ping terminated'}
else {Delay 500} {Browse ping} {Ping N-1} end
end
proc {Pong N}
{For 1 N 1
proc {$ I} {Delay 600} {Browse pong} end }
{Browse 'pong terminated'}
end
in
{Browse 'game started'}
thread {Ping 50} end
thread {Pong 50} end
end
The data-flow property of Oz easily enables writing threads that communicate through streams in a producer-consumer pattern. A stream is a list that is created incrementally by one thread (the producer) and subsequently consumed by one or more threads (the consumers). The threads consume the same elements of the stream. For example, the program in Figure 8.6 is an example of stream communication, where the producer generates a list of numbers and the consumer sums all the numbers.
fun {Generator N}
if N > 0 then N|{Generator N-1}
else nil end
end
local
fun {Sum1 L A}
case L
of nil then A
[] X|Xs then {Sum1 Xs A+X}
end
end
in fun {Sum L} {Sum1 L 0} end
end
Try the program above by running the following program:
{Browse thread {Sum thread {Generator 150000} end } end}
It should produce the number 11250075000
. Let us understand the working of stream communication. A producer incrementally creates a stream (a list) of elements as in the following example where it is producing volvo's. This happens in general in an eager fashion.
fun {Producer ...} ... volvo|{Producer ...} ... end
The consumer waits on the stream until items arrive, then the items are consumed as in:
proc {Consumer Ls ...}
'Consume volvo'
case Ls of volvo|Lr then... end
{Consumer Lr}
end
The data-flow behavior of the case-statement suspends the consumer until the arrival of the next item of the stream. The recursive call allows the consumer to iterate the action over again. The following pattern avoids the use of recursion by using an iterator instead:
proc {Consumer Ls ...}
Consume volvo
{ForAll Ls
proc {$ Item}
case Item of volvo then
...
end
end}
end
Figure 8.7 shows a simple example using this pattern. The consumer counts the cars received. Each time it receives 1000
cars it prints a message on the display of the Browser.
fun {Producer N}
if N > 0 then
volvo|{Producer N-1}
else nil end
end
local
proc {ConsumerN Ls N}
case Ls of nil then skip
[] volvo|Lr then
if N mod 1000 == 0 then
{Browse 'riding a new volvo'}
end
{ConsumerN Lr N+1}
else
{ConsumerN {List.drop Ls 1} N}
end
end
in
proc {Consumer Ls} {ConsumerN Ls 1} end
end
You may run this program using:
{Consumer thread {Producer 10000} end}
Warning:When you feed a statement into the emulator, it is executed in its own thread. Therefore, after feeding the above statement two threads are created. The main one is for the consumer, and the forked thread is for the producer.
Notice that the consumer was written using the recursive pattern. Can we write this program using the iterative ForAll/2
construct? This is not possible because the consumer carries an extra argument N that accumulates a result which, is passed to the next recursive call. The argument corresponds to some kind of state. In general, there are two solutions. We either introduce a stateful (mutable) data structure, which we will do in Section 9.4, or define another iterator that passes the state around. In our case, some iterators that fit our needs exist in the module List
. First, we need an iterator that filters away all items except volvo's. We can use {Filter Xs P ?Ys}
which outputs in Ys
all the elements that satisfies the procedure P/2
used as a Boolean function. The second construct is {List.forAllInd Xs P}
which is similar to ForAll
, but P/2
takes the index of the current element of the list, starting from 1
, as its first argument, and the element of the list as its second argument. Here is the program:
proc {Consumer Ls}
fun {IsVolvo X} X == volvo end
Ls1
in
thread Ls1 = {Filter Ls IsVolvo} end
{List.forAllInd Ls1
proc {$ N X}
if N mod 1000 == 0 then
{Browse 'riding a new volvo'}
end
end}
end
Try to run the program using the following statement:
{Consumer thread {Producer 5000000} end}
Switch on the panel and observe the memory behavior of the program. You will quickly notice that this program does not behave well. The reason has to do with the asynchronous message passing. If the producer sends messages i.e. create new elements in the stream, in a faster rate than the consumer can consume, increasingly more buffering will be needed until the system starts to break down.2 There are a number of ways to solve this problem. One is to create a bounded buffer between producers and consumers which we will discuss later. Another way is to change the thread execution speed (by changing the thread's priority) so that consumers get more time-slices than producers.
The modules Thread
and Property
provide a number of operations pertinent to threads. Some of these are summarized in Table 8.1.
Procedure | Description |
---|---|
| Returns current state of |
| Suspends |
| Resumes |
| Terminates |
| Raises exception |
| Returns the current thread |
| Sets |
| Sets current thread's priority |
| Gets system-priority ratios |
| Sets system-priority ratios |
Oz has three priority levels. The system procedure
{Property.put priorities(high:X medium:Y)}
sets the processor-time ratio to X:1
between high-priority threads and medium-priority thread. It also sets the processor-time ratio to Y:1
between medium-priority threads and low-priority thread. X
and Y
are integers. So, if we execute
{Property.put priorities(high:10 medium:10)}
for each 10
time-slices allocated to runnable high-priority threads, the system will allocate one time-slice for medium-priority threads, and similarly between medium and low priority threads. Within the same priority level, scheduling is fair and round-robin. Now let us make our producer-consumer program work. We give the producer low priority, and the consumer high. We also set the priority ratios to 10:1
and 10:1
.
local L in
{Property.put threads priorities(high:10 medium:10)}
thread
{Thread.setThisPriority low}
L = {Producer 5000000}
end
thread
{Thread.setThisPriority high}
{Consumer L}
end
end
An extreme alternative solution is to make the producer lazy, only producing an item when the consumer requests one. A consumer, in this case, constructs the stream with unbound variables (empty boxes). The producer waits for the unbound variables (empty boxes) to appear on the stream. It then binds the variables (fills the boxes). The general pattern of the producer is as follows.
proc {Producer Xs}
case Xs of X|Xr then
I in 'Produce I'
X=I ...
{Producer Xr}
end
end
The general pattern of the consumer is as follows.
proc {Consumer ... Xs}
X Xr in
...
Xs = X|Xr
'Consume X'
... {Consumer ... Xr}
end
The program shown in Figure 8.8 is a demand driven version of the program in Figure 8.7. You can run it with very large number of volvo's!
local
proc {Producer Xs}
case Xs of X|Xr then X = volvo {Producer Xr}
[] nil then {Browse 'end of line'}
end
end
proc {Consumer N Xs}
if N=<0 then Xs=nil
else X|Xr = Xs in
if X == volvo then
if N mod 1000 == 0 then
{Browse 'riding a new volvo'}
end
{Consumer N-1 Xr}
else
{Consumer N Xr}
end
end
end
in
{Consumer 10000000 thread {Producer $} end}
end
There is another way to program demand-driven computations. This uses the notion of future and the ByNeed
primitive operation. A future is a read-only capability of a logic variable. For example to create a future of the variable X
we perform the operation !!
to create a future Y
.
Y = !!X
A thread trying to use the value of a future, e.g. using Y
, will suspend until the variable of the future, e.g. X
, gets bound.
One way to execute a procedure lazily, i.e. in a demand-driven manner, is to use the operation {ByNeed +P ?F}
. ByNeed
takes a one-argument procedure P
, and returns a future F
. When a thread tries to access the value of F
, the procedure {P X}
is called, and its result value X
is bound to F
. This allows us to perform demand-driven computations in a straightforward manner. For example by feeding
declare Y
{ByNeed proc {$ X} X=1 end Y}
{Browse Y}
we will observe that Y
becomes a future, i.e. we will see Y<Future>
in the Browser. If we try to access the value of Y
, it will get bound to 1
. One way to access Y
is by performing the operation {Wait Y}
which triggers the producing procedure.
Now we can rewrite program of Figure 8.8 as shown in Figure 8.9. This looks very similar to Figure 8.7
local
proc {Producer Xs}
Xr in
Xs = volvo|{ByNeed {Producer Xr} $}
end
proc {Consumer N Xs}
if N>0 then
case Xs of X|Xr then
if X==volvo then
if N mod 1000 == 0 then
{Browse 'riding a new volvo'}
end
{Consumer N-1 Xr}
else {Consume N Xr} end
end
end
end
in
{Consumer 10000000 thread {Producer $} end}
end
We have seen how threads are forked using the statement thread
S end
. A natural question that arises is how to join back a forked thread into the original thread of control. In fact, this is a special case of detecting termination of multiple threads, and making another thread wait on that event. The general scheme is quite easy because Oz is a data-flow language.
thread
T1X1
=unit end
T2
threadX2
=
X1end
TN
...
threadXN
=
XN-1end
XN
{Wait}
MainThread
When All threads terminate the variables X1 ...
XN will be merged together and bound to unit
. {Wait
XN}
suspends the main thread until XN is bound.
In Figure 8.10 we define a higher-order construct (combinator), that implements the concurrent-composition control construct that has been outlined above. It takes a single argument that is a list of nullary procedures. When it is executed, the procedures are forked concurrently. The next statement is executed only when all procedures in the list terminate.
local
proc {Conc1 Ps I O}
case Ps of P|Pr then M in
thread {P} M = I end
{Conc1 Pr M O}
[] nil then O = I
end
end
in
proc {Conc Ps} {Wait {Conc1 Ps unit $}} end
end
The program Figure 8.5 didn't terminate properly when the Ping
and the Pong
threads terminated. This problem can be remedied now. If we use Application.exit/1
a stand-alone application terminates aborting remaining threads. We can arrange things such that the main thread terminates only when the Ping
and the Pong
threads terminate. This is shown in Figure 8.11.
functor
import
Browser(browse:Browse) %Import Browse form Browser module
Application
define
proc {Ping N}
if N==0 then {Browse 'ping terminated'}
else {Delay 500} {Browse ping} {Ping N-1} end
end
proc {Pong N}
{For 1 N 1
proc {$ I} {Delay 600} {Browse pong} end }
{Browse 'pong terminated'}
end
X1 X2
in
{Browse 'game started'}
thread {Ping 50} X1=unit end
thread {Pong 50} X2=X1 end
{Wait X2}
{Application.exit 0}
end
<< Prev | - Up - | Next >> |