Friday 30 May 2008

Reading 8. Concurrent Haskell and HSTM

1. Gentle introduction to IO Monad in Haskell

Our following talk about model of Software Transactional Memory in Haskell will be meaningless without discussing concepts of concurrent Haskell.
Main ideas of concurrent Haskell were described in paper Concurrent Haskell by Simon Peyton Jones, Andrew Gordon and Sigbjorn Finne. In actual fact concurrent Haskell is simple extension of pure lazy-evaluated functional Haskell language. It adds two main new ingredients to Haskell:
  • processes, and a mechanism for process initiation
  • atomically-mutable state, to support inter-process communication and cooperation
Following the tradition it may seem strange to talk about some concurrency in pure functional languages like Haskell is because of concurrency concepts suppose existence of mutable entities and states. The most common way to emulate mutable state in Haskell is to wrap our computation result into Monad entity. From this point of view our program become a sequence of new monads creation. This approach helps also to emulate strict sequence of computations which is non-obvious in lazy-evaluated Haskell.
Let's recall what Monad is. Speaking strictly, Monad is tuple (M, return, >>=) where M is type designator, return is operator to wrap our computation into monad entity and >>= or bind operator represents monadic evaluations themselves. Their types are correspondingly

type Monad = M a
(>>=) :: M a → (a → M b) → M b
return :: a → M a

In some it is convenient to use reduced bind operator:

(>>) :: M a → M b → M b

In other words it ignores value wrapped into its first argument.
Most common examples of monads in Haskell are [], Maybe and IO. The last is most interesting for our purposes. In a non-strict language it is completely impractical to perform input/output using “side-effecting functions", because the order in which sub-expressions are evaluated | and indeed whether they are evaluated at all | is determined by the context in which the result of the expression is used, and hence is hard to predict. This difficulty can be addressed by treating an I/O-performing computation as a state transformer; that is, a function that transforms the current state of the world to a new state. In addition, we need the ability for an I/O-performing computation to return a result. This reasoning leads to the following type definition:

type IO a = World -> (a, World)

That is, a value of type IO t takes a world state as input and delivers a modified world state together with a value of type t. Of course, the implementation performs the I/O right away - thereby modifying the state of the world “in place".
We call a value of type IO t an action. Here are two useful complete actions:

hGetChar :: Handle -> IO Char
hPutChar :: Handle -> Char -> IO ()

The action hGetChar reads a character from the specified handle (which identifies some file or other byte stream) and returns it as the result of the action. hPutChar takes a handle and a character and return an action that writes the character to the specified stream.
Actions can be combined in sequences using infix combinators >> and >>= described above. For example here is an action that reads a character from the standard input, and then prints it twice to the standard output:

hGetChar stdin >>= \c ->
hPutChar stdout c >>
hPutChar stdout c

The notation \c->E, for some expression E, denotes a lambda abstraction. In Haskell, the scope of a lambda abstraction extends as far to the right as possible; in this example the body of the \c-abstraction includes everything after the \c. The sequencing combinators, >> and >>=, feed the result state of their left hand argument to the input of their right hand argument, thereby forcing the two actions (via the data dependency) to be performed in the correct order. The combinator >> throws away the result of its first argument, while >>= takes the result of its first argument and passes it on to its second argument. The similarity of monadic I/O-performing programs to imperative programs is no surprise: when performing I/O we specifically want to impose a total order on I/O operations.
It is often also useful to have an action that performs no I/O, and immediately returns a specified value using return operator. For example, an echo action that reads a character, prints it and returns the character read, might look like this:

echo :: IO Char
echo = hGetChar stdin >>= \c →
hPutChar stdout >>
return c

So, the resulting program which can be compiled to do something might look like

main :: IO ()
main = echo >>= \c ->
if c == '\n'
then return ()
else main

In principle, then, a program is just a state transformer that is applied to the real world to give a new world. In practice, however, it is crucial that the side-effects the program specifies are performed incrementally, and not all at once when the program finishes.

2. Processes in Haskell

Concurrent Haskell provides a new primitive forkIO, which starts a concurrent process:

forkIO :: IO () -> IO ()

forkIO is an action which takes an action, a, as its argument and spawns a concurrent process to perform that action. The I/O and other side effects performed by a are interleaved in an unspecified fashion with those that follow the forkIO. Here's an example:


let
-- loop ch prints an infinite sequence of ch's
loop ch = hPutChar stdout ch >> loop ch
in
forkIO (loop 'a') >>
loop 'z'

The forkIO spawns a process which perform the action loop 'a'. Meanwhile, the “parent“ process continues on to perform loop 'z'. The result is than an infinite sequence of interleaved 'a's and 'z's appears on the screen. The exact interleaving is unspecified.
As a more realistic example of forkIO in action a mail tool night incorporate the following loop:

mailLoop
= getButtonPress b >>= \ v ->
case v of
Compose -> forkIO doCompose >>
mailLoop
...other things
doCompose :: IO () -- Pop up and manage
doCompose = ... -- composition window

Here, getButtonPress is very like hGetChar; it awaits the next button press on button b, and then delivers a value indicating which button was pressed. This value is then treated by the case expression. If its value is Compose, then the action doCompose is forked to handle an independent composition window, while the main process continues with the next getButtonPress.
There are some interesting questions related to concurrency in Haskell.

1. Let's imagine that we have to evaluate value named 'c'. In common Haskell this value is represented internally by pointer to some closure which will be called and evaluate this value when someone will need in it. That's the famous Haskell's laziness.
Now each of concurrent processes may need in this value. Then the first who provoked 'c''s evaluation replaces pointer to 'c''s closure to some temporary object named thunk. Thunk indicates that value 'c' is currently evaluating. Other processes wait until evaluation ends.

2.
Since the parent and child processes may both mutate (parts of) the same shared state (namely, the world), forkIO immediately introduces non-determinism. For example, if one process decides to read a file, and the other deletes it, the effect of running the program will be unpredictable. While this non-determinism is not desirable, it is not avoidable; indeed, every concurrent language is non-deterministic. The only way to enforce determinism would be by somehow constraining the two processes to work on separate parts of the state (different files, in our example). The trouble is that essentially all the interesting applications of concurrency involve the deliberate and controlled mutation of shared state, such as screen real estate, the file system, or the internal data structures of the program. The right solution, therefore, is to provide mechanisms which allow (though alas they cannot enforce) the safe mutation of shared state section.

3. forkIO is asymmetrical: when a process executes a forkIO, it spawns a child process that executes concurrently with the continued execution of the parent. It would have been possible to design a symmetrical fork, an approach taken by Jones & Hudak:

symFork :: IO a -> IO b -> IO (a,b)

The idea here is symFork p1 p2 is an action that forks two processes, p1 and p2. When both complete, the symFork pairs their results together and returns this pair as its result. We rejected this approach because it forces us to synchronize on the termination of the forked process. If the desired behavior is that the forked process lives as long as it desires, then we have to provide the whole of the rest of the parent as the other argument to symFork, which is extremely inconvenient.

3. Synchronization and communication

To make our processes interact with each other and organize synchronization between them we introduce spectial type

type MVar a

This is simple memory cell which can contain any value of type a or be empty. We define following primitive operations on MVars:

newMVar :: IO (MVar a)
creates a new MVar.

takeMVar :: MVar a -> IO a
blocks until the location is non-empty, then reads and returns the value, leaving the location empty.

putMVar :: MVar a -> a -> IO ()
writes a value into the specified location. If there are one or more processes blocked in takeMVar on that location, one is thereby allowed to proceed. It is an error to perform putMVar on a location which already contains a value.

Notice that MVar can be considered in different ways:
as a channel for messages exchange between processes
MVar () is simple semaphor, putMVar denotes rising and takeMVar is sinking

With MVar we now can solve simple problem of “Producer and Customer” in case when Producer produces faster than customer can take. For this we'll make buffered slot CVar for Customer to take from.

type CVar a = (MVar a, -- Producer -> consumer
MVar ()) -- Consumer -> producer

newCVar :: IO (CVar a)
newCVar
= newMVar >>= \ data_var ->
newMVar >>= \ ack_var ->
putMVar ack_var ( ) >>
return (data_var, ack_var)

putCVar :: CVar a -> a -> IO ()
putCVar (data_var,ack_var) val
= takeMVar ack_var >>
putMVar data_var val

getCVar :: CVar a -> IO a
getCVar (data_var,ack_var)
= takeMVar data_var >>= \ val ->
putMVar ack_var () >>
return val

4. Haskell Software transactional Memory (HSTM)

Implementation of transactional memory in Haskell resembles IO abstraction. It introduces terms of special monadic type which represent atomic blocks. It also adds special mechanism to compose two transaction as alternatives. Main characteristics of HSTM look like this:

Strong or Weak Isolation Strong
Transaction Granularity Word
Direct or Deferred Update Deferred (cloned replacement)
Concurrency Control Optimistic
Synchronization Blocking
Conflict Detection Late
Inconsistent Reads Inconsistency toleration
Conflict Resolution None
Nested Transaction None (not allowed by type system)
Exceptions Abort

So, we add new type of terms into language – STM monad. It has semantics similar to IO monad but it “marks” terms which will be treated as atomic blocks. To wrap them into habitual atomic block we use function atomic.

atomic :: STM a → STM a

By analogy with MVar type in plain concurrent Haskell we introduce also TVar type (transactional) to represent values stable against transactional operations.

type TVar a
readTVar :: TVar a → STM a
writeTVar :: TVar a → a → STM()

For example let's consider such variable containing some integer values and operations on it.

type Resource = TVar Int
putR :: Resource -> Int -> STM ()
putR r i = do { v <- readTVar r; writeTVar r (v+i) }

The atomic function transactionally committed (or aborted) these updates:

main = do { ...; atomic (putR r 3); ... }

HSTM also introduced an explicit retry statement as a coordination mechanism between transactions. The retry statement aborts the current transaction and prevents it from reexecuting until at least one of the TVars accessed by the transaction changes value. For example,

getR :: Resource → Int → STM ()
getR r i = do { v <- readTVar r
; if (v < i) then retry
else writeTVar r (v-i) }

atomically extracts i units from a Resource. It uses a retry statement to abort an enclosing transaction if the Resource does not contain enough units. If this function executes retry, r is the only TVar read, so the transaction re-executes when r changes value.
HSTM also introduced the binary orElse operator for composing two transactions. This operator first starts its left-hand transaction. If this transaction commits, the orElse operator finishes. However, if this transaction retries, the operator tries the right-hand transaction instead. If this one commits, the orElse operator finishes. If it retries, the entire orElse statement waits for changes in the set of TVars read by both transactions before retrying. For example, this operator turns getR into an operation that returns a true/false success/failure result:

nonBlockGetR :: Resource -> Int ->STM Bool
nonBlockGetR r i = do { getR r i ; return True }‘orElse‘ return false

Notice, that retry operator “retries” largest enclosing term which has STM type.
Some words about implementation. Al transaction treads and writes to TVars deal with special transactional log which hides these variables references form other transactions. When the transaction commits, it first validates its log entries, to ensure that no other transaction modified the TVars values. If valid, the transaction installs the new values in these variables. If validation fails, the log is discarded and the transaction re-executed.
If a transaction invokes retry, the transaction is validated (to avoid retries caused by inconsistent execution) and the log discarded after recording all TVars read by the transaction. The system binds the transaction’s thread to each of these variables. When a transaction updates one of these variables, it also restarts the thread, which re-executes the transaction.
The orElse statement requires a closed nested transaction to surround each of the two alternatives, so that either one can abort without terminating the surrounding transaction. If either transaction completes successfully, its log is merged with the surrounding transaction’s log, which can commit. If either or both transactions invoke retry, the outer transaction waits on the union of the TVars read by the transactions that retried.

5. Garbage Collection in Concurrent Haskell


At the end it would be good to tell some words about garbage collection in Concurrent Haskell. It's interesting problem to collect processes which become “useless”. There is obvious strategy to do it if we ensure, that process we want to collect will not have “side effects” further. We can formulate two rules to do it:
  1. Running process cannot be collected
  2. We can collect process which holds some MVar if this variable is now inaccessible for any other non-garbage process.
At last classic “mark-and-sweep” tracing procedure can be implemented on processes:
  1. When tracing accessible heap objects, treat all runnable processes as roots.
  2. When some MVar is identified as reachable, identify all processes blocked by it as reachable ones..

List of used papers:

  1. Simon Peyton Jones, Andrew Gordon, Sigbjorn Finne “Concurrent Haskell”
  2. Paul Hudak, Simon Peyton Jones, Philip Wadler, Brian Boutel, Jon Fairbairn, Joseph Fasel, María M. Guzmán, Kevin Hammond, John Hughes, Thomas Johnsson, Dick Kieburtz, Rishiyur Nikhil, Will Partain, John Peterson “Report on the programming language Haskell: a non-strict, purely functional language version 1.2”




Wednesday 28 May 2008

Reading 7. HTM. Multiple atomic read-write operations

Stone et al., IEEE Concurrency 1993: Multiple atomic read-write operations ("Oklahoma Update")

Let's extend compare&swap command set to support multiple memory locations.
  • read-and-reserve - reads a memory location into a specified general-purpose register, places a reservation on the location’s address in the reservation register, and clears the reservation register’s data field.
  • store-contingent - locally updates the reservation register’s data field without obtaining write permissions.
  • write-if-reserved - specifies a set of reservation registers and updates the memory locations reserved by those registers. It is used to initiate the commit process. It attempts to obtain exclusive ownership for each of the addresses in the reservation registers. If the reservations remain valid during this process, the instruction updates memory with the modified data from the reservation registers. The instruction returns an indication whether the update succeeded or not.
Example

Having such commands we can write i.e. synchronized queue:
void Enqueue(newpointer) {
Memory[newpointer].next = NULL;
status = 0;

while (!status) {
last_pointer = Read_and_Reserve(Memory[tail].next, reservation1);
if (last_pointer == NULL) {
// this is an empty queue
first_pointer = Read_and_Reserve(Memory[head].next, reservation2);
Store_Contingent(newpointer, reservation1);
Store_Contingent(newpointer, reservation2);
status = Write_If_Reserved(reservation1, reservation2);
} else {
// non-empty queue
temp_pointer = Read_and_Reserve(Memory[last_pointer].next, reservation2);
Store_Contingent(newpointer, reservation1);
Store_Contingent(newpointer, reservation2);
status = Write_If_Reserved(reservation1, reservation2);
}
} // repeat until successful

return;
}
Implementation

Compared to compare&swap decomposition, hardware implementation for Oklahoma Update is much more complex.
Instead of one xa-address register this implementation requires number of reservation registers per CPU to store memory locations and various flags. Those are places
read-and-reserve and store-contingent writes to. And write-if-reserved can be called commit operation since it commits all local writes to shared memory.

Basically, commit operation consists of two phases:
  • Requesting write permissions. CPU ensures having exclusive write lock to every location from reservation registers. To avoid deadlocking to commit operations from other processors, CPU acquires locks in address-ascending manner.

    If it can't obtain every one lock entire conflict resolution takes place. CPU may try to restart the process after some time or abort the operation.
  • Commiting data values. As soon as processor has locks it starts uninterruptable operation to write data.
Interactions between the new instructions and regular store operations introduce forward-progress concerns. Regular stores do not participate in the new instructions’ conflict resolution mechanism. If a regular store from one processor conflicted with an address specified in a reservation register of another processor, this processor would abort its update. See more on this in the original paper.

Classification

Strong or weak isolationStrong
Transaction granularityCache line
Direct or deferred updateDeferred (in reservation registers)
Concurrency controlOptimistic. Commit initiates acquiring ownership
Conflict detectionLate write-write conflict (if not a regular store)
Late write-read conflict (if not a regular store)
Inconsistent readsNone
Conflict resolutionAddress-based rwo-phase commit
Nested transactionN/A


Sources
  • J.R. Larus, R.Rajwar Transactional Memory, 4.3.4
  • J. M. Stone, H. S. Stone, P. Heidelberger and J. Turek, “Multiple reservations and the Oklahoma update,” IEEE Concurrency, Vol. 1(4), pp. 58–71 Nov. 1993.

Reading 7. HTM. Compare&Swap decomposition

Jensen,Hagensen, and Broughton, UCRL 1987: support for optimistic synchronization using single memory location.

Complex compare&swap instruction can be splitted into simplier parts:
  • sync load. One memory location are loaded into special register (called xa-address), that indicates the processor requires exclusive access to this address, and loads the accessed data into a general register.
  • sync store. Stores data to previously saved address in xa-address if no other processor stored anything there (i.e. no conflict occurred) or conflict resolution decided to allow write by this processor.
  • sync clear. Clears xa-address.
A simple locking can be implemented in those instructions as follows:

// if (lock == 0) { lock = ProcessID; } % atomically
// else goto LockHeld... % lock was held
Retry:
sync_load R10, lock ; declare exclusive intent
jump_q .neq (R10,0), LockHeld ; test for zero
sync_clear ; lock non-zero, hence abort
load R10, ProcessID ; prepare to update lock
sync_store R10, lock ; update lock if not aborted
goto Retry ; try the update again
MyLock: ; got the lock
Implementation

sync_store instruction broadcasts write operation on all processors thus detecting conflicts with other processors having same xa-address. If such conflict detected, one processor clears own xa-address or aborts store operation (depends on conflict resolution scheme).

Real-world usage

MIPS, Alpha, PowerPC implemented variations to this scheme

Classification

Strong or weak isolation
Strong
Transaction granularity
Cache line
Direct or deferred update
Conditional direct store (single word)
Concurrency control
Optimistic (single word)
Conflict detection
N/A
Inconsistent reads
None
Conflict resolution
Processor id or first to request ownership
Nested transaction
N/A


Sources
  • J.R. Larus, R.Rajwar Transactional Memory, 4.3.3
  • E. H. Jensen, G. W. Hagensen and J. M. Broughton, “A new approach to exclusive data access in shared memory multiprocessors,” Lawrence Livermore National Laboratory, Technical Report UCRL-97663, Nov. 1987.

Reading 7. HTM. A side story

Knight 1986, paralleling a single-threaded programs

A compiler divides a program into a series of code blocks called transactions. For doing the division, the compiler assumes that these transactions do not have memory dependencies. These blocks then execute optimistically on the processors. The hardware enforces correct execution and uses caches to detect when a memory dependence violation between threads occurs.

This is the first paper, that proposed to use caches and cache coherence to maintain ordering among speculatively parallelized regions of a sequential code in the presence of unknown memory dependencies. While the paper did not directly address explicitly parallel programming, it set the groundwork for using caches and coherence protocols for future transactional memory proposals.

Implementation

Firstly, the compiler divides a program into sequence of "mostly independent" series of blocks (transactions). Than those blocks runs on shared-memory multiprocessors with own register state. All write memory operations cached in confirm cache. Every processor also stores all read operations in special dependency cache. Also write operations from other processors detected and used to update dependency cache (so called, bus sneaking).

All transactions are committed one-by-one as prescribed by original single-threaded program. Stored dependencies used to detect write conflicts. If one occurs, failed transaction simply runs again.

Classification
Strong or weak isolation Strong
Transaction granularity Cache line
Direct or deferred update Deferred (in cache)
Concurrency control Optimistic. Commit serialized globally
Conflict detection Late write-write conflict
Late write-read conflict
Inconsistent reads No
Conflict resolution Program order (sequential program)
Nested transaction N/A


Sources