00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <stdio.h>
00024 #include <stdlib.h>
00025 #include <string.h>
00026 #include <signal.h>
00027 #include <limits.h>
00028 #include <math.h>
00029 #include <float.h>
00030 #include <unistd.h>
00031 #include <errno.h>
00032 #include <sched.h>
00033 #include <sys/types.h>
00034 #include <sys/socket.h>
00035 #include <sys/wait.h>
00036
00037 #include <vector>
00038
00039 #include <DynThreads.hh>
00040
00041 #include "BeamSrv.hh"
00042
00043
00044 volatile bool bRun = true;
00045 clDynThreads<clBeamSrvMaster> *BeamSrvMasterThreads;
00046 BS_PROCINFOV_T vProcInfo;
00047
00048
00049 void sig_handler_m (int signo)
00050 {
00051 switch (signo)
00052 {
00053 case SIGINT:
00054 case SIGHUP:
00055 case SIGTERM:
00056 bRun = false;
00057 break;
00058 }
00059 }
00060
00061
00062 void sig_handler_s (int signo)
00063 {
00064 switch (signo)
00065 {
00066 case SIGHUP:
00067 case SIGTERM:
00068
00069 exit(0);
00070 break;
00071 }
00072 }
00073
00074
00075 int main (int argc, char *argv[])
00076 {
00077 int iRetVal = 0;
00078 int iProcCntr;
00079 int iProcCount;
00080 int ipSockPair[2];
00081 pid_t pidFork;
00082 BS_PROCINFOV_T::iterator iterProcs;
00083
00084 if (argc < 2)
00085 {
00086 fprintf(stderr, "process count not defined\n");
00087 return 1;
00088 }
00089 if (strcmp(argv[1], "--version") == 0)
00090 {
00091 fprintf(stderr, "%s v%i.%i.%i\n", argv[0], GLOBAL_VERSMAJ,
00092 GLOBAL_VERSMIN, GLOBAL_VERSPL);
00093 fprintf(stderr, "Copyright (C) 2002 Jussi Laako\n");
00094 return 0;
00095 }
00096 if (strcmp(argv[1], "--help") == 0)
00097 {
00098 fprintf(stderr, "%s [--version|--help] <process count>\n\n", argv[0]);
00099 fprintf(stderr, "--version display version information\n");
00100 fprintf(stderr, "--help display this help\n");
00101 return 0;
00102 }
00103 iProcCount = atoi(argv[1]);
00104 for (iProcCntr = 0; iProcCntr < iProcCount; iProcCntr++)
00105 {
00106 if (socketpair(AF_LOCAL, SOCK_STREAM, 0, ipSockPair) < 0)
00107 {
00108 perror("socketpair()");
00109 return 2;
00110 }
00111 pidFork = fork();
00112 if (pidFork == 0)
00113 {
00114 signal(SIGHUP, sig_handler_s);
00115 signal(SIGTERM, sig_handler_s);
00116 signal(SIGPIPE, SIG_IGN);
00117
00118 close(ipSockPair[1]);
00119
00120 clBeamSrvSlave *BeamSrvSlave;
00121
00122 BeamSrvSlave = new clBeamSrvSlave(iProcCntr + 1, ipSockPair[0]);
00123 iRetVal = BeamSrvSlave->Main(&argc, &argv);
00124 delete BeamSrvSlave;
00125 return iRetVal;
00126 }
00127 else
00128 {
00129 close(ipSockPair[0]);
00130
00131 stBeamProcInfo sProcInfo;
00132
00133 sProcInfo.pidProc = pidFork;
00134 sProcInfo.iSockH = ipSockPair[1];
00135 vProcInfo.push_back(sProcInfo);
00136 }
00137 }
00138
00139 signal(SIGINT, sig_handler_m);
00140 signal(SIGHUP, sig_handler_m);
00141 signal(SIGTERM, sig_handler_m);
00142 signal(SIGPIPE, SIG_IGN);
00143
00144 clBeamSrvMaster *BeamSrvMaster;
00145
00146 BeamSrvMaster = new clBeamSrvMaster;
00147 BeamSrvMasterThreads =
00148 new clDynThreads<clBeamSrvMaster>(*BeamSrvMaster);
00149 iRetVal = BeamSrvMaster->Main(&argc, &argv);
00150 delete BeamSrvMaster;
00151
00152 iterProcs = vProcInfo.begin();
00153 while (iterProcs != vProcInfo.end())
00154 {
00155 kill((*iterProcs).pidProc, SIGHUP);
00156 close((*iterProcs).iSockH);
00157 waitpid((*iterProcs).pidProc, NULL, 0);
00158 iterProcs++;
00159 }
00160
00161 return iRetVal;
00162 }
00163
00164
00165
00166
00167
00168 bool clBeamSrvMaster::ReadConfig ()
00169 {
00170 if (!Cfg.GetInt("Type", &sNodeParams.iType))
00171 {
00172 Log.Add('!', "Parameter \"Type\" is missing");
00173 return false;
00174 }
00175 if (!Cfg.GetInt("Sensors", &sNodeParams.iSensors))
00176 {
00177 Log.Add('!', "Parameter \"Sensors\" is missing");
00178 return false;
00179 }
00180 if (!Cfg.GetFlt("Spacing", &sNodeParams.fSpacing))
00181 {
00182 Log.Add('!', "Parameter \"Spacing\" is missing");
00183 return false;
00184 }
00185 if (!Cfg.GetFlt("SoundSpeed", &sNodeParams.fSoundSpeed))
00186 {
00187 Log.Add('!', "Parameter \"SoundSpeed\" is missing");
00188 return false;
00189 }
00190 if (!Cfg.GetInt("Decimate", &iDecimate))
00191 {
00192 Log.Add('!', "Parameter \"Decimate\" is missing");
00193 return false;
00194 }
00195 if (!Cfg.GetInt("WindowSize", &sNodeParams.iWindowSize))
00196 {
00197 Log.Add('!', "Parameter \"WindowSize\" is missing");
00198 return false;
00199 }
00200 if (!Cfg.GetInt("BlockSize", &sNodeParams.iBlockSize))
00201 {
00202 Log.Add('!', "Parameter \"BlockSize\" is missing");
00203 return false;
00204 }
00205 if (!Cfg.GetInt("Beams", &sNodeParams.iBeamCount))
00206 {
00207 Log.Add('!', "Parameter \"Beams\" is missing");
00208 return false;
00209 }
00210
00211 return true;
00212 }
00213
00214
00215 bool clBeamSrvMaster::InitFilterBank ()
00216 {
00217 int iTwosExp;
00218 int iFilterCntr;
00219 float fArrayFreq;
00220
00221 if (!Cfg.GetInt("FilterType", &iFilterType))
00222 {
00223 Log.Add('!', "Parameter \"FilterType\" is missing");
00224 Abort();
00225 }
00226
00227 fArrayFreq = sNodeParams.fSoundSpeed / sNodeParams.fSpacing / 2.0f;
00228 if (iDecimate)
00229 {
00230 iTwosExp = (int)
00231 floor(log((float) sInHdr.dSampleRate / 2.0f / fArrayFreq) /
00232 log(2.0));
00233 iDecFact = (int) pow(2.0, iTwosExp);
00234 }
00235 else
00236 {
00237 iDecFact = 1;
00238 }
00239 sprintf(cpLogBuf, "Array frequency %g Hz, decimation factor %i",
00240 fArrayFreq, iDecFact);
00241 Log.Add(' ', cpLogBuf);
00242
00243 if (iDecFact > 1)
00244 {
00245 fprintf(stderr, "beamsrv(master): Initializing filters...\n");
00246 FilterBank = new clRecDecimator[sNodeParams.iSensors];
00247 for (iFilterCntr = 0; iFilterCntr < sNodeParams.iSensors; iFilterCntr++)
00248 {
00249 if (!FilterBank[iFilterCntr].Initialize(iDecFact,
00250 -sNodeParams.iWindowSize, (GDT *) NULL, (GDT) 0, iFilterType))
00251 return false;
00252 }
00253 fprintf(stderr, "beamsrv(master): Filter initialization complete\n");
00254 }
00255
00256 sNodeParams.fSampleRate = (float) sInHdr.dSampleRate / (float) iDecFact;
00257
00258 return true;
00259 }
00260
00261
00262 bool clBeamSrvMaster::InitProcessing ()
00263 {
00264 if ((int) vProcInfo.size() < sNodeParams.iBeamCount)
00265 {
00266 Log.Add('!', "Insufficient number of processes available");
00267 return false;
00268 }
00269
00270 if ((sInHdr.iChannels - iChOffset) < sNodeParams.iSensors)
00271 {
00272 Log.Add('!', "Misconfiguration (not enough channels)");
00273 return false;
00274 }
00275
00276 return SendNodeParams();
00277 }
00278
00279
00280 void clBeamSrvMaster::ProcessLoop ()
00281 {
00282 bool bInData;
00283 int iNodeCntr;
00284 int iInDataCount;
00285 int iNodeDataCount;
00286 int iNodeResCount;
00287 int iNodeResSize;
00288 clAlloc InData;
00289 clAlloc NodeData;
00290 clAlloc NodeRes;
00291 clAlloc LOutData;
00292 BS_PROCINFOV_T::iterator iterProcs;
00293
00294 iInDataCount = sNodeParams.iBlockSize * sInHdr.iChannels;
00295 iNodeDataCount = sNodeParams.iBlockSize * sNodeParams.iSensors;
00296 iNodeResCount = sNodeParams.iBlockSize;
00297 iNodeResSize = iNodeResCount * sizeof(GDT);
00298 iOutDataCount = sNodeParams.iBlockSize * sNodeParams.iBeamCount;
00299 InData.Size(iInDataCount * sizeof(GDT));
00300 NodeData.Size(iNodeDataCount * sizeof(GDT));
00301 NodeRes.Size(iNodeResCount * sizeof(GDT));
00302 LOutData.Size(iOutDataCount * sizeof(GDT));
00303 OutData.Size(iOutDataCount * sizeof(GDT));
00304
00305 fprintf(stderr, "beamsrv(master): Entering process loop...\n");
00306 while (bRun)
00307 {
00308 SemIn.Wait();
00309 MtxIn.Wait();
00310 bInData = InBuf.Get((GDT *) InData, iInDataCount);
00311 MtxIn.Release();
00312 if (!bInData) continue;
00313
00314 if (!bRun) break;
00315
00316 if (iDecFact <= 1)
00317 {
00318 CompactData(NodeData, InData,
00319 sNodeParams.iSensors, sInHdr.iChannels,
00320 iChOffset, sNodeParams.iBlockSize);
00321 }
00322 else
00323 {
00324 if (!FilterData(NodeData, InData,
00325 sNodeParams.iSensors, sInHdr.iChannels,
00326 iChOffset, sNodeParams.iBlockSize))
00327 continue;
00328 }
00329
00330 SendInData(NodeData, iNodeDataCount);
00331
00332 iterProcs = vProcInfo.begin();
00333 iNodeCntr = 0;
00334 while (iterProcs != vProcInfo.end())
00335 {
00336 # ifndef LINUXSYS
00337 if (recv((*iterProcs).iSockH, NodeRes, iNodeResSize,
00338 MSG_WAITALL) <= 0)
00339 # else
00340 if (recv((*iterProcs).iSockH, NodeRes, iNodeResSize,
00341 MSG_WAITALL|MSG_NOSIGNAL) <= 0)
00342 # endif
00343 {
00344 sprintf(cpLogBuf, "recv() from process %u failed",
00345 (*iterProcs).pidProc);
00346 Log.Add('!', cpLogBuf, errno);
00347 }
00348 DSP.Pack((GDT *) LOutData, (GDT *) NodeRes, iNodeCntr,
00349 sNodeParams.iBeamCount, iNodeResCount);
00350 iterProcs++;
00351 iNodeCntr++;
00352 }
00353
00354 MtxOut.Wait();
00355 OutData = LOutData;
00356 iBlockCntr++;
00357 CndOut.NotifyAll();
00358 MtxOut.Release();
00359 }
00360 }
00361
00362
00363 bool clBeamSrvMaster::SendNodeParams ()
00364 {
00365 BS_PROCINFOV_T::iterator iterProcs;
00366
00367 iterProcs = vProcInfo.begin();
00368 fprintf(stderr, "beamsrv(master): Sending node parameters...\n");
00369 while (iterProcs != vProcInfo.end())
00370 {
00371 # ifndef LINUXSYS
00372 if (send((*iterProcs).iSockH, &sNodeParams, sizeof(sNodeParams), 0) <
00373 (int) sizeof(sNodeParams))
00374 # else
00375 if (send((*iterProcs).iSockH, &sNodeParams, sizeof(sNodeParams),
00376 MSG_NOSIGNAL) < (int) sizeof(sNodeParams))
00377 # endif
00378 {
00379 sprintf(cpLogBuf, "send() to process %u failed",
00380 (*iterProcs).pidProc);
00381 Log.Add('!', cpLogBuf, errno);
00382 return false;
00383 }
00384 iterProcs++;
00385 }
00386 fprintf(stderr, "beamsrv(master): Node parameters sent\n");
00387 return true;
00388 }
00389
00390
00391 bool clBeamSrvMaster::WaitReady ()
00392 {
00393 int iProcess;
00394 BS_PROCINFOV_T::iterator iterProcs;
00395
00396 iterProcs = vProcInfo.begin();
00397 fprintf(stderr,
00398 "beamsrv(master): Waiting for ready messages... (may take a while)\n");
00399 while (iterProcs != vProcInfo.end())
00400 {
00401 # ifndef LINUXSYS
00402 if (recv((*iterProcs).iSockH, &iProcess, sizeof(int), MSG_WAITALL) <= 0)
00403 # else
00404 if (recv((*iterProcs).iSockH, &iProcess, sizeof(int),
00405 MSG_WAITALL|MSG_NOSIGNAL) <= 0)
00406 # endif
00407 {
00408 sprintf(cpLogBuf, "recv() from process %u failed",
00409 (*iterProcs).pidProc);
00410 Log.Add('!', cpLogBuf, errno);
00411 return false;
00412 }
00413 fprintf(stderr, "beamsrv(master): slave%i (%u) ready\n",
00414 iProcess, (*iterProcs).pidProc);
00415 iterProcs++;
00416 }
00417 fprintf(stderr, "beamsrv(master): All nodes are ready\n");
00418 return true;
00419 }
00420
00421
00422 bool clBeamSrvMaster::SendInData (const GDT *fpData, int iDataCount)
00423 {
00424 int iDataSize;
00425 BS_PROCINFOV_T::iterator iterProcs;
00426
00427 iDataSize = iDataCount * sizeof(GDT);
00428 iterProcs = vProcInfo.begin();
00429 while (iterProcs != vProcInfo.end())
00430 {
00431 # ifndef LINUXSYS
00432 if (send((*iterProcs).iSockH, fpData, iDataSize, 0) < iDataSize)
00433 # else
00434 if (send((*iterProcs).iSockH, fpData, iDataSize, MSG_NOSIGNAL) <
00435 iDataSize)
00436 # endif
00437 {
00438 sprintf(cpLogBuf, "send() to process %u failed",
00439 (*iterProcs).pidProc);
00440 Log.Add('!', cpLogBuf, errno);
00441 return false;
00442 }
00443 iterProcs++;
00444 }
00445 return true;
00446 }
00447
00448
00449 void clBeamSrvMaster::CompactData (GDT *fpDest, const GDT *fpSrc,
00450 long lDestChs, long lSrcChs, long lOffset, long lCount)
00451 {
00452 long lSampleCntr;
00453 long lChCntr;
00454 long lSrcIdx;
00455 long lDestIdx;
00456
00457 for (lSampleCntr = 0; lSampleCntr < lCount; lSampleCntr++)
00458 {
00459 lSrcIdx = lSampleCntr * lSrcChs + lOffset;
00460 lDestIdx = lSampleCntr * lDestChs;
00461 for (lChCntr = 0; lChCntr < lDestChs; lChCntr++)
00462 {
00463 fpDest[lDestIdx + lChCntr] = fpSrc[lSrcIdx + lChCntr];
00464 }
00465 }
00466 }
00467
00468
00469 bool clBeamSrvMaster::FilterData (GDT *fpDest, const GDT *fpSrc,
00470 long lDestChs, long lSrcChs, long lOffset, long lCount)
00471 {
00472 long lOutData = 0;
00473 long lChCntr;
00474
00475 FiltWork.Size(lCount * sizeof(GDT));
00476
00477 # ifdef _OPENMP
00478 # pragma omp parallel
00479 # pragma omp for
00480 # endif
00481 for (lChCntr = 0; lChCntr < lDestChs; lChCntr++)
00482 {
00483 DSP.Extract((GDT *) FiltWork, fpSrc, lOffset + lChCntr, lSrcChs,
00484 lCount * lSrcChs);
00485
00486
00487 FilterBank[lChCntr].Put((GDT *) FiltWork, lCount);
00488 if (FilterBank[lChCntr].Get((GDT *) FiltWork, lCount))
00489 {
00490 DSP.Pack(fpDest, (GDT *) FiltWork, lChCntr, lDestChs, lCount);
00491 # ifdef _OPENMP
00492 # pragma omp atomic
00493 # endif
00494 lOutData++;
00495 }
00496 }
00497 return ((lOutData) ? true : false);
00498 }
00499
00500
00501 clBeamSrvMaster::clBeamSrvMaster ()
00502 {
00503 Log.Open(BS_LOGFILE);
00504 Log.Add('*', "Starting");
00505 iBlockCntr = 0;
00506 FilterBank = NULL;
00507 }
00508
00509
00510 clBeamSrvMaster::~clBeamSrvMaster ()
00511 {
00512 if (FilterBank != NULL)
00513 delete [] FilterBank;
00514 Log.Add('*', "Stopping");
00515 }
00516
00517
00518 int clBeamSrvMaster::Main (int *ipArgC, char ***cpppArgV)
00519 {
00520 int iServerThreadH;
00521 int iInDataThreadH;
00522 int iPort;
00523 int iSockH;
00524 char cpServSock[_POSIX_PATH_MAX + 1];
00525 stRawDataReq sReq;
00526 clSockClie SClie;
00527
00528 Cfg.SetFileName(BS_CFGFILE);
00529
00530 if (!Cfg.GetInt("Port", &iPort))
00531 {
00532 Log.Add('!', "Parameter \"Port\" is missing");
00533 Abort();
00534 }
00535 if (!SServ.Bind((unsigned short) iPort))
00536 {
00537 Log.Add('!', "clSockServ::Bind() failed", SServ.GetErrno());
00538 Abort();
00539 }
00540
00541 if (!Cfg.GetStr("StreamSource", cpServSock))
00542 {
00543 Log.Add('!', "Parameter \"StreamSource\" is missing");
00544 Abort();
00545 }
00546 iSockH = SClie.Connect(cpServSock);
00547 if (iSockH < 0)
00548 {
00549 Log.Add('!', "clSockClie::Connect() failed", SClie.GetErrno());
00550 Abort();
00551 }
00552 SOpIn.SetHandle(iSockH);
00553 sReq.iChannel = -1;
00554 if (SOpIn.ReadN(&sInHdr, sizeof(sInHdr)) <= 0)
00555 {
00556 Log.Add('!', "clSockOp::Readn() failed", SOpIn.GetErrno());
00557 Abort();
00558 }
00559 if (SOpIn.WriteN(&sReq, sizeof(sReq)) <= 0)
00560 {
00561 Log.Add('!', "clSockOp::WriteN() failed", SOpIn.GetErrno());
00562 Abort();
00563 }
00564 sprintf(cpLogBuf, "Receiving data for %i channels at fs %g",
00565 sInHdr.iChannels, sInHdr.dSampleRate);
00566 Log.Add(' ', cpLogBuf);
00567
00568 if (!Cfg.GetInt("ChOffset", &iChOffset))
00569 {
00570 Log.Add('!', "Parameter \"ChOffset\" is missing");
00571 Abort();
00572 }
00573
00574 if (!ReadConfig())
00575 {
00576 Log.Add('!', "Reading of configuration file failed");
00577 Abort();
00578 }
00579
00580 if (!InitFilterBank())
00581 {
00582 Log.Add('!', "Filter bank initialization failed");
00583 Abort();
00584 }
00585
00586 if (!InitProcessing())
00587 {
00588 Log.Add('!', "Initialization of processing system failed");
00589 Abort();
00590 }
00591
00592 if (!WaitReady())
00593 {
00594 Log.Add('!', "All nodes didn't confirm ready state");
00595 Abort();
00596 }
00597
00598 iServerThreadH = BeamSrvMasterThreads->Create(
00599 &clBeamSrvMaster::ServerThread, NULL, false);
00600 iInDataThreadH = BeamSrvMasterThreads->Create(
00601 &clBeamSrvMaster::InDataThread, NULL, false);
00602
00603 ProcessLoop();
00604
00605 Abort();
00606 BeamSrvMasterThreads->Wait(iInDataThreadH);
00607 BeamSrvMasterThreads->Wait(iServerThreadH);
00608
00609 return 0;
00610 }
00611
00612
00613 void clBeamSrvMaster::Abort ()
00614 {
00615 bRun = false;
00616 }
00617
00618
00619 void *clBeamSrvMaster::InDataThread (void *vpParam)
00620 {
00621 int iInDataCount;
00622 clAlloc InData;
00623
00624 BeamSrvMasterThreads->SetSched(BeamSrvMasterThreads->Self(),
00625 SCHED_FIFO, BS_INTHREAD_PRIORITY);
00626
00627 iInDataCount = sNodeParams.iBlockSize * sInHdr.iChannels;
00628 InData.Size(iInDataCount * sizeof(GDT));
00629 while (bRun)
00630 {
00631 if (SOpIn.ReadN(InData, InData.GetSize()) <= 0)
00632 {
00633 Abort();
00634 break;
00635 }
00636
00637 MtxIn.Wait();
00638 InBuf.Put((GDT *) InData, iInDataCount);
00639 MtxIn.Release();
00640 SemIn.Post();
00641 }
00642
00643 return NULL;
00644 }
00645
00646
00647 void *clBeamSrvMaster::ServerThread (void *vpParam)
00648 {
00649 int iSockH;
00650
00651 while (bRun)
00652 {
00653 iSockH = SServ.WaitForConnect(BS_ACCEPT_TO);
00654 if (iSockH >= 0)
00655 {
00656 BeamSrvMasterThreads->Create(&clBeamSrvMaster::ServeClientThread,
00657 (void *) iSockH, true);
00658 }
00659 }
00660
00661 return NULL;
00662 }
00663
00664
00665 void *clBeamSrvMaster::ServeClientThread (void *vpParam)
00666 {
00667 int iSockH = (int) vpParam;
00668 int iMsgSize;
00669 int iLBlockCntr;
00670 stSoundStart sOutHdr;
00671 clAlloc OutMsg;
00672 clAlloc LOutData;
00673 clSockOp SOp;
00674 clSoundMsg Msg;
00675
00676 BeamSrvMasterThreads->SetSched(BeamSrvMasterThreads->Self(),
00677 SCHED_FIFO, BS_OUTTHREAD_PRIORITY);
00678
00679 Log.Add('+', "Client connected");
00680 SOp.SetHandle(iSockH);
00681 iMsgSize = iOutDataCount * sizeof(GDT);
00682 OutMsg.Size(GLOBAL_HEADER_LEN + iMsgSize);
00683 LOutData.Size(iOutDataCount * sizeof(GDT));
00684
00685 sOutHdr.iChannels = sNodeParams.iBeamCount;
00686 sOutHdr.dSampleRate = sNodeParams.fSampleRate;
00687 sOutHdr.iFragmentSize = iOutDataCount;
00688 sOutHdr.iCompress = MSG_SOUND_COMPRESS_NONE;
00689 Msg.SetStart(OutMsg, &sOutHdr);
00690 if (SOp.WriteN(OutMsg, GLOBAL_HEADER_LEN) <= 0)
00691 {
00692 Log.Add('-', "Error sending header to client, disconnecting");
00693 return NULL;
00694 }
00695
00696 if (!SOp.DisableNagle())
00697 {
00698 Log.Add('#', "Failed to disable TCP Nagle algorithm", SOp.GetErrno());
00699 }
00700 if (!SOp.SetTypeOfService(IPTOS_LOWDELAY))
00701 {
00702 Log.Add('#', "Unable to set type-of-service flag", SOp.GetErrno());
00703 }
00704
00705 MtxOut.Wait();
00706 iLBlockCntr = iBlockCntr;
00707 MtxOut.Release();
00708
00709 while (bRun)
00710 {
00711 MtxOut.Wait();
00712 CndOut.Wait(MtxOut.GetPtr());
00713 LOutData = OutData;
00714 iLBlockCntr++;
00715 if (iLBlockCntr != iBlockCntr)
00716 {
00717 printf("beamsrv: %i blocks lost\n",
00718 iBlockCntr - iLBlockCntr);
00719 iLBlockCntr = iBlockCntr;
00720 }
00721 MtxOut.Release();
00722
00723 Msg.SetData(OutMsg, (GDT *) LOutData, iOutDataCount);
00724 if (SOp.WriteN(OutMsg, iMsgSize) <= 0)
00725 break;
00726 }
00727
00728 Log.Add('-', "Client disconnected");
00729 return NULL;
00730 }
00731
00732
00733
00734
00735
00736 bool clBeamSrvSlave::RecvParams ()
00737 {
00738 # ifndef LINUXSYS
00739 if (recv(iSockH, &sNodeParams, sizeof(sNodeParams),
00740 MSG_WAITALL) < (int) sizeof(sNodeParams))
00741 # else
00742 if (recv(iSockH, &sNodeParams, sizeof(sNodeParams),
00743 MSG_WAITALL|MSG_NOSIGNAL) < (int) sizeof(sNodeParams))
00744 # endif
00745 return false;
00746 return true;
00747 }
00748
00749
00750 bool clBeamSrvSlave::RecvInData (GDT *fpData, int iDataCount)
00751 {
00752 int iDataSize;
00753
00754 iDataSize = iDataCount * sizeof(GDT);
00755 # ifndef LINUXSYS
00756 if (recv(iSockH, fpData, iDataSize, MSG_WAITALL) < iDataSize)
00757 # else
00758 if (recv(iSockH, fpData, iDataSize, MSG_WAITALL|MSG_NOSIGNAL) < iDataSize)
00759 # endif
00760 return false;
00761 return true;
00762 }
00763
00764
00765 bool clBeamSrvSlave::SendReady ()
00766 {
00767 # ifndef LINUXSYS
00768 if (send(iSockH, &iProcess, sizeof(int), 0) < (int) sizeof(int))
00769 # else
00770 if (send(iSockH, &iProcess, sizeof(int), MSG_NOSIGNAL) < (int) sizeof(int))
00771 # endif
00772 return false;
00773 return true;
00774 }
00775
00776
00777 bool clBeamSrvSlave::SendRes (const GDT *fpData, int iDataCount)
00778 {
00779 int iDataSize;
00780
00781 iDataSize = iDataCount * sizeof(GDT);
00782 # ifndef LINUXSYS
00783 if (send(iSockH, fpData, iDataSize, 0) < iDataSize)
00784 # else
00785 if (send(iSockH, fpData, iDataSize, MSG_NOSIGNAL) < iDataSize)
00786 # endif
00787 return false;
00788 return true;
00789 }
00790
00791
00792 clBeamSrvSlave::clBeamSrvSlave (int iProcessP, int iSockHP)
00793 {
00794 iProcess = iProcessP;
00795 iSockH = iSockHP;
00796 }
00797
00798
00799 clBeamSrvSlave::~clBeamSrvSlave ()
00800 {
00801 close(iSockH);
00802 }
00803
00804
00805 int clBeamSrvSlave::Main (int *ipArgC, char ***cpppArgV)
00806 {
00807 int iInDataCount;
00808 int iOutDataCount;
00809 float fDir;
00810 clAlloc InData;
00811 clAlloc OutData;
00812 clDSPOp DSP;
00813
00814 fprintf(stderr, "beamsrv(slave%i): Receiving node parameters...\n",
00815 iProcess);
00816 if (!RecvParams())
00817 return 1;
00818
00819 fDir = acos(-1.0) / (sNodeParams.iBeamCount - 1) * (iProcess - 1) -
00820 asin(1.0);
00821 fprintf(stderr, "beamsrv(slave%i): Beam direction %.1f\n",
00822 iProcess, 180.0 / acos(-1.0) * fDir);
00823 iInDataCount = sNodeParams.iSensors * sNodeParams.iBlockSize;
00824 iOutDataCount = sNodeParams.iBlockSize;
00825 InData.Size(iInDataCount * sizeof(GDT));
00826 OutData.Size(iOutDataCount * sizeof(GDT));
00827
00828 fprintf(stderr,
00829 "beamsrv(slave%i): Node parameters received, initializing beamformer...\n",
00830 iProcess);
00831
00832 switch (sNodeParams.iType)
00833 {
00834 case BS_ARRAY_TYPE_DIPOLE:
00835 if (!FBDipole.Initialize(sNodeParams.fSpacing,
00836 sNodeParams.iWindowSize, sNodeParams.fSampleRate))
00837 return 2;
00838 FBDipole.SetSoundSpeed(sNodeParams.fSoundSpeed);
00839 FBDipole.SetDirection(fDir, true);
00840 break;
00841 case BS_ARRAY_TYPE_TRIANGLE:
00842 break;
00843 case BS_ARRAY_TYPE_LINE:
00844 if (!FBLine.Initialize(sNodeParams.iSensors,
00845 sNodeParams.fSpacing, sNodeParams.iWindowSize,
00846 sNodeParams.fSampleRate))
00847 return 2;
00848 FBLine.SetSoundSpeed(sNodeParams.fSoundSpeed);
00849 FBLine.SetDirection(fDir, true);
00850 break;
00851 case BS_ARRAY_TYPE_PLANE:
00852 break;
00853 case BS_ARRAY_TYPE_CYLINDER:
00854 break;
00855 case BS_ARRAY_TYPE_SPHERE:
00856 break;
00857 }
00858
00859 fprintf(stderr,
00860 "beamsrv(slave%i): Beamformer initialized, entering process loop...\n",
00861 iProcess);
00862 if (!SendReady())
00863 return 3;
00864
00865 while (bRun)
00866 {
00867 if (!RecvInData(InData, iInDataCount))
00868 break;
00869
00870 switch (sNodeParams.iType)
00871 {
00872 case BS_ARRAY_TYPE_DIPOLE:
00873 FBDipole.Put(InData, iInDataCount, 0, sNodeParams.iSensors);
00874 if (FBDipole.Get(OutData, iOutDataCount))
00875 {
00876 if (!SendRes(OutData, iOutDataCount))
00877 return 4;
00878 }
00879 else
00880 {
00881 DSP.Zero((GDT *) OutData, iOutDataCount);
00882 if (!SendRes(OutData, iOutDataCount))
00883 return 4;
00884 }
00885 break;
00886 case BS_ARRAY_TYPE_TRIANGLE:
00887 break;
00888 case BS_ARRAY_TYPE_LINE:
00889 FBLine.Put(InData, iInDataCount, 0, sNodeParams.iSensors);
00890 if (FBLine.Get(OutData, iOutDataCount))
00891 {
00892 if (!SendRes(OutData, iOutDataCount))
00893 return 4;
00894 }
00895 else
00896 {
00897 DSP.Zero((GDT *) OutData, iOutDataCount);
00898 if (!SendRes(OutData, iOutDataCount))
00899 return 4;
00900 }
00901 break;
00902 case BS_ARRAY_TYPE_PLANE:
00903 break;
00904 case BS_ARRAY_TYPE_CYLINDER:
00905 break;
00906 case BS_ARRAY_TYPE_SPHERE:
00907 break;
00908 }
00909 }
00910
00911 return 0;
00912 }