Darwin-Streaming-Server/Server.tproj/QTSServerInterface.cpp
Darren VanBuren 849723c9cf Add even more of the source
This should be about everything needed to build so far?
2017-03-07 17:14:16 -08:00

636 lines
29 KiB
C++

/*
*
* @APPLE_LICENSE_HEADER_START@
*
* Copyright (c) 1999-2008 Apple Inc. All Rights Reserved.
*
* This file contains Original Code and/or Modifications of Original Code
* as defined in and that are subject to the Apple Public Source License
* Version 2.0 (the 'License'). You may not use this file except in
* compliance with the License. Please obtain a copy of the License at
* http://www.opensource.apple.com/apsl/ and read it before using this
* file.
*
* The Original Code and all software distributed under the License are
* distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
* EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
* INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
* Please see the License for the specific language governing rights and
* limitations under the License.
*
* @APPLE_LICENSE_HEADER_END@
*
*/
/*
File: QTSServerInterface.cpp
Contains: Implementation of object defined in QTSServerInterface.h.
*/
//INCLUDES:
#ifndef kVersionString
#include "../revision.h"
#endif
#include "QTSServerInterface.h"
#include "RTPSessionInterface.h"
#include "OSRef.h"
#include "UDPSocketPool.h"
#include "RTSPProtocol.h"
#include "RTPPacketResender.h"
#ifndef __MacOSX__
#include "../revision.h"
#endif
// STATIC DATA
UInt32 QTSServerInterface::sServerAPIVersion = QTSS_API_VERSION;
QTSServerInterface* QTSServerInterface::sServer = NULL;
#if __MacOSX__
StrPtrLen QTSServerInterface::sServerNameStr("QTSS");
#else
StrPtrLen QTSServerInterface::sServerNameStr("DSS");
#endif
// kVersionString from revision.h, include with -i at project level
StrPtrLen QTSServerInterface::sServerVersionStr(kVersionString);
StrPtrLen QTSServerInterface::sServerBuildStr(kBuildString);
StrPtrLen QTSServerInterface::sServerCommentStr(kCommentString);
StrPtrLen QTSServerInterface::sServerPlatformStr(kPlatformNameString);
StrPtrLen QTSServerInterface::sServerBuildDateStr(__DATE__ ", "__TIME__);
char QTSServerInterface::sServerHeader[kMaxServerHeaderLen];
StrPtrLen QTSServerInterface::sServerHeaderPtr(sServerHeader, kMaxServerHeaderLen);
ResizeableStringFormatter QTSServerInterface::sPublicHeaderFormatter(NULL, 0);
StrPtrLen QTSServerInterface::sPublicHeaderStr;
QTSSModule** QTSServerInterface::sModuleArray[QTSSModule::kNumRoles];
UInt32 QTSServerInterface::sNumModulesInRole[QTSSModule::kNumRoles];
OSQueue QTSServerInterface::sModuleQueue;
QTSSErrorLogStream QTSServerInterface::sErrorLogStream;
QTSSAttrInfoDict::AttrInfo QTSServerInterface::sConnectedUserAttributes[] =
{ /*fields: fAttrName, fFuncPtr, fAttrDataType, fAttrPermission */
/* 0 */ { "qtssConnectionType", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 1 */ { "qtssConnectionCreateTimeInMsec", NULL, qtssAttrDataTypeTimeVal, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 2 */ { "qtssConnectionTimeConnectedInMsec", TimeConnected, qtssAttrDataTypeTimeVal, qtssAttrModeRead | qtssAttrModePreempSafe },
/* 3 */ { "qtssConnectionBytesSent", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 4 */ { "qtssConnectionMountPoint", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 5 */ { "qtssConnectionHostName", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe } ,
/* 6 */ { "qtssConnectionSessRemoteAddrStr", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 7 */ { "qtssConnectionSessLocalAddrStr", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 8 */ { "qtssConnectionCurrentBitRate", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 9 */ { "qtssConnectionPacketLossPercent", NULL, qtssAttrDataTypeFloat32, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
// this last parameter is a workaround for the current dictionary implementation. For qtssConnectionTimeConnectedInMsec above we have a param
// retrieval function. This needs storage to keep the value returned, but if it sets its own param then the function no longer gets called.
/* 10 */ { "qtssConnectionTimeStorage", NULL, qtssAttrDataTypeTimeVal, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
};
QTSSAttrInfoDict::AttrInfo QTSServerInterface::sAttributes[] =
{ /*fields: fAttrName, fFuncPtr, fAttrDataType, fAttrPermission */
/* 0 */ { "qtssServerAPIVersion", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead | qtssAttrModePreempSafe },
/* 1 */ { "qtssSvrDefaultDNSName", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead },
/* 2 */ { "qtssSvrDefaultIPAddr", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 3 */ { "qtssSvrServerName", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModePreempSafe },
/* 4 */ { "qtssRTSPSvrServerVersion", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModePreempSafe },
/* 5 */ { "qtssRTSPSvrServerBuildDate", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModePreempSafe },
/* 6 */ { "qtssSvrRTSPPorts", NULL, qtssAttrDataTypeUInt16, qtssAttrModeRead },
/* 7 */ { "qtssSvrRTSPServerHeader", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModePreempSafe },
/* 8 */ { "qtssSvrState", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead | qtssAttrModeWrite },
/* 9 */ { "qtssSvrIsOutOfDescriptors", IsOutOfDescriptors, qtssAttrDataTypeBool16, qtssAttrModeRead },
/* 10 */ { "qtssRTSPCurrentSessionCount", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 11 */ { "qtssRTSPHTTPCurrentSessionCount",NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 12 */ { "qtssRTPSvrNumUDPSockets", GetTotalUDPSockets, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 13 */ { "qtssRTPSvrCurConn", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 14 */ { "qtssRTPSvrTotalConn", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 15 */ { "qtssRTPSvrCurBandwidth", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 16 */ { "qtssRTPSvrTotalBytes", NULL, qtssAttrDataTypeUInt64, qtssAttrModeRead },
/* 17 */ { "qtssRTPSvrAvgBandwidth", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 18 */ { "qtssRTPSvrCurPackets", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 19 */ { "qtssRTPSvrTotalPackets", NULL, qtssAttrDataTypeUInt64, qtssAttrModeRead },
/* 20 */ { "qtssSvrHandledMethods", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 21 */ { "qtssSvrModuleObjects", NULL, qtssAttrDataTypeQTSS_Object,qtssAttrModeRead | qtssAttrModePreempSafe },
/* 22 */ { "qtssSvrStartupTime", NULL, qtssAttrDataTypeTimeVal, qtssAttrModeRead },
/* 23 */ { "qtssSvrGMTOffsetInHrs", NULL, qtssAttrDataTypeSInt32, qtssAttrModeRead },
/* 24 */ { "qtssSvrDefaultIPAddrStr", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead },
/* 25 */ { "qtssSvrPreferences", NULL, qtssAttrDataTypeQTSS_Object,qtssAttrModeRead | qtssAttrModeInstanceAttrAllowed},
/* 26 */ { "qtssSvrMessages", NULL, qtssAttrDataTypeQTSS_Object,qtssAttrModeRead },
/* 27 */ { "qtssSvrClientSessions", NULL, qtssAttrDataTypeQTSS_Object,qtssAttrModeRead },
/* 28 */ { "qtssSvrCurrentTimeMilliseconds",CurrentUnixTimeMilli, qtssAttrDataTypeTimeVal,qtssAttrModeRead},
/* 29 */ { "qtssSvrCPULoadPercent", NULL, qtssAttrDataTypeFloat32, qtssAttrModeRead},
/* 30 */ { "qtssSvrNumReliableUDPBuffers", GetNumUDPBuffers, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 31 */ { "qtssSvrReliableUDPWastageInBytes",GetNumWastedBytes, qtssAttrDataTypeUInt32, qtssAttrModeRead },
/* 32 */ { "qtssSvrConnectedUsers", NULL, qtssAttrDataTypeQTSS_Object, qtssAttrModeRead | qtssAttrModeWrite },
/* 33 */ { "qtssMP3SvrCurConn", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 34 */ { "qtssMP3SvrTotalConn", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 35 */ { "qtssMP3SvrCurBandwidth", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 36 */ { "qtssMP3SvrTotalBytes", NULL, qtssAttrDataTypeUInt64, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 37 */ { "qtssMP3SvrAvgBandwidth", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead | qtssAttrModeWrite | qtssAttrModePreempSafe },
/* 38 */ { "qtssSvrServerBuild", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModePreempSafe },
/* 39 */ { "qtssSvrServerPlatform", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModePreempSafe },
/* 40 */ { "qtssSvrRTSPServerComment", NULL, qtssAttrDataTypeCharArray, qtssAttrModeRead | qtssAttrModePreempSafe },
/* 41 */ { "qtssSvrNumThinned", NULL, qtssAttrDataTypeSInt32, qtssAttrModeRead | qtssAttrModePreempSafe },
/* 42 */ { "qtssSvrNumThreads", NULL, qtssAttrDataTypeUInt32, qtssAttrModeRead | qtssAttrModePreempSafe }
};
void QTSServerInterface::Initialize()
{
for (UInt32 x = 0; x < qtssSvrNumParams; x++)
QTSSDictionaryMap::GetMap(QTSSDictionaryMap::kServerDictIndex)->
SetAttribute(x, sAttributes[x].fAttrName, sAttributes[x].fFuncPtr,
sAttributes[x].fAttrDataType, sAttributes[x].fAttrPermission);
for (UInt32 y = 0; y < qtssConnectionNumParams; y++)
QTSSDictionaryMap::GetMap(QTSSDictionaryMap::kQTSSConnectedUserDictIndex)->
SetAttribute(y, sConnectedUserAttributes[y].fAttrName, sConnectedUserAttributes[y].fFuncPtr,
sConnectedUserAttributes[y].fAttrDataType, sConnectedUserAttributes[y].fAttrPermission);
//Write out a premade server header
StringFormatter serverFormatter(sServerHeaderPtr.Ptr, kMaxServerHeaderLen);
serverFormatter.Put(RTSPProtocol::GetHeaderString(qtssServerHeader));
serverFormatter.Put(": ");
serverFormatter.Put(sServerNameStr);
serverFormatter.PutChar('/');
serverFormatter.Put(sServerVersionStr);
serverFormatter.PutChar(' ');
serverFormatter.PutChar('(');
serverFormatter.Put("Build/");
serverFormatter.Put(sServerBuildStr);
serverFormatter.Put("; ");
serverFormatter.Put("Platform/");
serverFormatter.Put(sServerPlatformStr);
serverFormatter.PutChar(';');
if (sServerCommentStr.Len > 0)
{
serverFormatter.PutChar(' ');
serverFormatter.Put(sServerCommentStr);
}
serverFormatter.PutChar(')');
sServerHeaderPtr.Len = serverFormatter.GetCurrentOffset();
Assert(sServerHeaderPtr.Len < kMaxServerHeaderLen);
}
QTSServerInterface::QTSServerInterface()
: QTSSDictionary(QTSSDictionaryMap::GetMap(QTSSDictionaryMap::kServerDictIndex), &fMutex),
fSocketPool(NULL),
fRTPMap(NULL),
fSrvrPrefs(NULL),
fSrvrMessages(NULL),
fServerState(qtssStartingUpState),
fDefaultIPAddr(0),
fListeners(NULL),
fNumListeners(0),
fStartupTime_UnixMilli(0),
fGMTOffset(0),
fNumRTSPSessions(0),
fNumRTSPHTTPSessions(0),
fNumRTPSessions(0),
fNumRTPPlayingSessions(0),
fTotalRTPSessions(0),
fTotalRTPBytes(0),
fTotalRTPPackets(0),
fTotalRTPPacketsLost(0),
fPeriodicRTPBytes(0),
fPeriodicRTPPacketsLost(0),
fPeriodicRTPPackets(0),
fCurrentRTPBandwidthInBits(0),
fAvgRTPBandwidthInBits(0),
fRTPPacketsPerSecond(0),
fCPUPercent(0),
fCPUTimeUsedInSec(0),
fUDPWastageInBytes(0),
fNumUDPBuffers(0),
fNumMP3Sessions(0),
fTotalMP3Sessions(0),
fCurrentMP3BandwidthInBits(0),
fTotalMP3Bytes(0),
fAvgMP3BandwidthInBits(0),
fSigInt(false),
fSigTerm(false),
fDebugLevel(0),
fDebugOptions(0),
fMaxLate(0),
fTotalLate(0),
fCurrentMaxLate(0),
fTotalQuality(0),
fNumThinned(0),
fNumThreads(0)
{
for (UInt32 y = 0; y < QTSSModule::kNumRoles; y++)
{
sModuleArray[y] = NULL;
sNumModulesInRole[y] = 0;
}
this->SetVal(qtssSvrState, &fServerState, sizeof(fServerState));
this->SetVal(qtssServerAPIVersion, &sServerAPIVersion, sizeof(sServerAPIVersion));
this->SetVal(qtssSvrDefaultIPAddr, &fDefaultIPAddr, sizeof(fDefaultIPAddr));
this->SetVal(qtssSvrServerName, sServerNameStr.Ptr, sServerNameStr.Len);
this->SetVal(qtssSvrServerVersion, sServerVersionStr.Ptr, sServerVersionStr.Len);
this->SetVal(qtssSvrServerBuildDate, sServerBuildDateStr.Ptr, sServerBuildDateStr.Len);
this->SetVal(qtssSvrRTSPServerHeader, sServerHeaderPtr.Ptr, sServerHeaderPtr.Len);
this->SetVal(qtssRTSPCurrentSessionCount, &fNumRTSPSessions, sizeof(fNumRTSPSessions));
this->SetVal(qtssRTSPHTTPCurrentSessionCount, &fNumRTSPHTTPSessions,sizeof(fNumRTSPHTTPSessions));
this->SetVal(qtssRTPSvrCurConn, &fNumRTPSessions, sizeof(fNumRTPSessions));
this->SetVal(qtssRTPSvrTotalConn, &fTotalRTPSessions, sizeof(fTotalRTPSessions));
this->SetVal(qtssRTPSvrCurBandwidth, &fCurrentRTPBandwidthInBits,sizeof(fCurrentRTPBandwidthInBits));
this->SetVal(qtssRTPSvrTotalBytes, &fTotalRTPBytes, sizeof(fTotalRTPBytes));
this->SetVal(qtssRTPSvrAvgBandwidth, &fAvgRTPBandwidthInBits, sizeof(fAvgRTPBandwidthInBits));
this->SetVal(qtssRTPSvrCurPackets, &fRTPPacketsPerSecond, sizeof(fRTPPacketsPerSecond));
this->SetVal(qtssRTPSvrTotalPackets, &fTotalRTPPackets, sizeof(fTotalRTPPackets));
this->SetVal(qtssSvrStartupTime, &fStartupTime_UnixMilli, sizeof(fStartupTime_UnixMilli));
this->SetVal(qtssSvrGMTOffsetInHrs, &fGMTOffset, sizeof(fGMTOffset));
this->SetVal(qtssSvrCPULoadPercent, &fCPUPercent, sizeof(fCPUPercent));
this->SetVal(qtssMP3SvrCurConn, &fNumMP3Sessions, sizeof(fNumMP3Sessions));
this->SetVal(qtssMP3SvrTotalConn, &fTotalMP3Sessions, sizeof(fTotalMP3Sessions));
this->SetVal(qtssMP3SvrCurBandwidth, &fCurrentMP3BandwidthInBits,sizeof(fCurrentMP3BandwidthInBits));
this->SetVal(qtssMP3SvrTotalBytes, &fTotalMP3Bytes, sizeof(fTotalMP3Bytes));
this->SetVal(qtssMP3SvrAvgBandwidth, &fAvgMP3BandwidthInBits, sizeof(fAvgMP3BandwidthInBits));
this->SetVal(qtssSvrServerBuild, sServerBuildStr.Ptr, sServerBuildStr.Len);
this->SetVal(qtssSvrRTSPServerComment, sServerCommentStr.Ptr, sServerCommentStr.Len);
this->SetVal(qtssSvrServerPlatform, sServerPlatformStr.Ptr, sServerPlatformStr.Len);
this->SetVal(qtssSvrNumThinned, &fNumThinned, sizeof(fNumThinned));
this->SetVal(qtssSvrNumThreads, &fNumThreads, sizeof(fNumThreads));
sServer = this;
}
void QTSServerInterface::LogError(QTSS_ErrorVerbosity inVerbosity, char* inBuffer)
{
QTSS_RoleParams theParams;
theParams.errorParams.inVerbosity = inVerbosity;
theParams.errorParams.inBuffer = inBuffer;
for (UInt32 x = 0; x < QTSServerInterface::GetNumModulesInRole(QTSSModule::kErrorLogRole); x++)
(void)QTSServerInterface::GetModule(QTSSModule::kErrorLogRole, x)->CallDispatch(QTSS_ErrorLog_Role, &theParams);
// If this is a fatal error, set the proper attribute in the RTSPServer dictionary
if ((inVerbosity == qtssFatalVerbosity) && (sServer != NULL))
{
QTSS_ServerState theState = qtssFatalErrorState;
(void)sServer->SetValue(qtssSvrState, 0, &theState, sizeof(theState));
}
}
void QTSServerInterface::KillAllRTPSessions()
{
OSMutexLocker locker(fRTPMap->GetMutex());
for (OSRefHashTableIter theIter(fRTPMap->GetHashTable()); !theIter.IsDone(); theIter.Next())
{
OSRef* theRef = theIter.GetCurrent();
RTPSessionInterface* theSession = (RTPSessionInterface*)theRef->GetObject();
theSession->Signal(Task::kKillEvent);
}
}
void QTSServerInterface::SetValueComplete(UInt32 inAttrIndex, QTSSDictionaryMap* inMap,
UInt32 inValueIndex, void* inNewValue, UInt32 inNewValueLen)
{
if (inAttrIndex == qtssSvrState)
{
Assert(inNewValueLen == sizeof(QTSS_ServerState));
//
// Invoke the server state change role
QTSS_RoleParams theParams;
theParams.stateChangeParams.inNewState = *(QTSS_ServerState*)inNewValue;
static QTSS_ModuleState sStateChangeState = { NULL, 0, NULL, false };
if (OSThread::GetCurrent() == NULL)
OSThread::SetMainThreadData(&sStateChangeState);
else
OSThread::GetCurrent()->SetThreadData(&sStateChangeState);
UInt32 numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kStateChangeRole);
{
for (UInt32 theCurrentModule = 0; theCurrentModule < numModules; theCurrentModule++)
{
QTSSModule* theModule = QTSServerInterface::GetModule(QTSSModule::kStateChangeRole, theCurrentModule);
(void)theModule->CallDispatch(QTSS_StateChange_Role, &theParams);
}
}
//
// Make sure to clear out the thread data
if (OSThread::GetCurrent() == NULL)
OSThread::SetMainThreadData(NULL);
else
OSThread::GetCurrent()->SetThreadData(NULL);
}
}
RTPStatsUpdaterTask::RTPStatsUpdaterTask()
: Task(), fLastBandwidthTime(0), fLastBandwidthAvg(0), fLastBytesSent(0), fLastTotalMP3Bytes(0)
{
this->SetTaskName("RTPStatsUpdaterTask");
this->Signal(Task::kStartEvent);
}
Float32 RTPStatsUpdaterTask::GetCPUTimeInSeconds()
{
// This function returns the total number of seconds that the
// process running RTPStatsUpdaterTask() has been executing as
// a user process.
Float32 cpuTimeInSec = 0.0;
#ifdef __Win32__
// The Win32 way of getting the time for this process
HANDLE hProcess = GetCurrentProcess();
SInt64 createTime, exitTime, kernelTime, userTime;
if(GetProcessTimes(hProcess, (LPFILETIME)&createTime, (LPFILETIME)&exitTime, (LPFILETIME)&kernelTime, (LPFILETIME)&userTime))
{
// userTime is in 10**-7 seconds since Jan.1, 1607.
// (What type of computers did they use in 1607?)
cpuTimeInSec = (Float32) (userTime/10000000.0);
}
else
{
// This should never happen!!!
Assert(0);
cpuTimeInSec = 0.0;
}
#else
// The UNIX way of getting the time for this process
clock_t cpuTime = clock();
cpuTimeInSec = (Float32) cpuTime / CLOCKS_PER_SEC;
#endif
return cpuTimeInSec;
}
SInt64 RTPStatsUpdaterTask::Run()
{
QTSServerInterface* theServer = QTSServerInterface::sServer;
// All of this must happen atomically wrt dictionary values we are manipulating
OSMutexLocker locker(&theServer->fMutex);
//First update total bytes. This must be done because total bytes is a 64 bit number,
//so no atomic functions can apply.
//
// NOTE: The line below is not thread safe on non-PowerPC platforms. This is
// because the fPeriodicRTPBytes variable is being manipulated from within an
// atomic_add. On PowerPC, assignments are atomic, so the assignment below is ok.
// On a non-PowerPC platform, the following would be thread safe:
//unsigned int periodicBytes = atomic_add(&theServer->fPeriodicRTPBytes, 0);
unsigned int periodicBytes = theServer->fPeriodicRTPBytes;
(void)atomic_sub(&theServer->fPeriodicRTPBytes, periodicBytes);
theServer->fTotalRTPBytes += periodicBytes;
// Same deal for packet totals
unsigned int periodicPackets = theServer->fPeriodicRTPPackets;
(void)atomic_sub(&theServer->fPeriodicRTPPackets, periodicPackets);
theServer->fTotalRTPPackets += periodicPackets;
// ..and for lost packet totals
unsigned int periodicPacketsLost = theServer->fPeriodicRTPPacketsLost;
(void)atomic_sub(&theServer->fPeriodicRTPPacketsLost, periodicPacketsLost);
theServer->fTotalRTPPacketsLost += periodicPacketsLost;
SInt64 curTime = OS::Milliseconds();
//for cpu percent
Float32 cpuTimeInSec = GetCPUTimeInSeconds();
//also update current bandwidth statistic
if (fLastBandwidthTime != 0)
{
Assert(curTime > fLastBandwidthTime);
UInt32 delta = (UInt32)(curTime - fLastBandwidthTime);
// Prevent divide by zero errror
if (delta < 1000) {
WarnV(delta >= 1000, "delta < 1000");
(void)this->GetEvents();//we must clear the event mask!
return theServer->GetPrefs()->GetTotalBytesUpdateTimeInSecs() * 1000;
}
UInt32 packetsPerSecond = periodicPackets;
UInt32 theTime = delta / 1000;
packetsPerSecond /= theTime;
Assert(packetsPerSecond >= 0);
theServer->fRTPPacketsPerSecond = packetsPerSecond;
UInt32 additionalBytes = 28 * packetsPerSecond; // IP headers = 20 + UDP headers = 8
UInt32 headerBits = 8 * additionalBytes;
headerBits /= theTime;
Float32 bits = periodicBytes * 8;
bits /= theTime;
theServer->fCurrentRTPBandwidthInBits = (UInt32) (bits + headerBits);
// okay let's do it for MP3 bytes now
bits = (Float32)(((SInt64)theServer->fTotalMP3Bytes - fLastTotalMP3Bytes) * 8);
bits /= theTime;
theServer->fCurrentMP3BandwidthInBits = (UInt32)bits;
//do the computation for cpu percent
Float32 diffTime = cpuTimeInSec - theServer->fCPUTimeUsedInSec;
theServer->fCPUPercent = (diffTime/theTime) * 100;
UInt32 numProcessors = OS::GetNumProcessors();
if (numProcessors > 1)
theServer->fCPUPercent /= numProcessors;
}
fLastTotalMP3Bytes = (SInt64)theServer->fTotalMP3Bytes;
fLastBandwidthTime = curTime;
// We use a running average for avg. bandwidth calculations
theServer->fAvgMP3BandwidthInBits = (theServer->fAvgMP3BandwidthInBits
+ theServer->fCurrentMP3BandwidthInBits)/2;
//for cpu percent
theServer->fCPUTimeUsedInSec = cpuTimeInSec;
//also compute average bandwidth, a much more smooth value. This is done with
//the fLastBandwidthAvg, a timestamp of the last time we did an average, and
//fLastBytesSent, the number of bytes sent when we last did an average.
if ((fLastBandwidthAvg != 0) && (curTime > (fLastBandwidthAvg +
(theServer->GetPrefs()->GetAvgBandwidthUpdateTimeInSecs() * 1000))))
{
UInt32 delta = (UInt32)(curTime - fLastBandwidthAvg);
SInt64 bytesSent = theServer->fTotalRTPBytes - fLastBytesSent;
Assert(bytesSent >= 0);
//do the bandwidth computation using floating point divides
//for accuracy and speed.
Float32 bits = (Float32)(bytesSent * 8);
Float32 theAvgTime = (Float32)delta;
theAvgTime /= 1000;
bits /= theAvgTime;
Assert(bits >= 0);
theServer->fAvgRTPBandwidthInBits = (UInt32)bits;
fLastBandwidthAvg = curTime;
fLastBytesSent = theServer->fTotalRTPBytes;
//if the bandwidth is above the bandwidth setting, disconnect 1 user by sending them
//a BYE RTCP packet.
SInt32 maxKBits = theServer->GetPrefs()->GetMaxKBitsBandwidth();
if ((maxKBits > -1) && (theServer->fAvgRTPBandwidthInBits > ((UInt32)maxKBits * 1024)))
{
//we need to make sure that all of this happens atomically wrt the session map
OSMutexLocker locker(theServer->GetRTPSessionMap()->GetMutex());
RTPSessionInterface* theSession = this->GetNewestSession(theServer->fRTPMap);
if (theSession != NULL)
if ((curTime - theSession->GetSessionCreateTime()) <
theServer->GetPrefs()->GetSafePlayDurationInSecs() * 1000)
theSession->Signal(Task::kKillEvent);
}
}
else if (fLastBandwidthAvg == 0)
{
fLastBandwidthAvg = curTime;
fLastBytesSent = theServer->fTotalRTPBytes;
}
(void)this->GetEvents();//we must clear the event mask!
return theServer->GetPrefs()->GetTotalBytesUpdateTimeInSecs() * 1000;
}
RTPSessionInterface* RTPStatsUpdaterTask::GetNewestSession(OSRefTable* inRTPSessionMap)
{
//Caller must lock down the RTP session map
SInt64 theNewestPlayTime = 0;
RTPSessionInterface* theNewestSession = NULL;
//use the session map to iterate through all the sessions, finding the most
//recently connected client
for (OSRefHashTableIter theIter(inRTPSessionMap->GetHashTable()); !theIter.IsDone(); theIter.Next())
{
OSRef* theRef = theIter.GetCurrent();
RTPSessionInterface* theSession = (RTPSessionInterface*)theRef->GetObject();
Assert(theSession->GetSessionCreateTime() > 0);
if (theSession->GetSessionCreateTime() > theNewestPlayTime)
{
theNewestPlayTime = theSession->GetSessionCreateTime();
theNewestSession = theSession;
}
}
return theNewestSession;
}
void* QTSServerInterface::CurrentUnixTimeMilli(QTSSDictionary* inServer, UInt32* outLen)
{
QTSServerInterface* theServer = (QTSServerInterface*)inServer;
theServer->fCurrentTime_UnixMilli = OS::TimeMilli_To_UnixTimeMilli(OS::Milliseconds());
// Return the result
*outLen = sizeof(theServer->fCurrentTime_UnixMilli);
return &theServer->fCurrentTime_UnixMilli;
}
void* QTSServerInterface::GetTotalUDPSockets(QTSSDictionary* inServer, UInt32* outLen)
{
QTSServerInterface* theServer = (QTSServerInterface*)inServer;
// Multiply by 2 because this is returning the number of socket *pairs*
theServer->fTotalUDPSockets = theServer->fSocketPool->GetSocketQueue()->GetLength() * 2;
// Return the result
*outLen = sizeof(theServer->fTotalUDPSockets);
return &theServer->fTotalUDPSockets;
}
void* QTSServerInterface::IsOutOfDescriptors(QTSSDictionary* inServer, UInt32* outLen)
{
QTSServerInterface* theServer = (QTSServerInterface*)inServer;
theServer->fIsOutOfDescriptors = false;
for (UInt32 x = 0; x < theServer->fNumListeners; x++)
{
if (theServer->fListeners[x]->IsOutOfDescriptors())
{
theServer->fIsOutOfDescriptors = true;
break;
}
}
// Return the result
*outLen = sizeof(theServer->fIsOutOfDescriptors);
return &theServer->fIsOutOfDescriptors;
}
void* QTSServerInterface::GetNumUDPBuffers(QTSSDictionary* inServer, UInt32* outLen)
{
// This param retrieval function must be invoked each time it is called,
// because whether we are out of descriptors or not is continually changing
QTSServerInterface* theServer = (QTSServerInterface*)inServer;
theServer->fNumUDPBuffers = RTPPacketResender::GetNumRetransmitBuffers();
// Return the result
*outLen = sizeof(theServer->fNumUDPBuffers);
return &theServer->fNumUDPBuffers;
}
void* QTSServerInterface::GetNumWastedBytes(QTSSDictionary* inServer, UInt32* outLen)
{
// This param retrieval function must be invoked each time it is called,
// because whether we are out of descriptors or not is continually changing
QTSServerInterface* theServer = (QTSServerInterface*)inServer;
theServer->fUDPWastageInBytes = RTPPacketResender::GetWastedBufferBytes();
// Return the result
*outLen = sizeof(theServer->fUDPWastageInBytes);
return &theServer->fUDPWastageInBytes;
}
void* QTSServerInterface::TimeConnected(QTSSDictionary* inConnection, UInt32* outLen)
{
SInt64 connectTime;
void* result;
UInt32 len = sizeof(connectTime);
inConnection->GetValue(qtssConnectionCreateTimeInMsec, 0, &connectTime, &len);
SInt64 timeConnected = OS::Milliseconds() - connectTime;
*outLen = sizeof(timeConnected);
inConnection->SetValue(qtssConnectionTimeStorage, 0, &timeConnected, sizeof(connectTime));
inConnection->GetValuePtr(qtssConnectionTimeStorage, 0, &result, outLen);
// Return the result
return result;
}
QTSS_Error QTSSErrorLogStream::Write(void* inBuffer, UInt32 inLen, UInt32* outLenWritten, UInt32 inFlags)
{
// For the error log stream, the flags are considered to be the verbosity
// of the error.
if (inFlags >= qtssIllegalVerbosity)
inFlags = qtssMessageVerbosity;
QTSServerInterface::LogError(inFlags, (char*)inBuffer);
if (outLenWritten != NULL)
*outLenWritten = inLen;
return QTSS_NoErr;
}
void QTSSErrorLogStream::LogAssert(char* inMessage)
{
QTSServerInterface::LogError(qtssAssertVerbosity, inMessage);
}