Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

BeamSrv.cc

Go to the documentation of this file.
00001 /*
00002 
00003     Beamforming input server
00004     Copyright (C) 2002 Jussi Laako
00005 
00006     This program is free software; you can redistribute it and/or modify
00007     it under the terms of the GNU General Public License as published by
00008     the Free Software Foundation; either version 2 of the License, or
00009     (at your option) any later version.
00010 
00011     This program is distributed in the hope that it will be useful,
00012     but WITHOUT ANY WARRANTY; without even the implied warranty of
00013     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014     GNU General Public License for more details.
00015 
00016     You should have received a copy of the GNU General Public License
00017     along with this program; if not, write to the Free Software
00018     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00019 
00020 */
00021 
00022 
00023 #include <stdio.h>
00024 #include <stdlib.h>
00025 #include <string.h>
00026 #include <signal.h>
00027 #include <limits.h>
00028 #include <math.h>
00029 #include <float.h>
00030 #include <unistd.h>
00031 #include <errno.h>
00032 #include <sched.h>
00033 #include <sys/types.h>
00034 #include <sys/socket.h>
00035 #include <sys/wait.h>
00036 
00037 #include <vector>
00038 
00039 #include <DynThreads.hh>
00040 
00041 #include "BeamSrv.hh"
00042 
00043 
00044 volatile bool bRun = true;
00045 clDynThreads<clBeamSrvMaster> *BeamSrvMasterThreads;
00046 BS_PROCINFOV_T vProcInfo;
00047 
00048 
00049 void sig_handler_m (int signo)
00050 {
00051     switch (signo)
00052     {
00053         case SIGINT:
00054         case SIGHUP:
00055         case SIGTERM:
00056             bRun = false;
00057             break;
00058     }
00059 }
00060 
00061 
00062 void sig_handler_s (int signo)
00063 {
00064     switch (signo)
00065     {
00066         case SIGHUP:
00067         case SIGTERM:
00068             //bRun = false;
00069             exit(0);
00070             break;
00071     }
00072 }
00073 
00074 
00075 int main (int argc, char *argv[])
00076 {
00077     int iRetVal = 0;
00078     int iProcCntr;
00079     int iProcCount;
00080     int ipSockPair[2];
00081     pid_t pidFork;
00082     BS_PROCINFOV_T::iterator iterProcs;
00083 
00084     if (argc < 2)
00085     {
00086         fprintf(stderr, "process count not defined\n");
00087         return 1;
00088     }
00089     if (strcmp(argv[1], "--version") == 0)
00090     {
00091         fprintf(stderr, "%s v%i.%i.%i\n", argv[0], GLOBAL_VERSMAJ,
00092             GLOBAL_VERSMIN, GLOBAL_VERSPL);
00093         fprintf(stderr, "Copyright (C) 2002 Jussi Laako\n");
00094         return 0;
00095     }
00096     if (strcmp(argv[1], "--help") == 0)
00097     {
00098         fprintf(stderr, "%s [--version|--help] <process count>\n\n", argv[0]);
00099         fprintf(stderr, "--version      display version information\n");
00100         fprintf(stderr, "--help         display this help\n");
00101         return 0;
00102     }
00103     iProcCount = atoi(argv[1]);
00104     for (iProcCntr = 0; iProcCntr < iProcCount; iProcCntr++)
00105     {
00106         if (socketpair(AF_LOCAL, SOCK_STREAM, 0, ipSockPair) < 0)
00107         {
00108             perror("socketpair()");
00109             return 2;
00110         }
00111         pidFork = fork();
00112         if (pidFork == 0)
00113         {
00114             signal(SIGHUP, sig_handler_s);
00115             signal(SIGTERM, sig_handler_s);
00116             signal(SIGPIPE, SIG_IGN);
00117     
00118             close(ipSockPair[1]);
00119 
00120             clBeamSrvSlave *BeamSrvSlave;
00121 
00122             BeamSrvSlave = new clBeamSrvSlave(iProcCntr + 1, ipSockPair[0]);
00123             iRetVal = BeamSrvSlave->Main(&argc, &argv);
00124             delete BeamSrvSlave;
00125             return iRetVal;
00126         }
00127         else
00128         {
00129             close(ipSockPair[0]);
00130 
00131             stBeamProcInfo sProcInfo;
00132             
00133             sProcInfo.pidProc = pidFork;
00134             sProcInfo.iSockH = ipSockPair[1];
00135             vProcInfo.push_back(sProcInfo);
00136         }
00137     }
00138 
00139     signal(SIGINT, sig_handler_m);
00140     signal(SIGHUP, sig_handler_m);
00141     signal(SIGTERM, sig_handler_m);
00142     signal(SIGPIPE, SIG_IGN);
00143 
00144     clBeamSrvMaster *BeamSrvMaster;
00145 
00146     BeamSrvMaster = new clBeamSrvMaster;
00147     BeamSrvMasterThreads = 
00148         new clDynThreads<clBeamSrvMaster>(*BeamSrvMaster);
00149     iRetVal = BeamSrvMaster->Main(&argc, &argv);
00150     delete BeamSrvMaster;
00151     
00152     iterProcs = vProcInfo.begin();
00153     while (iterProcs != vProcInfo.end())
00154     {
00155         kill((*iterProcs).pidProc, SIGHUP);
00156         close((*iterProcs).iSockH);
00157         waitpid((*iterProcs).pidProc, NULL, 0);
00158         iterProcs++;
00159     }    
00160     
00161     return iRetVal;
00162 }
00163 
00164 
00165 // --- MASTER
00166 
00167 
00168 bool clBeamSrvMaster::ReadConfig ()
00169 {
00170     if (!Cfg.GetInt("Type", &sNodeParams.iType))
00171     {
00172         Log.Add('!', "Parameter \"Type\" is missing");
00173         return false;
00174     }
00175     if (!Cfg.GetInt("Sensors", &sNodeParams.iSensors))
00176     {
00177         Log.Add('!', "Parameter \"Sensors\" is missing");
00178         return false;
00179     }
00180     if (!Cfg.GetFlt("Spacing", &sNodeParams.fSpacing))
00181     {
00182         Log.Add('!', "Parameter \"Spacing\" is missing");
00183         return false;
00184     }
00185     if (!Cfg.GetFlt("SoundSpeed", &sNodeParams.fSoundSpeed))
00186     {
00187         Log.Add('!', "Parameter \"SoundSpeed\" is missing");
00188         return false;
00189     }
00190     if (!Cfg.GetInt("Decimate", &iDecimate))
00191     {
00192         Log.Add('!', "Parameter \"Decimate\" is missing");
00193         return false;
00194     }
00195     if (!Cfg.GetInt("WindowSize", &sNodeParams.iWindowSize))
00196     {
00197         Log.Add('!', "Parameter \"WindowSize\" is missing");
00198         return false;
00199     }
00200     if (!Cfg.GetInt("BlockSize", &sNodeParams.iBlockSize))
00201     {
00202         Log.Add('!', "Parameter \"BlockSize\" is missing");
00203         return false;
00204     }
00205     if (!Cfg.GetInt("Beams", &sNodeParams.iBeamCount))
00206     {
00207         Log.Add('!', "Parameter \"Beams\" is missing");
00208         return false;
00209     }
00210 
00211     return true;
00212 }
00213 
00214 
00215 bool clBeamSrvMaster::InitFilterBank ()
00216 {
00217     int iTwosExp;
00218     int iFilterCntr;
00219     float fArrayFreq;
00220 
00221     if (!Cfg.GetInt("FilterType", &iFilterType))
00222     {
00223         Log.Add('!', "Parameter \"FilterType\" is missing");
00224         Abort();
00225     }
00226 
00227     fArrayFreq = sNodeParams.fSoundSpeed / sNodeParams.fSpacing / 2.0f;
00228     if (iDecimate)
00229     {
00230         iTwosExp = (int) 
00231             floor(log((float) sInHdr.dSampleRate / 2.0f / fArrayFreq) / 
00232             log(2.0));
00233         iDecFact = (int) pow(2.0, iTwosExp);
00234     }
00235     else
00236     {
00237         iDecFact = 1;
00238     }
00239     sprintf(cpLogBuf, "Array frequency %g Hz, decimation factor %i", 
00240         fArrayFreq, iDecFact);
00241     Log.Add(' ', cpLogBuf);
00242 
00243     if (iDecFact > 1)
00244     {
00245         fprintf(stderr, "beamsrv(master): Initializing filters...\n");
00246         FilterBank = new clRecDecimator[sNodeParams.iSensors];
00247         for (iFilterCntr = 0; iFilterCntr < sNodeParams.iSensors; iFilterCntr++)
00248         {
00249             if (!FilterBank[iFilterCntr].Initialize(iDecFact, 
00250                 -sNodeParams.iWindowSize, (GDT *) NULL, (GDT) 0, iFilterType))
00251                 return false;
00252         }
00253         fprintf(stderr, "beamsrv(master): Filter initialization complete\n");
00254     }
00255 
00256     sNodeParams.fSampleRate = (float) sInHdr.dSampleRate / (float) iDecFact;
00257 
00258     return true;
00259 }
00260 
00261 
00262 bool clBeamSrvMaster::InitProcessing ()
00263 {
00264     if ((int) vProcInfo.size() < sNodeParams.iBeamCount)
00265     {
00266         Log.Add('!', "Insufficient number of processes available");
00267         return false;
00268     }
00269     
00270     if ((sInHdr.iChannels - iChOffset) < sNodeParams.iSensors)
00271     {
00272         Log.Add('!', "Misconfiguration (not enough channels)");
00273         return false;
00274     }
00275 
00276     return SendNodeParams();
00277 }
00278 
00279 
00280 void clBeamSrvMaster::ProcessLoop ()
00281 {
00282     bool bInData;
00283     int iNodeCntr;
00284     int iInDataCount;
00285     int iNodeDataCount;
00286     int iNodeResCount;
00287     int iNodeResSize;
00288     clAlloc InData;
00289     clAlloc NodeData;
00290     clAlloc NodeRes;
00291     clAlloc LOutData;
00292     BS_PROCINFOV_T::iterator iterProcs;
00293 
00294     iInDataCount = sNodeParams.iBlockSize * sInHdr.iChannels;
00295     iNodeDataCount = sNodeParams.iBlockSize * sNodeParams.iSensors;
00296     iNodeResCount = sNodeParams.iBlockSize;
00297     iNodeResSize = iNodeResCount * sizeof(GDT);
00298     iOutDataCount = sNodeParams.iBlockSize * sNodeParams.iBeamCount;
00299     InData.Size(iInDataCount * sizeof(GDT));
00300     NodeData.Size(iNodeDataCount * sizeof(GDT));
00301     NodeRes.Size(iNodeResCount * sizeof(GDT));
00302     LOutData.Size(iOutDataCount * sizeof(GDT));
00303     OutData.Size(iOutDataCount * sizeof(GDT));
00304 
00305     fprintf(stderr, "beamsrv(master): Entering process loop...\n");
00306     while (bRun)
00307     {
00308         SemIn.Wait();
00309         MtxIn.Wait();
00310         bInData = InBuf.Get((GDT *) InData, iInDataCount);
00311         MtxIn.Release();
00312         if (!bInData) continue;
00313 
00314         if (!bRun) break;
00315 
00316         if (iDecFact <= 1)
00317         {
00318             CompactData(NodeData, InData, 
00319                 sNodeParams.iSensors, sInHdr.iChannels,
00320                 iChOffset, sNodeParams.iBlockSize);
00321         }
00322         else
00323         {
00324             if (!FilterData(NodeData, InData,
00325                 sNodeParams.iSensors, sInHdr.iChannels,
00326                 iChOffset, sNodeParams.iBlockSize))
00327                 continue;
00328         }
00329 
00330         SendInData(NodeData, iNodeDataCount);
00331 
00332         iterProcs = vProcInfo.begin();
00333         iNodeCntr = 0;
00334         while (iterProcs != vProcInfo.end())
00335         {
00336 #           ifndef LINUXSYS
00337             if (recv((*iterProcs).iSockH, NodeRes, iNodeResSize, 
00338                 MSG_WAITALL) <= 0)
00339 #           else
00340             if (recv((*iterProcs).iSockH, NodeRes, iNodeResSize, 
00341                 MSG_WAITALL|MSG_NOSIGNAL) <= 0)
00342 #           endif
00343             {
00344                 sprintf(cpLogBuf, "recv() from process %u failed",
00345                     (*iterProcs).pidProc);
00346                 Log.Add('!', cpLogBuf, errno);
00347             }
00348             DSP.Pack((GDT *) LOutData, (GDT *) NodeRes, iNodeCntr, 
00349                 sNodeParams.iBeamCount, iNodeResCount);
00350             iterProcs++;
00351             iNodeCntr++;
00352         }
00353 
00354         MtxOut.Wait();
00355         OutData = LOutData;
00356         iBlockCntr++;
00357         CndOut.NotifyAll();
00358         MtxOut.Release();
00359     }
00360 }
00361 
00362 
00363 bool clBeamSrvMaster::SendNodeParams ()
00364 {
00365     BS_PROCINFOV_T::iterator iterProcs;
00366 
00367     iterProcs = vProcInfo.begin();
00368     fprintf(stderr, "beamsrv(master): Sending node parameters...\n");
00369     while (iterProcs != vProcInfo.end())
00370     {
00371 #       ifndef LINUXSYS
00372         if (send((*iterProcs).iSockH, &sNodeParams, sizeof(sNodeParams), 0) <
00373             (int) sizeof(sNodeParams))
00374 #       else
00375         if (send((*iterProcs).iSockH, &sNodeParams, sizeof(sNodeParams), 
00376             MSG_NOSIGNAL) < (int) sizeof(sNodeParams))
00377 #       endif
00378         {
00379             sprintf(cpLogBuf, "send() to process %u failed",
00380                 (*iterProcs).pidProc);
00381             Log.Add('!', cpLogBuf, errno);
00382             return false;
00383         }
00384         iterProcs++;
00385     }
00386     fprintf(stderr, "beamsrv(master): Node parameters sent\n");
00387     return true;
00388 }
00389 
00390 
00391 bool clBeamSrvMaster::WaitReady ()
00392 {
00393     int iProcess;
00394     BS_PROCINFOV_T::iterator iterProcs;
00395 
00396     iterProcs = vProcInfo.begin();
00397     fprintf(stderr, 
00398         "beamsrv(master): Waiting for ready messages... (may take a while)\n");
00399     while (iterProcs != vProcInfo.end())
00400     {
00401 #       ifndef LINUXSYS
00402         if (recv((*iterProcs).iSockH, &iProcess, sizeof(int), MSG_WAITALL) <= 0)
00403 #       else
00404         if (recv((*iterProcs).iSockH, &iProcess, sizeof(int), 
00405             MSG_WAITALL|MSG_NOSIGNAL) <= 0)
00406 #       endif
00407         {
00408             sprintf(cpLogBuf, "recv() from process %u failed",
00409                 (*iterProcs).pidProc);
00410             Log.Add('!', cpLogBuf, errno);
00411             return false;
00412         }
00413         fprintf(stderr, "beamsrv(master): slave%i (%u) ready\n",
00414             iProcess, (*iterProcs).pidProc);
00415         iterProcs++;
00416     }
00417     fprintf(stderr, "beamsrv(master): All nodes are ready\n");
00418     return true;
00419 }
00420 
00421 
00422 bool clBeamSrvMaster::SendInData (const GDT *fpData, int iDataCount)
00423 {
00424     int iDataSize;
00425     BS_PROCINFOV_T::iterator iterProcs;
00426 
00427     iDataSize = iDataCount * sizeof(GDT);
00428     iterProcs = vProcInfo.begin();
00429     while (iterProcs != vProcInfo.end())
00430     {
00431 #       ifndef LINUXSYS
00432         if (send((*iterProcs).iSockH, fpData, iDataSize, 0) < iDataSize)
00433 #       else
00434         if (send((*iterProcs).iSockH, fpData, iDataSize, MSG_NOSIGNAL) < 
00435             iDataSize)
00436 #       endif
00437         {
00438             sprintf(cpLogBuf, "send() to process %u failed",
00439                 (*iterProcs).pidProc);
00440             Log.Add('!', cpLogBuf, errno);
00441             return false;
00442         }
00443         iterProcs++;
00444     }
00445     return true;
00446 }
00447 
00448 
00449 void clBeamSrvMaster::CompactData (GDT *fpDest, const GDT *fpSrc,
00450     long lDestChs, long lSrcChs, long lOffset, long lCount)
00451 {
00452     long lSampleCntr;
00453     long lChCntr;
00454     long lSrcIdx;
00455     long lDestIdx;
00456     
00457     for (lSampleCntr = 0; lSampleCntr < lCount; lSampleCntr++)
00458     {
00459         lSrcIdx = lSampleCntr * lSrcChs + lOffset;
00460         lDestIdx = lSampleCntr * lDestChs;
00461         for (lChCntr = 0; lChCntr < lDestChs; lChCntr++)
00462         {
00463             fpDest[lDestIdx + lChCntr] = fpSrc[lSrcIdx + lChCntr];
00464         }
00465     }
00466 }
00467 
00468 
00469 bool clBeamSrvMaster::FilterData (GDT *fpDest, const GDT *fpSrc,
00470     long lDestChs, long lSrcChs, long lOffset, long lCount)
00471 {
00472     long lOutData = 0;
00473     long lChCntr;
00474 
00475     FiltWork.Size(lCount * sizeof(GDT));
00476 
00477 #   ifdef _OPENMP
00478 #   pragma omp parallel
00479 #   pragma omp for
00480 #   endif
00481     for (lChCntr = 0; lChCntr < lDestChs; lChCntr++)
00482     {
00483         DSP.Extract((GDT *) FiltWork, fpSrc, lOffset + lChCntr, lSrcChs, 
00484             lCount * lSrcChs);
00485         /*DSP.Extract((GDT *) FiltWork, fpSrc, lOffset, lSrcChs, 
00486             lCount * lSrcChs);*/
00487         FilterBank[lChCntr].Put((GDT *) FiltWork, lCount);
00488         if (FilterBank[lChCntr].Get((GDT *) FiltWork, lCount))
00489         {
00490             DSP.Pack(fpDest, (GDT *) FiltWork, lChCntr, lDestChs, lCount);
00491 #           ifdef _OPENMP
00492 #           pragma omp atomic
00493 #           endif
00494             lOutData++;
00495         }
00496     }
00497     return ((lOutData) ? true : false);
00498 }
00499 
00500 
00501 clBeamSrvMaster::clBeamSrvMaster ()
00502 {
00503     Log.Open(BS_LOGFILE);
00504     Log.Add('*', "Starting");
00505     iBlockCntr = 0;
00506     FilterBank = NULL;
00507 }
00508 
00509 
00510 clBeamSrvMaster::~clBeamSrvMaster ()
00511 {
00512     if (FilterBank != NULL)
00513         delete [] FilterBank;
00514     Log.Add('*', "Stopping");
00515 }
00516 
00517 
00518 int clBeamSrvMaster::Main (int *ipArgC, char ***cpppArgV)
00519 {
00520     int iServerThreadH;
00521     int iInDataThreadH;
00522     int iPort;
00523     int iSockH;
00524     char cpServSock[_POSIX_PATH_MAX + 1];
00525     stRawDataReq sReq;
00526     clSockClie SClie;
00527 
00528     Cfg.SetFileName(BS_CFGFILE);
00529 
00530     if (!Cfg.GetInt("Port", &iPort))
00531     {
00532         Log.Add('!', "Parameter \"Port\" is missing");
00533         Abort();
00534     }
00535     if (!SServ.Bind((unsigned short) iPort))
00536     {
00537         Log.Add('!', "clSockServ::Bind() failed", SServ.GetErrno());
00538         Abort();
00539     }
00540 
00541     if (!Cfg.GetStr("StreamSource", cpServSock))
00542     {
00543         Log.Add('!', "Parameter \"StreamSource\" is missing");
00544         Abort();
00545     }
00546     iSockH = SClie.Connect(cpServSock);
00547     if (iSockH < 0)
00548     {
00549         Log.Add('!', "clSockClie::Connect() failed", SClie.GetErrno());
00550         Abort();
00551     }
00552     SOpIn.SetHandle(iSockH);
00553     sReq.iChannel = -1;
00554     if (SOpIn.ReadN(&sInHdr, sizeof(sInHdr)) <= 0)
00555     {
00556         Log.Add('!', "clSockOp::Readn() failed", SOpIn.GetErrno());
00557         Abort();
00558     }
00559     if (SOpIn.WriteN(&sReq, sizeof(sReq)) <= 0)
00560     {
00561         Log.Add('!', "clSockOp::WriteN() failed", SOpIn.GetErrno());
00562         Abort();
00563     }
00564     sprintf(cpLogBuf, "Receiving data for %i channels at fs %g",
00565         sInHdr.iChannels, sInHdr.dSampleRate);
00566     Log.Add(' ', cpLogBuf);
00567 
00568     if (!Cfg.GetInt("ChOffset", &iChOffset))
00569     {
00570         Log.Add('!', "Parameter \"ChOffset\" is missing");
00571         Abort();
00572     }
00573 
00574     if (!ReadConfig())
00575     {
00576         Log.Add('!', "Reading of configuration file failed");
00577         Abort();
00578     }
00579 
00580     if (!InitFilterBank())
00581     {
00582         Log.Add('!', "Filter bank initialization failed");
00583         Abort();
00584     }
00585 
00586     if (!InitProcessing())
00587     {
00588         Log.Add('!', "Initialization of processing system failed");
00589         Abort();
00590     }
00591 
00592     if (!WaitReady())
00593     {
00594         Log.Add('!', "All nodes didn't confirm ready state");
00595         Abort();
00596     }
00597 
00598     iServerThreadH = BeamSrvMasterThreads->Create(
00599         &clBeamSrvMaster::ServerThread, NULL, false);
00600     iInDataThreadH = BeamSrvMasterThreads->Create(
00601         &clBeamSrvMaster::InDataThread, NULL, false);
00602 
00603     ProcessLoop();
00604     
00605     Abort();
00606     BeamSrvMasterThreads->Wait(iInDataThreadH);
00607     BeamSrvMasterThreads->Wait(iServerThreadH);
00608     
00609     return 0;
00610 }
00611 
00612 
00613 void clBeamSrvMaster::Abort ()
00614 {
00615     bRun = false;
00616 }
00617 
00618 
00619 void *clBeamSrvMaster::InDataThread (void *vpParam)
00620 {
00621     int iInDataCount;
00622     clAlloc InData;
00623 
00624     BeamSrvMasterThreads->SetSched(BeamSrvMasterThreads->Self(),
00625         SCHED_FIFO, BS_INTHREAD_PRIORITY);
00626 
00627     iInDataCount = sNodeParams.iBlockSize * sInHdr.iChannels;
00628     InData.Size(iInDataCount * sizeof(GDT));
00629     while (bRun)
00630     {
00631         if (SOpIn.ReadN(InData, InData.GetSize()) <= 0)
00632         {
00633             Abort();
00634             break;
00635         }
00636 
00637         MtxIn.Wait();
00638         InBuf.Put((GDT *) InData, iInDataCount);
00639         MtxIn.Release();
00640         SemIn.Post();
00641     }
00642     
00643     return NULL;
00644 }
00645 
00646 
00647 void *clBeamSrvMaster::ServerThread (void *vpParam)
00648 {
00649     int iSockH;
00650 
00651     while (bRun)
00652     {
00653         iSockH = SServ.WaitForConnect(BS_ACCEPT_TO);
00654         if (iSockH >= 0)
00655         {
00656             BeamSrvMasterThreads->Create(&clBeamSrvMaster::ServeClientThread,
00657                 (void *) iSockH, true);
00658         }
00659     }
00660 
00661     return NULL;
00662 }
00663 
00664 
00665 void *clBeamSrvMaster::ServeClientThread (void *vpParam)
00666 {
00667     int iSockH = (int) vpParam;
00668     int iMsgSize;
00669     int iLBlockCntr;
00670     stSoundStart sOutHdr;
00671     clAlloc OutMsg;
00672     clAlloc LOutData;
00673     clSockOp SOp;
00674     clSoundMsg Msg;
00675     
00676     BeamSrvMasterThreads->SetSched(BeamSrvMasterThreads->Self(),
00677         SCHED_FIFO, BS_OUTTHREAD_PRIORITY);
00678 
00679     Log.Add('+', "Client connected");
00680     SOp.SetHandle(iSockH);
00681     iMsgSize = iOutDataCount * sizeof(GDT);
00682     OutMsg.Size(GLOBAL_HEADER_LEN + iMsgSize);
00683     LOutData.Size(iOutDataCount * sizeof(GDT));
00684 
00685     sOutHdr.iChannels = sNodeParams.iBeamCount;
00686     sOutHdr.dSampleRate = sNodeParams.fSampleRate;
00687     sOutHdr.iFragmentSize = iOutDataCount;
00688     sOutHdr.iCompress = MSG_SOUND_COMPRESS_NONE;
00689     Msg.SetStart(OutMsg, &sOutHdr);
00690     if (SOp.WriteN(OutMsg, GLOBAL_HEADER_LEN) <= 0)
00691     {
00692         Log.Add('-', "Error sending header to client, disconnecting");
00693         return NULL;
00694     }
00695 
00696     if (!SOp.DisableNagle())
00697     {
00698         Log.Add('#', "Failed to disable TCP Nagle algorithm", SOp.GetErrno());
00699     }
00700     if (!SOp.SetTypeOfService(IPTOS_LOWDELAY))
00701     {
00702         Log.Add('#', "Unable to set type-of-service flag", SOp.GetErrno());
00703     }
00704 
00705     MtxOut.Wait();
00706     iLBlockCntr = iBlockCntr;
00707     MtxOut.Release();
00708 
00709     while (bRun)
00710     {
00711         MtxOut.Wait();
00712         CndOut.Wait(MtxOut.GetPtr());
00713         LOutData = OutData;
00714         iLBlockCntr++;
00715         if (iLBlockCntr != iBlockCntr)
00716         {
00717             printf("beamsrv: %i blocks lost\n",
00718                 iBlockCntr - iLBlockCntr);
00719             iLBlockCntr = iBlockCntr;
00720         }
00721         MtxOut.Release();
00722         
00723         Msg.SetData(OutMsg, (GDT *) LOutData, iOutDataCount);
00724         if (SOp.WriteN(OutMsg, iMsgSize) <= 0)
00725             break;
00726     }
00727 
00728     Log.Add('-', "Client disconnected");
00729     return NULL;
00730 }
00731 
00732 
00733 // --- SLAVE
00734 
00735 
00736 bool clBeamSrvSlave::RecvParams ()
00737 {
00738 #   ifndef LINUXSYS
00739     if (recv(iSockH, &sNodeParams, sizeof(sNodeParams), 
00740         MSG_WAITALL) < (int) sizeof(sNodeParams))
00741 #   else
00742     if (recv(iSockH, &sNodeParams, sizeof(sNodeParams), 
00743         MSG_WAITALL|MSG_NOSIGNAL) < (int) sizeof(sNodeParams))
00744 #   endif
00745         return false;
00746     return true;
00747 }
00748 
00749 
00750 bool clBeamSrvSlave::RecvInData (GDT *fpData, int iDataCount)
00751 {
00752     int iDataSize;
00753     
00754     iDataSize = iDataCount * sizeof(GDT);
00755 #   ifndef LINUXSYS
00756     if (recv(iSockH, fpData, iDataSize, MSG_WAITALL) < iDataSize)
00757 #   else
00758     if (recv(iSockH, fpData, iDataSize, MSG_WAITALL|MSG_NOSIGNAL) < iDataSize)
00759 #   endif
00760         return false;
00761     return true;
00762 }
00763 
00764 
00765 bool clBeamSrvSlave::SendReady ()
00766 {
00767 #   ifndef LINUXSYS
00768     if (send(iSockH, &iProcess, sizeof(int), 0) < (int) sizeof(int))
00769 #   else
00770     if (send(iSockH, &iProcess, sizeof(int), MSG_NOSIGNAL) < (int) sizeof(int))
00771 #   endif
00772         return false;
00773     return true;
00774 }
00775 
00776 
00777 bool clBeamSrvSlave::SendRes (const GDT *fpData, int iDataCount)
00778 {
00779     int iDataSize;
00780 
00781     iDataSize = iDataCount * sizeof(GDT);
00782 #   ifndef LINUXSYS
00783     if (send(iSockH, fpData, iDataSize, 0) < iDataSize)
00784 #   else
00785     if (send(iSockH, fpData, iDataSize, MSG_NOSIGNAL) < iDataSize)
00786 #   endif
00787         return false;
00788     return true;
00789 }
00790 
00791 
00792 clBeamSrvSlave::clBeamSrvSlave (int iProcessP, int iSockHP)
00793 {
00794     iProcess = iProcessP;
00795     iSockH = iSockHP;
00796 }
00797 
00798 
00799 clBeamSrvSlave::~clBeamSrvSlave ()
00800 {
00801     close(iSockH);
00802 }
00803 
00804 
00805 int clBeamSrvSlave::Main (int *ipArgC, char ***cpppArgV)
00806 {
00807     int iInDataCount;
00808     int iOutDataCount;
00809     float fDir;
00810     clAlloc InData;
00811     clAlloc OutData;
00812     clDSPOp DSP;
00813 
00814     fprintf(stderr, "beamsrv(slave%i): Receiving node parameters...\n",
00815         iProcess);
00816     if (!RecvParams())
00817         return 1;
00818 
00819     fDir = acos(-1.0) / (sNodeParams.iBeamCount - 1) * (iProcess - 1) - 
00820         asin(1.0);
00821     fprintf(stderr, "beamsrv(slave%i): Beam direction %.1f\n",
00822         iProcess, 180.0 / acos(-1.0) * fDir);
00823     iInDataCount = sNodeParams.iSensors * sNodeParams.iBlockSize;
00824     iOutDataCount = sNodeParams.iBlockSize;
00825     InData.Size(iInDataCount * sizeof(GDT));
00826     OutData.Size(iOutDataCount * sizeof(GDT));
00827 
00828     fprintf(stderr, 
00829         "beamsrv(slave%i): Node parameters received, initializing beamformer...\n", 
00830         iProcess);
00831 
00832     switch (sNodeParams.iType)
00833     {
00834         case BS_ARRAY_TYPE_DIPOLE:
00835             if (!FBDipole.Initialize(sNodeParams.fSpacing,
00836                 sNodeParams.iWindowSize, sNodeParams.fSampleRate))
00837                 return 2;
00838             FBDipole.SetSoundSpeed(sNodeParams.fSoundSpeed);
00839             FBDipole.SetDirection(fDir, true);
00840             break;
00841         case BS_ARRAY_TYPE_TRIANGLE:
00842             break;
00843         case BS_ARRAY_TYPE_LINE:
00844             if (!FBLine.Initialize(sNodeParams.iSensors,
00845                 sNodeParams.fSpacing, sNodeParams.iWindowSize,
00846                 sNodeParams.fSampleRate))
00847                 return 2;
00848             FBLine.SetSoundSpeed(sNodeParams.fSoundSpeed);
00849             FBLine.SetDirection(fDir, true);
00850             break;
00851         case BS_ARRAY_TYPE_PLANE:
00852             break;
00853         case BS_ARRAY_TYPE_CYLINDER:
00854             break;
00855         case BS_ARRAY_TYPE_SPHERE:
00856             break;
00857     }
00858 
00859     fprintf(stderr, 
00860         "beamsrv(slave%i): Beamformer initialized, entering process loop...\n", 
00861         iProcess);
00862     if (!SendReady())
00863         return 3;
00864 
00865     while (bRun)
00866     {
00867         if (!RecvInData(InData, iInDataCount))
00868             break;
00869 
00870         switch (sNodeParams.iType)
00871         {
00872             case BS_ARRAY_TYPE_DIPOLE:
00873                 FBDipole.Put(InData, iInDataCount, 0, sNodeParams.iSensors);
00874                 if (FBDipole.Get(OutData, iOutDataCount))
00875                 {
00876                     if (!SendRes(OutData, iOutDataCount))
00877                         return 4;
00878                 }
00879                 else
00880                 {
00881                     DSP.Zero((GDT *) OutData, iOutDataCount);
00882                     if (!SendRes(OutData, iOutDataCount))
00883                         return 4;
00884                 }
00885                 break;
00886             case BS_ARRAY_TYPE_TRIANGLE:
00887                 break;
00888             case BS_ARRAY_TYPE_LINE:
00889                 FBLine.Put(InData, iInDataCount, 0, sNodeParams.iSensors);
00890                 if (FBLine.Get(OutData, iOutDataCount))
00891                 {
00892                     if (!SendRes(OutData, iOutDataCount))
00893                         return 4;
00894                 }
00895                 else
00896                 {
00897                     DSP.Zero((GDT *) OutData, iOutDataCount);
00898                     if (!SendRes(OutData, iOutDataCount))
00899                         return 4;
00900                 }
00901                 break;
00902             case BS_ARRAY_TYPE_PLANE:
00903                 break;
00904             case BS_ARRAY_TYPE_CYLINDER:
00905                 break;
00906             case BS_ARRAY_TYPE_SPHERE:
00907                 break;
00908         }
00909     }
00910 
00911     return 0;
00912 }

Generated on Sun Oct 26 19:11:19 2003 for HASAS by doxygen 1.3.3