next up previous index
Next: The Master Program Up: Basic MPI Previous: Exercises

   
Bank Queue

The following program illustrates a rather important parallel programming technique, which is often referred to as a job queue or a bank queue paradigm. The idea is as follows: you have a number of jobs that you need to attend to, and that are not dependent on each other. The master process maintains the job queue, and sends jobs to slave processes. The slaves labour on the jobs and return the results back to the master. Once a slave has finished working on its last assignment and returned the results to the master, a new job is sent to the slave. That way all processes of an MPI farm are kept busy.

The paradigm can be enriched. For example the slave processes can return not only answers to the master, but also new jobs. The master process may assess those jobs, perhaps compare them to a list of jobs already done that it may keep on a lightweight data base, and if a job is new indeed, it may be added to the queue, whereas if there is already an answer available to that job, the answer may be passed to the slave together with the information that this class of problems has already been solved. This way the queue may grow and shrink dynamically as the computation proceeds.

This is a very flexible paradigm and it is often used to traverse dynamically growing trees, especially in artificial reasoning programs.

The master itself may take part in the work, other than just maintaining the queue and communicating with the slaves. But if there is a lot of work and a lot of slaves, the master process may be too busy attending to the queue and communicating with the slaves to carry out any computation of its own.

When you write a program based on this paradigm you must remember that communication is expensive. Consequently jobs sent to the slaves must be substantial enough to occupy them for a very long time. It is very easy to overload the master process if task granularity is too small. In that case the communication and the inability of the master process to respond quickly enough to slaves' requests may easily become a bottleneck.

Here's the program itself.

In this program we are simply going to multiply matrix A by vector b in parallel. Every slave process will have its own copy of b, and the master is going to send them rows of A to multiply by their own copy of b. The result of the computation by a slave is going to be a corresponding entry in vector $\boldsymbol{c} =
\boldsymbol{A}\cdot\boldsymbol{b}$.

A slave process is going to deliver the calculated entry in vector c back to the master process, which will then place it in an appropriate slot in c and pass a new row to the slave process at the same time, if there is any more work still to be done. If there is no more work, the master sacks the slave process, but sending it a termination message.

/*
 * %Id: bank.c,v 1.1 2003/10/05 19:52:15 gustav Exp %
 *
 * %Log: bank.c,v %
 * Revision 1.1  2003/10/05 19:52:15  gustav
 * Initial revision
 *
 *
 */

#include <stdio.h>     /* [fs]printf, fopen and fclose defined here */
#include <stdlib.h>    /* exit defined here */
#include <sys/types.h> /* chmod defined here */
#include <sys/stat.h>  /* chmod defined here */
#include <mpi.h>

#define COLS 100
#define ROWS 100
#define TRUE 1
#define FALSE 0
#define MASTER_RANK 0

int main ( int argc, char **argv )
{
   int pool_size, my_rank, destination;
   int i_am_the_master = FALSE; 
   int a[ROWS][COLS], b[COLS], c[ROWS], i, j;
   int int_buffer[BUFSIZ];
   char my_logfile_name[BUFSIZ];
   FILE *my_logfile;
   MPI_Status status;
   

   MPI_Init(&argc, &argv);
   MPI_Comm_size(MPI_COMM_WORLD, &pool_size);
   MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);

   if (my_rank == MASTER_RANK) i_am_the_master = TRUE;

   sprintf(my_logfile_name, "/N/B/gustav/tmp/bank.%d", my_rank);
   my_logfile = fopen(my_logfile_name, "w");
   (void) chmod(my_logfile_name, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);

   if (i_am_the_master) {

      int row, count, sender;

      for (j = 0; j < COLS; j++) {
         b[j] = 1;
         for (i = 0; i < ROWS; i++) a[i][j] = i;
      }

      MPI_Bcast(b, COLS, MPI_INT, MASTER_RANK, MPI_COMM_WORLD);

      count = 0;
      for (destination = 0; destination < pool_size; destination++) {
         if (destination != my_rank) {
            for (j = 0; j < COLS; j++) int_buffer[j] = a[count][j];
            MPI_Send(int_buffer, COLS, MPI_INT, destination, count,
                     MPI_COMM_WORLD);
            fprintf(my_logfile, "sent row %d to %d\n", count, destination);
            count = count + 1;
         }
      }

      for (i = 0; i < ROWS; i++) {
         MPI_Recv (int_buffer, BUFSIZ, MPI_INT, MPI_ANY_SOURCE, 
                   MPI_ANY_TAG, MPI_COMM_WORLD, &status);
         sender = status.MPI_SOURCE;
         row = status.MPI_TAG;
         c[row] = int_buffer[0];
         fprintf(my_logfile, "\treceived row %d from %d\n", row, sender);
         if (count < ROWS) {
            for (j = 0; j < COLS; j++) int_buffer[j] = a[count][j];
            MPI_Send(int_buffer, COLS, MPI_INT, sender, count,
                     MPI_COMM_WORLD);
            fprintf(my_logfile, "sent row %d to %d\n", count, sender);
            count = count + 1;
         }
         else {
            MPI_Send(NULL, 0, MPI_INT, sender, ROWS, MPI_COMM_WORLD);
            fprintf(my_logfile, "terminated process %d with tag %d\n", sender, ROWS);
         }
      }
      for (row = 0; row < ROWS; row++) printf("%d ", c[row]);
      printf("\n");

   }
   else { /* I am not the master */

      int sum, row;

      MPI_Bcast(b, COLS, MPI_INT, MASTER_RANK, MPI_COMM_WORLD);
      fprintf(my_logfile, "received broadcast from %d\n", MASTER_RANK);
      MPI_Recv(int_buffer, COLS, MPI_INT, MASTER_RANK, MPI_ANY_TAG,
                    MPI_COMM_WORLD, &status);
      fprintf(my_logfile, "received a message from %d, tag %d\n",
                   status.MPI_SOURCE, status.MPI_TAG);
      while (status.MPI_TAG != ROWS) { /* The job is not finished */
         row = status.MPI_TAG; sum = 0;
         for (i = 0; i < COLS; i++) sum = sum + int_buffer[i] * b[i];
         int_buffer[0] = sum;
         MPI_Send (int_buffer, 1, MPI_INT, MASTER_RANK, row, MPI_COMM_WORLD);
         fprintf(my_logfile, "sent row %d to %d\n", row, MASTER_RANK);
         MPI_Recv (int_buffer, COLS, MPI_INT, MASTER_RANK, MPI_ANY_TAG,
                   MPI_COMM_WORLD, &status);
         fprintf(my_logfile, "received a message from %d, tag %d\n",
                 status.MPI_SOURCE, status.MPI_TAG);
      }
      fprintf(my_logfile, "exiting on  tag %d\n", status.MPI_TAG);
   }

   fclose (my_logfile);

   MPI_Finalize ();

   exit (0);
}

The code is compiled and installed with a very simple Makefile:

# 
# %Id: Makefile,v 1.2 2003/10/05 19:56:13 gustav Exp %
# 
# %Log: Makefile,v %
# Revision 1.2  2003/10/05 19:56:13  gustav
# Fixed DESTDIR
#
# Revision 1.1  2003/10/05 19:54:55  gustav
# Initial revision
#
#
DESTDIR = /N/B/gustav/bin

all: bank

bank: bank.c
        mpicc -o bank bank.c

install: all
        install bank $(DESTDIR)

clean:
        rm -f bank.o bank

clobber: clean
        rcsclean

And it all works as follows:

gustav@bh1 $ pwd
/N/B/gustav/src/I590/bank
gustav@bh1 $ make install
co  RCS/Makefile,v Makefile
RCS/Makefile,v  -->  Makefile
revision 1.2
done
co  RCS/bank.c,v bank.c
RCS/bank.c,v  -->  bank.c
revision 1.1
done
mpicc -o bank bank.c
install bank /N/B/gustav/bin
gustav@bh1 $ cd
gustav@bh1 $ [ -d tmp ] || mkdir tmp
gustav@bh1 $ rm -f tmp/*
gustav@bh1 $ mpdtrace
bh1
bc34
bc35
bc37
bc36
bc38
bc40
bc39
gustav@bh1 $ mpiexec -n 8 bank
0 100 200 300 400 500 600 700 800 900 1000 1100 1200 1300 1400 1500 1600 \
1700 1800 1900 2000 2100 2200 2300 2400 2500 2600 2700 2800 2900 3000 3100 \
3200 3300 3400 3500 3600 3700 3800 3900 4000 4100 4200 4300 4400 4500 4600 \
4700 4800 4900 5000 5100 5200 5300 5400 5500 5600 5700 5800 5900 6000 6100 \
6200 6300 6400 6500 6600 6700 6800 6900 7000 7100 7200 7300 7400 7500 7600 \
7700 7800 7900 8000 8100 8200 8300 8400 8500 8600 8700 8800 8900 9000 9100 \
9200 9300 9400 9500 9600 9700 9800 9900 
gustav@bh1 $ ls tmp
bank.0  bank.1  bank.2  bank.3  bank.4  bank.5  bank.6  bank.7
gustav@bh1 $

Files bank.0 through bank.7 contain logs from slaves and from the master. We'll have a look at those files later.

But first let us analyze this program in detail.

Matrix A is $100\times100$. Its dimensions are fixed by

#define COLS 100
#define ROWS 100
The master process is going to be the process of rank 0:
#define MASTER_RANK 0
All processes begin their existence from the usual MPI incantations:
   MPI_Init(&argc, &argv);
   MPI_Comm_size(MPI_COMM_WORLD, &pool_size);
   MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
through which they learn about their rank numbers and the size of the pool. Then the master process learns that about its masterood:
   if (my_rank == MASTER_RANK) i_am_the_master = TRUE;
The variable i_am_the_master is FALSE by default and it remains so for all other processes.

Then all processes open their own respective log files:

   sprintf(my_logfile_name, "/N/B/gustav/tmp/bank.%d", my_rank);
   my_logfile = fopen(my_logfile_name, "w");
   (void) chmod(my_logfile_name, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
Observe that all processes are going to log on the NFS in my $HOME. The files that correspond to the processes are numbered by their ranks, which are appended to the string bank.. The name of the log file is hardwired in this program. You will have to change this if you want to recompile this program and run it from your own account. Furthermore the program does not check for the existence of the directory /N/B/gustav/tmp, so this directory must be created before the program is run. This is a typical example of sloppy programming. You would not put anything like this in a commercial application. But, if you are in the process of developing a parallel program, and something doesn't work, and you have to debug every process and all communications that take place, then this is a legitimate way of doing things. It is a parallel equivalent of debugging  a sequential program with printf statements.

Function fopen is going to open the file with rather liberal permissions (e.g., it may be open for writing to all users on the system). To make sure that the permissions are as you expect them to be, e.g., 644, you can use the chmod system  call. Here I set the permissions to be -rw-r--r--. The constants used in the permissions and the function chmod itself are defined on /usr/include/sys/stat.h. The default file mode for opening is 0666.

Now the program splits into two subprograms. There is a separate subprogram for the master and a separate subprogram for the slaves:

...

   if (i_am_the_master) {
      blah... blah... blah...
   }
   else { /* I am not the master */
      blah... blah... blah...
   }

   fclose (my_logfile);
   MPI_Finalize();
   exit(0);
}

We are going to discuss these subprograms in the separate sections.



 
next up previous index
Next: The Master Program Up: Basic MPI Previous: Exercises
Zdzislaw Meglicki
2004-04-29