Main Page | Namespace List | Class Hierarchy | Alphabetical List | Compound List | File List | Compound Members | File Members

StreamDist.cc

Go to the documentation of this file.
00001 /*
00002 
00003     Audio stream distributor
00004     Copyright (C) 2000-2002 Jussi Laako
00005 
00006     This program is free software; you can redistribute it and/or modify
00007     it under the terms of the GNU General Public License as published by
00008     the Free Software Foundation; either version 2 of the License, or
00009     (at your option) any later version.
00010 
00011     This program is distributed in the hope that it will be useful,
00012     but WITHOUT ANY WARRANTY; without even the implied warranty of
00013     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014     GNU General Public License for more details.
00015 
00016     You should have received a copy of the GNU General Public License
00017     along with this program; if not, write to the Free Software
00018     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00019 
00020 */
00021 
00022 
00023 #include <pthread.h>
00024 #include <limits.h>
00025 #include <stdio.h>
00026 #include <stdlib.h>
00027 #include <string.h>
00028 #include <signal.h>
00029 #include <unistd.h>
00030 #include <sys/types.h>
00031 #include <sys/stat.h>
00032 
00033 #include "StreamDist.hh"
00034 
00035 
00036 static bool bDaemon;
00037 clStreamDist StreamDist;
00038 
00039 
00040 int main (int argc, char *argv[])
00041 {
00042     return StreamDist.Main(argc, argv);
00043 }
00044 
00045 
00046 void SigHandler (int iSigNum)
00047 {
00048     switch (iSigNum)
00049     {
00050         case SIGINT:
00051             StreamDist.Log.Add('#', "Received SIGINT, terminating...");
00052             StreamDist.Stop();
00053             break;
00054         case SIGHUP:
00055             StreamDist.Log.Add('#', "Received SIGHUP, terminating...");
00056             StreamDist.Stop();
00057             break;
00058         case SIGTERM:
00059             StreamDist.Log.Add('#', "Received SIGTERM, terminating...");
00060             StreamDist.Stop();
00061             break;
00062         default:
00063             StreamDist.Log.Add('!', 
00064                 "Received unknown signal, terminating...");
00065             StreamDist.Stop();
00066     }
00067 }
00068 
00069 
00070 void *WrapAudioInThread (void *vpParam)
00071 {
00072     return StreamDist.AudioInThread(vpParam);
00073 }
00074 
00075 
00076 void *WrapServeClientThread (void *vpParam)
00077 {
00078     return StreamDist.ServeClientThread(vpParam);
00079 }
00080 
00081 
00082 #ifdef USE_FLAC
00083 
00084 FLAC__StreamDecoderReadStatus WrapFLACRead (
00085     const FLAC__StreamDecoder *spFLACDec, 
00086     FLAC__byte cpBuffer[], unsigned *uipBytes, void *vpDataPtr)
00087 {
00088     clStreamDist *StreamDistInst = (clStreamDist *) vpDataPtr;
00089     
00090     return StreamDistInst->FLACRead(spFLACDec, cpBuffer, uipBytes);
00091 }
00092 
00093 
00094 FLAC__StreamDecoderWriteStatus WrapFLACWrite (
00095     const FLAC__StreamDecoder *spFLACDec, 
00096     const FLAC__Frame *spFrame, const FLAC__int32 * const ippBuffer[], 
00097     void *vpDataPtr)
00098 {
00099     clStreamDist *StreamDistInst = (clStreamDist *) vpDataPtr;
00100 
00101     return StreamDistInst->FLACWrite(spFLACDec, spFrame, ippBuffer);
00102 }
00103 
00104 
00105 void WrapFLACMetaData (
00106     const FLAC__StreamDecoder *spFLACDec, 
00107     const FLAC__StreamMetadata *spMetaData, void *vpDataPtr)
00108 {
00109     clStreamDist *StreamDistInst = (clStreamDist *) vpDataPtr;
00110 
00111     StreamDistInst->FLACMetaData(spFLACDec, spMetaData);    
00112 }
00113 
00114 
00115 void WrapFLACError (
00116     const FLAC__StreamDecoder *spFLACDec, 
00117     FLAC__StreamDecoderErrorStatus iErrorStatus, void *vpDataPtr)
00118 {
00119     clStreamDist *StreamDistInst = (clStreamDist *) vpDataPtr;
00120 
00121     StreamDistInst->FLACError(spFLACDec, iErrorStatus);    
00122 }
00123 
00124 #endif
00125 
00126 
00127 inline void clStreamDist::CopyChannel (GDT *fpDest, const GDT *fpSrc, 
00128     int iChannel)
00129 {
00130     int iSampleCntr;
00131     int iSampleCount;
00132 
00133     iSampleCount = iFragmentSize / sHdr.iChannels;
00134     for (iSampleCntr = 0; iSampleCntr < iSampleCount; iSampleCntr++)
00135     {
00136         fpDest[iSampleCntr] = fpSrc[iSampleCntr * sHdr.iChannels + iChannel];
00137     }
00138 }
00139 
00140 
00141 bool clStreamDist::InitCompress (int iCompress)
00142 {
00143     if (iCompress == MSG_SOUND_COMPRESS_FLAC)
00144     {
00145         #ifdef USE_FLAC
00146         FLAC__StreamDecoderState iDecState;
00147         
00148         spFLACDec = FLAC__stream_decoder_new();
00149         if (spFLACDec == NULL)
00150         {
00151             Log.Add('!', "FLAC constructor failed");
00152             return false;
00153         }
00154         FLAC__stream_decoder_set_read_callback(spFLACDec,
00155             WrapFLACRead);
00156         FLAC__stream_decoder_set_write_callback(spFLACDec,
00157             WrapFLACWrite);
00158         FLAC__stream_decoder_set_metadata_callback(spFLACDec,
00159             WrapFLACMetaData);
00160         FLAC__stream_decoder_set_error_callback(spFLACDec,
00161             WrapFLACError);
00162         FLAC__stream_decoder_set_client_data(spFLACDec,
00163             (void *) this);
00164         
00165         iDecState = FLAC__stream_decoder_init(spFLACDec);
00166         if (iDecState != FLAC__STREAM_DECODER_SEARCH_FOR_METADATA)
00167         {
00168             Log.Add('!', FLAC__StreamDecoderStateString[iDecState]);
00169             return false;
00170         }
00171         #endif
00172     }
00173     else
00174     {
00175         return false;
00176     }
00177     
00178     return true;
00179 }
00180 
00181 
00182 clStreamDist::clStreamDist ()
00183 {
00184     bRun = true;
00185     iAudioBufSize = 0;
00186     iBlockCntr = 0;
00187     #ifdef USE_FLAC
00188     spFLACDec = NULL;
00189     #endif
00190     Log.Open(SD_LOGFILE);
00191     Log.Add('*', "Starting");
00192     Cfg.SetFileName(SD_CFGFILE);
00193 }
00194 
00195 
00196 clStreamDist::~clStreamDist ()
00197 {
00198     #ifdef USE_FLAC
00199     if (spFLACDec != NULL)
00200     {
00201         FLAC__stream_decoder_delete(spFLACDec);
00202     }
00203     #endif
00204     Log.Add('*', "Ending");
00205 }
00206 
00207 
00208 int clStreamDist::Main (int iArgC, char **cpArgV)
00209 {
00210     int iSockH;
00211     void *vpAudioInRes;
00212     char cpSocket[_POSIX_PATH_MAX + 1];
00213     pthread_t ptidAudioIn;
00214     pthread_t ptidServeClient;
00215     clSockServ SServ;
00216 
00217     signal(SIGINT, SigHandler);
00218     signal(SIGHUP, SigHandler);
00219     signal(SIGTERM, SigHandler);
00220     signal(SIGPIPE, SIG_IGN);
00221     signal(SIGFPE, SIG_IGN);
00222     if (iArgC >= 2)
00223     {
00224         if (strcmp(cpArgV[1], "-D"))
00225         {
00226             bDaemon = true;
00227         }
00228         else if (strcmp(cpArgV[1], "--version"))
00229         {
00230             printf("%s v%i.%i.%i\n", cpArgV[0], GLOBAL_VERSMAJ,
00231                 GLOBAL_VERSMIN, GLOBAL_VERSPL);
00232             printf("Copyright (C) 2000-2001 Jussi Laako\n");
00233             return 0;
00234         }
00235         else if (strcmp(cpArgV[1], "--help"))
00236         {
00237             printf("%s [-D|--version|--help]\n\n", cpArgV[0]);
00238             printf("-D         start as daemon\n");
00239             printf("--version  display version information\n");
00240             printf("--help     display this help\n");
00241             return 0;
00242         }
00243     }
00244     if (bDaemon)
00245     {
00246         if (fork() != 0)
00247         {
00248             exit(0);
00249         }
00250         setsid();
00251         freopen("/dev/null", "r+", stderr);
00252         freopen("/dev/null", "r+", stdin);
00253         freopen("/dev/null", "r+", stdout);
00254     }
00255     if (!Cfg.GetStr("LocalSocket", cpSocket))
00256     {
00257         Log.Add('!', "\"LocalSocket\" not found from configuration file");
00258         return 1;
00259     }
00260     if (!SServ.Bind(cpSocket))
00261     {
00262         Log.Add('!', strerror(SServ.GetErrno()));
00263         return 2;
00264     }
00265     pthread_create(&ptidAudioIn, NULL, WrapAudioInThread, NULL);
00266     while (bRun)
00267     {
00268         if (access(SD_SHUTDOWNFILE, F_OK) == 0)
00269         {
00270             unlink(SD_SHUTDOWNFILE);
00271             Stop();
00272             break;
00273         }
00274         iSockH = SServ.WaitForConnect(SD_CONNECT_TIMEOUT);
00275         if (iSockH >= 0)
00276         {
00277             pthread_create(&ptidServeClient, NULL, WrapServeClientThread,
00278                 (void *) iSockH);
00279             pthread_detach(ptidServeClient);
00280         }
00281     }
00282     pthread_join(ptidAudioIn, &vpAudioInRes);
00283     if ((int) vpAudioInRes != 0) return ((int) vpAudioInRes);
00284     return 0;
00285 }
00286 
00287 
00288 void *clStreamDist::AudioInThread (void *vpParam)
00289 {
00290     int iSockH;
00291     int iHostPort;
00292     char cpHostAddr[SD_HOSTADDR_MAX];
00293     char cpHdrMsg[GLOBAL_HEADER_LEN];
00294     char cpLogEntry[SD_LOGENTRY_SIZE];
00295     stSoundStart sInHdr;
00296     sigset_t sigsetThis;
00297     #ifndef BSDSYS
00298     uid_t uidCurrent;
00299     struct sched_param sSchedParam;
00300     #endif
00301     clAlloc InBuf;
00302     clSockClie SClie;
00303     clSockOp SOp;
00304     clSoundMsg Msg;
00305 
00306     sigemptyset(&sigsetThis);
00307     sigaddset(&sigsetThis, SIGPIPE);
00308     sigaddset(&sigsetThis, SIGINT);
00309     sigaddset(&sigsetThis, SIGHUP);
00310     pthread_sigmask(SIG_BLOCK, &sigsetThis, NULL);
00311 
00312     if (!Cfg.GetStr("Server", cpHostAddr))
00313     {
00314         Log.Add('!', "\"Server\" not found from configuration file");
00315         return ((void *) 2);
00316     }
00317     if (!Cfg.GetInt("ServerPort", &iHostPort))
00318     {
00319         Log.Add('!', "\"ServerPort\" not found from configuration file");
00320         return ((void *) 2);
00321     }
00322     iSockH = SClie.Connect(cpHostAddr, NULL, iHostPort);
00323     if (iSockH < 0)
00324     {
00325         Log.Add('!', "Failed to connect to specified server", 
00326             SClie.GetErrno());
00327         return ((void *) 2);
00328     }
00329     SOp.SetHandle(iSockH);
00330     if (SOp.ReadN(cpHdrMsg, GLOBAL_HEADER_LEN) < GLOBAL_HEADER_LEN)
00331     {
00332         Log.Add('!', "Failed to receive data header", SOp.GetErrno());
00333         return ((void *) 2);
00334     }
00335     Msg.GetStart(cpHdrMsg, &sInHdr);
00336     sprintf(cpLogEntry, "Receiving data: ch %i fs %g frag %i samples",
00337         sInHdr.iChannels, sInHdr.dSampleRate, sInHdr.iFragmentSize);
00338     Log.Add(' ', cpLogEntry);
00339 
00340     if (sInHdr.iCompress)
00341     {
00342         if (!InitCompress(sInHdr.iCompress))
00343             return ((void *) 2);
00344     }
00345 
00346     iFragmentSize = sInHdr.iFragmentSize;
00347     iAudioBufSize = sInHdr.iFragmentSize * sizeof(GDT);
00348     sHdr.iChannels = sInHdr.iChannels;
00349     sHdr.dSampleRate = sInHdr.dSampleRate;
00350     InBuf.Size(iAudioBufSize);
00351     InBuf.Lock();
00352     AudioBuf.Size(iAudioBufSize);
00353     AudioBuf.Lock();
00354     Log.Add(' ', "AudioIn thread running");
00355 
00356     #ifndef BSDSYS
00357     uidCurrent = getuid();
00358     setuid(0);
00359     sSchedParam.sched_priority = sched_get_priority_min(SCHED_FIFO) + 
00360         SD_INTHREAD_PRIORITY;
00361     if (pthread_setschedparam(pthread_self(), SCHED_FIFO, &sSchedParam) != 0)
00362         Log.Add('#', "Warning: Unable to set scheduling parameters");
00363     setuid(uidCurrent);
00364     #endif
00365 
00366     if (sInHdr.iCompress == MSG_SOUND_COMPRESS_FLAC)
00367     {
00368         #ifdef USE_FLAC
00369         InSOp = &SOp;
00370         FLAC__stream_decoder_process_until_end_of_stream(spFLACDec);
00371         #endif
00372     }
00373 
00374     while (bRun)
00375     {
00376         if (sInHdr.iCompress == MSG_SOUND_COMPRESS_FLAC)
00377         {
00378             sleep(1);
00379         }
00380         else
00381         {
00382             if (SOp.ReadN(InBuf, iAudioBufSize) < iAudioBufSize)
00383             {
00384                 Log.Add('!', "Read error on input stream", SOp.GetErrno());
00385                 return ((void *) 2);
00386             }
00387             #ifndef USE_RWLOCK
00388             MtxAudio.Wait();
00389             #else
00390             RWLAudio.WaitWrite();
00391             #endif
00392             Msg.GetData(InBuf, (GDT *) AudioBuf, sInHdr.iFragmentSize);
00393             iBlockCntr++;
00394             CndAudio.NotifyAll();
00395             #ifndef USE_RWLOCK
00396             MtxAudio.Release();
00397             #else
00398             RWLAudio.Release();
00399             #endif
00400         }
00401     }
00402 
00403     if (sInHdr.iCompress == MSG_SOUND_COMPRESS_FLAC)
00404     {
00405         #ifdef USE_FLAC
00406         FLAC__stream_decoder_finish(spFLACDec);
00407         #endif
00408     }
00409 
00410     Log.Add(' ', "AudioIn thread ending");
00411     return ((void *) 0);
00412 }
00413 
00414 
00415 void *clStreamDist::ServeClientThread (void *vpParam)
00416 {
00417     bool bConnected = true;
00418     int iOutSize;
00419     int iLocalBlockCntr;
00420     stRawDataReq sReq;
00421     sigset_t sigsetThis;
00422     #ifndef BSDSYS
00423     uid_t uidCurrent;
00424     struct sched_param sSchedParam;
00425     #endif
00426     clAlloc OutBuf(iAudioBufSize);
00427     clSockOp SOp((int) vpParam);
00428 
00429     sigemptyset(&sigsetThis);
00430     sigaddset(&sigsetThis, SIGPIPE);
00431     sigaddset(&sigsetThis, SIGINT);
00432     sigaddset(&sigsetThis, SIGHUP);
00433     pthread_sigmask(SIG_BLOCK, &sigsetThis, NULL);
00434     Log.Add('+', "Client connected");
00435     if (SOp.WriteN(&sHdr, sizeof(sHdr)) < (int) sizeof(sHdr))
00436         bConnected = false;
00437     if (SOp.ReadN(&sReq, sizeof(sReq)) < (int) sizeof(sReq))
00438         bConnected = false;
00439     if (sReq.iChannel >= sHdr.iChannels) 
00440     {
00441         bConnected = false;
00442         Log.Add('!', "Client requested non-existing channel");
00443     }
00444     if (sReq.iChannel < 0)
00445         iOutSize = iAudioBufSize;
00446     else
00447         iOutSize = iAudioBufSize / sHdr.iChannels;
00448 
00449     #ifndef BSDSYS
00450     uidCurrent = getuid();
00451     setuid(0);
00452     sSchedParam.sched_priority = sched_get_priority_min(SCHED_FIFO) + 
00453         SD_OUTTHREAD_PRIORITY;
00454     if (pthread_setschedparam(pthread_self(), SCHED_FIFO, &sSchedParam) != 0)
00455         Log.Add('#', "Warning: Unable to set scheduling parameters");
00456     setuid(uidCurrent);
00457     #endif
00458 
00459     iLocalBlockCntr = iBlockCntr;
00460     while (bRun && bConnected)
00461     {
00462         MtxAudio.Wait();
00463         CndAudio.Wait(MtxAudio.GetPtr());
00464         #ifdef USE_RWLOCK
00465         MtxAudio.Release();
00466         RWLAudio.WaitRead();
00467         #endif
00468         if (sReq.iChannel < 0)
00469         {
00470             memcpy(OutBuf, AudioBuf, iAudioBufSize);
00471         }
00472         else
00473         {
00474             CopyChannel(OutBuf, AudioBuf, sReq.iChannel);
00475         }
00476         iLocalBlockCntr++;
00477         if (iLocalBlockCntr != iBlockCntr)
00478         {
00479             printf("streamdist: %i blocks lost\n", 
00480                 iBlockCntr - iLocalBlockCntr);
00481             iLocalBlockCntr = iBlockCntr;
00482         }
00483         #ifndef USE_RWLOCK
00484         MtxAudio.Release();
00485         #else
00486         RWLAudio.Release();
00487         #endif
00488         if (SOp.WriteN(OutBuf, iOutSize) < iOutSize)
00489             bConnected = false;
00490     }
00491     Log.Add('-', "Client disconnected");
00492     return NULL;
00493 }
00494 
00495 
00496 #ifdef USE_FLAC
00497 
00498 FLAC__StreamDecoderReadStatus clStreamDist::FLACRead (
00499     const FLAC__StreamDecoder *spFLACDec, 
00500     FLAC__byte *cpBuffer, unsigned *uipBytes)
00501 {
00502     int iReadRes;
00503 
00504     iReadRes = InSOp->Read(cpBuffer, (int) *uipBytes);
00505     if (iReadRes < 0)
00506     {
00507         Log.Add('!', "Read error on input stream", InSOp->GetErrno());
00508         *uipBytes = 0;
00509         return FLAC__STREAM_DECODER_READ_STATUS_ABORT;
00510     }
00511     if (iReadRes == 0 || !bRun)
00512     {
00513         *uipBytes = 0;
00514         return FLAC__STREAM_DECODER_READ_STATUS_END_OF_STREAM;
00515     }
00516     *uipBytes = (unsigned) iReadRes;
00517     return FLAC__STREAM_DECODER_READ_STATUS_CONTINUE;
00518 }
00519 
00520 
00521 FLAC__StreamDecoderWriteStatus clStreamDist::FLACWrite (
00522     const FLAC__StreamDecoder *spFLACDec, 
00523     const FLAC__Frame *spFrame, const FLAC__int32 * const *ippBuffer)
00524 {
00525     unsigned uiChannels;
00526     unsigned uiBits;
00527     unsigned uiBlockSize;
00528     unsigned uiSamples;
00529     unsigned uiChCntr;
00530     unsigned uiSampleCntr;
00531     GDT fScale;
00532     GDT *fpAudioBuf;
00533 
00534     uiChannels = FLAC__stream_decoder_get_channels(spFLACDec);
00535     uiBits = FLAC__stream_decoder_get_bits_per_sample(spFLACDec);
00536     uiBlockSize = FLAC__stream_decoder_get_blocksize(spFLACDec);
00537     uiSamples = uiBlockSize * uiChannels;
00538 
00539     #ifndef USE_RWLOCK
00540     MtxAudio.Wait();
00541     #else
00542     RWLAudio.WaitWrite();
00543     #endif
00544 
00545     if ((int) uiSamples != iFragmentSize)
00546     {
00547         iFragmentSize = uiSamples;
00548         iAudioBufSize = iFragmentSize * sizeof(GDT);
00549         AudioBuf.UnLock();
00550         AudioBuf.Size(iAudioBufSize);
00551         AudioBuf.Lock();
00552     }
00553 
00554     switch (uiBits)
00555     {
00556         case 8:
00557             fScale = 1.0f / 0x7f;
00558             break;
00559         case 16:
00560             fScale = 1.0f / 0x7fff;
00561             break;
00562         case 24:
00563             fScale =  1.0f / 0x7fffff;
00564             break;
00565         default:
00566             Log.Add('!', "Unsupported sample length");
00567             return FLAC__STREAM_DECODER_WRITE_STATUS_ABORT;
00568     }
00569     fpAudioBuf = AudioBuf;
00570     for (uiChCntr = 0; uiChCntr < uiChannels; uiChCntr++)
00571     {
00572         for (uiSampleCntr = 0; uiSampleCntr < uiBlockSize; uiSampleCntr++)
00573         {
00574             fpAudioBuf[uiSampleCntr * uiChannels + uiChCntr] =
00575                 (GDT) ippBuffer[uiChCntr][uiSampleCntr] * fScale;
00576         }
00577     }
00578 
00579     iBlockCntr++;
00580 
00581     CndAudio.NotifyAll();
00582     #ifndef USE_RWLOCK
00583     MtxAudio.Release();
00584     #else
00585     RWLAudio.Release();
00586     #endif
00587 
00588     return FLAC__STREAM_DECODER_WRITE_STATUS_CONTINUE;
00589 }
00590 
00591 
00592 void clStreamDist::FLACMetaData (
00593     const FLAC__StreamDecoder *spFLACDec, 
00594     const FLAC__StreamMetadata *spMetadata)
00595 {
00596 }
00597 
00598 
00599 void clStreamDist::FLACError (
00600     const FLAC__StreamDecoder *spFLACDec, 
00601     FLAC__StreamDecoderErrorStatus iErrorStatus)
00602 {
00603     Log.Add('!', FLAC__StreamDecoderErrorStatusString[iErrorStatus]);
00604 }
00605 
00606 #endif

Generated on Sun Oct 26 19:11:23 2003 for HASAS by doxygen 1.3.3