Monday, October 19, 2009

Section 4.4. Parallel Decomposition














4.4 Parallel Decomposition


Parallel programs are specifically designed
to take advantage of multiple CPUs for solving
computation-intensive problems. The main performance goals are
normally throughput and scalability � the number of
computations that can be performed per unit time, and the
potential for improvement when additional computational
resources are available. However, these are often intertwined
with other performance goals. For example, parallelism may
also improve response latencies for a service that hands off
work to a parallel execution facility.


Among the main challenges of
parallelism in the Java
programming language is to construct class=docEmphasis>portable programs that can exploit
multiple CPUs when they are present, while at the same time
working well on single processors, as well as on time-shared
multiprocessors that are often processing unrelated
programs.


Some classic approaches to parallelism don't
mesh well with these goals. Approaches that assume particular
architectures, topologies, processor capabilities, or other
fixed environmental constraints are ill suited to commonly
available JVM implementations. While it is not a
crime to build run-time systems with extensions specifically
geared to particular parallel computers, and to write parallel
programs specifically targeted to them, the associated class=docTextHighlight>programming techniques
necessarily fall outside the scope of this book.
Also, RMI and other distributed frameworks can be used to
obtain parallelism across remote machines. In fact, most of
the designs discussed here can be adapted to use serialization
and remote invocation to achieve parallelism over local
networks. This is becoming a common and efficient means of
coarse-grained parallel processing. However, these mechanics
also lie outside the scope of this book.


We instead focus on three families of
task-based designs, fork/join parallelism, computation trees,
and barriers. These techniques can yield very efficient
programs that exploit multiple CPUs when present, yet still
maintain portability and sequential efficiency. Empirically,
they are known to scale well, at least up through dozens of
CPUs. Moreover, even when these kinds of task-based parallel
programs are tuned to maximally exploit a given hardware
platform, they require only minor retunings to maximally
exploit other platforms.


As of this writing, probably the most common
targets for these techniques are applications servers and
compute servers that are often, but by no means always,
multiprocessors. In either case, we assume that CPU cycles are
usually available, so the main goal is to exploit them to
speed up the solution of computational problems. In other
words, these techniques are unlikely to be very helpful when
programs are run on computers that are already nearly
saturated.


4.4.1 Fork/Join


Fork/join decomposition relies on parallel
versions of divide-and-conquer techniques familiar in
sequential algorithm design. Solutions take the form:


pseudoclass Solver { // Pseudocode
// ...
Result solve(Param problem) {
if (problem.size <= BASE_CASE_SIZE)
return directlySolve(problem);
else {
Result l, r;
IN-PARALLEL {
l = solve(lefthalf(problem));
r = solve(rightHalf(problem));
}
return combine(l, r);
}
}
}

It takes some hard work and inspiration to
invent a divide-and-conquer algorithm. But many common
computationally intensive problems have known solutions of
approximately this form. Of course, there may be more than two
recursive calls, multiple base cases, and arbitrary pre- and
post-processing surrounding any of the cases.


Familiar sequential examples include
quicksort, mergesort, and many data structure, matrix, and
image processing algorithms. Sequential recursive
divide-and-conquer designs are easy to parallelize when the
recursive tasks are completely independent; that is, when they
operate on different parts of a data set (for example
different sections of an array) or solve different
sub-problems, and need not otherwise communicate or coordinate
actions. This often holds in recursive algorithms, even those
not originally intended for parallel implementation.


Additionally, there are recursive versions of
algorithms (for example, matrix multiplication) that are not
used much in sequential contexts, but are more widely used on
multiprocessors because of their readily parallelizable form.
And other parallel algorithms perform extensive
transformations and preprocessing to convert problems into a
form that can be solved class=docTextHighlight>using fork/join
techniques.
(See Further Readings in �
4.4.4.)


The class=docEmphasis>IN-PARALLEL pseudocode is
implemented by forking and
later joining tasks performing
the recursive calls. However, before discussing how to do
this, we first examine issues and frameworks that permit
efficient parallel execution of recursively generated
tasks.


4.4.1.1 Task granularity and
structure

Many of the design forces encountered when
implementing fork/join designs surround task granularity:


Maximizing
parallelism.
In general, the smaller the tasks, the
more opportunities for parallelism. All other
things being equal, using
many fine-grained tasks rather than only a few coarse-grained
tasks keeps more CPUs busy, improves load balancing, locality
and scalability, decreases the percentage of time that CPUs
must idly wait for one another, and leads to greater
throughput.


Minimizing
overhead.
Constructing and managing an
object to process a task in parallel, rather than just
invoking a method to process it serially, is the main
unavoidable overhead associated with task-based class=docTextHighlight>programming compared with
sequential solutions.
It is intrinsically more costly to create and use task objects
than to create and use stack-frames. Additionally, the use of
task objects can add to the amount of argument and result data
that must be transmitted and can impact garbage collection.
All other things being equal, total overhead is minimized when
there are only a few coarse-grained tasks.


Minimizing
contention.
A parallel decomposition is not going to
lead to much speed-up if each task frequently communicates
with others or must block waiting for resources held by
others. Tasks should be of a size and structure that maintain
as much independence as possible. They should minimize (in
most cases, eliminate) use of shared resources, global
(static) variables, locks, and other dependencies. Ideally,
each task would contain simple straight-line code that runs to
completion and then terminates. However, fork/join designs
require at least some minimal synchronization. The main object
that commences processing normally waits for all subtasks to
finish before proceeding.


Maximizing
locality.
Each subtask should be the only one operating
on some small piece of a problem, not only conceptually but
also at the level of lower-level resources and memory access
patterns. Refactorings that achieve good locality of reference
can significantly improve performance on modern heavily cached
processors. When dealing with large data sets, it is not
uncommon to partition computations into subtasks with good
locality even when parallelism is not the main goal. Recursive
decomposition is often a productive way to achieve this.
Parallelism accentuates the effects of locality. When parallel
tasks all access different parts of a data set (for example,
different regions of a common matrix), partitioning strategies
that reduce the need to transmit updates across caches often
achieve much better performance.


4.4.1.2 Frameworks

There is no general optimal solution to
granularity and related task structuring issues. Any choice
represents a compromise that best resolves the competing
forces for the problem at hand. However, it is possible to
build lightweight execution frameworks that support a wide
range of choices along the continuum.


Thread objects are unnecessarily
heavy vehicles for supporting purely computational fork/join
tasks. For example, these tasks never need to block on IO, and
never need to sleep. They require only an
operation to synchronize across subtasks. Worker thread
techniques discussed in �
4.1.4 can be extended to construct frameworks efficiently
supporting only the necessary constructs. While
there are several approaches, for concreteness we'll limit
discussion to a framework in util. class=docTextHighlight>concurrent that restricts
all tasks to be subclasses of class FJTask.
Here is a brief sketch of principal methods. More details are
discussed along with examples in �
4.4.1.4 through �
4.4.1.7.


abstract class FJTask implements Runnable {
boolean isDone(); // True after task is run
void cancel(); // Prematurely set as done
void fork(); // Start a dependent task
void start(); // Start an arbitrary task
static void yield(); // Allow another task to run
void join(); // Yield caller until done
static void invoke(FJTask t); // Directly run t
static void coInvoke(FJTask t,
FJTask u); // Fork and join t and u
static void coInvoke(FJTask[] tasks); // coInvoke all
void reset(); // Clear to allow reuse
}

An associated FJTaskRunnerGroup
class provides control and entry points into this framework. A
FJTaskRunnerGroup is constructed with a given number
of worker threads that should ordinarily be equal to the
number of CPUs on a system. The class supports method
invoke that starts up a main task, which will in turn
normally create many others.


FJTasks must employ only these task
control methods, not arbitrary Thread or monitor
methods. While the names of these operations are the same or
similar to those in class Thread, their
implementations are very different. In particular, there are
no general suspension facilities. For example, the
join operation is implemented simply by having the
underlying worker thread run other tasks to completion until
the target task is noticed to have completed (via
isDone). This wouldn't work at all with ordinary
threads, but is effective and efficient when all tasks are
structured as fork/join methods.


These kinds of trade-offs make
FJTask construction and invocation substantially
cheaper than would be possible for any class supporting the
full Thread interface. As of this writing, on at
least some platforms, the overhead of creating, running, and
otherwise managing a FJTask for the kinds of examples
illustrated below is only between four and ten times that of
performing equivalent sequential method calls.


The main effect is to lessen the impact of
overhead factors when making choices about task partitioning
and granularity. The granularity threshold for
using tasks can be fairly
small � on the order of a few thousand instructions even in
the most conservative cases � without noticeably degrading
performance on uniprocessors.
Programs can exploit as many CPUs as are available on even the
largest platforms without the need for special tools to
extract or manage parallelism. However, success also depends
on construction of task classes and methods that themselves
minimize overhead, avoid contention, and preserve
locality.


4.4.1.3 Defining tasks

Sequential divide-and-conquer algorithms can
be expressed as fork/join-based classes via the following
steps:




  1. Create a task class with:




    • Fields to hold arguments and results.
      Most should be strictly local to a task, never accessed
      from any other task. This eliminates the need for
      synchronization surrounding their use. However, in the
      typical case where result variables are accessed by other
      tasks, they should either be declared as volatile
      or be accessed only via synchronized methods.



    • A constructor that initializes argument
      variables.



    • A run method that executes the
      reworked method code.



  2. Replace the original recursive case with
    code that:




    • Creates subtask objects.



    • Forks each one to run in parallel.



    • Joins each of them.



    • Combines results by accessing result
      variables in the subtask objects.



  3. Replace (or extend) the original base case
    check with a threshold check.
    Problem sizes less than the threshold should use the
    original sequential code. This generalization of base case
    checks maintains efficiency when problem sizes are so small
    that task overhead overshadows potential gains from parallel
    execution. Tune performance by determining a good threshold
    size for the problem at hand.



  4. Replace the original method with one that
    creates the associated task, waits it out, and returns any
    results. (In the FJTask framework, the outermost
    call is performed via
    FJTaskRunnerGroup.invoke.)


4.4.1.4 Fibonacci


We'll illustrate the basic steps with a very
boring and unrealistic, but very simple classic example:
recursively computing fib, the
Fibonacci function. This function can be programmed
sequentially as:


int seqFib(int n) {
if (n <= 1)
return n;
else
return seqFib(n-1) + seqFib(n-2);
}

This example is unrealistic because there is
a much faster non-recursive solution for this particular
problem, but it is a favorite for demonstrating both recursion
and parallelism. Because it does so little other computation,
it makes the basic structure of fork/join designs easier to
see, yet it generates many recursive calls � at least class=docEmphasis>fib(n) calls to compute class=docEmphasis>fib(n). The first few values of the
sequence are 0, 1, 1, 2, 3, 5, 8; class=docEmphasis>fib(10) is 55; class=docEmphasis>fib(20) is 6,765; class=docEmphasis>fib(30) is 832,040; class=docEmphasis>fib(40) is 102,334,155.


Function seqFib can be transformed
into a task class such as the following:


class Fib extends FJTask {
static final int sequentialThreshold = 13; // for tuning
volatile int number; // argument/result

Fib(int n) { number = n; }

int getAnswer() {
if (!isDone())
throw new IllegalStateException("Not yet computed");
return number;
}

public void run() {
int n = number;

if (n <= sequentialThreshold) // base case
number = seqFib(n);
else {
Fib f1 = new Fib(n - 1); // create subtasks
Fib f2 = new Fib(n - 2);

coInvoke(f1, f2); // fork then join both

number = f1.number + f2.number; // combine results
}
}

public static void main(String[] args) { // sample driver
try {
int groupSize = 2; // 2 worker threads
int num = 35; // compute fib(35)
FJTaskRunnerGroup group = new FJTaskRunnerGroup(groupSize);
Fib f = new Fib(num);
group.invoke(f);
int result = f.getAnswer();
System.out.println("Answer: " + result);
}
catch (InterruptedException ex) {} // die
}
}

class=docEmphBoldItalic>Notes:



  • The class maintains a field holding the
    argument for which to compute the Fibonacci function. Also,
    we need a variable to hold the result. However, as is fairly
    typical in such classes, there is no need to keep two
    variables. For economy (bearing in mind that many millions
    of Fib objects might be constructed in the course
    of a computation), we can micro-optimize to use one
    variable, and overwrite the argument with its result after
    it is computed. (This is the first of several
    hand-optimizations that are uncomfortably petty, but are
    shown here in order to demonstrate minor tweaks that can be
    pragmatically important in constructing efficient parallel
    programs.)



  • The number field is declared as
    volatile to ensure visibility from other
    tasks/threads after it is computed (see �
    2.2.7). Here and in subsequent examples,
    volatile fields are read and/or written only once
    per task execution, and otherwise held in local variables.
    This avoids interfering with potential compiler
    optimizations that are otherwise disabled when class=docTextHighlight>using volatile.



  • Alternatively, we could have synchronized
    access to the number field. But there is no good
    reason to do so. The use of volatile
    fields is much more common in lightweight parallel task
    frameworks than in general-purpose class=docTextHighlight>concurrent programming. Tasks usually do not require other
    synchronization and control mechanics, yet often need to
    communicate results via field access. The most
    common reason for using
    synchronized instead of volatile is to
    deal with arrays. Individual array elements cannot be declared
    as volatile. Processing arrays within
    synchronized methods or blocks is the simplest way
    to ensure visibility of array updates, even in the typical
    case in which locking is not otherwise required. An
    occasionally attractive alternative is instead to create
    arrays each of whose elements is a forwarding object with
    volatile fields.



  • The method isDone returns true
    after the completion of a run method that has been
    executed via invoke or coInvoke. It is used as a guard in getAnswer to
    help detect programming
    errors that could occur if the ultimate consumer of an
    answer tries to access it prematurely. (There is no chance of this happening here,
    but this safeguard helps avoid unintended usages.)



  • The sequentialThreshold constant
    establishes granularity. It represents the balance point at
    which it is not worth the overhead to create tasks, also
    reflecting the goal of maintaining good sequential
    performance. For example, on one set of runs
    on a four-CPU system, setting sequentialThreshold
    to 13 resulted in a 4% performance degradation versus
    seqFib for large argument values when class=docTextHighlight>using a single worker
    thread. But it sped up by a factor of at least 3.8
    with four worker threads, processing several million
    Fib tasks per second.



  • Rather than wiring in a compile-time
    constant, we could have defined the threshold as a run-time
    variable and set it to a value based on the number of CPUs
    available or other platform characteristics. This is useful
    in task-based programs that do not scale linearly, as is
    likely to be true even here. As the number of CPUs increase,
    so do communication and resource management costs, which
    could be balanced by increasing the threshold.



  • The parallel analog of recursion is
    performed via a convenient method, coInvoke(FJTask t,
    FJTask u)
    , which in turn acts as:


    t.fork(); invoke(u); t.join();


  • The fork method is a specialized
    analog of Thread.start. A forked task is always
    processed in stack-based LIFO order when it is run by the
    same underlying worker thread that spawned it, but in
    queue-based FIFO order with respect to other tasks if run by
    another worker thread running in parallel. This represents a
    cross of sorts between normal stack-based sequential calls,
    and normal queue-based thread scheduling. This policy
    (implemented via double-ended scheduling queues) is ideal
    for recursive task-based parallelism (see Further Readings),
    and more generally whenever dealing with strictly dependent
    tasks � those that are spawned either by the tasks that
    ultimately join them or by their subtasks.



  • In contrast, FJTask.start behaves
    more like Thread.start. It employs queue-based FIFO
    scheduling with respect to all worker threads. It is used,
    for example, by FJTaskRunnerGroup.invoke to start
    up execution of a new main task.



  • The join method should be used
    only for tasks initiated via fork. It exploits
    termination dependency patterns of fork/join subtasks to
    optimize execution.



  • The FJTask.invoke method runs the
    body of one task within another task, and waits out
    completion. Seen differently, it is the one-task version of
    coInvoke, an optimization of u.fork();
    u.join()
    .


Effective use of any
lightweight executable framework requires the same
understanding of support methods and their semantics as does
programming with ordinary
Threads.
The FJTask framework exploits the
symbiosis between recursion and parallel decomposition, and so
encourages the divide-and-conquer class=docTextHighlight>programming style seen in
Fib.
However, the range of class=docTextHighlight>programming idioms and design
patterns conforming to this general style is fairly broad, as
illustrated by the following examples.


4.4.1.5 Linking subtasks

Fork/join techniques may be applied even when
the number of forked subtasks varies dynamically. Among
several related tactics for carrying this out, you can add
link fields so that subtasks can be maintained in lists. After spawning all tasks, an accumulate (also known
as reduction) operation can
traverse the list sequentially, joining and class=docTextHighlight>using the results of each
subtask.


Stretching the Fib example a bit, the
FibVL class illustrates one way to set this up. This
style of solution is not especially useful here, but is
applicable in contexts in which a dynamic number of subtasks
are created, possibly across different methods. Notice that
the subtasks here are joined in the opposite order in which
they were created. Since the processing order of results does
not matter here, we use the simplest possible linking
algorithm (prepending), which happens to reverse the order of
tasks during traversal. This strategy applies whenever the
accumulation step is commutative and associative with respect
to results, so tasks can be processed in any order. If the
order did matter, we would need to adjust list construction or
traversal accordingly.


class FibVL extends FJTask {
volatile int number; // as before
final FibVL next; // embedded linked list of sibling tasks

FibVL(int n, FibVL list) { number = n; next = list; }

public void run() {
int n = number;
if(n <= sequentialThreshold)
number = seqFib(n);
else {
FibVL forked = null; // list of subtasks

forked = new FibVL(n - 1, forked); // prepends to list
forked.fork();

forked = new FibVL(n - 2, forked);
forked.fork();

number = accumulate(forked);
}
}

// Traverse list, joining each subtask and adding to result
int accumulate(FibVL list) {
int sum = 0;
for (FibVL f = list; f != null; f = f.next) {
f.join();
sum += f.number;
}
return sum;
}
}

4.4.1.6 Callbacks

Recursive task-based fork/join parallelism
may be extended to apply when other local synchronization
conditions are used instead of join. In the
FJTask framework, t.join() is implemented as
an optimized version of:


while (!t.isDone()) yield();

Method yield here allows the
underlying worker thread to process other tasks. (More
specifically, in the FJTask framework, the thread
will process at least one other
task if one exists.)


Any other condition may be used in this
construction rather than isDone, as long as you are
certain that the predicate being waited for will eventually
become true due to the actions of a subtask (or one of its
subtasks, and so on). For example, rather than relying on
join, task control can rely on counters that keep track of
task creation and completion. A counter can be incremented on
each fork and decremented when the forked task has
produced a result. This and related counter-based schemes can
be attractive choices when subtasks communicate back results
via callbacks rather than via access to result fields.
Counters of this form are small-scale, localized versions of
the barriers discussed in �
4.4.3.


Callback-based fork/join
designs are seen, for example, in problem-solving algorithms,
games, searching, and logic class=docTextHighlight>programming.
In many such applications, the number of subtasks that are
forked can vary dynamically, and subtask results are better
captured by method calls than by field extraction.


Callback-based approaches also permit greater
asynchrony than techniques such as the linked tasks in �
4.4.1.5. This can lead to better performance when subtasks
differ in expected duration, since the result processing of
quickly completing subtasks can sometimes overlap with
continued processing of longer ones. However, this design
gives up all result ordering guarantees, and thus is
applicable only when subtask result processing is completely
independent of the order in which results are produced.


Callback counters are used in the following
class FibVCB, which is not at all well-suited for the
problem at hand but serves to exemplify techniques. This code
illustrates a typical but delicate combination of task-local
variables, volatiles, and locking in an effort to
keep task control overhead to a minimum:


class FibVCB extends FJTask {
// ...
volatile int number = 0; // as before
final FibVCB parent; // is null for outermost call
int callbacksExpected = 0;
volatile int callbacksReceived = 0;

FibVCB(int n, FibVCB p) { number = n; parent = p; }

// Callback method invoked by subtasks upon completion
synchronized void addToResult(int n) {
number += n;
++callbacksReceived;
}

public void run() { // same structure as join-based version
int n = number;
if (n <= sequentialThreshold)
number = seqFib(n);
else {
// Clear number so subtasks can fill in
number = 0;
// Establish number of callbacks expected
callbacksExpected = 2;

new FibVCB(n - 1, this).fork();
new FibVCB(n - 2, this).fork();

// Wait for callbacks from children
while (callbacksReceived < callbacksExpected) yield();
}

// Call back parent
if (parent != null) parent.addToResult(number);
}
}

class=docEmphBoldItalic>Notes:



  • All mutual exclusion locking is restricted
    to small code segments protecting field accesses, as must be
    true for any class in a lightweight task framework. Tasks
    are not allowed to block unless they are sure they will be
    able to continue soon. In particular, this framework
    unenforceably requires that synchronized blocks
    not span forks and
    subsequent joins or yields.



  • To help eliminate some synchronization, the
    callback count is split into two counters,
    callbacksExpected and callbacksReceived.
    The task is done when they are equal.



  • The callbacksExpected counter is
    used only by the current task, so access need not be
    synchronized, and it need not be volatile.
    In fact, since exactly two callbacks are always expected in
    the recursive case and the value is never needed outside the
    run method, this class could easily be reworked in a way
    that eliminates all need for this variable. However, such a
    variable is needed in more typical callback-based designs
    where the number of forks may vary dynamically and may be
    generated across multiple methods.



  • The addToResult callback method
    must be synchronized to avoid interference problems
    when subtasks call back at about the same time.



  • So long as both number and
    callbacksReceived are declared as
    volatile, and callbacksReceived is updated
    as the last statement of addToResult, the
    yield loop test need not involve synchronization
    because it is waiting for a latching threshold that, once
    reached, will never change (see �
    3.4.2.1).



  • We could also define a reworked
    getAnswer method that uses these mechanics so that
    it returns an answer if all callbacks have been received.
    However, since this method is designed to be called by
    external (non-task) clients upon completion of the overall
    computation, there is no compelling reason to do this. The
    version from the original Fib class suffices.



  • Despite these measures, the
    overhead associated with task control in this version is
    greater than that of the original version class=docTextHighlight>using coInvoke. If you were to use it anyway, you would
    probably choose a slightly larger sequential threshold, and
    thus exploit slightly less parallelism.


4.4.1.7 Cancellation

In some designs, there is no need for keeping
counts of callbacks or exhaustively traversing through subtask
lists. Instead, tasks complete when any subtask (or one of its
subtasks, and so on) arrives at a suitable result. In these
cases, you can avoid wasting computation by cancelling any
subtasks in the midst of producing results that will not be
needed.


The options here are similar to those seen in
other situations involving cancellation (see �
3.1.2). For example, subtasks can regularly invoke a
method (perhaps isDone) in their parents that
indicates that an answer has already been found, and if so to
return early. They must also set their own status, so any of
their subtasks can do the same. This can be
implemented here using
FJTask.cancel that just prematurely sets
isDone status.
This suppresses execution of tasks that have not yet been
started, but has no effect on tasks in the midst of execution
unless the tasks' run methods themselves detect updated status
and deal with it.


When an entire set of tasks are all trying to
compute a single result, an even simpler strategy suffices:
Tasks may regularly check a global (static) variable
that indicates completion. However, when there are many tasks,
and many CPUs, more localized strategies may still be
preferable to one that places so much pressure on the
underlying system by generating many accesses to the same
memory location, especially if it must be accessed under
synchronization. Additionally, bear in mind that the total
overhead associated with cancellation should be less than the
cost of just letting small tasks run even if their results are
not needed.


For example, here is a class that solves the
classic N-Queens problem, searching for the placement of N
queens that do not attack each other on a chessboard of size
NxN. For simplicity of illustration, it relies on a static
Result variable. Here tasks check for cancellation
only upon entry into the method. They will continue looping
through possible extensions even if a result has already been
found. However, the generated tasks will immediately exit.
This can be slightly wasteful, but may obtain a solution more
quickly than a version that checks for completion upon every
iteration of every task.


Note also here that the tasks do not bother
joining their subtasks since there is no reason to do so. Only
the ultimate external caller (in main) needs to wait
for a solution; this is supported here by adding standard
waiting and notification methods to the Result class.
(Also, for compactness, this version does not employ any kind
of granularity threshold. It is easy to add one, for example
by directly exploring moves rather than forking subtasks when
the number of rows is close to the board size.)


class NQueens extends FJTask {
static int boardSize; // fixed after initialization in main
// Boards are arrays where each cell represents a row,
// and holds the column number of the queen in that row

static class Result { // holder for ultimate result
private int[] board = null; // non-null when solved

synchronized boolean solved() { return board != null; }

synchronized void set(int[] b) { // Support use by non-Tasks
if (board == null) { board = b; notifyAll(); }
}

synchronized int[] await() throws InterruptedException {
while (board == null) wait();
return board;
}
}
static final Result result = new Result();
public static void main(String[] args) {
boardSize = ...;
FJTaskRunnerGroup tasks = new FJTaskRunnerGroup(...);
int[] initialBoard = new int[0]; // start with empty board
tasks.execute(new NQueens(initialBoard));
int[] board = result.await();
// ...
}

final int[] sofar; // initial configuration

NQueens(int[] board) { this.sofar = board; }

public void run() {
if (!result.solved()) { // skip if already solved
int row = sofar.length;

if (row >= boardSize) // done
result.set(sofar);

else { // try all expansions

for (int q = 0; q < boardSize; ++q) {

// Check if queen can be placed in column q of next row
boolean attacked = false;
for (int i = 0; i < row; ++i) {
int p = sofar[i];
if (q == p || q == p - (row-i) || q == p + (row-i)) {
attacked = true;
break;
}
}

// If so, fork to explore moves from new configuration
if (!attacked) {
// build extended board representation
int[] next = new int[row+1];
for (int k = 0; k < row; ++k) next[k] = sofar[k];
next[row] = q;
new NQueens(next).fork();
}
}
}
}
}
}

4.4.2 Computation Trees



A number of computationally intensive
algorithms involve tasks of the form:


For a fixed number of steps, or until convergence, do {
Update one section of a problem;
Wait for other tasks to finish updating their sections;
}

Most often, such algorithms perform update
operations on partitioned arrays, matrices, or image
representations. For example, many physical dynamics problems
involve repeated local updates to the cells of a matrix. Jacobi algorithms
and related relaxation techniques repeatedly recalculate
estimated values across neighboring cells, typically class=docTextHighlight>using an averaging formula such
as:


void oneStep(double[][] oldMatrix,
double[][] newMatrix, int i, int j) {
newMatrix[i][j] = 0.25 * (oldMatrix[i-1][j] +
oldMatrix[i][j-1] +
oldMatrix[i+1][j] +
oldMatrix[i][j+1]);
}

Normally, to save space, two different
matrices are swapped as newMatrix and
oldMatrix across successive steps.


Algorithms requiring that class=docEmphasis>all tasks periodically wait for class=docEmphasis>all others to complete do not always
scale quite as well as more loosely coupled fork/join designs.
Even so, these algorithms are common, efficient, and amenable
to significant parallel speedups.


4.4.2.1 Building and
using trees

It would be inefficient to repeatedly apply
fork/join decomposition in iterative designs in order to
update sections in parallel. Because the sections are the same
across iterations, they can be constructed just once and then
repeatedly invoked so that on each iteration, the
corresponding updates execute in the same order as would be
produced by a recursive solution.


Computation trees are explicit
representations of the tree-structured computations implicitly
arising in fork/join recursion. These trees have two kinds of
nodes, internal nodes and leaf nodes, corresponding to the
recursive and base cases of a recursive solution. They can be
constructed and used for iterative update problems via the
following steps:




  1. Create a tree of task objects representing
    the recursive partitions, where:




    • Each internal node contains references to
      subpartitions, and has an update method that performs
      fork/join processing of each of them.



    • Each leaf node represents a
      finest-granularity partition, and has an update method
      that operates directly on it.



  2. For a fixed number of steps, or until
    convergence, do:




    • Execute the task performing the root
      partition's update method.


For example, the following code
illustrates the highlights of a set of classes that perform
Jacobi iteration using the
averaging formula shown above.
In addition to updating, this version also keeps track of the
differences among computed cell values across iterations, and
stops when the maximum difference is within a constant
EPSILON. Also, like many programs of this form, this
code assumes that the matrices have been set up with extra
edge cells that are never updated, so boundary conditions
never need to be checked. (Alternatives include
recomputing edge values class=docTextHighlight>using special edge formulas
after each pass, and treating edges as toroidally wrapping
around the mesh.)


The recursive decomposition strategy used
here is to divide the mesh into quadrants, stopping when the
number of cells is at most leafCells, which serves as
the granularity threshold. This strategy works well so long as
the numbers of rows and columns in the matrix are
approximately equal. If they are not, additional classes and
methods could be defined to divide across only one dimension
at a time. The approach here assumes that the matrix as a
whole already exists, so rather than actually dividing up
cells, task nodes just keep track of the row and column
offsets of this matrix that each partition is working on.


The subclass-based design used here reflects
the different structure and behavior of internal versus leaf
nodes. Both are subclasses of abstract base
JTree:


abstract class JTree extends FJTask {
volatile double maxDiff; // for convergence check
}

class Interior extends JTree {
private final JTree[] quads;

Interior(JTree q1, JTree q2, JTree q3, JTree q4) {
quads = new JTree[] { q1, q2, q3, q4 };
}

public void run() {
coInvoke(quads);
double md = 0.0;
for (int i = 0; i < quads.length; ++i) {
md = Math.max(md,quads[i].maxDiff);
quads[i].reset();
}
maxDiff = md;
}
}

class Leaf extends JTree {
private final double[][] A; private final double[][] B;
private final int loRow; private final int hiRow;
private final int loCol; private final int hiCol;
private int steps = 0;

Leaf(double[][] A, double[][] B,
int loRow, int hiRow, int loCol, int hiCol) {
this.A = A; this.B = B;
this.loRow = loRow; this.hiRow = hiRow;
this.loCol = loCol; this.hiCol = hiCol;
}

public synchronized void run() {
boolean AtoB = (steps++ % 2) == 0;
double[][] a = (AtoB)? A : B;
double[][] b = (AtoB)? B : A;
double md = 0.0;
for (int i = loRow; i <= hiRow; ++i) {
for (int j = loCol; j <= hiCol; ++j) {
b[i][j] = 0.25 * (a[i-1][j] + a[i][j-1] +
a[i+1][j] + a[i][j+1]);
md = Math.max(md, Math.abs(b[i][j] - a[i][j]));
}
}
maxDiff = md;
}
}

The driver class first builds a tree that
represents the partitioning of its argument matrix. The build
method could itself be parallelized. But because the base
actions are just node constructions, the granularity threshold
would be so high that parallelization would be worthwhile only
for huge problem sizes.


The run method repeatedly sets the
root task in motion and waits out completion. For simplicity
of illustration, it continues until convergence. Among other
changes necessary to turn this into a realistic program, you
would need to initialize the matrices and deal with possible
lack of convergence within a bounded number of iterations.
Because each iteration entails a full synchronization point
waiting for the root task to finish, it is relatively simple
to insert additional operations that maintain or report global
status between iterations.


class Jacobi extends FJTask {
static final double EPSILON = 0.001; // convergence criterion
final JTree root;
final int maxSteps;
Jacobi(double[][] A, double[][] B,
int firstRow, int lastRow, int firstCol, int lastCol,
int maxSteps, int leafCells) {
this.maxSteps = maxSteps;
root = build(A, B, firstRow, lastRow, firstCol, lastCol,
leafCells);
}

public void run() {
for (int i = 0; i < maxSteps; ++i) {
invoke(root);
if (root.maxDiff < EPSILON) {
System.out.println("Converged");
return;
}
else root.reset();
}
}

static JTree build(double[][] a, double[][] b,
int lr, int hr, int lc, int hc, int size) {
if ((hr - lr + 1) * (hc - lc + 1) <= size)
return new Leaf(a, b, lr, hr, lc, hc);
int mr = (lr + hr) / 2; // midpoints
int mc = (lc + hc) / 2;
return new Interior(build(a, b, lr, mr, lc, mc, size),
build(a, b, lr, mr, mc+1, hc, size),
build(a, b, mr+1, hr, lc, mc, size),
build(a, b, mr+1, hr, mc+1, hc, size));
}
}

4.4.3 Barriers



Recursive decomposition is a powerful and
flexible technique, but does not always fit well with the
structure of iterative problems, and usually requires adoption
of a lightweight execution framework for efficient
implementation. A more direct path to a solution of many
iterative problems is first to divide the problem into class=docEmphasis>segments, each with an associated
task performing a loop that must periodically wait for other
segments to complete. From the perspective of tree-based
approaches, these designs flatten out all the internal nodes
and just deal with the leaves.


As with recursive tasks, there are
opportunities to specialize Threads to make them more
attuned to the demands of parallel iteration (see Further
Readings). However, there is usually less to be gained by
doing so, in part because all thread construction overhead is
restricted to the start-up phase. Here we
illustrate the basic mechanics class=docTextHighlight>using regular Threads
each executing a single Runnable.
When using
Threads, granularity thresholds must in general be
substantially higher than when class=docTextHighlight>using lightweight executable
classes (although still substantially lower than those needed
in distributed parallel designs).
But the basic logic of iterative algorithms is otherwise
identical, regardless of granularity. In many
iterative problems, little potential parallelism is wasted by
using coarse
granularities.
When all threads perform approximately the same actions for
approximately the same durations, creating only as many tasks
as CPUs, or perhaps a small multiple of the number of CPUs,
can work well.


While it is always possible to
hand-craft the necessary control mechanics class=docTextHighlight>using waiting and notification
constructs, it is both more convenient and less error-prone
instead to rely on standardized synchronization aids that
encapsulate these mechanics.
The synchronization device of choice in iterative designs is a
cyclic barrier. A cyclic
barrier is initialized with a fixed number of parties that
will be repeatedly synchronizing. It supports only one method,
barrier, that forces each caller to wait until all
parties have invoked the method, and then resets for the next
iteration. A basic CyclicBarrier class can be defined
as follows:


class CyclicBarrier {

protected final int parties;
protected int count; // parties currently being waited for
protected int resets = 0; // times barrier has been tripped

CyclicBarrier(int c) { count = parties = c; }

synchronized int barrier() throws InterruptedException {
int index = --count;
if (index > 0) { // not yet tripped
int r = resets; // wait until next reset

do { wait(); } while (resets == r);

}
else { // trip
count = parties; // reset count for next time
++resets;
notifyAll(); // cause all other parties to resume
}

return index;
}
}

(The util. class=docTextHighlight>concurrent version of this
class available from the online supplement deals more
responsibly with interruptions and time-outs.
Fancier versions that reduce memory contention on the lock and
on the fields may be worth constructing on systems with very
large numbers of processors.)


The CyclicBarrier.barrier method
defined here returns the number of other threads that were
still waiting when the barrier was entered, which can be
useful in some algorithms. As another by-product, the
barrier method is intrinsically
synchronized, so it also serves as a memory barrier
to ensure flushes and loads of array element values in its
most typical usage contexts (see �
2.2.7).


A barrier may also be construed as a simple
consensus operator (see �
3.6). It gathers "votes" among several threads about
whether they should all continue to the next iteration.
Release occurs when all votes have been collected and
agreement has thus been reached. (However,
unlike transaction frameworks, threads class=docTextHighlight>using this
CyclicBarrier class are not allowed to vote
"no".)


With barriers, many parallel iterative
algorithms become easy to express. In the simplest cases,
these programs might take the form (eliding all
problem-specific details):


class Segment implements Runnable { // Code sketch
final CyclicBarrier bar; // shared by all segments
Segment(CyclicBarrier b, ...) { bar = b; ...; }

void update() { ... }

public void run() {
// ...
for (int i = 0; i < iterations; ++i) {
update();
bar.barrier();
}
// ...
}
}

class Driver {
// ...
void compute(Problem problem) throws ... {
int n = problem.size / granularity;
CyclicBarrier barrier = new CyclicBarrier(n);
Thread[] threads = new Thread[n];

// create
for (int i = 0; i < n; ++i)
threads[i] = new Thread(new Segment(barrier, ...));

// trigger
for (int i = 0; i < n; ++i) threads[i].start();

// await termination
for (int i = 0; i < n; ++i) threads[i].join();
}
}

This structure suffices for problems
requiring known numbers of iterations. However, many problems
require checks for convergence or some other global property
between iterations. (Conversely, in a few class=docEmphasis>chaotic relaxation algorithms you
don't even need a barrier after each iteration, but can
instead let segments free-run for a while between barriers
and/or checks.)


One way to provide convergence checks is to
rework the CyclicBarrier class to optionally run a
supplied Runnable command whenever a barrier is about
to be reset. A more classic approach, which illustrates a
technique useful in other contexts as well, is to rely on the
index returned by barrier. The caller obtaining index
zero (as an arbitrary, but always legal choice) can perform
the check while all others are quietly waiting for a class=docEmphasis>second barrier.


For example, here a a barrier-based version
of a segment class for the Jacobi problem described in �
4.4.2. Collections of JacobiSegment objects can
be initialized and run by a driver of the generic form given
above.


class JacobiSegment implements Runnable { // Incomplete
// These are same as in Leaf class version:
double[][] A; double[][] B;
final int firstRow; final int lastRow;
final int firstCol; final int lastCol;
volatile double maxDiff;
int steps = 0;
void update() { /* Nearly same as Leaf.run */ }

final CyclicBarrier bar;
final JacobiSegment[] allSegments; // for convergence check
volatile boolean converged = false;

JacobiSegment(double[][] A, double[][] B,
int firstRow, int lastRow,
int firstCol, int lastCol,
CyclicBarrier b, JacobiSegment[] allSegments) {
this.A = A; this.B = B;
this.firstRow = firstRow; this.lastRow = lastRow;
this.firstCol = firstCol; this.lastCol = lastCol;
this.bar = b;
this.allSegments = allSegments;
}

public void run() {
try {
while (!converged) {
update();
int myIndex = bar.barrier(); // wait for all to update
if (myIndex == 0) convergenceCheck();
bar.barrier(); // wait for convergence check
}
}
catch(Exception ex) {
// clean up ...
}
}

void convergenceCheck() {
for (int i = 0; i < allSegments.length; ++i)
if (allSegments[i].maxDiff > EPSILON) return;
for (int i = 0; i < allSegments.length; ++i)
allSegments[i].converged = true;
}
}

4.4.4 Further Readings


For a survey of approaches to
high-performance parallel processing, see


Skillicorn, David, and Domenico Talia,
"Models and Languages for Parallel Computation", Computing
Surveys
, June 1998.


Most texts on parallel class=docTextHighlight>programming concentrate on
algorithms designed for use on fine-grained parallel machine
architectures, but also cover design techniques and algorithms
that can be implemented class=docTextHighlight>using the kinds of stock
multiprocessors most amenable to supporting a JVM.
See, for example:


Foster, Ian. Designing and Building
Parallel Programs
, Addison Wesley, 1995.


Roosta, Seyed. Parallel Processing and
Parallel Algorithms
, Springer-Verlag, 1999.


Wilson, Gregory. Practical
Parallel Programming,
MIT Press, 1995.


Zomaya, Albert (ed.). Parallel and
Distributed Computing Handbook
, McGraw-Hill, 1996.


Pattern-based accounts of
parallel programming
include:


Massingill, Berna, Timothy Mattson, and
Beverly Sanders. A Pattern Language for
Parallel Application class=docTextHighlight>Programming, Technical
report, University of Florida, 1999.


MacDonald, Steve, Duane Szafron, and Jonathan
Schaeffer. "Object-Oriented Pattern-Based
Parallel Programming with
Automatically Generated Frameworks", in Proceedings of the
5th USENIX Conference on Object-Oriented Tools and Systems
(COOTS), 1999.


The FJTask framework
internally relies on a class=docEmphasis>work-stealing task scheduler based on
the one in Cilk, a C-based parallel class=docTextHighlight>programming framework.
In work-stealing schedulers, each worker thread normally runs
(in LIFO order) tasks that it constructs, but when idle steals
(in FIFO order) those constructed by other worker threads.
More details, including explanations of the senses in which
this scheme is optimal for recursive fork/join programs, may
be found in:


Frigo, Matteo, Charles Leiserson, and Keith
Randall. "The Implementation of the Cilk-5
Multithreaded Language", Proceedings of 998 ACM SIGPLAN
Conference on Programming
Language Design and Implementation (PLDI), 1998.


The online supplement includes more realistic
examples of the techniques discussed in this section. It also
provides links to the Cilk package and related frameworks,
including Hood (a C++ follow-on to Cilk) and Filaments (a C
package that includes a specialized framework supporting
barrier-based iterative computation).



    No comments:

    Post a Comment