Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

BeamSrv2.cc

Go to the documentation of this file.
00001 /*
00002 
00003     Beamforming input server
00004     Copyright (C) 2002 Jussi Laako
00005 
00006     This program is free software; you can redistribute it and/or modify
00007     it under the terms of the GNU General Public License as published by
00008     the Free Software Foundation; either version 2 of the License, or
00009     (at your option) any later version.
00010 
00011     This program is distributed in the hope that it will be useful,
00012     but WITHOUT ANY WARRANTY; without even the implied warranty of
00013     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014     GNU General Public License for more details.
00015 
00016     You should have received a copy of the GNU General Public License
00017     along with this program; if not, write to the Free Software
00018     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00019 
00020 */
00021 
00022 
00023 #include <stdio.h>
00024 #include <stdlib.h>
00025 #include <signal.h>
00026 #include <limits.h>
00027 #include <math.h>
00028 #include <float.h>
00029 
00030 #include <mpi.h>
00031 
00032 #include <DynThreads.hh>
00033 
00034 #include "BeamSrv2.hh"
00035 
00036 
00037 volatile bool bRun = true;
00038 clDynThreads<clBeamSrv2Master> *BeamSrvMasterThreads;
00039 
00040 
00041 void sig_handler (int signo)
00042 {
00043     switch (signo)
00044     {
00045         case SIGINT:
00046         case SIGHUP:
00047         case SIGTERM:
00048             //clBeamSrv2Master::Abort();
00049             bRun = false;
00050             break;
00051     }
00052 }
00053 
00054 
00055 int main (int argc, char *argv[])
00056 {
00057     int iRetVal = 0;
00058     int iRank;
00059 
00060     if (argc > 1)
00061     {
00062         if (strcmp(argv[1], "--version") == 0)
00063         {
00064             fprintf(stderr, "%s v%i.%i.%i\n", argv[0], GLOBAL_VERSMAJ,
00065                 GLOBAL_VERSMIN, GLOBAL_VERSPL);
00066             fprintf(stderr, "Copyright (C) 2002 Jussi Laako\n");
00067             return 0;
00068         }
00069         if (strcmp(argv[1], "--help") == 0)
00070         {
00071             fprintf(stderr, "%s [--version|--help]\n\n", argv[0]);
00072             fprintf(stderr, "--version      display version information\n");
00073             fprintf(stderr, "--help         display this help\n");
00074             return 0;
00075         }
00076     }
00077 
00078     if (MPI_Init(&argc, &argv) != MPI_SUCCESS)
00079     {
00080         fprintf(stderr, "MPI_Init() failed!\n");
00081         return 1;
00082     }
00083     if (MPI_Comm_rank(MPI_COMM_WORLD, &iRank) == MPI_SUCCESS)
00084     {
00085         if (!iRank)
00086         {
00087             signal(SIGINT, sig_handler);
00088             signal(SIGHUP, sig_handler);
00089             signal(SIGTERM, sig_handler);
00090 
00091             clBeamSrv2Master *BeamSrvMaster;
00092 
00093             BeamSrvMaster = new clBeamSrv2Master;
00094             BeamSrvMasterThreads = 
00095                 new clDynThreads<clBeamSrv2Master>(*BeamSrvMaster);
00096             iRetVal = BeamSrvMaster->Main(&argc, &argv);
00097             delete BeamSrvMaster;
00098         }
00099         else
00100         {
00101             clBeamSrv2Slave *BeamSrvSlave;
00102 
00103             BeamSrvSlave = new clBeamSrv2Slave(iRank);
00104             iRetVal = BeamSrvSlave->Main(&argc, &argv);
00105             delete BeamSrvSlave;
00106         }
00107     }
00108     else fprintf(stderr, "MPI_Comm_rank() failed!\n");
00109     MPI_Finalize();
00110     
00111     return iRetVal;
00112 }
00113 
00114 
00115 // --- SHARED
00116 
00117 
00118 bool BeamCommNodeParams (stBeamNodeParams &sNodeParams)
00119 {
00120     if (MPI_Bcast(&sNodeParams.iType, 1, MPI_INT, 0, MPI_COMM_WORLD) !=
00121         MPI_SUCCESS) return false;
00122     if (MPI_Bcast(&sNodeParams.iSensors, 1, MPI_INT, 0, MPI_COMM_WORLD) !=
00123         MPI_SUCCESS) return false;
00124     if (MPI_Bcast(&sNodeParams.fSpacing, 1, MPI_FLOAT, 0, MPI_COMM_WORLD) !=
00125         MPI_SUCCESS) return false;
00126     if (MPI_Bcast(&sNodeParams.fSoundSpeed, 1, MPI_FLOAT, 0, MPI_COMM_WORLD) !=
00127         MPI_SUCCESS) return false;
00128     if (MPI_Bcast(&sNodeParams.iBeamCount, 1, MPI_INT, 0, MPI_COMM_WORLD) !=
00129         MPI_SUCCESS) return false;
00130     if (MPI_Bcast(&sNodeParams.iWindowSize, 1, MPI_INT, 0, MPI_COMM_WORLD) !=
00131         MPI_SUCCESS) return false;
00132     if (MPI_Bcast(&sNodeParams.iBlockSize, 1, MPI_INT, 0, MPI_COMM_WORLD) !=
00133         MPI_SUCCESS) return false;
00134     if (MPI_Bcast(&sNodeParams.fSampleRate, 1, MPI_FLOAT, 0, MPI_COMM_WORLD) !=
00135         MPI_SUCCESS) return false;
00136     return true;
00137 }
00138 
00139 
00140 bool BeamCommInData (GDT *fpData, int iDataCount)
00141 {
00142     if (typeid(GDT) == typeid(float))
00143     {
00144         if (MPI_Bcast(fpData, iDataCount, MPI_FLOAT, 0, MPI_COMM_WORLD) !=
00145             MPI_SUCCESS) return false;
00146     }
00147     else if (typeid(GDT) == typeid(double))
00148     {
00149         if (MPI_Bcast(fpData, iDataCount, MPI_DOUBLE, 0, MPI_COMM_WORLD) !=
00150             MPI_SUCCESS) return false;
00151     }
00152     return true;
00153 }
00154 
00155 
00156 // --- MASTER
00157 
00158 
00159 bool clBeamSrv2Master::ReadConfig ()
00160 {
00161     if (!Cfg.GetInt("Type", &sNodeParams.iType))
00162     {
00163         Log.Add('!', "Parameter \"Type\" is missing");
00164         return false;
00165     }
00166     if (!Cfg.GetInt("Sensors", &sNodeParams.iSensors))
00167     {
00168         Log.Add('!', "Parameter \"Sensors\" is missing");
00169         return false;
00170     }
00171     if (!Cfg.GetFlt("Spacing", &sNodeParams.fSpacing))
00172     {
00173         Log.Add('!', "Parameter \"Spacing\" is missing");
00174         return false;
00175     }
00176     if (!Cfg.GetFlt("SoundSpeed", &sNodeParams.fSoundSpeed))
00177     {
00178         Log.Add('!', "Parameter \"SoundSpeed\" is missing");
00179         return false;
00180     }
00181     if (!Cfg.GetInt("Decimate", &iDecimate))
00182     {
00183         Log.Add('!', "Parameter \"Decimate\" is missing");
00184         return false;
00185     }
00186     if (!Cfg.GetInt("WindowSize", &sNodeParams.iWindowSize))
00187     {
00188         Log.Add('!', "Parameter \"WindowSize\" is missing");
00189         return false;
00190     }
00191     if (!Cfg.GetInt("BlockSize", &sNodeParams.iBlockSize))
00192     {
00193         Log.Add('!', "Parameter \"BlockSize\" is missing");
00194         return false;
00195     }
00196     if (!Cfg.GetInt("Beams", &sNodeParams.iBeamCount))
00197     {
00198         Log.Add('!', "Parameter \"Beams\" is missing");
00199         return false;
00200     }
00201 
00202     return true;
00203 }
00204 
00205 
00206 bool clBeamSrv2Master::InitFilterBank ()
00207 {
00208     int iTwosExp;
00209     int iFilterCntr;
00210     float fArrayFreq;
00211 
00212     if (!Cfg.GetInt("FilterType", &iFilterType))
00213     {
00214         Log.Add('!', "Parameter \"FilterType\" is missing");
00215         Abort();
00216     }
00217 
00218     fArrayFreq = sNodeParams.fSoundSpeed / sNodeParams.fSpacing / 2.0f;
00219     if (iDecimate)
00220     {
00221         iTwosExp = (int) 
00222             floor(log((float) sInHdr.dSampleRate / 2.0f / fArrayFreq) / 
00223             log(2.0));
00224         iDecFact = (int) pow(2.0, iTwosExp);
00225     }
00226     else
00227     {
00228         iDecFact = 1;
00229     }
00230     sprintf(cpLogBuf, "Array frequency %g Hz, decimation factor %i", 
00231         fArrayFreq, iDecFact);
00232     Log.Add(' ', cpLogBuf);
00233 
00234     if (iDecFact > 1)
00235     {
00236         FilterBank = new clRecDecimator[sNodeParams.iSensors];
00237         for (iFilterCntr = 0; iFilterCntr < sNodeParams.iSensors; iFilterCntr++)
00238         {
00239             if (!FilterBank[iFilterCntr].Initialize(iDecFact, 
00240                 -sNodeParams.iWindowSize, (GDT *) NULL, (GDT) 0, iFilterType))
00241                 return false;
00242         }
00243     }
00244 
00245     sNodeParams.fSampleRate = (float) sInHdr.dSampleRate / (float) iDecFact;
00246 
00247     return true;
00248 }
00249 
00250 
00251 bool clBeamSrv2Master::InitProcessing ()
00252 {
00253     int iNodeCount;
00254 
00255     MPI_Comm_size(MPI_COMM_WORLD, &iNodeCount);
00256     if (iNodeCount <= sNodeParams.iBeamCount)
00257     {
00258         Log.Add('!', "Insufficient number of nodes available");
00259         return false;
00260     }
00261     
00262     if ((sInHdr.iChannels - iChOffset) < sNodeParams.iSensors)
00263     {
00264         Log.Add('!', "Misconfiguration (not enough channels)");
00265         return false;
00266     }
00267 
00268     return BeamCommNodeParams(sNodeParams);
00269 }
00270 
00271 
00272 bool clBeamSrv2Master::WaitReady ()
00273 {
00274     int iNodeCntr;
00275     int iNode;
00276     
00277     for (iNodeCntr = 0; iNodeCntr < sNodeParams.iBeamCount; iNodeCntr++)
00278     {
00279         if (MPI_Recv(&iNode, 1, MPI_INT, iNodeCntr + 1, BS_TAG_READY,
00280             MPI_COMM_WORLD, MPI_STATUS_IGNORE) != MPI_SUCCESS)
00281         {
00282             sprintf(cpLogBuf, "Communication error with rank %i",
00283                 iNodeCntr + 1);
00284             Log.Add('!', cpLogBuf);
00285             return false;
00286         }
00287     }
00288     return true;
00289 }
00290 
00291 
00292 void clBeamSrv2Master::ProcessLoop ()
00293 {
00294     bool bInData;
00295     int iNodeCntr;
00296     int iInDataCount;
00297     int iNodeDataCount;
00298     int iNodeResCount;
00299     clAlloc InData;
00300     clAlloc NodeData;
00301     clAlloc NodeRes;
00302     clAlloc LOutData;
00303     clDSPOp DSP;
00304 
00305     iInDataCount = sNodeParams.iBlockSize * sInHdr.iChannels;
00306     iNodeDataCount = sNodeParams.iBlockSize * sNodeParams.iSensors;
00307     iNodeResCount = sNodeParams.iBlockSize;
00308     iOutDataCount = sNodeParams.iBlockSize * sNodeParams.iBeamCount;
00309     InData.Size(iInDataCount * sizeof(GDT));
00310     NodeData.Size(iNodeDataCount * sizeof(GDT));
00311     NodeRes.Size(iNodeResCount * sizeof(GDT));
00312     LOutData.Size(iOutDataCount * sizeof(GDT));
00313     OutData.Size(iOutDataCount * sizeof(GDT));
00314     while (bRun)
00315     {
00316         SemIn.Wait();
00317         MtxIn.Wait();
00318         bInData = InBuf.Get((GDT *) InData, iInDataCount);
00319         MtxIn.Release();
00320         if (!bInData) continue;
00321 
00322         if (iDecFact <= 1)
00323         {
00324             CompactData(NodeData, InData, 
00325                 sNodeParams.iSensors, sInHdr.iChannels,
00326                 iChOffset, sNodeParams.iBlockSize);
00327         }
00328         else
00329         {
00330             if (!FilterData(NodeData, InData,
00331                 sNodeParams.iSensors, sInHdr.iChannels,
00332                 iChOffset, sNodeParams.iBlockSize))
00333                 continue;
00334         }
00335 
00336         BeamCommInData(NodeData, iNodeDataCount);
00337 
00338         for (iNodeCntr = 0; iNodeCntr < sNodeParams.iBeamCount; iNodeCntr++)
00339         {
00340             if (typeid(GDT) == typeid(float))
00341             {
00342                 if (MPI_Recv(NodeRes, iNodeResCount, MPI_FLOAT, iNodeCntr + 1,
00343                     BS_TAG_RES, MPI_COMM_WORLD, MPI_STATUS_IGNORE) != 
00344                     MPI_SUCCESS)
00345                 {
00346                     sprintf(cpLogBuf, "Communication error with rank %i",
00347                         iNodeCntr + 1);
00348                     Log.Add('!', cpLogBuf);
00349                 }
00350             }
00351             else if (typeid(GDT) == typeid(double))
00352             {
00353                 if (MPI_Recv(NodeRes, iNodeResCount, MPI_DOUBLE, iNodeCntr + 1,
00354                     BS_TAG_RES, MPI_COMM_WORLD, MPI_STATUS_IGNORE) != 
00355                     MPI_SUCCESS)
00356                 {
00357                     sprintf(cpLogBuf, "Communication error with rank %i",
00358                         iNodeCntr + 1);
00359                     Log.Add('!', cpLogBuf);
00360                 }
00361             }
00362             DSP.Pack((GDT *) LOutData, (GDT *) NodeRes, iNodeCntr, 
00363                 sNodeParams.iBeamCount, iNodeResCount);
00364         }
00365 
00366         MtxOut.Wait();
00367         OutData = LOutData;
00368         CndOut.NotifyAll();
00369         MtxOut.Release();
00370     }
00371 }
00372 
00373 
00374 void clBeamSrv2Master::CompactData (GDT *fpDest, const GDT *fpSrc,
00375     long lDestChs, long lSrcChs, long lOffset, long lCount)
00376 {
00377     long lSampleCntr;
00378     long lChCntr;
00379     long lSrcIdx;
00380     long lDestIdx;
00381     
00382     for (lSampleCntr = 0; lSampleCntr < lCount; lSampleCntr++)
00383     {
00384         lSrcIdx = lSampleCntr * lSrcChs + lOffset;
00385         lDestIdx = lSampleCntr * lDestChs;
00386         for (lChCntr = 0; lChCntr < lDestChs; lChCntr++)
00387         {
00388             fpDest[lDestIdx + lChCntr] = fpSrc[lSrcIdx + lChCntr];
00389         }
00390     }
00391 }
00392 
00393 
00394 bool clBeamSrv2Master::FilterData (GDT *fpDest, const GDT *fpSrc,
00395     long lDestChs, long lSrcChs, long lOffset, long lCount)
00396 {
00397     long lOutData = 0;
00398     long lChCntr;
00399 
00400     FiltWork.Size(lCount * sizeof(GDT));
00401 
00402 #   ifdef _OPENMP
00403 #   pragma omp parallel
00404 #   pragma omp for
00405 #   endif
00406     for (lChCntr = 0; lChCntr < lDestChs; lChCntr++)
00407     {
00408         DSP.Extract((GDT *) FiltWork, fpSrc, lOffset + lChCntr, lSrcChs, 
00409             lCount * lSrcChs);
00410         FilterBank[lChCntr].Put((GDT *) FiltWork, lCount);
00411         if (FilterBank[lChCntr].Get((GDT *) FiltWork, lCount))
00412         {
00413             DSP.Pack(fpDest, (GDT *) FiltWork, lChCntr, lDestChs, lCount);
00414 #           ifdef _OPENMP
00415 #           pragma omp atomic
00416 #           endif
00417             lOutData++;
00418         }
00419     }
00420     return ((lOutData) ? true : false);
00421 }
00422 
00423 
00424 clBeamSrv2Master::clBeamSrv2Master ()
00425 {
00426     Log.Open(BS_LOGFILE);
00427     Log.Add('*', "Starting");
00428     FilterBank = NULL;
00429 }
00430 
00431 
00432 clBeamSrv2Master::~clBeamSrv2Master ()
00433 {
00434     if (FilterBank != NULL)
00435         delete [] FilterBank;
00436     Log.Add('*', "Stopping");
00437 }
00438 
00439 
00440 int clBeamSrv2Master::Main (int *ipArgC, char ***cpppArgV)
00441 {
00442     int iPort;
00443     int iSockH;
00444     int iServerThreadH;
00445     int iInDataThreadH;
00446     char cpServSock[_POSIX_PATH_MAX + 1];
00447     stRawDataReq sReq;
00448     clSockClie SClie;
00449 
00450     Cfg.SetFileName(BS_CFGFILE);
00451 
00452     if (!Cfg.GetInt("Port", &iPort))
00453     {
00454         Log.Add('!', "Parameter \"Port\" is missing");
00455         Abort();
00456     }
00457     if (!SServ.Bind((unsigned short) iPort))
00458     {
00459         Log.Add('!', "clSockServ::Bind() failed", SServ.GetErrno());
00460         Abort();
00461     }
00462 
00463     if (!Cfg.GetStr("StreamSource", cpServSock))
00464     {
00465         Log.Add('!', "Parameter \"StreamSource\" is missing");
00466         Abort();
00467     }
00468     iSockH = SClie.Connect(cpServSock);
00469     if (iSockH < 0)
00470     {
00471         Log.Add('!', "clSockClie::Connect() failed", SClie.GetErrno());
00472         Abort();
00473     }
00474     SOpIn.SetHandle(iSockH);
00475     sReq.iChannel = -1;
00476     if (SOpIn.ReadN(&sInHdr, sizeof(sInHdr)) <= 0)
00477     {
00478         Log.Add('!', "clSockOp::Readn() failed", SOpIn.GetErrno());
00479         Abort();
00480     }
00481     if (SOpIn.WriteN(&sReq, sizeof(sReq)) <= 0)
00482     {
00483         Log.Add('!', "clSockOp::WriteN() failed", SOpIn.GetErrno());
00484         Abort();
00485     }
00486     sprintf(cpLogBuf, "Receiving data for %i channels at fs %g",
00487         sInHdr.iChannels, sInHdr.dSampleRate);
00488     Log.Add(' ', cpLogBuf);
00489 
00490     if (!Cfg.GetInt("ChOffset", &iChOffset))
00491     {
00492         Log.Add('!', "Parameter \"ChOffset\" is missing");
00493         Abort();
00494     }
00495 
00496     if (!ReadConfig())
00497     {
00498         Log.Add('!', "Reading of configuration file failed");
00499         Abort();
00500     }
00501 
00502     if (!InitFilterBank())
00503     {
00504         Log.Add('!', "Filter bank initialization failed");
00505         Abort();
00506     }
00507 
00508     if (!InitProcessing())
00509     {
00510         Log.Add('!', "Initialization of processing system failed");
00511         Abort();
00512     }
00513 
00514     if (!WaitReady())
00515     {
00516         Log.Add('!', "All nodes didn't confirm ready state");
00517         Abort();
00518     }
00519 
00520     iServerThreadH = BeamSrvMasterThreads->Create(
00521         &clBeamSrv2Master::ServerThread, NULL, false);
00522     iInDataThreadH = BeamSrvMasterThreads->Create(
00523         &clBeamSrv2Master::InDataThread, NULL, false);
00524 
00525     ProcessLoop();
00526     
00527     Abort();
00528     BeamSrvMasterThreads->Wait(iInDataThreadH);
00529     BeamSrvMasterThreads->Wait(iServerThreadH);
00530 
00531     return 0;
00532 }
00533 
00534 
00535 void clBeamSrv2Master::Abort (int iErrorCode)
00536 {
00537     MPI_Abort(MPI_COMM_WORLD, iErrorCode);
00538 }
00539 
00540 
00541 void *clBeamSrv2Master::InDataThread (void *vpParam)
00542 {
00543     int iInDataCount;
00544     clAlloc InData;
00545 
00546     iInDataCount = sNodeParams.iBlockSize * sInHdr.iChannels;
00547     InData.Size(iInDataCount * sizeof(GDT));
00548     while (bRun)
00549     {
00550         if (SOpIn.ReadN(InData, InData.GetSize()) <= 0)
00551         {
00552             Abort();
00553             break;
00554         }
00555 
00556         MtxIn.Wait();
00557         InBuf.Put((GDT *) InData, iInDataCount);
00558         MtxIn.Release();
00559         SemIn.Post();
00560     }
00561     
00562     return NULL;
00563 }
00564 
00565 
00566 void *clBeamSrv2Master::ServerThread (void *vpParam)
00567 {
00568     int iSockH;
00569 
00570     while (bRun)
00571     {
00572         iSockH = SServ.WaitForConnect(BS_ACCEPT_TO);
00573         if (iSockH >= 0)
00574         {
00575             BeamSrvMasterThreads->Create(&clBeamSrv2Master::ServeClientThread,
00576                 (void *) iSockH, true);
00577         }
00578     }
00579 
00580     return NULL;
00581 }
00582 
00583 
00584 void *clBeamSrv2Master::ServeClientThread (void *vpParam)
00585 {
00586     int iSockH = (int) vpParam;
00587     int iMsgSize;
00588     stSoundStart sOutHdr;
00589     clAlloc OutMsg;
00590     clAlloc LOutData;
00591     clSockOp SOp;
00592     clSoundMsg Msg;
00593     
00594     Log.Add('+', "Client connected");
00595     SOp.SetHandle(iSockH);
00596     iMsgSize = iOutDataCount * sizeof(GDT);
00597     OutMsg.Size(GLOBAL_HEADER_LEN + iMsgSize);
00598     LOutData.Size(iOutDataCount * sizeof(GDT));
00599 
00600     sOutHdr.iChannels = sNodeParams.iBeamCount;
00601     sOutHdr.dSampleRate = sNodeParams.fSampleRate;
00602     sOutHdr.iFragmentSize = iOutDataCount;
00603     sOutHdr.iCompress = MSG_SOUND_COMPRESS_NONE;
00604     Msg.SetStart(OutMsg, &sOutHdr);
00605     if (SOp.WriteN(OutMsg, GLOBAL_HEADER_LEN) <= 0)
00606     {
00607         Log.Add('-', "Error sending header to client, disconnecting");
00608         return NULL;
00609     }
00610 
00611     if (!SOp.DisableNagle())
00612     {
00613         Log.Add('#', "Failed to disable TCP Nagle algorithm", SOp.GetErrno());
00614     }
00615     if (!SOp.SetTypeOfService(IPTOS_LOWDELAY))
00616     {
00617         Log.Add('#', "Unable to set type-of-service flag", SOp.GetErrno());
00618     }
00619 
00620     while (bRun)
00621     {
00622         MtxOut.Wait();
00623         CndOut.Wait(MtxOut.GetPtr());
00624         LOutData = OutData;
00625         MtxOut.Release();
00626         
00627         Msg.SetData(OutMsg, (GDT *) LOutData, iOutDataCount);
00628         if (SOp.WriteN(OutMsg, iMsgSize) <= 0)
00629             break;
00630     }
00631 
00632     Log.Add('-', "Client disconnected");
00633     return NULL;
00634 }
00635 
00636 
00637 // --- SLAVE
00638 
00639 
00640 bool clBeamSrv2Slave::SendReady ()
00641 {
00642     if (MPI_Send(&iRank, 1, MPI_INT, 0, BS_TAG_READY, MPI_COMM_WORLD) !=
00643         MPI_SUCCESS)
00644         return false;
00645     return true;
00646 }
00647 
00648 
00649 bool clBeamSrv2Slave::SendRes (GDT *fpData, int iDataCount)
00650 {
00651     if (typeid(GDT) == typeid(float))
00652     {
00653         if (MPI_Send(fpData, iDataCount, MPI_FLOAT, 0, BS_TAG_RES,
00654             MPI_COMM_WORLD) != MPI_SUCCESS)
00655             return false;
00656     }
00657     else if (typeid(GDT) == typeid(double))
00658     {
00659         if (MPI_Send(fpData, iDataCount, MPI_DOUBLE, 0, BS_TAG_RES,
00660             MPI_COMM_WORLD) != MPI_SUCCESS)
00661             return false;
00662     }
00663     return true;
00664 }
00665 
00666 
00667 clBeamSrv2Slave::clBeamSrv2Slave (int iRankP)
00668 {
00669     iRank = iRankP;
00670 }
00671 
00672 
00673 clBeamSrv2Slave::~clBeamSrv2Slave ()
00674 {
00675 }
00676 
00677 
00678 int clBeamSrv2Slave::Main (int *ipArgC, char ***cpppArgV)
00679 {
00680     int iNodeCount;
00681     int iInDataCount;
00682     int iOutDataCount;
00683     float fDir;
00684     clAlloc InData;
00685     clAlloc OutData;
00686     clDSPOp DSP;
00687 
00688     MPI_Comm_size(MPI_COMM_WORLD, &iNodeCount);
00689 
00690     if (!BeamCommNodeParams(sNodeParams))
00691         return 1;
00692 
00693     fDir = acos(-1.0) / (sNodeParams.iBeamCount - 1) * (iRank - 1) - asin(1.0);
00694     iInDataCount = sNodeParams.iSensors * sNodeParams.iBlockSize;
00695     iOutDataCount = sNodeParams.iBlockSize;
00696     InData.Size(iInDataCount * sizeof(GDT));
00697     OutData.Size(iOutDataCount * sizeof(GDT));
00698 
00699     switch (sNodeParams.iType)
00700     {
00701         case BS_ARRAY_TYPE_DIPOLE:
00702             if (!FBDipole.Initialize(sNodeParams.fSpacing,
00703                 sNodeParams.iWindowSize, sNodeParams.fSampleRate))
00704                 return 2;
00705             FBDipole.SetSoundSpeed(sNodeParams.fSoundSpeed);
00706             FBDipole.SetDirection(fDir, true);
00707             break;
00708         case BS_ARRAY_TYPE_TRIANGLE:
00709             break;
00710         case BS_ARRAY_TYPE_LINE:
00711             if (!FBLine.Initialize(sNodeParams.iSensors,
00712                 sNodeParams.fSpacing, sNodeParams.iWindowSize,
00713                 sNodeParams.fSampleRate))
00714                 return 2;
00715             FBLine.SetSoundSpeed(sNodeParams.fSoundSpeed);
00716             FBLine.SetDirection(fDir, true);
00717             break;
00718         case BS_ARRAY_TYPE_PLANE:
00719             break;
00720         case BS_ARRAY_TYPE_CYLINDER:
00721             break;
00722         case BS_ARRAY_TYPE_SPHERE:
00723             break;
00724     }
00725 
00726     if (!SendReady())
00727         return 3;
00728 
00729     while (bRun)
00730     {
00731         if (!BeamCommInData(InData, iInDataCount))
00732             break;
00733 
00734         switch (sNodeParams.iType)
00735         {
00736             case BS_ARRAY_TYPE_DIPOLE:
00737                 FBDipole.Put(InData, iInDataCount, 0, sNodeParams.iSensors);
00738                 if (FBDipole.Get(OutData, iOutDataCount))
00739                 {
00740                     if (!SendRes(OutData, iOutDataCount))
00741                         return 4;
00742                 }
00743                 else
00744                 {
00745                     DSP.Zero((GDT *) OutData, iOutDataCount);
00746                     if (!SendRes(OutData, iOutDataCount))
00747                         return 4;
00748                 }
00749                 break;
00750             case BS_ARRAY_TYPE_TRIANGLE:
00751                 break;
00752             case BS_ARRAY_TYPE_LINE:
00753                 FBLine.Put(InData, iInDataCount, 0, sNodeParams.iSensors);
00754                 if (FBLine.Get(OutData, iOutDataCount))
00755                 {
00756                     if (!SendRes(OutData, iOutDataCount))
00757                         return 4;
00758                 }
00759                 else
00760                 {
00761                     DSP.Zero((GDT *) OutData, iOutDataCount);
00762                     if (!SendRes(OutData, iOutDataCount))
00763                         return 4;
00764                 }
00765                 break;
00766             case BS_ARRAY_TYPE_PLANE:
00767                 break;
00768             case BS_ARRAY_TYPE_CYLINDER:
00769                 break;
00770             case BS_ARRAY_TYPE_SPHERE:
00771                 break;
00772         }
00773     }
00774 
00775     return 0;
00776 }

Generated on Sun Oct 26 19:11:19 2003 for HASAS by doxygen 1.3.3