Note that there are some explanatory texts on larger screens.

plurals
  1. PONeed suggestion to avoid deadlocks with higher iteration steps
    text
    copied!<p>My MPI program consists of a number of processes that send/receive zero or more messages from other processes. The processes check periodically if messages are available to be processed. Code runs fine up-to 3000 iteration steps. After that there are deadlocks and program freezes. Please feel free to toss any suggestion. Below is my pseudo-code..let me know if you have any question.</p> <p>N is number of processing nodes:</p> <pre><code>do{ if(numberIterations&gt;1) -- Receive Data { getdata: MPI_Iprobe() while(flagprobe !=0) { If(TAG=StausUpdate) Update status of processor; If(TAG=Data) Process Data; MPI_Iprobe() } } if( numberIterations&lt; MaxIterations ) -- Send Data { for(i=0;i&lt;N;i++) MPI_Bsend_init(request[i]) for(i=0;i&lt;N;i++) MPI_Start(request[i]) numberIterations++; } if(numberIterations == MaxIterations) -- Update Processor Status { for(i=0;i&lt;N;i++) MPI_Isend(request1[i]) -- with TAG = StatusUpdate goto getdata; set endloopflag = 1 } if(numberIterations == MaxIterations &amp;&amp; endloopflag ==1) --Final Check { for(i=0;i&lt;N;i++) MPI_Test(request1[i],flagtest); if(!flagtest) goto getdata; } } while(numberIterations &lt; MaxIterations); for(i=0;i&lt;N;i++) --Free request { MPI_Request_free(&amp;request[i]); } </code></pre> <p>---Updated pseudo code as per Mark</p> <pre><code>#include &lt;string.h&gt; #include &lt;math.h&gt; #include &lt;stdio.h&gt; #include &lt;stdlib.h&gt; #include &lt;time.h&gt; #include &lt;iostream&gt; #include &lt;fstream&gt; #include "mpi.h" #define N 9 //# of nodes #define M 10 //samples number #define n 2 //demension of weight vector #define TAU 0.15 #define DISTANCE 0.1 //measuremeant for two nodes #define A 0.2 //learning rate #define ITERATION_STEPS 1000 // Program goes for ITERATION_STEPS - 1 #define SAMPLE_STEP 1 //Number of current iteration #define BT1 0.17 #define BT2 0.02 #define A0 0.9 //initial learning rate #define AC 0.05 //middle learning rate #define AF 0.001 //final learning rate #define TC 4500 //first period of iteration #define TF 5000 //second period of iteration #define BUFSIZE 400000 using namespace std; void printtime(double comm_time,double update_time,string filename,int rank); int checkack(int ack[],int status[]); int checkstatus(int status[],int procid); int noof_activeproc(int status[],int myrank); void printresult(double w[][n],string filename,int rank); void plot(double w[][n], char* fileName); void update(double w[][n], double x[], int t,int rank, int g[][9]); double norm(double a[], double b[]); double p(double sample[], double w[], int t); void OneToTwo (int index, int *row, int *col); int g(int b, int j); int main(int argc, char *argv[]) { int rank,size;; MPI_Init(&amp;argc,&amp;argv ); MPI_Comm_size(MPI_COMM_WORLD, &amp;size); MPI_Comm_rank(MPI_COMM_WORLD, &amp;rank); MPI_Status statusprobe,status,status1[N],status2[N]; MPI_Request request[N],request1[N],request2[N],request3[N]; //N request for N process per iteration double buf[BUFSIZE]; // buffer for the outgoing message; int procstatus[N],ack[N]; //store the process status and ack double temp[n]; double tempsend[n]; ifstream in1, in2, in3; ofstream out,outtime; int i, j,k,z,req = 0,req1; int checklocation=0; //bookmark for MPI_Test int checklocation1=0; //bookmark for MPI_Test int numberIterations; double samples[M][n]; //for all samples double w[N][n]; //for all node weight double x[n]; //one sample int g[9][9]; int count=n; int flagprobe=0; int flagtest=1; int flagrecv=0; int datareadflag; // flag that checks wheter the data is read or not double dataincount; int checktestflag; int requestfreeflag=0; int flag=0; //test flag int flagtest1=0; // check for the request to update the processor status int endloopflag=0; int *bptr, bl; double start_time,end_time,tupdate_start,tupdate_end,t_temp1,t_temp2; double comm_time; double update_time=0; for(i=0;i&lt;N;i++) { procstatus[i]=1; // all the processor are on ack[i]=0; } // read sample data in1.open("samples.dat"); if(!in1) { cout&lt;&lt;"100:File openning error. \n"; exit(100); } in2.open("initialMap.dat"); if(!in2) { cout&lt;&lt;"200:File openning error. \n"; exit(200); } in3.open("gij.dat"); if(!in3) { cout&lt;&lt;"200:File openning error. \n"; exit(200); } for(i=0; i&lt;M; i++) for(j=0; j&lt;n; j++){ in1&gt;&gt;samples[i][j]; //cout&lt;&lt;samples[i][j]&lt;&lt;"="&lt;&lt;i&lt;&lt;","&lt;&lt;j&lt;&lt;" "; } //read initial weights for(i=0; i&lt;N; i++) for(j=0; j&lt;n; j++) { in2&gt;&gt;w[i][j]; //cout&lt;&lt;w[i][j]&lt;&lt;"="&lt;&lt;i&lt;&lt;","&lt;&lt;j&lt;&lt;" "; } //read Gij for(i=0; i&lt;9; i++) for(j=0; j&lt;9; j++) { in3&gt;&gt;g[i][j]; //cout&lt;&lt;w[i][j]&lt;&lt;"="&lt;&lt;i&lt;&lt;","&lt;&lt;j&lt;&lt;" "; } //Print W to file out.open("w.dot"); out&lt;&lt;"graph G {"&lt;&lt;endl; out&lt;&lt;"size=\"10,10\";"&lt;&lt;endl; out&lt;&lt;"ratio=expand;"&lt;&lt;endl; out&lt;&lt;"node [shape=circle];"&lt;&lt;endl; //out&lt;&lt;"node [shape=point];"&lt;&lt;endl; for(i=0; i&lt;9; i++) { for(j=0; j&lt;n; j++) { if(j == 0) out&lt;&lt;i+1&lt;&lt;"[pos = \""; out&lt;&lt;w[i][j]; if(j == 0) out&lt;&lt;","; if(j == 1) out&lt;&lt;"!\"]"&lt;&lt;endl; } } for(i=0; i&lt;9; i++) for(j=0; j&lt;i+1; j++) { if(g[i][j] == 1 &amp;&amp; i != j) out&lt;&lt;i+1&lt;&lt;" -- "&lt;&lt;j+1&lt;&lt;";"&lt;&lt;endl; } out&lt;&lt;"}"&lt;&lt;endl; MPI_Barrier(MPI_COMM_WORLD); MPI_Buffer_attach( buf, BUFSIZE ); k = 0; numberIterations = 1; dataincount=N; //for the first time , all process or has N data in from file. datareadflag=1; checktestflag=1; int tagno=1; int prevtag; // start_time=MPI_Wtime(); time_t start,start1,end1,end; time(&amp;start); do{ if(numberIterations%SAMPLE_STEP==0) { t_temp1=MPI_Wtime(); if(k&gt;=M) k=0; for(j=0; j&lt;n; j++) { x[j]=samples[k][j]; } k++; t_temp2=MPI_Wtime(); } if(numberIterations&gt;1) { getdata: MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &amp;flagprobe, &amp;statusprobe); //tag = numberIterations, while(flagprobe != 0) { if(statusprobe.MPI_TAG==0) // tag=0 means status update of the processor { int rtemp[1]; MPI_Recv(rtemp,1,MPI_INT,statusprobe.MPI_SOURCE,0, MPI_COMM_WORLD, &amp;status ); procstatus[status.MPI_SOURCE]=rtemp[0]; } else { datareadflag=1; dataincount++; MPI_Recv(temp,count,MPI_DOUBLE,statusprobe.MPI_SOURCE,statusprobe.MPI_TAG, MPI_COMM_WORLD, &amp;status ); for(j=0;j&lt;n;j++) w[status.MPI_SOURCE][j]=temp[j]; } MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG, MPI_COMM_WORLD,&amp;flagprobe, &amp;statusprobe); } //end while } //end if (no of iteration &gt;1) if( numberIterations&lt; ITERATION_STEPS ) // do not send on last iterations. { tupdate_start=MPI_Wtime(); update(w,x,k,rank,g); tupdate_end=MPI_Wtime(); update_time=update_time+tupdate_end-tupdate_start; if(req==0) { for(i=0;i&lt;N;i++) { int c=0; if((i!=rank)&amp;&amp;(checkstatus(procstatus,i)==1)) // send if only the process is active { MPI_Bsend_init(w[rank], count, MPI_DOUBLE, i ,tagno, MPI_COMM_WORLD,&amp;request[i]); MPI_Bsend_init(&amp;c,1, MPI_INT,i,0, MPI_COMM_WORLD,&amp;request1[i]); } } //end for req=1; } for(i=0;i&lt;N;i++) { if((i!=rank)&amp;&amp;(checkstatus(procstatus,i)==1)) // send if only the process is active { MPI_Start(&amp;request[i]); //actual message send. } } tagno++; requestfreeflag==1; checktestflag=0; dataincount=0; checklocation=0; numberIterations++; datareadflag=0; cout&lt;&lt;numberIterations&lt;&lt;"-th iterations for . "&lt;&lt;rank&lt;&lt;endl; } //end if( numberIterations&lt; ITERATION_STEPS ) /* Before exiting notify all the active process */ if((numberIterations == ITERATION_STEPS) &amp;&amp; (endloopflag==0)) //endloop flag prevent sending twice { // status value (initially all 1); req1=0; for(i=0;i&lt;N;i++) { if((i!=rank)&amp;&amp;(checkstatus(procstatus,i)==1)) // check if only the process is active { MPI_Start(&amp;request1[i]); } } endloopflag=1; goto getdata; } //end if if(numberIterations == ITERATION_STEPS &amp;&amp; endloopflag==1) { for(i=1;i&lt;N;i++) { if((i!=rank)&amp;&amp;(checkstatus(procstatus,i)==1)) // check if only the process is active { MPI_Test(&amp;request[i], &amp;flagtest, &amp;status); MPI_Test(&amp;request1[i], &amp;flagtest1, &amp;status); if(!flagtest || !flagtest1) { checklocation1=i; //for next check continue from i; cout&lt;&lt;"getdata called by" &lt;&lt;rank&lt;&lt;endl; goto getdata; } } //end if }//end for } //end if } while(numberIterations &lt; ITERATION_STEPS); for(i=0;i&lt;N;i++) { if(i!=rank &amp;&amp; request[i]!=MPI_REQUEST_NULL) { MPI_Request_free(&amp;request[i]); MPI_Request_free(&amp;request1[i]); } } if(numberIterations == ITERATION_STEPS) { char pno[2]; sprintf(pno,"%d",rank); string filename; filename=filename+pno; filename=filename+".dot"; char *file=strdup(filename.c_str()); ofstream out; out.open(file); //plot(w, "final_map_25.dat",rank); out&lt;&lt;"graph G {"&lt;&lt;endl; out&lt;&lt;"size=\"10,10\";"&lt;&lt;endl; out&lt;&lt;"ratio=expand;"&lt;&lt;endl; out&lt;&lt;"node [shape=point];"&lt;&lt;endl; //out&lt;&lt;"node [shape=point];"&lt;&lt;endl; for(i=0; i&lt;9; i++) { for(j=0; j&lt;n; j++) { if(j == 0) out&lt;&lt;i+1&lt;&lt;"[pos = \""; out&lt;&lt;w[i][j]; if(j == 0) out&lt;&lt;","; if(j == 1) out&lt;&lt;"!\"]"&lt;&lt;endl; } } for(i=0; i&lt;9; i++) for(j=0; j&lt;i+1; j++) { if(g[i][j] == 1 &amp;&amp; i != j) out&lt;&lt;i+1&lt;&lt;" -- "&lt;&lt;j+1&lt;&lt;";"&lt;&lt;endl; } out&lt;&lt;"}"&lt;&lt;endl; } MPI_Buffer_detach( &amp;bptr, &amp;bl ); MPI_Finalize(); return 0; } // End main Program </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload