Discussion:
[OMPI users] Parallel MPI broadcasts (parameterized)
Konstantinos Konstantinidis
2017-11-01 03:42:04 UTC
Permalink
Assume that we have K=q*k nodes (slaves) where q,k are positive integers >=
2.

Based on the scheme that I am currently using I create [q^(k-1)]*(q-1)
groups (along with their communicators). Each group consists of k nodes and
within each group exactly k broadcasts take place (each node broadcasts
something to the rest of them). So in total [q^(k-1)]*(q-1)*k MPI
broadcasts take place. Let me skip the details of the above scheme.

Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here
https://stackoverflow.com/questions/11372012/mpi-several-broadcast-at-the-same-time
that
this is possible in MPI.

In my case it's more complicated since q,k are parameters of the problem
and change between different experiments. If I get the idea about the 2nd
method that is proposed there and assume that we have only 3 groups within
which some communication takes places one can simply do:

*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *

where comm1, comm2, comm3 are the corresponding sub-communicators that
contain only the members of each group.

But how can I generalize the above idea to arbitrary number of groups or
perhaps do something else?

The code is in C++ and the MPI installed is described in the attached file.

Regards,
Kostas
Konstantinos Konstantinidis
2017-11-01 03:50:42 UTC
Permalink
Let me clarify one thing,

When I said "there are q-1 groups that can communicate in parallel at the
same time" I meant that this is possible at any particular time. So at the
beginning we have q-1 groups that could communicate in parallel, then
another set of q-1 groups and so on until we exhaust all groups. My hope is
that the speedup can be such that the total number of broadcasts i.e.
[q^(k-1)]*(q-1)*k
to be executed in time equivalent to only [q^(k-1)]*k broadcasts.

Cheers,
Kostas.

On Tue, Oct 31, 2017 at 10:42 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Assume that we have K=q*k nodes (slaves) where q,k are positive integers
= 2.
Based on the scheme that I am currently using I create [q^(k-1)]*(q-1)
groups (along with their communicators). Each group consists of k nodes and
within each group exactly k broadcasts take place (each node broadcasts
something to the rest of them). So in total [q^(k-1)]*(q-1)*k MPI
broadcasts take place. Let me skip the details of the above scheme.
Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here https://stackoverflow.com/questions/11372012/mpi-severa
l-broadcast-at-the-same-time that this is possible in MPI.
In my case it's more complicated since q,k are parameters of the problem
and change between different experiments. If I get the idea about the 2nd
method that is proposed there and assume that we have only 3 groups within
*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *
where comm1, comm2, comm3 are the corresponding sub-communicators that
contain only the members of each group.
But how can I generalize the above idea to arbitrary number of groups or
perhaps do something else?
The code is in C++ and the MPI installed is described in the attached file.
Regards,
Kostas
George Bosilca
2017-11-01 04:11:49 UTC
Permalink
It really depends what are you trying to achieve. If the question is
rhetorical: "can I write a code that does in parallel broadcasts on
independent groups of processes ?" then the answer is yes, this is
certainly possible. If however you add a hint of practicality in your
question "can I write an efficient parallel broadcast between independent
groups of processes?" then I'm afraid the answer will be a negative one.

Let's not look at how you can write the multiple bcast code as the answer
in the stackoverflow is correct, but instead look at what resources these
collective operations are using. In general you can assume that nodes are
connected by a network, able to move data at a rate B in both directions
(full duplex). Assuming the implementation of the bcast algorithm is not
entirely moronic, the bcast can saturate the network with a single process
per node. Now, if you have multiple processes per node (P) then either you
schedule them sequentially (so that each one has the full bandwidth B) or
you let them progress in parallel in which case each participating process
can claim a lower bandwidth B/P (as it is shared between all processes on
the nore).

So even if you are able to expose enough parallelism, physical resources
will impose the real hard limit.

That being said I have the impression you are trying to implement an
MPI_Allgather(v) using a series of MPI_Bcast. Is that true ?

George.

PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1)
communicator might be prohibitive; the MPI library might support a limited
number of communicators.


On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Assume that we have K=q*k nodes (slaves) where q,k are positive integers
= 2.
Based on the scheme that I am currently using I create [q^(k-1)]*(q-1)
groups (along with their communicators). Each group consists of k nodes and
within each group exactly k broadcasts take place (each node broadcasts
something to the rest of them). So in total [q^(k-1)]*(q-1)*k MPI
broadcasts take place. Let me skip the details of the above scheme.
Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here https://stackoverflow.com/questions/11372012/mpi-
several-broadcast-at-the-same-time that this is possible in MPI.
In my case it's more complicated since q,k are parameters of the problem
and change between different experiments. If I get the idea about the 2nd
method that is proposed there and assume that we have only 3 groups within
*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *
where comm1, comm2, comm3 are the corresponding sub-communicators that
contain only the members of each group.
But how can I generalize the above idea to arbitrary number of groups or
perhaps do something else?
The code is in C++ and the MPI installed is described in the attached file.
Regards,
Kostas
_______________________________________________
users mailing list
https://lists.open-mpi.org/mailman/listinfo/users
Konstantinos Konstantinidis
2017-11-06 03:23:19 UTC
Permalink
Hi George,

First, let me note that the cost of q^(k-1)]*(q-1) communicators was fine
for the values of parameters q,k I am working with. Also, the whole point
of speeding up the shuffling phase is trying to reduce this number even
more (compared to already known implementations) which is a major concern
of my project. But thanks for pointing that out. Btw, do you know what is
the maximum such number in MPI?

Now to the main part of the question, let me clarify that I have 1 process
per machine. I don't know if this is important here but my way of thinking
is that we have a big text file and each process will have to work on some
chunks of it (like chapters of a book). But each process resides in an
machine with some RAM which is able to handle a specific amount of work so
if you generate many processes per machine you must have fewer book
chapters per process than before. Thus, I wanted to avoid thinking in the
process-level rather than machine-level with the RAM limitations.

Now to the actual shuffling, here is what I am currently doing (Option 1):

Let's denote the data that slave s has to send to the slaves in group G as
D(s,G).

*for each slave s in 1,2,...,K{*

* for each group G that s participates into{*

* if (my rank is s){*
* MPI_Bcast(send data D(s,G))*
* }else if(my rank is in group G)*
* MPI_Bcast(get data D(s,G))*
* }else{*
* Do nothing*
* }*

* }*

*MPI::COMM_WORLD.Barrier();*

*}*

What I suggested before to speedup things (Option 2) is:

*for each set {G(1),G(2),...,G(q-1)} of q-1 disjoint groups{ *

* for each slave s in G(1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(1)))*
* }else if(**my rank is in** group G(1))*
* MPI_Bcast(get data D(s,G(1)))*
* }else{*
* Do nothing*
* }*
* }*

* for each slave s in G(2)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(2)))*
* }else if(**my rank is in** G(2))*
* MPI_Bcast(get data D(s,G(2)))*
* }else{*
* Do nothing*
* }*
* }*

* ...*

*for each slave s in G(q-1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(q-1)))*
* }else if(**my rank is in** G(q-1))*
* MPI_Bcast(get data D(s,G(q-1)))*
* }else{*
* Do nothing*
* }*
* }*

* MPI::COMM_WORLD.Barrier();*

*}*

My hope was that I could implement Option 2 (in some way without copying
and pasting the same code q-1 times every time I change q) and that this
could bring a speedup of q-1 compared to Option 1 by having these groups
communicate in parallel. Right, now I am trying to find a way to identify
these sets of groups based on my implementation, which involves some
abstract algebra but for now let's assume that I can find them in an
efficient manner.

Let me emphasize that each broadcast sends different actual data. There are
no two broadcasts that send the same D(s,G).

Finally, let's go to MPI_Allgather(): I am really confused since I have
never used this call but I have this image in my mind:


​
I am not sure what you meant but now I am thinking of this (let commG be
the intra-communicator of group G):

*for each possible group G{*

*if (my rank is in G){*
* commG.MPI_AllGather(**send data D(rank,G)**)*
* }**else{*
* Do nothing*
* }*

*MPI::COMM_WORLD.Barrier();*

*}*

I am not sure whether this makes sense since I am confused about the
correspodence of the data transmitted with Allgather() compared to the
notation D(s,G) I am currently using.

Thanks.
Post by George Bosilca
It really depends what are you trying to achieve. If the question is
rhetorical: "can I write a code that does in parallel broadcasts on
independent groups of processes ?" then the answer is yes, this is
certainly possible. If however you add a hint of practicality in your
question "can I write an efficient parallel broadcast between independent
groups of processes?" then I'm afraid the answer will be a negative one.
Let's not look at how you can write the multiple bcast code as the answer
in the stackoverflow is correct, but instead look at what resources these
collective operations are using. In general you can assume that nodes are
connected by a network, able to move data at a rate B in both directions
(full duplex). Assuming the implementation of the bcast algorithm is not
entirely moronic, the bcast can saturate the network with a single process
per node. Now, if you have multiple processes per node (P) then either you
schedule them sequentially (so that each one has the full bandwidth B) or
you let them progress in parallel in which case each participating process
can claim a lower bandwidth B/P (as it is shared between all processes on
the nore).
So even if you are able to expose enough parallelism, physical resources
will impose the real hard limit.
That being said I have the impression you are trying to implement an
MPI_Allgather(v) using a series of MPI_Bcast. Is that true ?
George.
PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1)
communicator might be prohibitive; the MPI library might support a limited
number of communicators.
On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Assume that we have K=q*k nodes (slaves) where q,k are positive integers
= 2.
Based on the scheme that I am currently using I create [q^(k-1)]*(q-1)
groups (along with their communicators). Each group consists of k nodes and
within each group exactly k broadcasts take place (each node broadcasts
something to the rest of them). So in total [q^(k-1)]*(q-1)*k MPI
broadcasts take place. Let me skip the details of the above scheme.
Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here https://stackoverflow.com/questions/11372012/mpi-severa
l-broadcast-at-the-same-time that this is possible in MPI.
In my case it's more complicated since q,k are parameters of the problem
and change between different experiments. If I get the idea about the 2nd
method that is proposed there and assume that we have only 3 groups within
*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *
where comm1, comm2, comm3 are the corresponding sub-communicators that
contain only the members of each group.
But how can I generalize the above idea to arbitrary number of groups or
perhaps do something else?
The code is in C++ and the MPI installed is described in the attached file.
Regards,
Kostas
_______________________________________________
users mailing list
https://lists.open-mpi.org/mailman/listinfo/users
George Bosilca
2017-11-06 14:58:10 UTC
Permalink
On Sun, Nov 5, 2017 at 10:23 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Hi George,
First, let me note that the cost of q^(k-1)]*(q-1) communicators was fine
for the values of parameters q,k I am working with. Also, the whole point
of speeding up the shuffling phase is trying to reduce this number even
more (compared to already known implementations) which is a major concern
of my project. But thanks for pointing that out. Btw, do you know what is
the maximum such number in MPI?
Last time I run into such troubles these limits were: 2k for MVAPICH, 16k
for MPICH and 2^30-1 for OMPI (all positive signed 23 bits integers). It
might have changed meanwhile.
Post by Konstantinos Konstantinidis
Now to the main part of the question, let me clarify that I have 1 process
per machine. I don't know if this is important here but my way of thinking
is that we have a big text file and each process will have to work on some
chunks of it (like chapters of a book). But each process resides in an
machine with some RAM which is able to handle a specific amount of work so
if you generate many processes per machine you must have fewer book
chapters per process than before. Thus, I wanted to avoid thinking in the
process-level rather than machine-level with the RAM limitations.
Let's denote the data that slave s has to send to the slaves in group G as
D(s,G).
*for each slave s in 1,2,...,K{*
* for each group G that s participates into{*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G))*
* }else if(my rank is in group G)*
* MPI_Bcast(get data D(s,G))*
* }else{*
* Do nothing*
* }*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
*for each set {G(1),G(2),...,G(q-1)} of q-1 disjoint groups{ *
* for each slave s in G(1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(1)))*
* }else if(**my rank is in** group G(1))*
* MPI_Bcast(get data D(s,G(1)))*
* }else{*
* Do nothing*
* }*
* }*
* for each slave s in G(2)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(2)))*
* }else if(**my rank is in** G(2))*
* MPI_Bcast(get data D(s,G(2)))*
* }else{*
* Do nothing*
* }*
* }*
* ...*
*for each slave s in G(q-1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(q-1)))*
* }else if(**my rank is in** G(q-1))*
* MPI_Bcast(get data D(s,G(q-1)))*
* }else{*
* Do nothing*
* }*
* }*
* MPI::COMM_WORLD.Barrier();*
*}*
My hope was that I could implement Option 2 (in some way without copying
and pasting the same code q-1 times every time I change q) and that this
could bring a speedup of q-1 compared to Option 1 by having these groups
communicate in parallel. Right, now I am trying to find a way to identify
these sets of groups based on my implementation, which involves some
abstract algebra but for now let's assume that I can find them in an
efficient manner.
Let me emphasize that each broadcast sends different actual data. There
are no two broadcasts that send the same D(s,G).
Finally, let's go to MPI_Allgather(): I am really confused since I have
If every member of a group does a bcast to all other members of the same
group, then this operation is better realized by an allgather. The picture
you attached clearly expose the data movement pattern where each color box
gets distributed to all members of the same communicator. You could also
see this operation as a loop of bcast where the iterator goes over all
members of the communicator and use it as a root.
Post by Konstantinos Konstantinidis
​
I am not sure what you meant but now I am thinking of this (let commG be
*for each possible group G{*
*if (my rank is in G){*
* commG.MPI_AllGather(**send data D(rank,G)**)*
* }**else{*
* Do nothing*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
This is indeed what I was thinking about, with the condition that you make
sure the list of communicators in G is ordered in the same way on all
processes.

That being said, this communication pattern 1) generated a large barrier in
your code; 2) as all processes will potentially be involved in many
collective communications you will be hammering the network in a
significant way (so you will have to take into account the network
congestion); and 3) all processes need to have all memory for receive
allocated for the buffers. Thus, even be implementing a nice communication
scheme you might encounter some performance issues.

Another way to do this is to instead of conglomerating all communications
in a single temporal location you spread them out across time by imposing
your own communication logic. This basically translate a set of blocking
collective (bcast is a perfect target) into a pipelined mix. Instead of
describing such a scheme here I suggest you read the algorithmic
description of the SUMMA and/or PUMMA distributed matrix multiplication.

George.


I am not sure whether this makes sense since I am confused about the
Post by Konstantinos Konstantinidis
correspodence of the data transmitted with Allgather() compared to the
notation D(s,G) I am currently using.
Thanks.
Post by George Bosilca
It really depends what are you trying to achieve. If the question is
rhetorical: "can I write a code that does in parallel broadcasts on
independent groups of processes ?" then the answer is yes, this is
certainly possible. If however you add a hint of practicality in your
question "can I write an efficient parallel broadcast between independent
groups of processes?" then I'm afraid the answer will be a negative one.
Let's not look at how you can write the multiple bcast code as the answer
in the stackoverflow is correct, but instead look at what resources these
collective operations are using. In general you can assume that nodes are
connected by a network, able to move data at a rate B in both directions
(full duplex). Assuming the implementation of the bcast algorithm is not
entirely moronic, the bcast can saturate the network with a single process
per node. Now, if you have multiple processes per node (P) then either you
schedule them sequentially (so that each one has the full bandwidth B) or
you let them progress in parallel in which case each participating process
can claim a lower bandwidth B/P (as it is shared between all processes on
the nore).
So even if you are able to expose enough parallelism, physical resources
will impose the real hard limit.
That being said I have the impression you are trying to implement an
MPI_Allgather(v) using a series of MPI_Bcast. Is that true ?
George.
PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1)
communicator might be prohibitive; the MPI library might support a limited
number of communicators.
On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Assume that we have K=q*k nodes (slaves) where q,k are positive integers
= 2.
Based on the scheme that I am currently using I create [q^(k-1)]*(q-1)
groups (along with their communicators). Each group consists of k nodes and
within each group exactly k broadcasts take place (each node broadcasts
something to the rest of them). So in total [q^(k-1)]*(q-1)*k MPI
broadcasts take place. Let me skip the details of the above scheme.
Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here https://stackoverflow.com/questions/11372012/mpi-severa
l-broadcast-at-the-same-time that this is possible in MPI.
In my case it's more complicated since q,k are parameters of the problem
and change between different experiments. If I get the idea about the 2nd
method that is proposed there and assume that we have only 3 groups within
*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *
where comm1, comm2, comm3 are the corresponding sub-communicators that
contain only the members of each group.
But how can I generalize the above idea to arbitrary number of groups or
perhaps do something else?
The code is in C++ and the MPI installed is described in the attached file.
Regards,
Kostas
_______________________________________________
users mailing list
https://lists.open-mpi.org/mailman/listinfo/users
Konstantinos Konstantinidis
2017-11-07 09:23:28 UTC
Permalink
OK, I started implementing the above Allgather() idea without success
(segmentation fault). So I will post the problematic lines hare:

* comm.Allgather(&(endata.size), 1, MPI::UNSIGNED_LONG_LONG,
&(endata_rcv.size), 1, MPI::UNSIGNED_LONG_LONG);*
* endata_rcv.data = new unsigned char[endata_rcv.size*lineSize];*
* comm.Allgather(&(endata.data), endata.size*lineSize, MPI::UNSIGNED_CHAR,
&(endata_rcv.data), endata_rcv.size*lineSize, MPI::UNSIGNED_CHAR);*
* delete [] endata.data;*

The idea (as it was also for the broadcasts) is first to transmit the data
size as an unsigned long long integer, so that the receivers will reserve
the required memory for the actual data to be transmitted after that. To my
understanding, the problem is that each broadcasted data, let D(s,G), as I
explained in the previous email is not only different but also has
different size (in general). That's because if I replace the 3rd line with

* comm.Allgather(&(endata.data), 1, MPI::UNSIGNED_CHAR, &(endata_rcv.data),
1, MPI::UNSIGNED_CHAR);*

seems to work without seg. fault but this is pointless for me since I don't
want only 1 char to be transmitted. So if we see the previous image I
posted, imagine that the red, green and blue squares are different in size?
Can Allgather() even work then? If no, do you suggest anything else or I am
trapped in using the MPI_Bcast() as shown in Option 1?
Post by George Bosilca
On Sun, Nov 5, 2017 at 10:23 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Hi George,
First, let me note that the cost of q^(k-1)]*(q-1) communicators was
fine for the values of parameters q,k I am working with. Also, the whole
point of speeding up the shuffling phase is trying to reduce this number
even more (compared to already known implementations) which is a major
concern of my project. But thanks for pointing that out. Btw, do you know
what is the maximum such number in MPI?
Last time I run into such troubles these limits were: 2k for MVAPICH, 16k
for MPICH and 2^30-1 for OMPI (all positive signed 23 bits integers). It
might have changed meanwhile.
Post by Konstantinos Konstantinidis
Now to the main part of the question, let me clarify that I have 1
process per machine. I don't know if this is important here but my way of
thinking is that we have a big text file and each process will have to work
on some chunks of it (like chapters of a book). But each process resides in
an machine with some RAM which is able to handle a specific amount of work
so if you generate many processes per machine you must have fewer book
chapters per process than before. Thus, I wanted to avoid thinking in the
process-level rather than machine-level with the RAM limitations.
Let's denote the data that slave s has to send to the slaves in group G
as D(s,G).
*for each slave s in 1,2,...,K{*
* for each group G that s participates into{*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G))*
* }else if(my rank is in group G)*
* MPI_Bcast(get data D(s,G))*
* }else{*
* Do nothing*
* }*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
*for each set {G(1),G(2),...,G(q-1)} of q-1 disjoint groups{ *
* for each slave s in G(1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(1)))*
* }else if(**my rank is in** group G(1))*
* MPI_Bcast(get data D(s,G(1)))*
* }else{*
* Do nothing*
* }*
* }*
* for each slave s in G(2)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(2)))*
* }else if(**my rank is in** G(2))*
* MPI_Bcast(get data D(s,G(2)))*
* }else{*
* Do nothing*
* }*
* }*
* ...*
*for each slave s in G(q-1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(q-1)))*
* }else if(**my rank is in** G(q-1))*
* MPI_Bcast(get data D(s,G(q-1)))*
* }else{*
* Do nothing*
* }*
* }*
* MPI::COMM_WORLD.Barrier();*
*}*
My hope was that I could implement Option 2 (in some way without copying
and pasting the same code q-1 times every time I change q) and that this
could bring a speedup of q-1 compared to Option 1 by having these groups
communicate in parallel. Right, now I am trying to find a way to identify
these sets of groups based on my implementation, which involves some
abstract algebra but for now let's assume that I can find them in an
efficient manner.
Let me emphasize that each broadcast sends different actual data. There
are no two broadcasts that send the same D(s,G).
Finally, let's go to MPI_Allgather(): I am really confused since I have
If every member of a group does a bcast to all other members of the same
group, then this operation is better realized by an allgather. The picture
you attached clearly expose the data movement pattern where each color box
gets distributed to all members of the same communicator. You could also
see this operation as a loop of bcast where the iterator goes over all
members of the communicator and use it as a root.
Post by Konstantinos Konstantinidis
​
I am not sure what you meant but now I am thinking of this (let commG be
*for each possible group G{*
*if (my rank is in G){*
* commG.MPI_AllGather(**send data D(rank,G)**)*
* }**else{*
* Do nothing*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
This is indeed what I was thinking about, with the condition that you make
sure the list of communicators in G is ordered in the same way on all
processes.
That being said, this communication pattern 1) generated a large barrier
in your code; 2) as all processes will potentially be involved in many
collective communications you will be hammering the network in a
significant way (so you will have to take into account the network
congestion); and 3) all processes need to have all memory for receive
allocated for the buffers. Thus, even be implementing a nice communication
scheme you might encounter some performance issues.
Another way to do this is to instead of conglomerating all communications
in a single temporal location you spread them out across time by imposing
your own communication logic. This basically translate a set of blocking
collective (bcast is a perfect target) into a pipelined mix. Instead of
describing such a scheme here I suggest you read the algorithmic
description of the SUMMA and/or PUMMA distributed matrix multiplication.
George.
I am not sure whether this makes sense since I am confused about the
Post by Konstantinos Konstantinidis
correspodence of the data transmitted with Allgather() compared to the
notation D(s,G) I am currently using.
Thanks.
Post by George Bosilca
It really depends what are you trying to achieve. If the question is
rhetorical: "can I write a code that does in parallel broadcasts on
independent groups of processes ?" then the answer is yes, this is
certainly possible. If however you add a hint of practicality in your
question "can I write an efficient parallel broadcast between independent
groups of processes?" then I'm afraid the answer will be a negative one.
Let's not look at how you can write the multiple bcast code as the
answer in the stackoverflow is correct, but instead look at what resources
these collective operations are using. In general you can assume that nodes
are connected by a network, able to move data at a rate B in both
directions (full duplex). Assuming the implementation of the bcast
algorithm is not entirely moronic, the bcast can saturate the network with
a single process per node. Now, if you have multiple processes per node (P)
then either you schedule them sequentially (so that each one has the full
bandwidth B) or you let them progress in parallel in which case each
participating process can claim a lower bandwidth B/P (as it is shared
between all processes on the nore).
So even if you are able to expose enough parallelism, physical resources
will impose the real hard limit.
That being said I have the impression you are trying to implement an
MPI_Allgather(v) using a series of MPI_Bcast. Is that true ?
George.
PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1)
communicator might be prohibitive; the MPI library might support a limited
number of communicators.
On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Assume that we have K=q*k nodes (slaves) where q,k are positive
integers >= 2.
Based on the scheme that I am currently using I create [q^(k-1)]*(q-1)
groups (along with their communicators). Each group consists of k nodes and
within each group exactly k broadcasts take place (each node broadcasts
something to the rest of them). So in total [q^(k-1)]*(q-1)*k MPI
broadcasts take place. Let me skip the details of the above scheme.
Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here https://stackoverflow.com/questions/11372012/mpi-severa
l-broadcast-at-the-same-time that this is possible in MPI.
In my case it's more complicated since q,k are parameters of the
problem and change between different experiments. If I get the idea about
the 2nd method that is proposed there and assume that we have only 3 groups
*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *
where comm1, comm2, comm3 are the corresponding sub-communicators that
contain only the members of each group.
But how can I generalize the above idea to arbitrary number of groups
or perhaps do something else?
The code is in C++ and the MPI installed is described in the attached file.
Regards,
Kostas
_______________________________________________
users mailing list
https://lists.open-mpi.org/mailman/listinfo/users
George Bosilca
2017-11-07 15:34:14 UTC
Permalink
If each process send a different amount of data, then the operation should
be an allgatherv. This also requires that you know the amount each process
will send, so you will need a allgather. Schematically the code should look
like the following:

long bytes_send_count = endata.size * sizeof(long); // compute the amount
of data sent by this process
long* recv_counts = (long*)malloc(comm_size * sizeof(long)); // allocate
buffer to receive the amounts from all peers
int displs = (int*)malloc(comm_size * sizeof(int)); // allocate buffer to
compute the displacements for each peer
MPI_Allgather( &bytes_send_count, 1, MPI_LONG, recv_counts, 1, MPI_LONG,
comm); // exchange the amount of sent data
long total = 0; // we need a total amount of data to be received
for( int i = 0; i < comm_size; i++) {
displs[i] = total; // update the displacements
total += recv_counts[i]; // and the total count
}
char* recv_buf = (char*)malloc(total * sizeof(char)); // prepare buffer
for the allgatherv
MPI_Allgatherv( &(endata.data), endata.size*sizeof(char),
MPI_UNSIGNED_CHAR, recv_buf, recv_counts, displs, MPI_UNSIGNED_CHAR, comm);

George.



On Tue, Nov 7, 2017 at 4:23 AM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
OK, I started implementing the above Allgather() idea without success
* comm.Allgather(&(endata.size), 1, MPI::UNSIGNED_LONG_LONG,
&(endata_rcv.size), 1, MPI::UNSIGNED_LONG_LONG);*
* endata_rcv.data = new unsigned char[endata_rcv.size*lineSize];*
* comm.Allgather(&(endata.data), endata.size*lineSize, MPI::UNSIGNED_CHAR,
&(endata_rcv.data), endata_rcv.size*lineSize, MPI::UNSIGNED_CHAR);*
* delete [] endata.data;*
The idea (as it was also for the broadcasts) is first to transmit the data
size as an unsigned long long integer, so that the receivers will reserve
the required memory for the actual data to be transmitted after that. To my
understanding, the problem is that each broadcasted data, let D(s,G), as I
explained in the previous email is not only different but also has
different size (in general). That's because if I replace the 3rd line with
* comm.Allgather(&(endata.data), 1, MPI::UNSIGNED_CHAR,
&(endata_rcv.data), 1, MPI::UNSIGNED_CHAR);*
seems to work without seg. fault but this is pointless for me since I
don't want only 1 char to be transmitted. So if we see the previous image I
posted, imagine that the red, green and blue squares are different in size?
Can Allgather() even work then? If no, do you suggest anything else or I am
trapped in using the MPI_Bcast() as shown in Option 1?
Post by George Bosilca
On Sun, Nov 5, 2017 at 10:23 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Hi George,
First, let me note that the cost of q^(k-1)]*(q-1) communicators was
fine for the values of parameters q,k I am working with. Also, the whole
point of speeding up the shuffling phase is trying to reduce this number
even more (compared to already known implementations) which is a major
concern of my project. But thanks for pointing that out. Btw, do you know
what is the maximum such number in MPI?
Last time I run into such troubles these limits were: 2k for MVAPICH, 16k
for MPICH and 2^30-1 for OMPI (all positive signed 23 bits integers). It
might have changed meanwhile.
Post by Konstantinos Konstantinidis
Now to the main part of the question, let me clarify that I have 1
process per machine. I don't know if this is important here but my way of
thinking is that we have a big text file and each process will have to work
on some chunks of it (like chapters of a book). But each process resides in
an machine with some RAM which is able to handle a specific amount of work
so if you generate many processes per machine you must have fewer book
chapters per process than before. Thus, I wanted to avoid thinking in the
process-level rather than machine-level with the RAM limitations.
Let's denote the data that slave s has to send to the slaves in group G
as D(s,G).
*for each slave s in 1,2,...,K{*
* for each group G that s participates into{*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G))*
* }else if(my rank is in group G)*
* MPI_Bcast(get data D(s,G))*
* }else{*
* Do nothing*
* }*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
*for each set {G(1),G(2),...,G(q-1)} of q-1 disjoint groups{ *
* for each slave s in G(1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(1)))*
* }else if(**my rank is in** group G(1))*
* MPI_Bcast(get data D(s,G(1)))*
* }else{*
* Do nothing*
* }*
* }*
* for each slave s in G(2)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(2)))*
* }else if(**my rank is in** G(2))*
* MPI_Bcast(get data D(s,G(2)))*
* }else{*
* Do nothing*
* }*
* }*
* ...*
*for each slave s in G(q-1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(q-1)))*
* }else if(**my rank is in** G(q-1))*
* MPI_Bcast(get data D(s,G(q-1)))*
* }else{*
* Do nothing*
* }*
* }*
* MPI::COMM_WORLD.Barrier();*
*}*
My hope was that I could implement Option 2 (in some way without copying
and pasting the same code q-1 times every time I change q) and that this
could bring a speedup of q-1 compared to Option 1 by having these groups
communicate in parallel. Right, now I am trying to find a way to identify
these sets of groups based on my implementation, which involves some
abstract algebra but for now let's assume that I can find them in an
efficient manner.
Let me emphasize that each broadcast sends different actual data. There
are no two broadcasts that send the same D(s,G).
Finally, let's go to MPI_Allgather(): I am really confused since I have
If every member of a group does a bcast to all other members of the same
group, then this operation is better realized by an allgather. The picture
you attached clearly expose the data movement pattern where each color box
gets distributed to all members of the same communicator. You could also
see this operation as a loop of bcast where the iterator goes over all
members of the communicator and use it as a root.
Post by Konstantinos Konstantinidis
​
I am not sure what you meant but now I am thinking of this (let commG be
*for each possible group G{*
*if (my rank is in G){*
* commG.MPI_AllGather(**send data D(rank,G)**)*
* }**else{*
* Do nothing*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
This is indeed what I was thinking about, with the condition that you
make sure the list of communicators in G is ordered in the same way on all
processes.
That being said, this communication pattern 1) generated a large barrier
in your code; 2) as all processes will potentially be involved in many
collective communications you will be hammering the network in a
significant way (so you will have to take into account the network
congestion); and 3) all processes need to have all memory for receive
allocated for the buffers. Thus, even be implementing a nice communication
scheme you might encounter some performance issues.
Another way to do this is to instead of conglomerating all communications
in a single temporal location you spread them out across time by imposing
your own communication logic. This basically translate a set of blocking
collective (bcast is a perfect target) into a pipelined mix. Instead of
describing such a scheme here I suggest you read the algorithmic
description of the SUMMA and/or PUMMA distributed matrix multiplication.
George.
I am not sure whether this makes sense since I am confused about the
Post by Konstantinos Konstantinidis
correspodence of the data transmitted with Allgather() compared to the
notation D(s,G) I am currently using.
Thanks.
Post by George Bosilca
It really depends what are you trying to achieve. If the question is
rhetorical: "can I write a code that does in parallel broadcasts on
independent groups of processes ?" then the answer is yes, this is
certainly possible. If however you add a hint of practicality in your
question "can I write an efficient parallel broadcast between independent
groups of processes?" then I'm afraid the answer will be a negative one.
Let's not look at how you can write the multiple bcast code as the
answer in the stackoverflow is correct, but instead look at what resources
these collective operations are using. In general you can assume that nodes
are connected by a network, able to move data at a rate B in both
directions (full duplex). Assuming the implementation of the bcast
algorithm is not entirely moronic, the bcast can saturate the network with
a single process per node. Now, if you have multiple processes per node (P)
then either you schedule them sequentially (so that each one has the full
bandwidth B) or you let them progress in parallel in which case each
participating process can claim a lower bandwidth B/P (as it is shared
between all processes on the nore).
So even if you are able to expose enough parallelism, physical
resources will impose the real hard limit.
That being said I have the impression you are trying to implement an
MPI_Allgather(v) using a series of MPI_Bcast. Is that true ?
George.
PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1)
communicator might be prohibitive; the MPI library might support a limited
number of communicators.
On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Assume that we have K=q*k nodes (slaves) where q,k are positive
integers >= 2.
Based on the scheme that I am currently using I create [q^(k-1)]*(q-1)
groups (along with their communicators). Each group consists of k nodes and
within each group exactly k broadcasts take place (each node broadcasts
something to the rest of them). So in total [q^(k-1)]*(q-1)*k MPI
broadcasts take place. Let me skip the details of the above scheme.
Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here https://stackoverflow.com/questions/11372012/mpi-severa
l-broadcast-at-the-same-time that this is possible in MPI.
In my case it's more complicated since q,k are parameters of the
problem and change between different experiments. If I get the idea about
the 2nd method that is proposed there and assume that we have only 3 groups
*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *
where comm1, comm2, comm3 are the corresponding sub-communicators that
contain only the members of each group.
But how can I generalize the above idea to arbitrary number of groups
or perhaps do something else?
The code is in C++ and the MPI installed is described in the attached file.
Regards,
Kostas
_______________________________________________
users mailing list
https://lists.open-mpi.org/mailman/listinfo/users
Konstantinos Konstantinidis
2017-11-07 23:09:15 UTC
Permalink
OK, I will try to explain a few more things about the shuffling and I have
attached only specific excerpts of the code to avoid confusion. I have
added many comments.

First, let me note that this project is an implementation of the Terasort
benchmark with a master node which assigns jobs to the slaves and
communicates with them after each phase to get measurements.

The file shuffle_before.cc shows how I am doing the shuffling up to now and
the shuffle_after.cc the progress I made so far switching to Allgatherv().

I have also included the code that measures time and data size since it's
crucial for me to check if I have rate speedup.

Some questions I have are:
1. At shuffle_after.cc:61 why do we reserve *comm.Get_size() *entries for*
recv_counts* and not *comm.Get_size()-1 *? For example if I am rank k what
is the point of *recv_counts[k-1]*? I guess that rank k also receives data
from himself but we can ignore it, right?

2. My next concern is about the structure of the buffer *recv_buf[]*. The
documentation says that the data is stored there ordered. So I assume that
it's stored as segments of char* ordered by rank and the way to distinguish
them is to chop the whole data based on *recv_counts[]*. So let G = {g1,
g2, ..., gN} a group that exchanges data. Let's take slave g2: Then
segment *recv_buf[0
until **recv_counts[0]-1**] *is what g2 received from g1,
*recv_buf[**recv_counts[0]
until **recv_counts[1]-1**] *is what g2 received from himself (ignore it),
and so on... Is this idea correct?

So I have written a sketch of the code at shuffle_after.cc which I also try
to explain how the master will compute rate, but at least I want to make it
work.

I know that this discussion is getting long but if you have some free time
can you take a look at it?

Thanks,
Kostas
Post by George Bosilca
If each process send a different amount of data, then the operation should
be an allgatherv. This also requires that you know the amount each process
will send, so you will need a allgather. Schematically the code should look
long bytes_send_count = endata.size * sizeof(long); // compute the amount
of data sent by this process
long* recv_counts = (long*)malloc(comm_size * sizeof(long)); // allocate
buffer to receive the amounts from all peers
int displs = (int*)malloc(comm_size * sizeof(int)); // allocate buffer to
compute the displacements for each peer
MPI_Allgather( &bytes_send_count, 1, MPI_LONG, recv_counts, 1, MPI_LONG,
comm); // exchange the amount of sent data
long total = 0; // we need a total amount of data to be received
for( int i = 0; i < comm_size; i++) {
displs[i] = total; // update the displacements
total += recv_counts[i]; // and the total count
}
char* recv_buf = (char*)malloc(total * sizeof(char)); // prepare buffer
for the allgatherv
MPI_Allgatherv( &(endata.data), endata.size*sizeof(char),
MPI_UNSIGNED_CHAR, recv_buf, recv_counts, displs, MPI_UNSIGNED_CHAR, comm);
George.
On Tue, Nov 7, 2017 at 4:23 AM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
OK, I started implementing the above Allgather() idea without success
* comm.Allgather(&(endata.size), 1, MPI::UNSIGNED_LONG_LONG,
&(endata_rcv.size), 1, MPI::UNSIGNED_LONG_LONG);*
* endata_rcv.data = new unsigned char[endata_rcv.size*lineSize];*
* comm.Allgather(&(endata.data), endata.size*lineSize,
MPI::UNSIGNED_CHAR, &(endata_rcv.data), endata_rcv.size*lineSize,
MPI::UNSIGNED_CHAR);*
* delete [] endata.data;*
The idea (as it was also for the broadcasts) is first to transmit the
data size as an unsigned long long integer, so that the receivers will
reserve the required memory for the actual data to be transmitted after
that. To my understanding, the problem is that each broadcasted data, let
D(s,G), as I explained in the previous email is not only different but also
has different size (in general). That's because if I replace the 3rd line
with
* comm.Allgather(&(endata.data), 1, MPI::UNSIGNED_CHAR,
&(endata_rcv.data), 1, MPI::UNSIGNED_CHAR);*
seems to work without seg. fault but this is pointless for me since I
don't want only 1 char to be transmitted. So if we see the previous image I
posted, imagine that the red, green and blue squares are different in size?
Can Allgather() even work then? If no, do you suggest anything else or I am
trapped in using the MPI_Bcast() as shown in Option 1?
Post by George Bosilca
On Sun, Nov 5, 2017 at 10:23 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Hi George,
First, let me note that the cost of q^(k-1)]*(q-1) communicators was
fine for the values of parameters q,k I am working with. Also, the whole
point of speeding up the shuffling phase is trying to reduce this number
even more (compared to already known implementations) which is a major
concern of my project. But thanks for pointing that out. Btw, do you know
what is the maximum such number in MPI?
Last time I run into such troubles these limits were: 2k for MVAPICH,
16k for MPICH and 2^30-1 for OMPI (all positive signed 23 bits integers).
It might have changed meanwhile.
Post by Konstantinos Konstantinidis
Now to the main part of the question, let me clarify that I have 1
process per machine. I don't know if this is important here but my way of
thinking is that we have a big text file and each process will have to work
on some chunks of it (like chapters of a book). But each process resides in
an machine with some RAM which is able to handle a specific amount of work
so if you generate many processes per machine you must have fewer book
chapters per process than before. Thus, I wanted to avoid thinking in the
process-level rather than machine-level with the RAM limitations.
Let's denote the data that slave s has to send to the slaves in group G
as D(s,G).
*for each slave s in 1,2,...,K{*
* for each group G that s participates into{*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G))*
* }else if(my rank is in group G)*
* MPI_Bcast(get data D(s,G))*
* }else{*
* Do nothing*
* }*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
*for each set {G(1),G(2),...,G(q-1)} of q-1 disjoint groups{ *
* for each slave s in G(1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(1)))*
* }else if(**my rank is in** group G(1))*
* MPI_Bcast(get data D(s,G(1)))*
* }else{*
* Do nothing*
* }*
* }*
* for each slave s in G(2)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(2)))*
* }else if(**my rank is in** G(2))*
* MPI_Bcast(get data D(s,G(2)))*
* }else{*
* Do nothing*
* }*
* }*
* ...*
*for each slave s in G(q-1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(q-1)))*
* }else if(**my rank is in** G(q-1))*
* MPI_Bcast(get data D(s,G(q-1)))*
* }else{*
* Do nothing*
* }*
* }*
* MPI::COMM_WORLD.Barrier();*
*}*
My hope was that I could implement Option 2 (in some way without
copying and pasting the same code q-1 times every time I change q) and that
this could bring a speedup of q-1 compared to Option 1 by having these
groups communicate in parallel. Right, now I am trying to find a way to
identify these sets of groups based on my implementation, which involves
some abstract algebra but for now let's assume that I can find them in an
efficient manner.
Let me emphasize that each broadcast sends different actual data. There
are no two broadcasts that send the same D(s,G).
Finally, let's go to MPI_Allgather(): I am really confused since I have
If every member of a group does a bcast to all other members of the same
group, then this operation is better realized by an allgather. The picture
you attached clearly expose the data movement pattern where each color box
gets distributed to all members of the same communicator. You could also
see this operation as a loop of bcast where the iterator goes over all
members of the communicator and use it as a root.
Post by Konstantinos Konstantinidis
​
I am not sure what you meant but now I am thinking of this (let commG
*for each possible group G{*
*if (my rank is in G){*
* commG.MPI_AllGather(**send data D(rank,G)**)*
* }**else{*
* Do nothing*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
This is indeed what I was thinking about, with the condition that you
make sure the list of communicators in G is ordered in the same way on all
processes.
That being said, this communication pattern 1) generated a large barrier
in your code; 2) as all processes will potentially be involved in many
collective communications you will be hammering the network in a
significant way (so you will have to take into account the network
congestion); and 3) all processes need to have all memory for receive
allocated for the buffers. Thus, even be implementing a nice communication
scheme you might encounter some performance issues.
Another way to do this is to instead of conglomerating all
communications in a single temporal location you spread them out across
time by imposing your own communication logic. This basically translate a
set of blocking collective (bcast is a perfect target) into a pipelined
mix. Instead of describing such a scheme here I suggest you read the
algorithmic description of the SUMMA and/or PUMMA distributed matrix
multiplication.
George.
I am not sure whether this makes sense since I am confused about the
Post by Konstantinos Konstantinidis
correspodence of the data transmitted with Allgather() compared to the
notation D(s,G) I am currently using.
Thanks.
Post by George Bosilca
It really depends what are you trying to achieve. If the question is
rhetorical: "can I write a code that does in parallel broadcasts on
independent groups of processes ?" then the answer is yes, this is
certainly possible. If however you add a hint of practicality in your
question "can I write an efficient parallel broadcast between independent
groups of processes?" then I'm afraid the answer will be a negative one.
Let's not look at how you can write the multiple bcast code as the
answer in the stackoverflow is correct, but instead look at what resources
these collective operations are using. In general you can assume that nodes
are connected by a network, able to move data at a rate B in both
directions (full duplex). Assuming the implementation of the bcast
algorithm is not entirely moronic, the bcast can saturate the network with
a single process per node. Now, if you have multiple processes per node (P)
then either you schedule them sequentially (so that each one has the full
bandwidth B) or you let them progress in parallel in which case each
participating process can claim a lower bandwidth B/P (as it is shared
between all processes on the nore).
So even if you are able to expose enough parallelism, physical
resources will impose the real hard limit.
That being said I have the impression you are trying to implement an
MPI_Allgather(v) using a series of MPI_Bcast. Is that true ?
George.
PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1)
communicator might be prohibitive; the MPI library might support a limited
number of communicators.
On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Assume that we have K=q*k nodes (slaves) where q,k are positive
integers >= 2.
Based on the scheme that I am currently using I create
[q^(k-1)]*(q-1) groups (along with their communicators). Each group
consists of k nodes and within each group exactly k broadcasts take place
(each node broadcasts something to the rest of them). So in total
[q^(k-1)]*(q-1)*k MPI broadcasts take place. Let me skip the details of the
above scheme.
Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here https://stackoverflow.com/questions/11372012/mpi-severa
l-broadcast-at-the-same-time that this is possible in MPI.
In my case it's more complicated since q,k are parameters of the
problem and change between different experiments. If I get the idea about
the 2nd method that is proposed there and assume that we have only 3 groups
*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *
where comm1, comm2, comm3 are the corresponding sub-communicators
that contain only the members of each group.
But how can I generalize the above idea to arbitrary number of groups
or perhaps do something else?
The code is in C++ and the MPI installed is described in the attached file.
Regards,
Kostas
_______________________________________________
users mailing list
https://lists.open-mpi.org/mailman/listinfo/users
George Bosilca
2017-11-08 04:37:24 UTC
Permalink
On Tue, Nov 7, 2017 at 6:09 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
OK, I will try to explain a few more things about the shuffling and I have
attached only specific excerpts of the code to avoid confusion. I have
added many comments.
First, let me note that this project is an implementation of the Terasort
benchmark with a master node which assigns jobs to the slaves and
communicates with them after each phase to get measurements.
The file shuffle_before.cc shows how I am doing the shuffling up to now
and the shuffle_after.cc the progress I made so far switching to
Allgatherv().
I have also included the code that measures time and data size since it's
crucial for me to check if I have rate speedup.
1. At shuffle_after.cc:61 why do we reserve *comm.Get_size() *entries for*
recv_counts* and not *comm.Get_size()-1 *? For example if I am rank k
what is the point of *recv_counts[k-1]*? I guess that rank k also
receives data from himself but we can ignore it, right?
No, you cant simply ignore it ;) allgather copies the same amount of data
to all processes in the communicator ... including itself. If you want to
argue about this reach out to the MPI standardization body ;)
Post by Konstantinos Konstantinidis
2. My next concern is about the structure of the buffer *recv_buf[]*. The
documentation says that the data is stored there ordered. So I assume that
it's stored as segments of char* ordered by rank and the way to distinguish
them is to chop the whole data based on *recv_counts[]*. So let G = {g1,
g2, ..., gN} a group that exchanges data. Let's take slave g2: Then segment *recv_buf[0
until **recv_counts[0]-1**] *is what g2 received from g1, *recv_buf[**recv_counts[0]
until **recv_counts[1]-1**] *is what g2 received from himself (ignore
it), and so on... Is this idea correct?
I don't know what documentation says "ordered", there is no such wording in
the MPI standard. By carefully playing with the receive datatype I can do
anything I want, including interleaving data from the different peers. But
this is not what you are trying to do here.

The placement in memory you describe is true if you use the displacement
array as crafted in my example. The entry i in the displacement array
specifies the displacement (relative to recvbuf) at which to place the
incoming data from process i, so where you receive data has nothing to do
with the amount you receive but with what you have in the displacement
array.
Post by Konstantinos Konstantinidis
So I have written a sketch of the code at shuffle_after.cc which I also
try to explain how the master will compute rate, but at least I want to
make it work.
This code looks OK to me. I would however:

1. Remove the barriers on the workerComm. If the order of the communicators
in the multicastGroupMap is identical on all processes (including
communicators where they do not belong to) then the barriers are
superfluous. However, if you try to protect your processes from starting
the allgather collective to early, then you can replace the barrier
on workerComm with one on mcComm.

2. The check "ns.find(rank) != ns.end()" should be equivalent to "mcComm ==
MPI_COMM_NULL" if I understand your code correctly.

3. This is an optimization. Remove all time exchanges outside the main
loop. Instead of sending them one-by-one, keep them in an array and send
the entire array once per CodedWorker::execShuffle, possible via an
MPI_Allgatherv toward the master process in MPI_COMM_WORLD (in this case
you can convert the "long long" into a double to facilitate the collective).

George.
Post by Konstantinos Konstantinidis
I know that this discussion is getting long but if you have some free time
can you take a look at it?
Thanks,
Kostas
Post by George Bosilca
If each process send a different amount of data, then the operation
should be an allgatherv. This also requires that you know the amount each
process will send, so you will need a allgather. Schematically the code
long bytes_send_count = endata.size * sizeof(long); // compute the
amount of data sent by this process
long* recv_counts = (long*)malloc(comm_size * sizeof(long)); // allocate
buffer to receive the amounts from all peers
int displs = (int*)malloc(comm_size * sizeof(int)); // allocate buffer
to compute the displacements for each peer
MPI_Allgather( &bytes_send_count, 1, MPI_LONG, recv_counts, 1, MPI_LONG,
comm); // exchange the amount of sent data
long total = 0; // we need a total amount of data to be received
for( int i = 0; i < comm_size; i++) {
displs[i] = total; // update the displacements
total += recv_counts[i]; // and the total count
}
char* recv_buf = (char*)malloc(total * sizeof(char)); // prepare buffer
for the allgatherv
MPI_Allgatherv( &(endata.data), endata.size*sizeof(char),
MPI_UNSIGNED_CHAR, recv_buf, recv_counts, displs, MPI_UNSIGNED_CHAR, comm);
George.
On Tue, Nov 7, 2017 at 4:23 AM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
OK, I started implementing the above Allgather() idea without success
* comm.Allgather(&(endata.size), 1, MPI::UNSIGNED_LONG_LONG,
&(endata_rcv.size), 1, MPI::UNSIGNED_LONG_LONG);*
* endata_rcv.data = new unsigned char[endata_rcv.size*lineSize];*
* comm.Allgather(&(endata.data), endata.size*lineSize,
MPI::UNSIGNED_CHAR, &(endata_rcv.data), endata_rcv.size*lineSize,
MPI::UNSIGNED_CHAR);*
* delete [] endata.data;*
The idea (as it was also for the broadcasts) is first to transmit the
data size as an unsigned long long integer, so that the receivers will
reserve the required memory for the actual data to be transmitted after
that. To my understanding, the problem is that each broadcasted data, let
D(s,G), as I explained in the previous email is not only different but also
has different size (in general). That's because if I replace the 3rd line
with
* comm.Allgather(&(endata.data), 1, MPI::UNSIGNED_CHAR,
&(endata_rcv.data), 1, MPI::UNSIGNED_CHAR);*
seems to work without seg. fault but this is pointless for me since I
don't want only 1 char to be transmitted. So if we see the previous image I
posted, imagine that the red, green and blue squares are different in size?
Can Allgather() even work then? If no, do you suggest anything else or I am
trapped in using the MPI_Bcast() as shown in Option 1?
Post by George Bosilca
On Sun, Nov 5, 2017 at 10:23 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Hi George,
First, let me note that the cost of q^(k-1)]*(q-1) communicators was
fine for the values of parameters q,k I am working with. Also, the whole
point of speeding up the shuffling phase is trying to reduce this number
even more (compared to already known implementations) which is a major
concern of my project. But thanks for pointing that out. Btw, do you know
what is the maximum such number in MPI?
Last time I run into such troubles these limits were: 2k for MVAPICH,
16k for MPICH and 2^30-1 for OMPI (all positive signed 23 bits integers).
It might have changed meanwhile.
Post by Konstantinos Konstantinidis
Now to the main part of the question, let me clarify that I have 1
process per machine. I don't know if this is important here but my way of
thinking is that we have a big text file and each process will have to work
on some chunks of it (like chapters of a book). But each process resides in
an machine with some RAM which is able to handle a specific amount of work
so if you generate many processes per machine you must have fewer book
chapters per process than before. Thus, I wanted to avoid thinking in the
process-level rather than machine-level with the RAM limitations.
Let's denote the data that slave s has to send to the slaves in group
G as D(s,G).
*for each slave s in 1,2,...,K{*
* for each group G that s participates into{*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G))*
* }else if(my rank is in group G)*
* MPI_Bcast(get data D(s,G))*
* }else{*
* Do nothing*
* }*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
*for each set {G(1),G(2),...,G(q-1)} of q-1 disjoint groups{ *
* for each slave s in G(1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(1)))*
* }else if(**my rank is in** group G(1))*
* MPI_Bcast(get data D(s,G(1)))*
* }else{*
* Do nothing*
* }*
* }*
* for each slave s in G(2)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(2)))*
* }else if(**my rank is in** G(2))*
* MPI_Bcast(get data D(s,G(2)))*
* }else{*
* Do nothing*
* }*
* }*
* ...*
*for each slave s in G(q-1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(q-1)))*
* }else if(**my rank is in** G(q-1))*
* MPI_Bcast(get data D(s,G(q-1)))*
* }else{*
* Do nothing*
* }*
* }*
* MPI::COMM_WORLD.Barrier();*
*}*
My hope was that I could implement Option 2 (in some way without
copying and pasting the same code q-1 times every time I change q) and that
this could bring a speedup of q-1 compared to Option 1 by having these
groups communicate in parallel. Right, now I am trying to find a way to
identify these sets of groups based on my implementation, which involves
some abstract algebra but for now let's assume that I can find them in an
efficient manner.
Let me emphasize that each broadcast sends different actual data.
There are no two broadcasts that send the same D(s,G).
Finally, let's go to MPI_Allgather(): I am really confused since I
If every member of a group does a bcast to all other members of the
same group, then this operation is better realized by an allgather. The
picture you attached clearly expose the data movement pattern where each
color box gets distributed to all members of the same communicator. You
could also see this operation as a loop of bcast where the iterator goes
over all members of the communicator and use it as a root.
Post by Konstantinos Konstantinidis
​
I am not sure what you meant but now I am thinking of this (let commG
*for each possible group G{*
*if (my rank is in G){*
* commG.MPI_AllGather(**send data D(rank,G)**)*
* }**else{*
* Do nothing*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
This is indeed what I was thinking about, with the condition that you
make sure the list of communicators in G is ordered in the same way on all
processes.
That being said, this communication pattern 1) generated a large
barrier in your code; 2) as all processes will potentially be involved in
many collective communications you will be hammering the network in a
significant way (so you will have to take into account the network
congestion); and 3) all processes need to have all memory for receive
allocated for the buffers. Thus, even be implementing a nice communication
scheme you might encounter some performance issues.
Another way to do this is to instead of conglomerating all
communications in a single temporal location you spread them out across
time by imposing your own communication logic. This basically translate a
set of blocking collective (bcast is a perfect target) into a pipelined
mix. Instead of describing such a scheme here I suggest you read the
algorithmic description of the SUMMA and/or PUMMA distributed matrix
multiplication.
George.
I am not sure whether this makes sense since I am confused about the
Post by Konstantinos Konstantinidis
correspodence of the data transmitted with Allgather() compared to the
notation D(s,G) I am currently using.
Thanks.
Post by George Bosilca
It really depends what are you trying to achieve. If the question is
rhetorical: "can I write a code that does in parallel broadcasts on
independent groups of processes ?" then the answer is yes, this is
certainly possible. If however you add a hint of practicality in your
question "can I write an efficient parallel broadcast between independent
groups of processes?" then I'm afraid the answer will be a negative one.
Let's not look at how you can write the multiple bcast code as the
answer in the stackoverflow is correct, but instead look at what resources
these collective operations are using. In general you can assume that nodes
are connected by a network, able to move data at a rate B in both
directions (full duplex). Assuming the implementation of the bcast
algorithm is not entirely moronic, the bcast can saturate the network with
a single process per node. Now, if you have multiple processes per node (P)
then either you schedule them sequentially (so that each one has the full
bandwidth B) or you let them progress in parallel in which case each
participating process can claim a lower bandwidth B/P (as it is shared
between all processes on the nore).
So even if you are able to expose enough parallelism, physical
resources will impose the real hard limit.
That being said I have the impression you are trying to implement an
MPI_Allgather(v) using a series of MPI_Bcast. Is that true ?
George.
PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1)
communicator might be prohibitive; the MPI library might support a limited
number of communicators.
On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Assume that we have K=q*k nodes (slaves) where q,k are positive
integers >= 2.
Based on the scheme that I am currently using I create
[q^(k-1)]*(q-1) groups (along with their communicators). Each group
consists of k nodes and within each group exactly k broadcasts take place
(each node broadcasts something to the rest of them). So in total
[q^(k-1)]*(q-1)*k MPI broadcasts take place. Let me skip the details of the
above scheme.
Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here https://stackoverflow.com/questions/11372012/mpi-severa
l-broadcast-at-the-same-time that this is possible in MPI.
In my case it's more complicated since q,k are parameters of the
problem and change between different experiments. If I get the idea about
the 2nd method that is proposed there and assume that we have only 3 groups
*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *
where comm1, comm2, comm3 are the corresponding sub-communicators
that contain only the members of each group.
But how can I generalize the above idea to arbitrary number of
groups or perhaps do something else?
The code is in C++ and the MPI installed is described in the attached file.
Regards,
Kostas
_______________________________________________
users mailing list
https://lists.open-mpi.org/mailman/listinfo/users
Konstantinos Konstantinidis
2017-12-05 07:38:38 UTC
Permalink
Coming back to this discussion after a long time let me clarify a few
issues that you have addressed.

1. Yes, the list of communicators in G is ordered in the same way on all
processes.

2. I am now using "mcComm != MPI_COMM_NULL" for participation check. I have
not seen much improvement but it's more elegant.

3. I have tried many things for barriers.
a) For the Bcast() case, first I tried this:
*for (unsigned int activeId = 1; activeId <= conf->getNumReducer();
activeId++){ *
* //...*
* for (auto nsit = vset.begin(); nsit != vset.end(); nsit++){*
* //...*
* MPI::Intracomm mcComm = multicastGroupMap[nsid]; *
* mcComm.Barrier();*
* if (mcComm == MPI::COMM_NULL){*
* continue;*
* }*

* if (rank == activeId){*
* //...*
* }else{*
* //...*
* }*
* mcComm.Barrier();*
*}*
replacing the barriers on workerComm i.e. the barriers that affect all
workers with barriers on mcComm within the inner loop. For some reason this
led to worse performance. Specifically, it increased the Shuffle time but
increased the rate (???) (will explain at the next point how the rate is
computed). Then I tried removing all barriers and the situation is even
worse (both the time increased and the rate decreased). It seems that the
barriers on workerComm help a lot even though they seem useless.

b) For the Allgatherv() case first I tried this:
*//Get all shuffling groups of the scheme *
*vector< NodeSet >& allSGs = cg->getShufflingGroups(); *
*//...*

*workerComm.Barrier();*
*//For each group, execute all the shuffling...*
*for (auto nsit = allSGs.begin(); nsit != allSGs.end(); nsit++){*
* //...*
* MPI::Intracomm mcComm = multicastGroupMap[nsid];*

* //If I am in the current group*
* if (mcComm != MPI::COMM_NULL){*

* //...*
* } *
*}*
*workerComm.Barrier();*
Then, I tried this:
*vector< NodeSet >& allSGs = cg->getShufflingGroups(); *
*//...*
*for (auto nsit = allSGs.begin(); nsit != allSGs.end(); nsit++){*
* //...*
* MPI::Intracomm mcComm = multicastGroupMap[nsid];*
* if (mcComm != MPI::COMM_NULL){*
* mcComm**.Barrier();*

* //...*
* mcComm**.Barrier();*
* } *
*}*
Finally, I tried this
*vector< NodeSet >& allSGs = cg->getShufflingGroups(); *
*//...*
*for (auto nsit = allSGs.begin(); nsit != allSGs.end(); nsit++){*
*workerComm.Barrier();*
* //...*
* MPI::Intracomm mcComm = multicastGroupMap[nsid];*

* //If I am in the current group*
* if (mcComm != MPI::COMM_NULL){*

* //...*
* } *
*workerComm.Barrier();*
*}*
All of these three methods seem to perform like the Bcast() case, maybe a
bit faster. I cannot be sure about this since as I will explain in next
point I am doing some mistake when measuring time so I measured manually
with my phone!

4. This is my main issue: I have implemented the Allgatherv() idea and it
works but I am trying to measure the time of the Shuffling phase as well as
the transmission rate. The way I am currently doing this can be seen at the
updated shuffle_before.cc I have attached. This is correct since I have set
the rate limit to 100Mbps with the Linux "tc" command and I have seen that
the rate computed is actually 100Mbps (of course, for small values of
[q^(k-1)]*(q-1)
i.e. for small number of required Broadcasts otherwise it is lower than 100
which is essentially what I am trying to improve).

Now the transmission time and/or the rate I compute for Allgatherv() is
wrong since even though I can see that it takes pretty much the same time
as in the case of Bcast() it prints completely unrealistic numbers as the
Shuffling total time (very high) and rate (very low). I have attached a
second file which includes the main parts of the code with the Allgatherv()
idea. The point is that each slave initializes some total time counter
"time", some transmission time counter "txTime" and some totalsize counter
in bytes "tolSize" to zero and then iterates through all groups that it
belongs to and adds the total time and the transmission time it took for
the send-receive function to complete (the only difference is that I
subtract the deserialization time from both counters since I don't want
this counted in order to have a valid comparison with the previous
implementation). It also adds the total size of data and metadata it
transmitted to the group. When all slaves are done they return the total
Shuffle time and the rate in Megabits/sec to the Master. The Master (code
omitted) just computes the average of these values (time and rate) and
prints them on the terminal. I am pretty sure that I miss something here
and I get wrong measurements.

Thanks for your time:)
Post by George Bosilca
On Tue, Nov 7, 2017 at 6:09 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
OK, I will try to explain a few more things about the shuffling and I
have attached only specific excerpts of the code to avoid confusion. I have
added many comments.
First, let me note that this project is an implementation of the Terasort
benchmark with a master node which assigns jobs to the slaves and
communicates with them after each phase to get measurements.
The file shuffle_before.cc shows how I am doing the shuffling up to now
and the shuffle_after.cc the progress I made so far switching to
Allgatherv().
I have also included the code that measures time and data size since it's
crucial for me to check if I have rate speedup.
1. At shuffle_after.cc:61 why do we reserve *comm.Get_size() *entries for*
recv_counts* and not *comm.Get_size()-1 *? For example if I am rank k
what is the point of *recv_counts[k-1]*? I guess that rank k also
receives data from himself but we can ignore it, right?
No, you cant simply ignore it ;) allgather copies the same amount of data
to all processes in the communicator ... including itself. If you want to
argue about this reach out to the MPI standardization body ;)
Post by Konstantinos Konstantinidis
2. My next concern is about the structure of the buffer *recv_buf[]*.
The documentation says that the data is stored there ordered. So I assume
that it's stored as segments of char* ordered by rank and the way to
distinguish them is to chop the whole data based on *recv_counts[]*. So
Then segment *recv_buf[0 until **recv_counts[0]-1**] *is what g2
received from g1, *recv_buf[**recv_counts[0] until **recv_counts[1]-1**]
*is what g2 received from himself (ignore it), and so on... Is this idea
correct?
I don't know what documentation says "ordered", there is no such wording
in the MPI standard. By carefully playing with the receive datatype I can
do anything I want, including interleaving data from the different peers.
But this is not what you are trying to do here.
The placement in memory you describe is true if you use the displacement
array as crafted in my example. The entry i in the displacement array
specifies the displacement (relative to recvbuf) at which to place the
incoming data from process i, so where you receive data has nothing to do
with the amount you receive but with what you have in the displacement
array.
Post by Konstantinos Konstantinidis
So I have written a sketch of the code at shuffle_after.cc which I also
try to explain how the master will compute rate, but at least I want to
make it work.
1. Remove the barriers on the workerComm. If the order of the
communicators in the multicastGroupMap is identical on all processes
(including communicators where they do not belong to) then the barriers are
superfluous. However, if you try to protect your processes from starting
the allgather collective to early, then you can replace the barrier
on workerComm with one on mcComm.
2. The check "ns.find(rank) != ns.end()" should be equivalent to "mcComm
== MPI_COMM_NULL" if I understand your code correctly.
3. This is an optimization. Remove all time exchanges outside the main
loop. Instead of sending them one-by-one, keep them in an array and send
the entire array once per CodedWorker::execShuffle, possible via an
MPI_Allgatherv toward the master process in MPI_COMM_WORLD (in this case
you can convert the "long long" into a double to facilitate the collective).
George.
Post by Konstantinos Konstantinidis
I know that this discussion is getting long but if you have some free
time can you take a look at it?
Thanks,
Kostas
Post by George Bosilca
If each process send a different amount of data, then the operation
should be an allgatherv. This also requires that you know the amount each
process will send, so you will need a allgather. Schematically the code
long bytes_send_count = endata.size * sizeof(long); // compute the
amount of data sent by this process
long* recv_counts = (long*)malloc(comm_size * sizeof(long)); //
allocate buffer to receive the amounts from all peers
int displs = (int*)malloc(comm_size * sizeof(int)); // allocate buffer
to compute the displacements for each peer
MPI_Allgather( &bytes_send_count, 1, MPI_LONG, recv_counts, 1, MPI_LONG,
comm); // exchange the amount of sent data
long total = 0; // we need a total amount of data to be received
for( int i = 0; i < comm_size; i++) {
displs[i] = total; // update the displacements
total += recv_counts[i]; // and the total count
}
char* recv_buf = (char*)malloc(total * sizeof(char)); // prepare buffer
for the allgatherv
MPI_Allgatherv( &(endata.data), endata.size*sizeof(char),
MPI_UNSIGNED_CHAR, recv_buf, recv_counts, displs, MPI_UNSIGNED_CHAR, comm);
George.
On Tue, Nov 7, 2017 at 4:23 AM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
OK, I started implementing the above Allgather() idea without success
* comm.Allgather(&(endata.size), 1, MPI::UNSIGNED_LONG_LONG,
&(endata_rcv.size), 1, MPI::UNSIGNED_LONG_LONG);*
* endata_rcv.data = new unsigned char[endata_rcv.size*lineSize];*
* comm.Allgather(&(endata.data), endata.size*lineSize,
MPI::UNSIGNED_CHAR, &(endata_rcv.data), endata_rcv.size*lineSize,
MPI::UNSIGNED_CHAR);*
* delete [] endata.data;*
The idea (as it was also for the broadcasts) is first to transmit the
data size as an unsigned long long integer, so that the receivers will
reserve the required memory for the actual data to be transmitted after
that. To my understanding, the problem is that each broadcasted data, let
D(s,G), as I explained in the previous email is not only different but also
has different size (in general). That's because if I replace the 3rd line
with
* comm.Allgather(&(endata.data), 1, MPI::UNSIGNED_CHAR,
&(endata_rcv.data), 1, MPI::UNSIGNED_CHAR);*
seems to work without seg. fault but this is pointless for me since I
don't want only 1 char to be transmitted. So if we see the previous image I
posted, imagine that the red, green and blue squares are different in size?
Can Allgather() even work then? If no, do you suggest anything else or I am
trapped in using the MPI_Bcast() as shown in Option 1?
Post by George Bosilca
On Sun, Nov 5, 2017 at 10:23 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Hi George,
First, let me note that the cost of q^(k-1)]*(q-1) communicators was
fine for the values of parameters q,k I am working with. Also, the whole
point of speeding up the shuffling phase is trying to reduce this number
even more (compared to already known implementations) which is a major
concern of my project. But thanks for pointing that out. Btw, do you know
what is the maximum such number in MPI?
Last time I run into such troubles these limits were: 2k for MVAPICH,
16k for MPICH and 2^30-1 for OMPI (all positive signed 23 bits integers).
It might have changed meanwhile.
Post by Konstantinos Konstantinidis
Now to the main part of the question, let me clarify that I have 1
process per machine. I don't know if this is important here but my way of
thinking is that we have a big text file and each process will have to work
on some chunks of it (like chapters of a book). But each process resides in
an machine with some RAM which is able to handle a specific amount of work
so if you generate many processes per machine you must have fewer book
chapters per process than before. Thus, I wanted to avoid thinking in the
process-level rather than machine-level with the RAM limitations.
Let's denote the data that slave s has to send to the slaves in group
G as D(s,G).
*for each slave s in 1,2,...,K{*
* for each group G that s participates into{*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G))*
* }else if(my rank is in group G)*
* MPI_Bcast(get data D(s,G))*
* }else{*
* Do nothing*
* }*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
*for each set {G(1),G(2),...,G(q-1)} of q-1 disjoint groups{ *
* for each slave s in G(1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(1)))*
* }else if(**my rank is in** group G(1))*
* MPI_Bcast(get data D(s,G(1)))*
* }else{*
* Do nothing*
* }*
* }*
* for each slave s in G(2)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(2)))*
* }else if(**my rank is in** G(2))*
* MPI_Bcast(get data D(s,G(2)))*
* }else{*
* Do nothing*
* }*
* }*
* ...*
*for each slave s in G(q-1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(q-1)))*
* }else if(**my rank is in** G(q-1))*
* MPI_Bcast(get data D(s,G(q-1)))*
* }else{*
* Do nothing*
* }*
* }*
* MPI::COMM_WORLD.Barrier();*
*}*
My hope was that I could implement Option 2 (in some way without
copying and pasting the same code q-1 times every time I change q) and that
this could bring a speedup of q-1 compared to Option 1 by having these
groups communicate in parallel. Right, now I am trying to find a way to
identify these sets of groups based on my implementation, which involves
some abstract algebra but for now let's assume that I can find them in an
efficient manner.
Let me emphasize that each broadcast sends different actual data.
There are no two broadcasts that send the same D(s,G).
Finally, let's go to MPI_Allgather(): I am really confused since I
If every member of a group does a bcast to all other members of the
same group, then this operation is better realized by an allgather. The
picture you attached clearly expose the data movement pattern where each
color box gets distributed to all members of the same communicator. You
could also see this operation as a loop of bcast where the iterator goes
over all members of the communicator and use it as a root.
Post by Konstantinos Konstantinidis
​
I am not sure what you meant but now I am thinking of this (let commG
*for each possible group G{*
*if (my rank is in G){*
* commG.MPI_AllGather(**send data D(rank,G)**)*
* }**else{*
* Do nothing*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
This is indeed what I was thinking about, with the condition that you
make sure the list of communicators in G is ordered in the same way on all
processes.
That being said, this communication pattern 1) generated a large
barrier in your code; 2) as all processes will potentially be involved in
many collective communications you will be hammering the network in a
significant way (so you will have to take into account the network
congestion); and 3) all processes need to have all memory for receive
allocated for the buffers. Thus, even be implementing a nice communication
scheme you might encounter some performance issues.
Another way to do this is to instead of conglomerating all
communications in a single temporal location you spread them out across
time by imposing your own communication logic. This basically translate a
set of blocking collective (bcast is a perfect target) into a pipelined
mix. Instead of describing such a scheme here I suggest you read the
algorithmic description of the SUMMA and/or PUMMA distributed matrix
multiplication.
George.
I am not sure whether this makes sense since I am confused about the
Post by Konstantinos Konstantinidis
correspodence of the data transmitted with Allgather() compared to the
notation D(s,G) I am currently using.
Thanks.
Post by George Bosilca
It really depends what are you trying to achieve. If the question is
rhetorical: "can I write a code that does in parallel broadcasts on
independent groups of processes ?" then the answer is yes, this is
certainly possible. If however you add a hint of practicality in your
question "can I write an efficient parallel broadcast between independent
groups of processes?" then I'm afraid the answer will be a negative one.
Let's not look at how you can write the multiple bcast code as the
answer in the stackoverflow is correct, but instead look at what resources
these collective operations are using. In general you can assume that nodes
are connected by a network, able to move data at a rate B in both
directions (full duplex). Assuming the implementation of the bcast
algorithm is not entirely moronic, the bcast can saturate the network with
a single process per node. Now, if you have multiple processes per node (P)
then either you schedule them sequentially (so that each one has the full
bandwidth B) or you let them progress in parallel in which case each
participating process can claim a lower bandwidth B/P (as it is shared
between all processes on the nore).
So even if you are able to expose enough parallelism, physical
resources will impose the real hard limit.
That being said I have the impression you are trying to implement an
MPI_Allgather(v) using a series of MPI_Bcast. Is that true ?
George.
PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1)
communicator might be prohibitive; the MPI library might support a limited
number of communicators.
On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Assume that we have K=q*k nodes (slaves) where q,k are positive
integers >= 2.
Based on the scheme that I am currently using I create
[q^(k-1)]*(q-1) groups (along with their communicators). Each group
consists of k nodes and within each group exactly k broadcasts take place
(each node broadcasts something to the rest of them). So in total
[q^(k-1)]*(q-1)*k MPI broadcasts take place. Let me skip the details of the
above scheme.
Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here https://stackoverflow.com/questions/11372012/mpi-severa
l-broadcast-at-the-same-time that this is possible in MPI.
In my case it's more complicated since q,k are parameters of the
problem and change between different experiments. If I get the idea about
the 2nd method that is proposed there and assume that we have only 3 groups
*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *
where comm1, comm2, comm3 are the corresponding sub-communicators
that contain only the members of each group.
But how can I generalize the above idea to arbitrary number of
groups or perhaps do something else?
The code is in C++ and the MPI installed is described in the attached file.
Regards,
Kostas
_______________________________________________
users mailing list
https://lists.open-mpi.org/mailman/listinfo/users
George Bosilca
2017-12-06 21:43:09 UTC
Permalink
Your code looks correct. There are few things I would change to improve it:

- There are too many calls to the clock(). I would move the operations on
"time" (the variable) outside the outer loop.
- Replace the 2 non scalable constructs to gather the 2 times on the root
either by an MPI_Reduce(+) (if you don't care about the individual values),
or by an MPI_Gather. You can even do both values in the same operations.
- I would remove the barrier at the end of the execShuffle (the processes
will remain blocked on the initial barrier on the next call to execShuffle).

Regarding the need to have a lot of barrier, it really depend on your code.
If your processes get out of sync they might start communicating for the
next collective operation. This might force the receiving processes to
allocate too many internal buffer (to cope with the large number of
unexpected messages). Difficult to assess the overhead.

Good luck,
George.




On Tue, Dec 5, 2017 at 2:38 AM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Coming back to this discussion after a long time let me clarify a few
issues that you have addressed.
1. Yes, the list of communicators in G is ordered in the same way on all
processes.
2. I am now using "mcComm != MPI_COMM_NULL" for participation check. I
have not seen much improvement but it's more elegant.
3. I have tried many things for barriers.
*for (unsigned int activeId = 1; activeId <= conf->getNumReducer();
activeId++){ *
* //...*
* for (auto nsit = vset.begin(); nsit != vset.end(); nsit++){*
* //...*
* MPI::Intracomm mcComm = multicastGroupMap[nsid]; *
* mcComm.Barrier();*
* if (mcComm == MPI::COMM_NULL){*
* continue;*
* }*
* if (rank == activeId){*
* //...*
* }else{*
* //...*
* }*
* mcComm.Barrier();*
*}*
replacing the barriers on workerComm i.e. the barriers that affect all
workers with barriers on mcComm within the inner loop. For some reason this
led to worse performance. Specifically, it increased the Shuffle time but
increased the rate (???) (will explain at the next point how the rate is
computed). Then I tried removing all barriers and the situation is even
worse (both the time increased and the rate decreased). It seems that the
barriers on workerComm help a lot even though they seem useless.
*//Get all shuffling groups of the scheme *
*vector< NodeSet >& allSGs = cg->getShufflingGroups(); *
*//...*
*workerComm.Barrier();*
*//For each group, execute all the shuffling...*
*for (auto nsit = allSGs.begin(); nsit != allSGs.end(); nsit++){*
* //...*
* MPI::Intracomm mcComm = multicastGroupMap[nsid];*
* //If I am in the current group*
* if (mcComm != MPI::COMM_NULL){*
* //...*
* } *
*}*
*workerComm.Barrier();*
*vector< NodeSet >& allSGs = cg->getShufflingGroups(); *
*//...*
*for (auto nsit = allSGs.begin(); nsit != allSGs.end(); nsit++){*
* //...*
* MPI::Intracomm mcComm = multicastGroupMap[nsid];*
* if (mcComm != MPI::COMM_NULL){*
* mcComm**.Barrier();*
* //...*
* mcComm**.Barrier();*
* } *
*}*
Finally, I tried this
*vector< NodeSet >& allSGs = cg->getShufflingGroups(); *
*//...*
*for (auto nsit = allSGs.begin(); nsit != allSGs.end(); nsit++){*
*workerComm.Barrier();*
* //...*
* MPI::Intracomm mcComm = multicastGroupMap[nsid];*
* //If I am in the current group*
* if (mcComm != MPI::COMM_NULL){*
* //...*
* } *
*workerComm.Barrier();*
*}*
All of these three methods seem to perform like the Bcast() case, maybe a
bit faster. I cannot be sure about this since as I will explain in next
point I am doing some mistake when measuring time so I measured manually
with my phone!
4. This is my main issue: I have implemented the Allgatherv() idea and it
works but I am trying to measure the time of the Shuffling phase as well as
the transmission rate. The way I am currently doing this can be seen at the
updated shuffle_before.cc I have attached. This is correct since I have set
the rate limit to 100Mbps with the Linux "tc" command and I have seen that
the rate computed is actually 100Mbps (of course, for small values of [q^(k-1)]*(q-1)
i.e. for small number of required Broadcasts otherwise it is lower than 100
which is essentially what I am trying to improve).
Now the transmission time and/or the rate I compute for Allgatherv() is
wrong since even though I can see that it takes pretty much the same time
as in the case of Bcast() it prints completely unrealistic numbers as the
Shuffling total time (very high) and rate (very low). I have attached a
second file which includes the main parts of the code with the Allgatherv()
idea. The point is that each slave initializes some total time counter
"time", some transmission time counter "txTime" and some totalsize counter
in bytes "tolSize" to zero and then iterates through all groups that it
belongs to and adds the total time and the transmission time it took for
the send-receive function to complete (the only difference is that I
subtract the deserialization time from both counters since I don't want
this counted in order to have a valid comparison with the previous
implementation). It also adds the total size of data and metadata it
transmitted to the group. When all slaves are done they return the total
Shuffle time and the rate in Megabits/sec to the Master. The Master (code
omitted) just computes the average of these values (time and rate) and
prints them on the terminal. I am pretty sure that I miss something here
and I get wrong measurements.
Thanks for your time:)
Post by George Bosilca
On Tue, Nov 7, 2017 at 6:09 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
OK, I will try to explain a few more things about the shuffling and I
have attached only specific excerpts of the code to avoid confusion. I have
added many comments.
First, let me note that this project is an implementation of the
Terasort benchmark with a master node which assigns jobs to the slaves and
communicates with them after each phase to get measurements.
The file shuffle_before.cc shows how I am doing the shuffling up to now
and the shuffle_after.cc the progress I made so far switching to
Allgatherv().
I have also included the code that measures time and data size since
it's crucial for me to check if I have rate speedup.
1. At shuffle_after.cc:61 why do we reserve *comm.Get_size() *entries
for* recv_counts* and not *comm.Get_size()-1 *? For example if I am
rank k what is the point of *recv_counts[k-1]*? I guess that rank k
also receives data from himself but we can ignore it, right?
No, you cant simply ignore it ;) allgather copies the same amount of data
to all processes in the communicator ... including itself. If you want to
argue about this reach out to the MPI standardization body ;)
Post by Konstantinos Konstantinidis
2. My next concern is about the structure of the buffer *recv_buf[]*.
The documentation says that the data is stored there ordered. So I assume
that it's stored as segments of char* ordered by rank and the way to
distinguish them is to chop the whole data based on *recv_counts[]*. So
Then segment *recv_buf[0 until **recv_counts[0]-1**] *is what g2
received from g1, *recv_buf[**recv_counts[0] until **recv_counts[1]-1**]
*is what g2 received from himself (ignore it), and so on... Is this
idea correct?
I don't know what documentation says "ordered", there is no such wording
in the MPI standard. By carefully playing with the receive datatype I can
do anything I want, including interleaving data from the different peers.
But this is not what you are trying to do here.
The placement in memory you describe is true if you use the displacement
array as crafted in my example. The entry i in the displacement array
specifies the displacement (relative to recvbuf) at which to place the
incoming data from process i, so where you receive data has nothing to do
with the amount you receive but with what you have in the displacement
array.
Post by Konstantinos Konstantinidis
So I have written a sketch of the code at shuffle_after.cc which I also
try to explain how the master will compute rate, but at least I want to
make it work.
1. Remove the barriers on the workerComm. If the order of the
communicators in the multicastGroupMap is identical on all processes
(including communicators where they do not belong to) then the barriers are
superfluous. However, if you try to protect your processes from starting
the allgather collective to early, then you can replace the barrier
on workerComm with one on mcComm.
2. The check "ns.find(rank) != ns.end()" should be equivalent to "mcComm
== MPI_COMM_NULL" if I understand your code correctly.
3. This is an optimization. Remove all time exchanges outside the main
loop. Instead of sending them one-by-one, keep them in an array and send
the entire array once per CodedWorker::execShuffle, possible via an
MPI_Allgatherv toward the master process in MPI_COMM_WORLD (in this case
you can convert the "long long" into a double to facilitate the collective).
George.
Post by Konstantinos Konstantinidis
I know that this discussion is getting long but if you have some free
time can you take a look at it?
Thanks,
Kostas
Post by George Bosilca
If each process send a different amount of data, then the operation
should be an allgatherv. This also requires that you know the amount each
process will send, so you will need a allgather. Schematically the code
long bytes_send_count = endata.size * sizeof(long); // compute the
amount of data sent by this process
long* recv_counts = (long*)malloc(comm_size * sizeof(long)); //
allocate buffer to receive the amounts from all peers
int displs = (int*)malloc(comm_size * sizeof(int)); // allocate buffer
to compute the displacements for each peer
MPI_Allgather( &bytes_send_count, 1, MPI_LONG, recv_counts, 1,
MPI_LONG, comm); // exchange the amount of sent data
long total = 0; // we need a total amount of data to be received
for( int i = 0; i < comm_size; i++) {
displs[i] = total; // update the displacements
total += recv_counts[i]; // and the total count
}
char* recv_buf = (char*)malloc(total * sizeof(char)); // prepare
buffer for the allgatherv
MPI_Allgatherv( &(endata.data), endata.size*sizeof(char),
MPI_UNSIGNED_CHAR, recv_buf, recv_counts, displs, MPI_UNSIGNED_CHAR, comm);
George.
On Tue, Nov 7, 2017 at 4:23 AM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
OK, I started implementing the above Allgather() idea without success
* comm.Allgather(&(endata.size), 1, MPI::UNSIGNED_LONG_LONG,
&(endata_rcv.size), 1, MPI::UNSIGNED_LONG_LONG);*
* endata_rcv.data = new unsigned char[endata_rcv.size*lineSize];*
* comm.Allgather(&(endata.data), endata.size*lineSize,
MPI::UNSIGNED_CHAR, &(endata_rcv.data), endata_rcv.size*lineSize,
MPI::UNSIGNED_CHAR);*
* delete [] endata.data;*
The idea (as it was also for the broadcasts) is first to transmit the
data size as an unsigned long long integer, so that the receivers will
reserve the required memory for the actual data to be transmitted after
that. To my understanding, the problem is that each broadcasted data, let
D(s,G), as I explained in the previous email is not only different but also
has different size (in general). That's because if I replace the 3rd line
with
* comm.Allgather(&(endata.data), 1, MPI::UNSIGNED_CHAR,
&(endata_rcv.data), 1, MPI::UNSIGNED_CHAR);*
seems to work without seg. fault but this is pointless for me since I
don't want only 1 char to be transmitted. So if we see the previous image I
posted, imagine that the red, green and blue squares are different in size?
Can Allgather() even work then? If no, do you suggest anything else or I am
trapped in using the MPI_Bcast() as shown in Option 1?
Post by George Bosilca
On Sun, Nov 5, 2017 at 10:23 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Hi George,
First, let me note that the cost of q^(k-1)]*(q-1)
communicators was fine for the values of parameters q,k I am working with.
Also, the whole point of speeding up the shuffling phase is trying to
reduce this number even more (compared to already known implementations)
which is a major concern of my project. But thanks for pointing that out.
Btw, do you know what is the maximum such number in MPI?
Last time I run into such troubles these limits were: 2k for MVAPICH,
16k for MPICH and 2^30-1 for OMPI (all positive signed 23 bits integers).
It might have changed meanwhile.
Post by Konstantinos Konstantinidis
Now to the main part of the question, let me clarify that I have 1
process per machine. I don't know if this is important here but my way of
thinking is that we have a big text file and each process will have to work
on some chunks of it (like chapters of a book). But each process resides in
an machine with some RAM which is able to handle a specific amount of work
so if you generate many processes per machine you must have fewer book
chapters per process than before. Thus, I wanted to avoid thinking in the
process-level rather than machine-level with the RAM limitations.
Let's denote the data that slave s has to send to the slaves in
group G as D(s,G).
*for each slave s in 1,2,...,K{*
* for each group G that s participates into{*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G))*
* }else if(my rank is in group G)*
* MPI_Bcast(get data D(s,G))*
* }else{*
* Do nothing*
* }*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
*for each set {G(1),G(2),...,G(q-1)} of q-1 disjoint groups{ *
* for each slave s in G(1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(1)))*
* }else if(**my rank is in** group G(1))*
* MPI_Bcast(get data D(s,G(1)))*
* }else{*
* Do nothing*
* }*
* }*
* for each slave s in G(2)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(2)))*
* }else if(**my rank is in** G(2))*
* MPI_Bcast(get data D(s,G(2)))*
* }else{*
* Do nothing*
* }*
* }*
* ...*
*for each slave s in G(q-1)*
* if (my rank is s){*
* MPI_Bcast(send data D(s,G(q-1)))*
* }else if(**my rank is in** G(q-1))*
* MPI_Bcast(get data D(s,G(q-1)))*
* }else{*
* Do nothing*
* }*
* }*
* MPI::COMM_WORLD.Barrier();*
*}*
My hope was that I could implement Option 2 (in some way without
copying and pasting the same code q-1 times every time I change q) and that
this could bring a speedup of q-1 compared to Option 1 by having these
groups communicate in parallel. Right, now I am trying to find a way to
identify these sets of groups based on my implementation, which involves
some abstract algebra but for now let's assume that I can find them in an
efficient manner.
Let me emphasize that each broadcast sends different actual data.
There are no two broadcasts that send the same D(s,G).
Finally, let's go to MPI_Allgather(): I am really confused since I
If every member of a group does a bcast to all other members of the
same group, then this operation is better realized by an allgather. The
picture you attached clearly expose the data movement pattern where each
color box gets distributed to all members of the same communicator. You
could also see this operation as a loop of bcast where the iterator goes
over all members of the communicator and use it as a root.
Post by Konstantinos Konstantinidis
​
I am not sure what you meant but now I am thinking of this (let
*for each possible group G{*
*if (my rank is in G){*
* commG.MPI_AllGather(**send data D(rank,G)**)*
* }**else{*
* Do nothing*
* }*
*MPI::COMM_WORLD.Barrier();*
*}*
This is indeed what I was thinking about, with the condition that you
make sure the list of communicators in G is ordered in the same way on all
processes.
That being said, this communication pattern 1) generated a large
barrier in your code; 2) as all processes will potentially be involved in
many collective communications you will be hammering the network in a
significant way (so you will have to take into account the network
congestion); and 3) all processes need to have all memory for receive
allocated for the buffers. Thus, even be implementing a nice communication
scheme you might encounter some performance issues.
Another way to do this is to instead of conglomerating all
communications in a single temporal location you spread them out across
time by imposing your own communication logic. This basically translate a
set of blocking collective (bcast is a perfect target) into a pipelined
mix. Instead of describing such a scheme here I suggest you read the
algorithmic description of the SUMMA and/or PUMMA distributed matrix
multiplication.
George.
I am not sure whether this makes sense since I am confused about the
Post by Konstantinos Konstantinidis
correspodence of the data transmitted with Allgather() compared to the
notation D(s,G) I am currently using.
Thanks.
On Tue, Oct 31, 2017 at 11:11 PM, George Bosilca <
Post by George Bosilca
It really depends what are you trying to achieve. If the question
is rhetorical: "can I write a code that does in parallel broadcasts on
independent groups of processes ?" then the answer is yes, this is
certainly possible. If however you add a hint of practicality in your
question "can I write an efficient parallel broadcast between independent
groups of processes?" then I'm afraid the answer will be a negative one.
Let's not look at how you can write the multiple bcast code as the
answer in the stackoverflow is correct, but instead look at what resources
these collective operations are using. In general you can assume that nodes
are connected by a network, able to move data at a rate B in both
directions (full duplex). Assuming the implementation of the bcast
algorithm is not entirely moronic, the bcast can saturate the network with
a single process per node. Now, if you have multiple processes per node (P)
then either you schedule them sequentially (so that each one has the full
bandwidth B) or you let them progress in parallel in which case each
participating process can claim a lower bandwidth B/P (as it is shared
between all processes on the nore).
So even if you are able to expose enough parallelism, physical
resources will impose the real hard limit.
That being said I have the impression you are trying to implement
an MPI_Allgather(v) using a series of MPI_Bcast. Is that true ?
George.
PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1)
communicator might be prohibitive; the MPI library might support a limited
number of communicators.
On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis <
Post by Konstantinos Konstantinidis
Assume that we have K=q*k nodes (slaves) where q,k are positive
integers >= 2.
Based on the scheme that I am currently using I create
[q^(k-1)]*(q-1) groups (along with their communicators). Each group
consists of k nodes and within each group exactly k broadcasts take place
(each node broadcasts something to the rest of them). So in total
[q^(k-1)]*(q-1)*k MPI broadcasts take place. Let me skip the details of the
above scheme.
Now theoretically I figured out that there are q-1 groups that can
communicate in parallel at the same time i.e. groups that have no common
nodes and I would like to utilize that to speedup the shuffling. I have
seen here https://stackoverflow.com/questions/11372012/mpi-severa
l-broadcast-at-the-same-time that this is possible in MPI.
In my case it's more complicated since q,k are parameters of the
problem and change between different experiments. If I get the idea about
the 2nd method that is proposed there and assume that we have only 3 groups
*if my rank belongs to group 1{*
* comm1.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group 2{*
* comm2.Bcast(..., ..., ..., rootId);*
*}else if my rank belongs to group3{*
* comm3.Bcast(..., ..., ..., rootId);*
*} *
where comm1, comm2, comm3 are the corresponding sub-communicators
that contain only the members of each group.
But how can I generalize the above idea to arbitrary number of
groups or perhaps do something else?
The code is in C++ and the MPI installed is described in the attached file.
Regards,
Kostas
_______________________________________________
users mailing list
https://lists.open-mpi.org/mailman/listinfo/users
Loading...