next up previous index
Next: Manipulating Communicators Up: Interacting Particles Previous: The Code

The Discussion

The program begins in the usual way: participating processes find out about the size of the process pool and their own place (rank, master) within that pool:

   MPI_Init(&argc, &argv);
   MPI_Comm_size(MPI_COMM_WORLD, &pool_size);
   MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
   if (my_rank == MASTER_RANK) i_am_the_master = TRUE;

Then every process finds about the number of particles that it is going to look after:

   particle_number = MAX_PARTICLES / pool_size;   
   if (i_am_the_master)
      printf ("%d particles per processor\n", particle_number);

Particles are described in terms of structures that comprise information about particle's mass and its position:

   typedef struct {
      double x, y, z;
      double mass;
   } Particle;
     Particle  particles[MAX_PARTICLES];  /* Particles on all nodes */

Because we are going to send particles between processes, we should define the corresponding MPI_Datatypte so that we can operate on particles as single entities, instead of having to split each of them into four real numbers and send those numbers separately or as an array.

In this case we rely on the fact that most C compilers implement a structure such as a Particle as four contigous double precision numbers. And MPI has a special way of handling such objects:

   MPI_Type_contiguous ( 4, MPI_DOUBLE, &particle_type );
   MPI_Type_commit ( &particle_type );
New MPI types, once defined, have to be committed. That operation allows MPI and, possibly, hardware to rearrange its internal communication buffers and links to handle the new type more efficiently. We have already encountered that operation in our previous example.

The next operation:

   MPI_Allgather ( &particle_number, 1, MPI_INT, counts, 1, MPI_INT,
                   MPI_COMM_WORLD );
gathers information about the number of particles every process is going to look after on an array counts. Because this is an All-Gather operation, every process ends up with data in its own copy of counts. The semantics of this operations is as follows: every process puts an integer number into its own slot called particle_number. The address of that slot, &particle_number points to the beginning of the send buffer. There is only 1 item of type MPI_INT in that buffer. Once the operation completes, the contribution from process 0 goes into counts[0], the contribution from process 1 goes into counts[1], and so on. Every process gets its own array counts filled. What goes into counts is one number of type MPI_INT from each process. It is actually possible to re-interpret the data that is sent on arrival, so that you can send MPI_LONG and receive it as 2 MPI_INT per slot of counts. In that case every slot of counts would have enough space for 2 integers.

Now every process evaluates the displacements, i.e., the positions within the array of particles where particles belonging to a given process begin:

   displacements[0] = 0;
   for (i = 1; i < pool_size; i++)
      displacements[i] = displacements[i-1] + counts[i-1];
   total_particles = displacements[pool_size - 1]
                     + counts[pool_size - 1];
Every process evaluates the total number of particles in the system too.

Observe that although we have given an equal number of particles to each process, the whole logic of the program is such that we can handle different numbers of particles per process.

It is now up to the master process to announce the total number of particles:

   if (i_am_the_master)
      printf ("total number of particles = %d\n", total_particles);

And once this is done, every process evaluates the place in the array of particles where its own particles begin:

   my_offset = displacements[my_rank];

This information is then communicated to the master process, who prints on standard output information about offsets corresponding to every process:

   MPI_Gather ( &my_offset, 1, MPI_INT, offsets, 1, MPI_INT, MASTER_RANK,
                MPI_COMM_WORLD );
   if (i_am_the_master) {
      printf ("offsets: ");
      for (i = 0; i < pool_size; i++)
         printf ("%d ", offsets[i]);
The operation MPI_Gather works like MPI_Allgather, but the difference is that only one process, called the root process does the collection and receives data from all other processes this time. The root process here is the MASTER_RANK process.

Now every process initializes positions of its own particles, only, to some random numbers, and sets their mass to 1:

   srand48((long) my_rank);
   for (i = 0; i < particle_number; i++) {
      particles[my_offset + i].x = drand48();
      particles[my_offset + i].y = drand48();
      particles[my_offset + i].z = drand48();
      particles[my_offset + i].mass = 1.0;

And here all processes exchange their particle data with other processes:

   start_time = MPI_Wtime();
   MPI_Allgatherv ( particles + my_offset, particle_number,
                    particles, counts, displacements, particle_type,
                    MPI_COMM_WORLD );

   end_time = MPI_Wtime();
Function MPI_Wtime() returns wall clock time in seconds. This is a very useful MPI function, which is handy when timing MPI programs. Remember that to a user the only thing that really matter is the wall clock time, i.e., the time she's going to wait for her program to finish execution. CPU time is for clerks who administer supercomputer centres.

Now let us have a closer look at the MPI_Allgatherv operation. The operation works much like MPI_Allgather, but it allows for contributions of different length by different processes. The send buffer for a given process begins from particles + my_offset. Every process sends particle_number of objects whose type is particle_type. The data will be collected on particles, receiving counts[i] data items from process i, and once that data has been received it should be written on particles beginning from position displacements[i] within that array. The received data items are expected to be of type particle_type. Again observe that MPI allows for data to be cast on a different type while this operation is under way.

After this operation completes the master process writes on standard output a message about how long this communication operation took. Also, it writes x coordinates of particles located at the beginning of the sectors corresponding to different processes, just so that you can see that there are really some particles there.

Now we start out timer again and perform computation on particles:

   start_time = MPI_Wtime();

   count = 0;
   for (i = 0; i < particle_number; i++) {

      vector[my_offset + i].vx = 0.0;
      vector[my_offset + i].vy = 0.0;
      vector[my_offset + i].vz = 0.0;
      for (j = 0; j < total_particles; j++) {
         if (j != i) {

            double dx, dy, dz, r2, r, mimj_by_r3;

            dx = particles[my_offset + i].x - particles[j].x;
            dy = particles[my_offset + i].y - particles[j].y;
            dz = particles[my_offset + i].z - particles[j].z;

            r2 = dx * dx + dy * dy + dz * dz; r = sqrt(r2);

            if (r2 < EPSILON) mimj_by_r3 = 0.0;
               mimj_by_r3 = particles[my_offset + i].mass 
                            * particles[j].mass / (r2 * r);

            vector[my_offset + i].vx = vector[my_offset + i].vx +
                                       mimj_by_r3 * dx;
            vector[my_offset + i].vy = vector[my_offset + i].vy +
                                       mimj_by_r3 * dy;
            vector[my_offset + i].vz = vector[my_offset + i].vz +
                                       mimj_by_r3 * dz;
            count = count + 1;
For every particle i within every process' region of responsibility, the velocity of that particle vi is set to 0. Then for that particle we evaluate forces with which all other particles (j) in the entire system act on it:

&=& \frac{m_i m_j}{\left(\boldsymbol{r}_i ...
...dsymbol{v}_i &\leftarrow&
\boldsymbol{v}_i + \boldsymbol{F}_{ij}

and then the effect that those forces have on particle i's velocity.

The computation here is just a fake, and it's purpose is merely to demonstrate that something gets computed. Our laws of dynamics are a little strange, it is not entirely clear if mi is really a mass or a charge, $\Delta t$ is set to 1, and if mi is a mass, then we quite unnecessarily multiply mi by mj, because then we should really divide it all back by mi. But who cares. For the sake of this program the most important thing is to measure how long this computation will take and compare that with the time used by the communication operations.

As soon as the master process has finished its own work, it writes on standard output how long it spent computing forces and advancing velocities.

   end_time = MPI_Wtime();
   if (i_am_the_master)
      printf ("done my job in %8.5f seconds, waiting for slow \
processes...\n", end_time - start_time);

Some processes may have taken longer to do that than others. So we set up a barrier.

   MPI_Barrier (MPI_COMM_WORLD);
This is a synchronization device. The function returns only when all processes within the MPI_COMM_WORLD communicator, in this case, have joined the barrier. So now the master can measure time again, and tell us how long it took for the slowest process to finish the job:
   end_time = MPI_Wtime();

   if (i_am_the_master)
          printf ("evaluated %d 3D interactions in %8.5f seconds\n",
                  count * pool_size, end_time - start_time);

And this is it.

Let us now have a look again at the output produced by the program:

100 particles per processor
total number of particles = 1000
offsets: 0 100 200 300 400 500 600 700 800 900 
Communicating =  0.00802 seconds
particles[offsets[i]].x:  0.17083  0.04163  0.91243  0.78323  0.65404  0.52484  0.39564  0.26644  0.13725  0.00805 
done my job in  0.09365 seconds, waiting for slow processes...
evaluated 999000 3D interactions in  0.09555 seconds
The most important thing to observe is that on this example it took only 0.008 seconds to exchange information using MPI_Allgatherv, whereas it took 0.095 seconds to perform the computation. Our program spends only 8% of its wall clock time communicating which is quite good.

Will this work just as well if we increase the number of particles to, say, 10,000? It will work even better:

gustav@sp20:../MPI 21:21:17 !575 $ cat particles.out
1111 particles per processor
total number of particles = 9999
offsets: 0 1111 2222 3333 4444 5555 6666 7777 8888 
Communicating =  0.03104 seconds
particles[offsets[i]].x:  0.17083  0.04163  0.91243  0.78323  0.65404  0.52484  0.39564  0.26644  0.13725 
done my job in 10.61837 seconds, waiting for slow processes...
evaluated 99970002 3D interactions in 20.73818 seconds
gustav@sp20:../MPI 21:21:22 !576 $
This time our communication took only 0.15% of computation time.

The moral of the story is that in this case, at least, the larger the size of the problem on a node the more efficient the parallelisation, because the cost of communication is going to be, in proportion, that much less. This state of affairs is pretty common, meaning that the larger the problem in general the more effective parallelisation may be. For smaller problems on the other hand, the cost of parallelisation may be sometimes prohibitive: a parallelised version of a small problem may take much longer to execute than a sequential version.

next up previous index
Next: Manipulating Communicators Up: Interacting Particles Previous: The Code
Zdzislaw Meglicki