00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <pthread.h>
00024 #include <stdio.h>
00025 #include <string.h>
00026 #include <math.h>
00027 #include <float.h>
00028 #include <signal.h>
00029 #include <unistd.h>
00030
00031 #ifdef USE_TRILLIUM
00032 #include <tstdio.h>
00033 #endif
00034
00035 #include "Locate.hh"
00036
00037
00038 static bool bDebug = false;
00039 static int iNodeCount;
00040 static int iRank;
00041 clMPIProc MPIProc;
00042 clLocate *Locate;
00043 clSubLocate *SubLocate;
00044
00045
00046 void SigHandler (int iSigNum)
00047 {
00048 switch (iSigNum)
00049 {
00050 case SIGSEGV:
00051 if (bDebug)
00052 {
00053 printf("Oops, received SIGSEGV, crash...\n");
00054 abort();
00055 }
00056 else exit(-1);
00057 break;
00058 }
00059 }
00060
00061
00062 int main (int argc, char *argv[])
00063 {
00064 int iRetVal;
00065
00066 signal(SIGPIPE, SIG_IGN);
00067 signal(SIGFPE, SIG_IGN);
00068 signal(SIGSEGV, SigHandler);
00069 MPIProc.Initialize(&argc, &argv);
00070 MPIProc.GetNodeCount(&iNodeCount);
00071 MPIProc.GetRank(&iRank);
00072 if (argc >= 2)
00073 {
00074 if (strcmp(argv[1], "--debug") == 0)
00075 {
00076 bDebug = true;
00077 }
00078 if (strcmp(argv[1], "--version") == 0)
00079 {
00080 if (iRank == 0)
00081 {
00082 printf("%s v%i.%i.%i\n", argv[0], GLOBAL_VERSMAJ,
00083 GLOBAL_VERSMIN, GLOBAL_VERSPL);
00084 printf("Copyright (C) 2000-2001 Jussi Laako\n");
00085 }
00086 MPIProc.Finalize();
00087 return 0;
00088 }
00089 }
00090 if (iRank == 0)
00091 {
00092 Locate = new clLocate;
00093 iRetVal = Locate->Main(argc, argv);
00094 delete Locate;
00095 }
00096 else
00097 {
00098 SubLocate = new clSubLocate;
00099 iRetVal = SubLocate->Main(argc, argv);
00100 delete SubLocate;
00101 }
00102 MPIProc.Finalize();
00103 return iRetVal;
00104 }
00105
00106
00107
00108
00109
00110 void MasterSigHandler (int iSigNum)
00111 {
00112 switch (iSigNum)
00113 {
00114 case SIGINT:
00115 fprintf(stderr, "Received SIGINT, terminating...\n");
00116 Locate->Stop();
00117 break;
00118 default:
00119 fprintf(stderr, "Received unknown signal, terminating...\n");
00120 Locate->Stop();
00121 }
00122 }
00123
00124
00125 void *WrapProcessThread (void *vpParam)
00126 {
00127 return Locate->ProcessThread(vpParam);
00128 }
00129
00130
00131 void *WrapServeClientThread (void *vpParam)
00132 {
00133 return Locate->ServeClientThread(vpParam);
00134 }
00135
00136
00137 bool clLocate::Initialize ()
00138 {
00139 int iReadRes;
00140 int iHostPort;
00141 long lPosX;
00142 long lPosY;
00143 float fAzimuthDeg;
00144 float fAzimuth;
00145 char cpHostName[LOCATE_HOSTNAME_MAXLEN];
00146 FILE *fileSensors;
00147
00148 Cfg.SetFileName(LOCATE_CFGFILE);
00149 if (!Cfg.GetInt("Width", &lWidth)) return false;
00150 if (!Cfg.GetInt("Height", &lHeight)) return false;
00151 if (!Cfg.GetFlt("Weight", &fWeight)) return false;
00152 if (!Cfg.GetInt("Port", &iPort)) return false;
00153
00154 if (!Cfg.GetInt("WindowSize", &sDirCfg.lWindowSize))
00155 sDirCfg.lWindowSize = LOCATE_DEF_WINDOWSIZE;
00156 if (!Cfg.GetFlt("SoundSpeed", &sDirCfg.fSoundSpeed))
00157 sDirCfg.fSoundSpeed = LOCATE_DEF_SOUNDSPEED;
00158 if (!Cfg.GetFlt("LowFrequency", &sDirCfg.fLowFrequency))
00159 sDirCfg.fLowFrequency = LOCATE_DEF_LOWFREQ;
00160 if (!Cfg.GetFlt("IntegrationTime", &sDirCfg.fIntegrationTime))
00161 sDirCfg.fIntegrationTime = LOCATE_DEF_INTTIME;
00162 if (!Cfg.GetInt("Scaling", &sDirCfg.iScaling))
00163 sDirCfg.iScaling = LOCATE_DEF_SCALING;
00164 if (!Cfg.GetFlt("ScalingExp", &sDirCfg.fScalingExp))
00165 sDirCfg.fScalingExp = LOCATE_DEF_SCALINGEXP;
00166 if (!Cfg.GetInt("RemoveNoise", &sDirCfg.iRemoveNoise))
00167 sDirCfg.iRemoveNoise = LOCATE_DEF_REMOVENOISE;
00168 if (!Cfg.GetFlt("Alpha", &sDirCfg.fAlpha))
00169 sDirCfg.fAlpha = LOCATE_DEF_ALPHA;
00170 if (!Cfg.GetInt("MeanLength", &sDirCfg.lMeanLength))
00171 sDirCfg.lMeanLength = LOCATE_DEF_MEANLENGTH;
00172 if (!Cfg.GetInt("GapLength", &sDirCfg.lGapLength))
00173 sDirCfg.lGapLength = LOCATE_DEF_GAPLENGTH;
00174
00175 fileSensors = fopen(LOCATE_SENSOR_LIST, "rt");
00176 if (fileSensors == NULL) return false;
00177 do
00178 {
00179 iReadRes = fscanf(fileSensors, "%s %i %li %li %f",
00180 cpHostName, &iHostPort,
00181 &lPosX, &lPosY, &fAzimuthDeg);
00182 fAzimuth = DSP.DegToRad(fAzimuthDeg);
00183 if (iReadRes == EOF) break;
00184 if (iSensorCount < (iNodeCount - 1))
00185 {
00186 SendParams(iSensorCount, cpHostName, iHostPort, lPosX, lPosY,
00187 fAzimuth);
00188 iSensorCount++;
00189 }
00190 else fprintf(stderr, "Ran out of nodes!\n");
00191 } while (!feof(fileSensors));
00192 fclose(fileSensors);
00193 fpLocMatrix = (GDT *) LocMatrix.Size(lWidth * lHeight * sizeof(GDT));
00194 iMsgSize = GLOBAL_HEADER_LEN + lWidth * lHeight * sizeof(GDT);
00195 ResMsg.Size(iMsgSize);
00196 return true;
00197 }
00198
00199
00200 void clLocate::SendParams (int iSensor,
00201 const char *cpHostName, int iHostPort,
00202 long lPosX, long lPosY, float fAzimuth)
00203 {
00204 int iDestRank = iSensor + 1;
00205
00206 MPICParam.Send(iDestRank, (char *) cpHostName, LOCATE_HOSTNAME_MAXLEN);
00207 MPICParam.Send(iDestRank, &iHostPort, 1);
00208
00209 MPICParam.Send(iDestRank, &lWidth, 1);
00210 MPICParam.Send(iDestRank, &lHeight, 1);
00211
00212 MPICParam.Send(iDestRank, &lPosX, 1);
00213 MPICParam.Send(iDestRank, &lPosY, 1);
00214 MPICParam.Send(iDestRank, &fAzimuth, 1);
00215
00216 MPICParam.Send(iDestRank, (void *) &sDirCfg, sizeof(stDirCfg));
00217 }
00218
00219
00220 clLocate::clLocate ()
00221 {
00222 bRun = true;
00223 iSensorCount = 0;
00224 Log.Open(LOCATE_LOGFILE);
00225 MPICCtrl.SetTag(LOCATE_CONTROL_TAG);
00226 MPICParam.SetTag(LOCATE_PARAM_TAG);
00227 MPICNormal.SetTag(LOCATE_NORMAL_TAG);
00228 Log.Add('*', "Starting");
00229 }
00230
00231
00232 clLocate::~clLocate ()
00233 {
00234 Log.Add('*', "Stopping");
00235 }
00236
00237
00238 int clLocate::Main (int iArgC, char **cpArgV)
00239 {
00240 int iSockH;
00241 pthread_t ptidProcess;
00242 pthread_t ptidServeClient;
00243 clSockServ SServer;
00244
00245 signal(SIGINT, MasterSigHandler);
00246 if (bDebug) printf("clLocate::Main -> Initialize\n");
00247 if (!Initialize ()) return 1;
00248 if (!SServer.Bind(NULL, iPort)) return 2;
00249 pthread_create(&ptidProcess, NULL, WrapProcessThread, NULL);
00250 if (bDebug) printf("clLocate::Main ... waiting connections ...\n");
00251 while (bRun)
00252 {
00253 iSockH = SServer.WaitForConnect(LOCATE_TIMEOUT);
00254 if (iSockH >= 0)
00255 {
00256 pthread_create(&ptidServeClient, NULL, WrapServeClientThread,
00257 (void *) iSockH);
00258 pthread_detach(ptidServeClient);
00259 }
00260 }
00261 pthread_join(ptidProcess, NULL);
00262 if (bDebug) printf("clLocate::Main END\n");
00263 return 0;
00264 }
00265
00266
00267 void clLocate::Stop ()
00268 {
00269 int iNodeCntr;
00270 int iCtrlCmd = LOCATE_CTRL_STOP;
00271
00272 bRun = false;
00273 for (iNodeCntr = 1; iNodeCntr <= iNodeCount; iNodeCntr++)
00274 MPICCtrl.Send(iNodeCntr, &iCtrlCmd, 1);
00275 }
00276
00277
00278 void *clLocate::ProcessThread (void *vpParam)
00279 {
00280 int iNodeCntr;
00281 GDT *fpSubRes;
00282 stLocateRes sResHdr;
00283 sigset_t sigsetThis;
00284 clAlloc SubRes;
00285 clLocateSystem LocateSystem;
00286
00287 sigemptyset(&sigsetThis);
00288 sigaddset(&sigsetThis, SIGFPE);
00289 sigaddset(&sigsetThis, SIGINT);
00290 pthread_sigmask(SIG_BLOCK, &sigsetThis, NULL);
00291 fpSubRes = (GDT *) SubRes.Size(lWidth * lHeight * sizeof(GDT));
00292 LocateSystem.Initialize(lWidth, lHeight, fWeight);
00293 while (bRun)
00294 {
00295 iNodeCntr = 1;
00296 do
00297 {
00298 if (bDebug) printf("clLocate::ProcessThread ... receiving ...\n");
00299 if (MPICNormal.Recv(MPI_ANY_SOURCE, fpSubRes, lWidth * lHeight))
00300 {
00301 if (bDebug) printf("clLocate::ProcessThread ... adding ...\n");
00302 LocateSystem.Add(fpSubRes);
00303 iNodeCntr++;
00304 }
00305 } while (iNodeCntr < iNodeCount);
00306 if (bDebug) printf("clLocate::ProcessThread ... finalizing ...\n");
00307 LocateSystem.Process();
00308 LocateSystem.GetResults(fpLocMatrix);
00309 sResHdr.lPointCount = lWidth * lHeight;
00310 MtxResMsg.Wait();
00311 Msg.SetResult(ResMsg, &sResHdr, fpLocMatrix);
00312 CndResMsg.NotifyAll();
00313 MtxResMsg.Release();
00314 }
00315 return NULL;
00316 }
00317
00318
00319 void *clLocate::ServeClientThread (void *vpParam)
00320 {
00321 bool bConnected = true;
00322 char cpHdrBuf[GLOBAL_HEADER_LEN];
00323 stLocateHdr sHdr;
00324 sigset_t sigsetThis;
00325 clAlloc LocalMsg;
00326 clSockOp SOp((int) vpParam);
00327 clLocateMsg Msg;
00328
00329 sigemptyset(&sigsetThis);
00330 sigaddset(&sigsetThis, SIGFPE);
00331 sigaddset(&sigsetThis, SIGINT);
00332 sigaddset(&sigsetThis, SIGPIPE);
00333 pthread_sigmask(SIG_BLOCK, &sigsetThis, NULL);
00334 LocalMsg.Size(iMsgSize);
00335 sHdr.iWidth = lWidth;
00336 sHdr.iHeight = lHeight;
00337 Msg.SetHeader(cpHdrBuf, &sHdr);
00338 if (SOp.WriteN(cpHdrBuf, GLOBAL_HEADER_LEN) < GLOBAL_HEADER_LEN)
00339 bConnected = false;
00340 while (bConnected && bRun)
00341 {
00342 MtxResMsg.Wait();
00343 CndResMsg.Wait(MtxResMsg.GetPtr());
00344 memcpy(LocalMsg, ResMsg, iMsgSize);
00345 MtxResMsg.Release();
00346 if (SOp.WriteN(LocalMsg, iMsgSize) < iMsgSize)
00347 bConnected = false;
00348 }
00349 return NULL;
00350 }
00351
00352
00353
00354
00355
00356 bool clSubLocate::RecvParams ()
00357 {
00358 if (!MPICParam.ProbeAny(0)) return false;
00359 if (MPICParam.GetSenderTag() != LOCATE_PARAM_TAG) return false;
00360
00361 MPICParam.Recv(0, cpHostName, LOCATE_HOSTNAME_MAXLEN);
00362 MPICParam.Recv(0, &iHostPort, 1);
00363
00364 MPICParam.Recv(0, &lWidth, 1);
00365 MPICParam.Recv(0, &lHeight, 1);
00366
00367 MPICParam.Recv(0, &lPosX, 1);
00368 MPICParam.Recv(0, &lPosY, 1);
00369 MPICParam.Recv(0, &fAzimuth, 1);
00370
00371 MPICParam.Recv(0, (void *) &sDirCfg, sizeof(stDirCfg));
00372 return true;
00373 }
00374
00375
00376 bool clSubLocate::Initialize ()
00377 {
00378 long lSpectSize = sDirCfg.lWindowSize / 2 + 1;
00379
00380 if (!LocSens.Initialize(lWidth, lHeight, lPosX, lPosY, fAzimuth, false))
00381 return false;
00382 iDirMsgSize = GLOBAL_HEADER_LEN + lSpectSize * sizeof(GDT) * 2;
00383 DirMsg.Size(iDirMsgSize);
00384 fpLevRes = (GDT *) LevRes.Size(sDirCfg.lWindowSize * sizeof(GDT));
00385 fpDirRes = (GDT *) DirRes.Size(sDirCfg.lWindowSize * sizeof(GDT));
00386 return true;
00387 }
00388
00389
00390 bool clSubLocate::ConnectDir ()
00391 {
00392 int iSockH;
00393 char cpReqMsg[GLOBAL_HEADER_LEN];
00394 stDirReq2 sReq;
00395 clSockClie SClient;
00396
00397 sReq.lWindowSize = sDirCfg.lWindowSize;
00398 sReq.fSoundSpeed = sDirCfg.fSoundSpeed;
00399 sReq.fLowFreqLimit = sDirCfg.fLowFrequency;
00400 sReq.fIntegrationTime = sDirCfg.fIntegrationTime;
00401 sReq.iScaling = sDirCfg.iScaling;
00402 sReq.fScalingExp = sDirCfg.fScalingExp;
00403 sReq.iRemoveNoise = sDirCfg.iRemoveNoise;
00404 sReq.fAlpha = sDirCfg.fAlpha;
00405 sReq.lMeanLength = sDirCfg.lMeanLength;
00406 sReq.lGapLength = sDirCfg.lGapLength;
00407 iSockH = SClient.Connect(cpHostName, NULL, iHostPort);
00408 if (iSockH < 0) return false;
00409 SOp.SetHandle(iSockH);
00410 strcpy(cpReqMsg, LOCATE_DIR_PROC);
00411 if (SOp.WriteN(cpReqMsg, GLOBAL_HEADER_LEN) < GLOBAL_HEADER_LEN)
00412 return false;
00413 Msg.SetRequest(cpReqMsg, &sReq);
00414 if (SOp.WriteN(cpReqMsg, GLOBAL_HEADER_LEN) < GLOBAL_HEADER_LEN)
00415 return false;
00416 return true;
00417 }
00418
00419
00420 clSubLocate::clSubLocate ()
00421 {
00422 MPICCtrl.SetTag(LOCATE_CONTROL_TAG);
00423 MPICParam.SetTag(LOCATE_PARAM_TAG);
00424 MPICNormal.SetTag(LOCATE_NORMAL_TAG);
00425 }
00426
00427
00428 clSubLocate::~clSubLocate ()
00429 {
00430 }
00431
00432
00433 int clSubLocate::Main (int iArgC, char **cpArgV)
00434 {
00435 int iCtrlCmd;
00436 stDirRes2 sHdr;
00437
00438 # ifdef USE_TRILLIUM
00439 if (bDebug) tprintf("clSubLocate::Main -> RecvParams\n");
00440 # endif
00441 if (!RecvParams()) return 1;
00442 # ifdef USE_TRILLIUM
00443 if (bDebug) tprintf("clSubLocate::Main -> Initialize\n");
00444 # endif
00445 if (!Initialize()) return 2;
00446 # ifdef USE_TRILLIUM
00447 if (bDebug) tprintf("clSubLocate::Main -> ConnectDir\n");
00448 # endif
00449 if (!ConnectDir()) return 3;
00450 SOp.SetRecvBufSize(iDirMsgSize * 2);
00451 while (!MPICCtrl.ProbeNB(0))
00452 {
00453 # ifdef USE_TRILLIUM
00454 if (bDebug) tprintf("clSubLocate::Main ... receiving ...\n");
00455 # endif
00456 if (SOp.ReadN(DirMsg, iDirMsgSize) < iDirMsgSize) return 4;
00457 Msg.GetResult(DirMsg, &sHdr, fpLevRes, fpDirRes);
00458 # ifdef USE_TRILLIUM
00459 if (bDebug) tprintf("clSubLocate::Main ... working ...\n");
00460 # endif
00461 LocSens.SetDirectionValues(fpLevRes, fpDirRes, sHdr.lResultCount,
00462 sHdr.lMinBin, sHdr.lMaxBin, sHdr.fFreqResolution);
00463 # ifdef USE_TRILLIUM
00464 if (bDebug) tprintf("clSubLocate::Main ... sending ...\n");
00465 # endif
00466 if (!MPICNormal.Send(0, LocSens.GetResultMatrix(), lWidth * lHeight))
00467 return 5;
00468 }
00469 MPICCtrl.Recv(0, &iCtrlCmd, 1);
00470 # ifdef USE_TRILLIUM
00471 if (bDebug) tprintf("clSubLocate::Main -> OK, I'm out of here (%i)\n",
00472 iCtrlCmd);
00473 # endif
00474
00475
00476 return 0;
00477 }
00478