<< Prev | - Up - | Next >> |
Oz provides a set of stateful data types. These include ports, objects, arrays, and dictionaries (hash tables). These data types are abstract in the sense that they are characterized only by the set of operations performed on the members of the type. Their implementation is always hidden, and in fact different implementations exist but their corresponding behavior remains the same. For example, objects are implemented in a totally different way depending on the optimization level of the compiler. Each member is always unique by conceptually tagging it with an Oz-name upon creation. A member is created by an explicit creation operation. A type test operation always exists. In addition, a member ceases to exist when it is no longer accessible.
Port is such an abstract data-type. A Port P
is an asynchronous communication channel that can be shared among several senders. A port has a stream associated with it. The operation: {Port.new S ?P}
creates a port P
and initially connects it to the variable S
taking the role of a stream. The operation: {Port.send P M}
will append the message M
to the end of the stream associated with P
. The port keeps track of the end of the stream as its next insertion point. The operation {IsPort P ?B}
checks whether P
is a port. In order to protect the stream S
from being bound by mistake S
is actually a future. The following program shows a simple example using ports:
declare S P
P = {Port.new S}
{Browse S}
{Port.send P 1}
{Port.send P 2}
If you enter the above statements incrementally you will observe that S
gets incrementally more defined.
S
1|
1|2|_
Ports are more expressive abstractions than pure stream communication, which was discussed in Section 8.2, since they can be shared among multiple threads, and can be embedded in other data structures. Ports are the main message passing mechanism between threads in Oz.
Ports are used as a communication entry point to servers. The program shown in Figure 9.1 defines a thread that acts as FIFO queue server. It has two ports, one for inserting items to the queue using put
, and the other for fetch items out of the queue using get
. The use of single-assignment (logic) variables makes the server insensitive to the relative arrival order of get
and put
requests. get
requests can arrive even when the queue is empty. A server is created by {NewQueueServer ?Q}
. This procedure returns back a record Q
with features put
and get
each holding a unary procedure. A client thread having access to Q
can request services by invoking these procedure. Notice how results are returned back through logic variables. A client requesting an Item in the queue will call {Q.get I}
. The server will eventually answer back by binding I
to an item.
declare
fun {NewQueueServer}
Given GivePort={Port.new Given}
Taken TakePort={Port.new Taken}
in
Given = Taken
queue(put:proc {$ X} {Port.send GivePort X} end
get:proc {$ X} {Port.send TakePort X} end)
end
Try the following sequence of statements. The program will not work. So, what is the problem?
declare
thread Q = {NewQueueServer} end
{Q.put 1}
{Browse {Q.get $}}
{Browse {Q.get $}}
{Browse {Q.get $}}
{Q.put 2}
{Q.put 3}
The problem is that Given = Taken
is trying to impose equality between two futures. Remember that Given
and Taken
are futures that can only be read and cannot be bound. So the thread corresponding to the queue server will suspend in the statement Given = Taken
. This problem is remedied by running this statement in its own thread as shown in Figure 9.2 1.
The program works as follows. {Q.put I0} {Q.put I1} ... {Q.put In}
will incrementally add the elements I0 I1 ... In
to the stream Given
, resulting in I0|I1|...|In|<Future1>
. {Q.get X0} {Q.put X1} ... {Q.put Xn}
will add the elements X0 X1 ... Xn
to the stream Taken
resulting in X0|X1|...|Xn|<Future2>
. The equality constraint Given = Taken
will bind Xi's
to Ii's
.
declare
fun {NewQueueServer}
Given GivePort={Port.new Given}
Taken TakePort={Port.new Taken}
in
thread Given=Taken end
queue(put:proc {$ X} {Port.send GivePort X} end
get:proc {$ X} {Port.send TakePort X} end)
end
Warning: The code above is correct but, due to a limitation in the current Oz implementation, leaks memory. As a workaround, one can use the code below as a drop-in replacement.
declare
fun {NewQueueServer}
Given GivePort={Port.new Given}
Taken TakePort={Port.new Taken}
in
thread
for X in Given Y in Taken do
X=Y
end
end
queue(put:proc {$ X} {Port.send GivePort X} end
get:proc {$ X} {Port.send TakePort X} end)
end
Ports are actually stateful data structures. A port keeps a local state internally tracking the end of its associated stream. Oz provides two primitive devices to construct abstract stateful data-types chunks and cells. All others subtypes of chunks can be defined in terms of chunks and cells.
A chunk is similar to a record except that the label of a chunk is an oz-name, and there is no arity operation available on chunks. This means one can hide certain components of a chunk if the feature of the component is an oz-name that is visible only (by lexical scoping) to user-defined operations on the chunk.
A chunk is created by the procedure {NewChunk Record}
. This creates a chunk with the same components as the record, but having a unique label. The following program creates a chunk.
local X in
X={NewChunk f(c:3 a:1 b:2)}
{Browse X}
{Browse X.c}
end
This will display the following.
<Ch>(a:1 b:2 c:3)
3
Warning: As a syntactic convenience, one can equate an expression E
at an expression position with a variable X = E
, and use X
to refer to the value of the expression. Using this notation the above program could be written as
local X in
{Browse X={NewChunk f(c:3 a:1 b:2)}}
{Browse X.c}
end
In Figure 9.3, we show an example of using the information hiding ability of chunks to implement Ports.
A cell could be seen as a chunk with a mutable single component. A cell is created as follows.
{NewCell X ?C}
A cell is created with the initial content X
. C
is bound to a cell. The Table 9.1 shows the operations on a cell.
Operation | Description |
---|---|
| Creates a cell C with content X |
| Returns the content of C in X |
| Modifies the content of C to Y |
| Tests if C is a cell |
| Swaps atomically the content of C from X to Y |
Check the following program. The last statement increments the cell by one. If we leave out thread ... end
the program deadlocks. Do you know why?
local I O X in
I = {NewCell a} {Browse @I}
{Assign I b} {Browse @I}
{Assign I X} {Browse @I}
X = 5*5
{Exchange I O thread O+1 end} {Browse @I}
end
Cells and higher-order iterators allow conventional assignment-based programming in Oz. The following program accumulates in the cell J
the value of .
declare J in
J = {NewCell 0}
{For 1 10 1
proc {$ I}
O N in
{Exchange J O N}
N = O+I
end}
{Browse @J}
Ports described in Section 9.1 can be implemented by chunks and cells in a secure way, i.e. as an abstract data type that cannot be forged. The program in Figure 9.3 shows an implementation of Ports. Initially an Oz-name is created locally, which is accessible only by the Port operations. A port is created as a chunk that has one component, which is a cell. The cell is initialized to the stream associated with the port. The type test IsPort
is done by checking the feature Port
. Sending a message to a port results in updating the stream atomically, and updating the cell to point to the tail of the stream.
declare Port in
local
PortTag = {NewName} %New Oz name
fun {NewPort S}
C = {NewCell S} in
{NewChunk port(PortTag:C)}
end
fun {IsPort ?P}
{Chunk.hasFeature P PortTag} %Checks a chunk feature
end
proc {Send P M}
Ms Mr in
{Exchange P.PortTag Ms Mr}
Ms = M|Mr
end
in Port = port(new:NewPort
is:IsPort
send:Send)
end
The implementation in Figure 9.3 does not protect the stream of the port. Protection of the stream is done using a future as follows.
declare Port in
local
PortTag = {NewName} %New Oz name
fun {NewPort FS}
S C = {NewCell S} in
FS = !!S % Create a future
{NewChunk port(PortTag:C)}
end
fun {IsPort ?P}
{Chunk.hasFeature P PortTag} %Checks a chunk feature
end
proc {Send P M}
Ms Mr in
{Exchange P.PortTag Ms Mr}
Ms = M|!!Mr
end
in Port = port(new:NewPort
is:IsPort
send:Send)
end
<< Prev | - Up - | Next >> |