next up previous index
Next: The Master Program Up: Simple MPI Previous: Dividing the Pie

   
Bank Queue

The following program illustrates a rather important parallel programming technique, which is often referred to as a job queue or a bank queue paradigm. The idea is as follows: you have a number of jobs that you need to attend to, and that are not dependent on each other. The master process maintains the job queue, and sends jobs to slave processes. The slaves labour on the jobs and return the results back to the master. Once a slave has finished working on its last assignment and returned the results to the master, a new job is sent to the slave. That way all processes of an MPI farm are kept busy.

The paradigm can be enriched. For example slave processes can return not only answers to the master, but also new jobs. The master process may assess those jobs, perhaps compare them to a list of jobs already done that it may keep on a lightweight data base, and if a job is new indeed, it can be added to the queue, whereas if there is already an answer available to that job, the answer may be passed to the slave together with the information that this class of problems has already been solved.

That way the queue can grow and shrink dynamically as the computation proceeds.

This is a very dynamic paradigm and it is often used to traverse dynamically growing trees, especially in artificial reasoning programs.

The master itself may take part in the work, other than just maintaining the queue and communicating with the slaves. But if there is a lot of work and a lot of slaves, the master process may be too busy to carry on with any computation of its own.

When you write programs like that you must also remember that communication is expensive. Consequently jobs sent to the slaves must be substantial enough to occupy them for a very long time. It is very easy to overload the master process if task granularity is too small. In that case the communication and the inability of the master process to respond quickly enough to slaves' requests can become a bottle neck.

Here's the program itself. In this program we are simply going to multiply matrix A by vector bin parallel. Every slave process will have its own copy of b, and the master is going to send them rows of Ato multiply by their own copy of b. The result is a corresponding entry in vector $\boldsymbol{c} = \boldsymbol{A}\cdot\boldsymbol{b}$. A slave process are going to deliver the entry to the master process, which will then place it in an appropriate slot in c and pass a new row to the slave process at the same time, if there is any more work still to be done. If there is no more work, the master sacks the slave process, but sending it a termination message.

So here is the code:

gustav@sp20:../MPI 18:19:12 !522 $ cat bank.c
#include <stdio.h>
#include <mpi.h>
#define COLS 100
#define ROWS 100
#define TRUE 1
#define FALSE 0
#define MASTER_RANK 0

int main ( int argc, char **argv )
{
   int pool_size, my_rank, destination;
   int i_am_the_master = FALSE; 
   int a[ROWS][COLS], b[ROWS], c[ROWS], i, j;
   int int_buffer[BUFSIZ];
   MPI_Status status;
   

   MPI_Init(&argc, &argv);
   MPI_Comm_size(MPI_COMM_WORLD, &pool_size);
   MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);

   if (my_rank == MASTER_RANK) i_am_the_master = TRUE;

   if (i_am_the_master) {

      int row, count, sender;

      for (i = 0; i < COLS; i++) {
         b[i] = 1;
         for (j = 0; j < ROWS; j++) a[i][j] = i;
      }

      MPI_Bcast(b, ROWS, MPI_INT, MASTER_RANK, MPI_COMM_WORLD);

      count = 0;
      for (destination = 0; destination < pool_size; destination++) {
         if (destination != my_rank) {
            for (j = 0; j < COLS; j++) int_buffer[j] = a[count][j];
            MPI_Send(int_buffer, COLS, MPI_INT, destination, count,
                     MPI_COMM_WORLD);
            printf("sent row %d to %d\n", count, destination);
            count = count + 1;
         }
      }

      for (i = 0; i < ROWS; i++) {
         MPI_Recv (int_buffer, BUFSIZ, MPI_INT, MPI_ANY_SOURCE, 
                   MPI_ANY_TAG, MPI_COMM_WORLD, &status);
         sender = status.MPI_SOURCE;
         row = status.MPI_TAG;
         c[row] = int_buffer[0];
         printf("\treceived row %d from %d\n", row, sender);
         if (count < ROWS) {
            for (j = 0; j < COLS; j++) int_buffer[j] = a[count][j];
            MPI_Send(int_buffer, COLS, MPI_INT, sender, count,
                     MPI_COMM_WORLD);
            printf("sent row %d to %d\n", count, sender);
            count = count + 1;
         }
         else {
            MPI_Send(NULL, 0, MPI_INT, sender, ROWS, MPI_COMM_WORLD);
            printf("terminated process %d with tag %d\n", sender, ROWS);
         }
      }
   }
   else { /* I am not the master */

      int sum, row;
      FILE *log_file;

      log_file = fopen ("/tmp/gustav_log", "w");

      MPI_Bcast(b, COLS, MPI_INT, MASTER_RANK, MPI_COMM_WORLD);
      fprintf(log_file, "received broadcast from %d\n", MASTER_RANK);
      fflush(log_file);
      MPI_Recv(int_buffer, COLS, MPI_INT, MASTER_RANK, MPI_ANY_TAG,
                    MPI_COMM_WORLD, &status);
      fprintf(log_file, "received a message from %d, tag %d\n",
                   status.MPI_SOURCE, status.MPI_TAG);
      fflush(log_file);
      while (status.MPI_TAG != ROWS) { /* The job is not finished */
         row = status.MPI_TAG; sum = 0;
         for (i = 0; i < COLS; i++) sum = sum + int_buffer[i] * b[i];
         int_buffer[0] = sum;
         MPI_Send (int_buffer, 1, MPI_INT, MASTER_RANK, row, MPI_COMM_WORLD);
         fprintf(log_file, "sent row %d to %d\n", row, MASTER_RANK);
         fflush(log_file);
         MPI_Recv (int_buffer, COLS, MPI_INT, MASTER_RANK, MPI_ANY_TAG,
                   MPI_COMM_WORLD, &status);
         fprintf(log_file, "received a message from %d, tag %d\n",
                 status.MPI_SOURCE, status.MPI_TAG);
         fflush(log_file);
      }
      fprintf(log_file, "exiting on  tag %d\n", status.MPI_TAG);
      fflush(log_file);
   }

   MPI_Finalize ();
}

gustav@sp20:../MPI 18:19:16 !523 $

And here is how this code compiles and runs:

gustav@sp20:../MPI 18:20:01 !524 $ mpcc -o bank bank.c
gustav@sp20:../MPI 18:20:10 !525 $ cat bank.ll
# @ job_type = parallel
# @ environment = COPY_ALL; MP_EUILIB=ip; MP_INFOLEVEL=2
# @ requirements = (Adapter == "hps_ip") && (Machine != "sp20") \
                   && (Machine != "sp18")
# @ min_processors = 4
# @ max_processors = 8
# @ class = test
# @ notification = never
# @ executable = /usr/bin/poe
# @ arguments = bank
# @ output = bank.out
# @ error = bank.err
# @ queue
gustav@sp20:../MPI 18:20:15 !526 $ llsubmit bank.ll
submit: The job "sp20.98" has been submitted.
gustav@sp20:../MPI 18:20:18 !527 $

The results are returned on bank.out, which looks as follows:

gustav@sp20:../MPI 18:21:10 !529 $ cat bank.out
sent row 0 to 1
sent row 1 to 2
sent row 2 to 3
sent row 3 to 4
sent row 4 to 5
sent row 5 to 6
sent row 6 to 7
        received row 1 from 2
sent row 7 to 2
        received row 2 from 3
sent row 8 to 3
        received row 7 from 2
sent row 9 to 2
        received row 8 from 3
sent row 10 to 3
        received row 9 from 2
sent row 11 to 2
        received row 10 from 3
sent row 12 to 3
        received row 11 from 2
sent row 13 to 2
        received row 12 from 3
sent row 14 to 3
        received row 13 from 2
sent row 15 to 2
        received row 14 from 3
sent row 16 to 3
        received row 15 from 2
sent row 17 to 2
        received row 16 from 3
sent row 18 to 3
        received row 17 from 2
sent row 19 to 2
        received row 18 from 3
sent row 20 to 3
        received row 19 from 2
sent row 21 to 2
        received row 20 from 3
sent row 22 to 3
        received row 21 from 2
sent row 23 to 2
        received row 22 from 3
sent row 24 to 3
        received row 23 from 2
sent row 25 to 2
        received row 24 from 3
sent row 26 to 3
        received row 25 from 2
sent row 27 to 2
        received row 26 from 3
sent row 28 to 3
        received row 27 from 2
sent row 29 to 2
        received row 28 from 3
sent row 30 to 3
        received row 29 from 2
sent row 31 to 2
        received row 30 from 3
sent row 32 to 3
        received row 31 from 2
sent row 33 to 2
        received row 32 from 3
sent row 34 to 3
        received row 33 from 2
sent row 35 to 2
        received row 34 from 3
sent row 36 to 3
        received row 35 from 2
sent row 37 to 2
        received row 36 from 3
sent row 38 to 3
        received row 3 from 4
sent row 39 to 4
        received row 4 from 5
sent row 40 to 5
        received row 37 from 2
sent row 41 to 2
        received row 38 from 3
sent row 42 to 3
        received row 5 from 6
sent row 43 to 6
        received row 6 from 7
sent row 44 to 7
        received row 39 from 4
sent row 45 to 4
        received row 40 from 5
sent row 46 to 5
        received row 41 from 2
sent row 47 to 2
        received row 42 from 3
sent row 48 to 3
        received row 43 from 6
sent row 49 to 6
        received row 44 from 7
sent row 50 to 7
        received row 45 from 4
sent row 51 to 4
        received row 46 from 5
sent row 52 to 5
        received row 47 from 2
sent row 53 to 2
        received row 48 from 3
sent row 54 to 3
        received row 49 from 6
sent row 55 to 6
        received row 50 from 7
sent row 56 to 7
        received row 51 from 4
sent row 57 to 4
        received row 52 from 5
sent row 58 to 5
        received row 53 from 2
sent row 59 to 2
        received row 54 from 3
sent row 60 to 3
        received row 55 from 6
sent row 61 to 6
        received row 56 from 7
sent row 62 to 7
        received row 57 from 4
sent row 63 to 4
        received row 58 from 5
sent row 64 to 5
        received row 59 from 2
sent row 65 to 2
        received row 60 from 3
sent row 66 to 3
        received row 61 from 6
sent row 67 to 6
        received row 62 from 7
sent row 68 to 7
        received row 63 from 4
sent row 69 to 4
        received row 64 from 5
sent row 70 to 5
        received row 65 from 2
sent row 71 to 2
        received row 66 from 3
sent row 72 to 3
        received row 67 from 6
sent row 73 to 6
        received row 68 from 7
sent row 74 to 7
        received row 69 from 4
sent row 75 to 4
        received row 70 from 5
sent row 76 to 5
        received row 71 from 2
sent row 77 to 2
        received row 72 from 3
sent row 78 to 3
        received row 73 from 6
sent row 79 to 6
        received row 75 from 4
sent row 80 to 4
        received row 74 from 7
sent row 81 to 7
        received row 76 from 5
sent row 82 to 5
        received row 77 from 2
sent row 83 to 2
        received row 78 from 3
sent row 84 to 3
        received row 79 from 6
sent row 85 to 6
        received row 80 from 4
sent row 86 to 4
        received row 81 from 7
sent row 87 to 7
        received row 82 from 5
sent row 88 to 5
        received row 83 from 2
sent row 89 to 2
        received row 84 from 3
sent row 90 to 3
        received row 85 from 6
sent row 91 to 6
        received row 86 from 4
sent row 92 to 4
        received row 87 from 7
sent row 93 to 7
        received row 88 from 5
sent row 94 to 5
        received row 89 from 2
sent row 95 to 2
        received row 90 from 3
sent row 96 to 3
        received row 91 from 6
sent row 97 to 6
        received row 92 from 4
sent row 98 to 4
        received row 93 from 7
sent row 99 to 7
        received row 94 from 5
terminated process 5 with tag 100
        received row 95 from 2
terminated process 2 with tag 100
        received row 96 from 3
terminated process 3 with tag 100
        received row 97 from 6
terminated process 6 with tag 100
        received row 98 from 4
terminated process 4 with tag 100
        received row 99 from 7
terminated process 7 with tag 100
        received row 0 from 1
terminated process 1 with tag 100
gustav@sp20:../MPI 18:21:16 !530 $

Let me now explain in detail how this program works.

The matrix is $100\times100$, which is fixed by

#define COLS 100
#define ROWS 100
and the master process is going to be the process of rank 0:
#define MASTER_RANK 0

After the initial incantations:

   MPI_Init(&argc, &argv);
   MPI_Comm_size(MPI_COMM_WORLD, &pool_size);
   MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
all processes know about the size of the process pool and their own rank within it, including the master process, which asserts:
    if (my_rank == MASTER_RANK) i_am_the_master = TRUE;

Now we enter an interesting part of the code. There is a large if statement there, which looks as follows:

   if (i_am_the_master) {
      blah... blah... blah..
   }
   else { /* I am not the master */
      blah... blah... blah...
   }

   MPI_Finalize ();

As you see, this if statement divides the code into two subprograms: the master process executes the first clause of if and the slave processes execute the else clause.

The two subprograms are really quite different and they don't merge at all until the final MPI_Finalize().



 
next up previous index
Next: The Master Program Up: Simple MPI Previous: Dividing the Pie
Zdzislaw Meglicki
2001-02-26