00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <pthread.h>
00024 #include <stdio.h>
00025 #include <stdlib.h>
00026 #include <unistd.h>
00027 #include <limits.h>
00028 #include <string.h>
00029 #include <signal.h>
00030 #include <errno.h>
00031 #include <math.h>
00032 #include <float.h>
00033 #include <time.h>
00034 #include <sys/time.h>
00035
00036 #include "Direction.hh"
00037
00038
00039 static bool bDebug = false;
00040 static bool bDaemon = false;
00041 static const char *cpaArrayTypes[] = { "Dipole", "Triangle", "Line",
00042 "Plane", "Cylinder", "Sphere" };
00043 static clDirection *Dir;
00044
00045
00046 void SigHandler (int iSigNum)
00047 {
00048 switch (iSigNum)
00049 {
00050 case SIGSEGV:
00051 if (bDebug)
00052 {
00053 printf("Oops, received SIGSEGV, crash...\n");
00054 abort();
00055 }
00056 else exit(-1);
00057 break;
00058 }
00059 }
00060
00061
00062 int main (int argc, char *argv[])
00063 {
00064 int iRetVal;
00065
00066 signal(SIGPIPE, SIG_IGN);
00067 signal(SIGFPE, SIG_IGN);
00068 signal(SIGSEGV, SigHandler);
00069 if (argc >= 2)
00070 {
00071 if (strcmp(argv[1], "--debug") == 0)
00072 {
00073 bDebug = true;
00074 }
00075 if (strcmp(argv[1], "-D") == 0)
00076 {
00077 bDaemon = true;
00078 }
00079 }
00080 if (bDebug) printf("direction: Started\n");
00081 Dir = new clDirection(3, 4);
00082 iRetVal = Dir->Exec();
00083 if (bDebug) printf("direction: Exit = %i\n", iRetVal);
00084 return iRetVal;
00085 }
00086
00087
00088 void *ThrReceiveData (void *vpParam)
00089 {
00090 return Dir->ReceiveData(vpParam);
00091 }
00092
00093
00094 void *ThrProcessData (void *vpParam)
00095 {
00096 return Dir->ProcessData(vpParam);
00097 }
00098
00099
00100 void *ThrSendResults (void *vpParam)
00101 {
00102 return Dir->SendResults(vpParam);
00103 }
00104
00105
00106 bool clDirection::GetRequest ()
00107 {
00108 char cpMsgBuf[GLOBAL_HEADER_LEN];
00109
00110 if (!SOpRequest->ReadSelect(DIR_REQ_TIMEOUT))
00111 {
00112 if (bDebug) printf("direction: clDirection::GetRequest() timeout\n");
00113 return false;
00114 }
00115 if (SOpRequest->ReadN(cpMsgBuf, GLOBAL_HEADER_LEN) != GLOBAL_HEADER_LEN)
00116 {
00117 if (bDebug) printf("direction: clSockOp::ReadN errno %i\n",
00118 SOpRequest->GetErrno());
00119 return false;
00120 }
00121 DirMsg.GetRequest(cpMsgBuf, &sDirRequest);
00122 return true;
00123 }
00124
00125
00126 bool clDirection::GetFirstMsg ()
00127 {
00128 stRawDataReq sDataReq;
00129
00130 if (!SOpData->ReadSelect(DIR_RAW1ST_TIMEOUT))
00131 {
00132 if (bDebug) printf("direction: clDirection::GetFirstMsg() timeout\n");
00133 return false;
00134 }
00135 if (SOpData->ReadN(&sDataFirst, sizeof(stRawDataFirst)) !=
00136 sizeof(stRawDataFirst))
00137 {
00138 if (bDebug) printf("direction: clSockOp::ReadN errno %i\n",
00139 SOpData->GetErrno());
00140 return false;
00141 }
00142 sDataReq.iChannel = -1;
00143 if (SOpData->WriteN(&sDataReq, sizeof(sDataReq)) !=
00144 sizeof(sDataReq))
00145 {
00146 if (bDebug) printf("direction: clSockOp::WriteN errno %i\n",
00147 SOpData->GetErrno());
00148 }
00149 return true;
00150 }
00151
00152
00153 bool clDirection::InitArray ()
00154 {
00155 int iProcCntr;
00156 double dTemp;
00157
00158 if (CfgDirection->GetInt("ArrayType", &iArrayType))
00159 {
00160 if (bDebug) printf("direction: Array type %s\n",
00161 cpaArrayTypes[iArrayType]);
00162 }
00163 else
00164 {
00165 if (!bDaemon) printf("direction: Array type not specified\n");
00166 return false;
00167 }
00168 if (iArrayType != DIR_ARRAY_TYPE_DIPOLE &&
00169 iArrayType != DIR_ARRAY_TYPE_TRIANGLE)
00170 {
00171 CfgDirection->GetInt("ShadingType", &iShadingType);
00172
00173
00174 CfgDirection->GetInt("HorizontalSensors", &iHorizSensors);
00175 if (bDebug) printf("Horizontal sensor count %i\n", iHorizSensors);
00176 if (CfgDirection->GetFltArray("HorizontalSpacings",
00177 fpHorizSpacing) < (iHorizSensors - 1))
00178 {
00179 if (!bDaemon) printf("direction: Not enough sensor spacings\n");
00180 return false;
00181 }
00182 if (iArrayType != DIR_ARRAY_TYPE_LINE)
00183 {
00184 CfgDirection->GetInt("VerticalSensors", &iVertSensors);
00185 if (bDebug) printf("Vertical sensor count %i\n", iVertSensors);
00186 if (CfgDirection->GetFltArray("VerticalSpacings",
00187 fpVertSpacing) < (iVertSensors - 1))
00188 {
00189 if (!bDaemon)
00190 printf("direction: Not enough sensor spacings\n");
00191 return false;
00192 }
00193 }
00194 }
00195 else
00196 {
00197 if (!CfgDirection->GetFlt("SensorSpacing", &dTemp))
00198 {
00199 if (!bDaemon) printf("direction: Sensor spacing not specified\n");
00200 return false;
00201 }
00202 fSensorSpacing = (GDT) dTemp;
00203 }
00204 switch (iArrayType)
00205 {
00206 case DIR_ARRAY_TYPE_DIPOLE:
00207 iRawBufSize = lWindowSize * sizeof(GDT) * sDataFirst.iChannels;
00208 ArDipole.Initialize(fSensorSpacing, sDirRequest.fSoundSpeed,
00209 (int) (sDataFirst.dSampleRate + 0.5), lWindowSize,
00210 sDirRequest.fLowFreqLimit, bDebug);
00211 if ((sDirRequest.iAlgorithm & MSG_DIR_ALG_BEAM) ==
00212 MSG_DIR_ALG_BEAM)
00213 {
00214 if (bDebug) printf("direction: Creating beamformers\n");
00215 for (iProcCntr = 0; iProcCntr < iProcessorCount; iProcCntr++)
00216 {
00217 BeDipole[iProcCntr] = new clBeamDipole(&ArDipole,
00218 sDirRequest.fIntegrationTime,
00219 (int) (sDataFirst.dSampleRate + 0.5),
00220 lWindowSize, bDebug);
00221 }
00222 sDirResHdr.fIntegrationTime =
00223 BeDipole[0]->GetIntegrationTime();
00224 }
00225 if ((sDirRequest.iAlgorithm & MSG_DIR_ALG_CORR) ==
00226 MSG_DIR_ALG_CORR)
00227 {
00228 if (bDebug) printf("direction: Creating correlators\n");
00229 for (iProcCntr = 0; iProcCntr < iProcessorCount; iProcCntr++)
00230 {
00231 CoDipole[iProcCntr] = new clCorrDipole(&ArDipole,
00232 sDirRequest.fIntegrationTime,
00233 (int) (sDataFirst.dSampleRate + 0.5),
00234 lWindowSize, sDirRequest.bDisableFilter, bDebug);
00235 }
00236 sDirResHdr.fIntegrationTime =
00237 CoDipole[0]->GetIntegrationTime();
00238 }
00239 sDirResHdr.fHighFreqLimit = ArDipole.GetArrayFreq();
00240 sDirResHdr.b3DArray = false;
00241 break;
00242 case DIR_ARRAY_TYPE_TRIANGLE:
00243 case DIR_ARRAY_TYPE_LINE:
00244 case DIR_ARRAY_TYPE_PLANE:
00245 case DIR_ARRAY_TYPE_CYLINDER:
00246 case DIR_ARRAY_TYPE_SPHERE:
00247 if (!bDaemon)
00248 printf("direction: This array type is not supported yet\n");
00249 return false;
00250 break;
00251 default:
00252 if (!bDaemon) printf("direction: Unknown array type!\n");
00253 return false;
00254 }
00255 sDirResHdr.lSectorCount = sDirRequest.lSectorCount;
00256 return true;
00257 }
00258
00259
00260 void clDirection::Stop ()
00261 {
00262 MtxClassData.Wait();
00263 bRun = false;
00264 MtxClassData.Release();
00265 }
00266
00267
00268 inline void clDirection::Scale (GDT *fpValue)
00269 {
00270 GDT fLogMult;
00271
00272 switch (sDirRequest.iScaling)
00273 {
00274 case MSG_DIR_SCAL_LOG:
00275 fLogMult = (GDT) (1.0 / log10(2.0));
00276 *fpValue = (GDT) log10(*fpValue + 1.0) * fLogMult;
00277 break;
00278 case MSG_DIR_SCAL_EXP:
00279 *fpValue = (GDT) pow(*fpValue, sDirRequest.fScalingExp);
00280 break;
00281 case MSG_DIR_SCAL_SIN:
00282
00283 *fpValue = (GDT) sin(*fpValue * asin(1));
00284 break;
00285 case MSG_DIR_SCAL_LIN:
00286 default:
00287
00288 break;
00289 }
00290 }
00291
00292
00293 void clDirection::CalcDipoleBeams (int iThreadIdx, long lStartBeam,
00294 long lStopBeam, bool bMultiply)
00295 {
00296 int iDebug = 0;
00297 long lBeamCntr;
00298 GDT fBeamStep;
00299 GDT fBeamDir;
00300 GDT fTempRes;
00301 GDT *fpDirResData = DirResData;
00302
00303 fBeamStep = (sDirRequest.fRightDir - sDirRequest.fLeftDir) /
00304 (GDT) sDirRequest.lSectorCount;
00305 for (lBeamCntr = lStartBeam; lBeamCntr < lStopBeam; lBeamCntr++)
00306 {
00307 fBeamDir = fBeamStep * (GDT) lBeamCntr + sDirRequest.fLeftDir;
00308 fTempRes = BeDipole[iThreadIdx]->Process(fBeamDir);
00309 Scale(&fTempRes);
00310 MtxClassData.Wait();
00311 if (!bMultiply)
00312 fpDirResData[lBeamCntr] = fTempRes;
00313 else
00314 fpDirResData[lBeamCntr] *= fTempRes;
00315 MtxClassData.Release();
00316 iDebug++;
00317 }
00318 BeDipole[iThreadIdx]->SetHistory();
00319 }
00320
00321
00322 void clDirection::CalcDipoleCorrs (int iThreadIdx, long lStartSect,
00323 long lStopSect, bool bMultiply)
00324 {
00325 long lSectCntr;
00326 GDT fSectStep;
00327 GDT fSectDir;
00328 GDT fTempRes;
00329 GDT *fpDirResData = DirResData;
00330
00331 fSectStep = (sDirRequest.fRightDir - sDirRequest.fLeftDir) /
00332 (GDT) sDirRequest.lSectorCount;
00333 for (lSectCntr = lStartSect; lSectCntr < lStopSect; lSectCntr++)
00334 {
00335 fSectDir = fSectStep * (GDT) lSectCntr + sDirRequest.fLeftDir;
00336 fTempRes = CoDipole[iThreadIdx]->Process(fSectDir);
00337 fTempRes = (fTempRes + (GDT) 1.0) * (GDT) 0.5;
00338 Scale(&fTempRes);
00339 MtxClassData.Wait();
00340 if (!bMultiply)
00341 fpDirResData[lSectCntr] = fTempRes;
00342 else
00343 fpDirResData[lSectCntr] *= fTempRes;
00344 MtxClassData.Release();
00345 }
00346 CoDipole[iThreadIdx]->SetHistory();
00347 }
00348
00349
00350 void clDirection::StartTiming ()
00351 {
00352 struct timeval sTimingReal;
00353
00354 ctTiming = clock();
00355 gettimeofday(&sTimingReal, NULL);
00356 dTimingReal = (double) sTimingReal.tv_sec + (double) sTimingReal.tv_usec /
00357 1000000.0;
00358 }
00359
00360
00361 void clDirection::StopTiming ()
00362 {
00363 clock_t ctDiff;
00364 double dDiffTime;
00365 double dRealEnd;
00366 double dRealDiff;
00367 struct timeval sTimingReal;
00368
00369 ctDiff = clock() - ctTiming;
00370 gettimeofday(&sTimingReal, NULL);
00371 dDiffTime = (double) ctDiff / (double) CLOCKS_PER_SEC;
00372 dRealEnd = (double) sTimingReal.tv_sec + (double) sTimingReal.tv_usec /
00373 1000000.0;
00374 dRealDiff = dRealEnd - dTimingReal;
00375 if (dDiffTime > sDirResHdr.fIntegrationTime)
00376 {
00377 if (bDebug)
00378 printf("direction: Underpowered CPU for this task (%f / %f)\n",
00379 sDirResHdr.fIntegrationTime, dDiffTime);
00380 }
00381 else if (dRealDiff > sDirResHdr.fIntegrationTime)
00382 {
00383 if (bDebug)
00384 printf("direction: CPU load too high for this task (%f / %f)\n",
00385 sDirResHdr.fIntegrationTime, dRealDiff);
00386 }
00387 }
00388
00389
00390 clDirection::clDirection (int iInHandle, int iOutHandle)
00391 {
00392 int iProcCntr;
00393
00394 bRun = true;
00395 bConnected = true;
00396 iProcessorCount = 1;
00397 iStartChannel = 0;
00398 iResultsRefCount = 0;
00399 lWindowSize = DIR_DEF_WIN_SIZE;
00400 SOpRequest = new clSockOp(iInHandle);
00401 SOpResult = new clSockOp(iOutHandle);
00402 SOpData = NULL;
00403 CfgStreamDist = new clCfgFile(SD_CFGFILE);
00404 CfgDirection = new clCfgFile(DIR_CFGFILE);
00405 for (iProcCntr = 0; iProcCntr < DIR_MAX_CPUS; iProcCntr++)
00406 {
00407 BeDipole[iProcCntr] = NULL;
00408 CoDipole[iProcCntr] = NULL;
00409 }
00410 }
00411
00412
00413 clDirection::~clDirection ()
00414 {
00415 int iProcCntr;
00416
00417 for (iProcCntr = 0; iProcCntr < iProcessorCount; iProcCntr++)
00418 {
00419 if (BeDipole[iProcCntr] != NULL) delete BeDipole[iProcCntr];
00420 if (CoDipole[iProcCntr] != NULL) delete CoDipole[iProcCntr];
00421 }
00422 delete CfgStreamDist;
00423 delete CfgDirection;
00424 delete SOpRequest;
00425 if (SOpData != NULL) delete SOpData;
00426 delete SOpResult;
00427 }
00428
00429
00430 int clDirection::Exec ()
00431 {
00432 int iDataH;
00433 int iProcCntr;
00434 int iTempWinSize;
00435 char cpIDSockName[_POSIX_PATH_MAX + 1];
00436
00437 if (CfgDirection->GetInt("ProcessorCount", &iProcessorCount))
00438 {
00439 if (bDebug) printf("direction: Using %i CPUs\n", iProcessorCount);
00440 }
00441 if (CfgDirection->GetInt("StartChannel", &iStartChannel))
00442 {
00443 if (bDebug) printf("direction: Data starts at channel %i\n",
00444 iStartChannel);
00445 }
00446 if (bDebug) printf("direction: Get request message\n");
00447 if (!GetRequest()) return 1;
00448 if (CfgStreamDist->GetStr("LocalSocket", cpIDSockName))
00449 {
00450 iDataH = SClient.Connect(cpIDSockName);
00451 if (iDataH >= 0)
00452 {
00453 SOpData = new clSockOp(iDataH);
00454 if (bDebug) printf("direction: Get first data message\n");
00455 if (!GetFirstMsg()) return 1;
00456 if (CfgDirection->GetInt("WindowSize", &iTempWinSize))
00457 lWindowSize = iTempWinSize;
00458 if (bDebug) printf("direction: Initializing array\n");
00459 if (!InitArray()) return 1;
00460 if (bDebug)
00461 {
00462 printf("direction: Background noise estimation and removal: ");
00463 switch (sDirRequest.iRemoveNoise)
00464 {
00465 case MSG_DIR_BNER_TPSW:
00466 printf("Two-Pass Split-Window\n");
00467 break;
00468 case MSG_DIR_BNER_OTA:
00469 printf("Order-Truncate-Average\n");
00470 break;
00471 default:
00472 printf("None\n");
00473 break;
00474 }
00475 }
00476 RawData.Size(iRawBufSize);
00477 DirResData = (sDirRequest.lSectorCount * sizeof(GDT));
00478 if (bDebug) printf("direction: Starting threads\n");
00479 pthread_create(&tidReceiveData, NULL, ThrReceiveData, NULL);
00480 MtxClassData.Wait();
00481 for (iProcCntr = 0; iProcCntr < iProcessorCount; iProcCntr++)
00482 {
00483 pthread_create(&tidpProcessData[iProcCntr], NULL,
00484 ThrProcessData, NULL);
00485 }
00486 MtxClassData.Release();
00487 pthread_create(&tidSendResults, NULL, ThrSendResults, NULL);
00488 if (bDebug) printf("direction: Running...\n");
00489 pthread_join(tidSendResults, NULL);
00490 for (iProcCntr = 0; iProcCntr < iProcessorCount; iProcCntr++)
00491 {
00492 pthread_join(tidpProcessData[iProcCntr], NULL);
00493 }
00494 pthread_join(tidReceiveData, NULL);
00495 if (bDebug) printf("direction: Threads ended\n");
00496 }
00497 else
00498 {
00499 if (!bDaemon) printf("clSockClie::Connect() failed\n");
00500 return 1;
00501 }
00502 }
00503 else
00504 {
00505 if (!bDaemon) printf("clCfgFile::GetStr() failed\n");
00506 return 1;
00507 }
00508 return 0;
00509 }
00510
00511
00512 void *clDirection::ReceiveData (void *vpParam)
00513 {
00514 bool bLocalRun = true;
00515 int iDataCount = iRawBufSize;
00516 sigset_t sigsetThis;
00517
00518 sigemptyset(&sigsetThis);
00519 sigaddset(&sigsetThis, SIGPIPE);
00520 pthread_sigmask(SIG_BLOCK, &sigsetThis, NULL);
00521 if (bDebug) printf("direction: ReceiveData thread running\n");
00522 while (bLocalRun)
00523 {
00524 if (SOpData->ReadSelect(DIR_TIMEOUT))
00525 {
00526 if (SOpData->ReadN(RawData, iDataCount) != iDataCount)
00527 {
00528 if (bDebug) printf("direction: SockOp::ReadN errno %i (%s~%i)\n",
00529 SOpData->GetErrno(), __FILE__, __LINE__);
00530 Stop();
00531 pthread_exit(NULL);
00532 }
00533 switch (iArrayType)
00534 {
00535 case DIR_ARRAY_TYPE_DIPOLE:
00536 MtxDataReady.Wait();
00537 ArDipole.AddData(RawData, iStartChannel,
00538 sDataFirst.iChannels);
00539 CndDataReady.NotifyAll();
00540 MtxDataReady.Release();
00541 break;
00542 case DIR_ARRAY_TYPE_TRIANGLE:
00543 case DIR_ARRAY_TYPE_LINE:
00544 case DIR_ARRAY_TYPE_PLANE:
00545 case DIR_ARRAY_TYPE_CYLINDER:
00546 case DIR_ARRAY_TYPE_SPHERE:
00547 break;
00548 }
00549 }
00550 MtxClassData.Wait();
00551 bLocalRun = bRun;
00552 MtxClassData.Release();
00553 }
00554 if (bDebug) printf("direction: ReceiveData thread ending\n");
00555 return NULL;
00556 }
00557
00558
00559 void *clDirection::ProcessData (void *vpParam)
00560 {
00561 bool bLocalRun = true;
00562 int iProcCntr;
00563 int iThreadIdx = 0;
00564 int iAddDataRes = 0;
00565 int iResultsReady = 0;
00566 long lStartSect;
00567 long lStopSect;
00568 pthread_t tidThis;
00569 sigset_t sigsetThis;
00570
00571 tidThis = pthread_self();
00572 sigemptyset(&sigsetThis);
00573 sigaddset(&sigsetThis, SIGFPE);
00574 pthread_sigmask(SIG_BLOCK, &sigsetThis, NULL);
00575 MtxClassData.Wait();
00576 for (iProcCntr = 0; iProcCntr < iProcessorCount; iProcCntr++)
00577 {
00578 if (tidpProcessData[iProcCntr] == tidThis)
00579 {
00580 iThreadIdx = iProcCntr;
00581 break;
00582 }
00583 }
00584 MtxClassData.Release();
00585 lStartSect = sDirRequest.lSectorCount / iProcessorCount * iThreadIdx;
00586 lStopSect = sDirRequest.lSectorCount / iProcessorCount * (iThreadIdx + 1);
00587 if (bDebug)
00588 printf("direction: ProcessData[%i] thread running (%li - %li)\n",
00589 iThreadIdx, lStartSect, lStopSect);
00590 while (bLocalRun)
00591 {
00592 MtxDataReady.Wait();
00593 if (CndDataReady.Wait(MtxDataReady.GetPtr()))
00594 {
00595 switch (iArrayType)
00596 {
00597 case DIR_ARRAY_TYPE_DIPOLE:
00598 if ((sDirRequest.iAlgorithm & MSG_DIR_ALG_BEAM) ==
00599 MSG_DIR_ALG_BEAM)
00600 {
00601 if (BeDipole[iThreadIdx]->AddData())
00602 iAddDataRes |= MSG_DIR_ALG_BEAM;
00603 }
00604 if ((sDirRequest.iAlgorithm & MSG_DIR_ALG_CORR) ==
00605 MSG_DIR_ALG_CORR)
00606 {
00607 if (CoDipole[iThreadIdx]->AddData())
00608 iAddDataRes |= MSG_DIR_ALG_CORR;
00609 }
00610 MtxDataReady.Release();
00611 StartTiming();
00612 if ((iAddDataRes & MSG_DIR_ALG_BEAM) == MSG_DIR_ALG_BEAM)
00613 {
00614 CalcDipoleBeams(iThreadIdx, lStartSect, lStopSect,
00615 false);
00616 iResultsReady |= MSG_DIR_ALG_BEAM;
00617 }
00618 if ((iAddDataRes & MSG_DIR_ALG_CORR) == MSG_DIR_ALG_CORR)
00619 {
00620 if ((iAddDataRes & MSG_DIR_ALG_BEAM) ==
00621 MSG_DIR_ALG_BEAM)
00622 {
00623 CalcDipoleCorrs(iThreadIdx, lStartSect, lStopSect,
00624 true);
00625 }
00626 else
00627 {
00628 CalcDipoleCorrs(iThreadIdx, lStartSect, lStopSect,
00629 false);
00630 }
00631 iResultsReady |= MSG_DIR_ALG_CORR;
00632 }
00633 StopTiming();
00634 if (iResultsReady == sDirRequest.iAlgorithm)
00635 {
00636 if (iThreadIdx == 0)
00637 {
00638 if ((iResultsReady & MSG_DIR_ALG_BEAM) ==
00639 MSG_DIR_ALG_BEAM)
00640 {
00641 sDirResHdr.fPeakLevel =
00642 BeDipole[iThreadIdx]->GetPeakLevel();
00643 }
00644 else if ((iResultsReady & MSG_DIR_ALG_CORR) ==
00645 MSG_DIR_ALG_CORR)
00646 {
00647 sDirResHdr.fPeakLevel =
00648 CoDipole[iThreadIdx]->GetPeakLevel();
00649 }
00650 }
00651 MtxResultsReady.Wait();
00652 iResultsRefCount++;
00653 CndResultsReady.Notify();
00654 MtxResultsReady.Release();
00655 iAddDataRes = 0;
00656 iResultsReady = 0;
00657 }
00658 break;
00659 case DIR_ARRAY_TYPE_TRIANGLE:
00660 case DIR_ARRAY_TYPE_LINE:
00661 case DIR_ARRAY_TYPE_PLANE:
00662 case DIR_ARRAY_TYPE_CYLINDER:
00663 case DIR_ARRAY_TYPE_SPHERE:
00664 break;
00665 }
00666 }
00667 else
00668 {
00669 MtxDataReady.Release();
00670 }
00671 MtxClassData.Wait();
00672 bLocalRun = bRun;
00673 MtxClassData.Release();
00674 }
00675 if (bDebug) printf("direction: ProcessData[%i] thread ending\n",
00676 iThreadIdx);
00677 return NULL;
00678 }
00679
00680
00681 void *clDirection::SendResults (void *vpParam)
00682 {
00683 bool bLocalRun = true;
00684 int iMsgSize;
00685 GDT fResMin;
00686 GDT fResMax;
00687 GDT *fpDirResData = DirResData;
00688 sigset_t sigsetThis;
00689 clAlloc MsgBuf;
00690
00691 sigemptyset(&sigsetThis);
00692 sigaddset(&sigsetThis, SIGPIPE);
00693 sigaddset(&sigsetThis, SIGFPE);
00694 pthread_sigmask(SIG_BLOCK, &sigsetThis, NULL);
00695 iMsgSize = GLOBAL_HEADER_LEN + sDirRequest.lSectorCount * sizeof(GDT);
00696 MsgBuf.Size(iMsgSize);
00697 if (bDebug) printf("direction: SendResults thread running\n");
00698 while (bLocalRun)
00699 {
00700 MtxResultsReady.Wait();
00701 if (CndResultsReady.Wait(MtxResultsReady.GetPtr()))
00702 {
00703 if (iResultsRefCount == iProcessorCount)
00704 {
00705 MtxClassData.Wait();
00706 switch (sDirRequest.iRemoveNoise)
00707 {
00708 case MSG_DIR_BNER_TPSW:
00709 NoiseEstRem.TPSW(fpDirResData,
00710 (GDT) sDirRequest.fAlpha, sDirRequest.lMeanLength,
00711 sDirRequest.lGapLength, sDirRequest.lSectorCount);
00712 break;
00713 case MSG_DIR_BNER_OTA:
00714 NoiseEstRem.OTA(fpDirResData,
00715 (GDT) sDirRequest.fAlpha, sDirRequest.lMeanLength,
00716 sDirRequest.lSectorCount);
00717 break;
00718 default:
00719
00720 break;
00721 }
00722 if (sDirRequest.bNormalize)
00723 {
00724 DSP.MinMax(&fResMin, &fResMax, fpDirResData,
00725 sDirRequest.lSectorCount);
00726 DSP.Sub(fpDirResData, fResMin, sDirRequest.lSectorCount);
00727 DSP.Mul(fpDirResData, (GDT) 1.0 / (fResMax - fResMin),
00728 sDirRequest.lSectorCount);
00729 }
00730 gettimeofday(&sDirResHdr.sTimeStamp, NULL);
00731 DirMsg.SetResult(MsgBuf, &sDirResHdr, fpDirResData);
00732 MtxClassData.Release();
00733 iResultsRefCount = 0;
00734 MtxResultsReady.Release();
00735 if (SOpResult->WriteSelect(DIR_TIMEOUT))
00736 {
00737 if (SOpResult->WriteN(MsgBuf, iMsgSize) < iMsgSize)
00738 {
00739 if (bDebug)
00740 printf("direction: clSockOp::WriteN errno %i\n",
00741 SOpResult->GetErrno());
00742 Stop();
00743 pthread_exit(NULL);
00744 }
00745 }
00746 else
00747 {
00748 if (bDebug)
00749 printf("direction: clSockOp::WriteSelect() timeout\n");
00750 }
00751 }
00752 else
00753 {
00754 MtxResultsReady.Release();
00755 }
00756 }
00757 else
00758 {
00759 MtxResultsReady.Release();
00760 }
00761 MtxClassData.Wait();
00762 bLocalRun = bRun;
00763 MtxClassData.Release();
00764 }
00765 if (bDebug) printf("direction: SendResults thread ending\n");
00766 return NULL;
00767 }
00768