1617 lines
62 KiB
C++
1617 lines
62 KiB
C++
/*
|
|
*
|
|
* @APPLE_LICENSE_HEADER_START@
|
|
*
|
|
* Copyright (c) 1999-2008 Apple Inc. All Rights Reserved.
|
|
*
|
|
* This file contains Original Code and/or Modifications of Original Code
|
|
* as defined in and that are subject to the Apple Public Source License
|
|
* Version 2.0 (the 'License'). You may not use this file except in
|
|
* compliance with the License. Please obtain a copy of the License at
|
|
* http://www.opensource.apple.com/apsl/ and read it before using this
|
|
* file.
|
|
*
|
|
* The Original Code and all software distributed under the License are
|
|
* distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
|
|
* EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
|
|
* INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
|
|
* Please see the License for the specific language governing rights and
|
|
* limitations under the License.
|
|
*
|
|
* @APPLE_LICENSE_HEADER_END@
|
|
*
|
|
*/
|
|
/*
|
|
File: ReflectorStream.cpp
|
|
|
|
Contains: Implementation of object defined in ReflectorStream.h.
|
|
|
|
|
|
|
|
*/
|
|
|
|
#include "ReflectorStream.h"
|
|
#include "QTSSModuleUtils.h"
|
|
#include "OSMemory.h"
|
|
#include "SocketUtils.h"
|
|
#include "atomic.h"
|
|
#include "RTCPPacket.h"
|
|
#include "ReflectorSession.h"
|
|
|
|
|
|
#if DEBUG
|
|
#define REFLECTOR_STREAM_DEBUGGING 0
|
|
#else
|
|
#define REFLECTOR_STREAM_DEBUGGING 0
|
|
#endif
|
|
|
|
|
|
static ReflectorSocketPool sSocketPool;
|
|
|
|
// ATTRIBUTES
|
|
|
|
static QTSS_AttributeID sCantBindReflectorSocketErr = qtssIllegalAttrID;
|
|
static QTSS_AttributeID sCantJoinMulticastGroupErr = qtssIllegalAttrID;
|
|
|
|
// PREFS
|
|
|
|
static UInt32 sDefaultOverBufferInSec = 10;
|
|
static UInt32 sDefaultBucketDelayInMsec = 73;
|
|
static Bool16 sDefaultUsePacketReceiveTime = false;
|
|
static UInt32 sDefaultMaxFuturePacketTimeSec = 60;
|
|
static UInt32 sDefaultFirstPacketOffsetMsec = 500;
|
|
|
|
UInt32 ReflectorStream::sBucketSize = 16;
|
|
UInt32 ReflectorStream::sOverBufferInMsec = 10000; // more or less what the client over buffer will be
|
|
UInt32 ReflectorStream::sMaxFuturePacketMSec = 60000; // max packet future time
|
|
UInt32 ReflectorStream::sMaxPacketAgeMSec = 10000;
|
|
|
|
UInt32 ReflectorStream::sMaxFuturePacketSec = 60; // max packet future time
|
|
UInt32 ReflectorStream::sOverBufferInSec = 10;
|
|
UInt32 ReflectorStream::sBucketDelayInMsec = 73;
|
|
Bool16 ReflectorStream::sUsePacketReceiveTime = false;
|
|
UInt32 ReflectorStream::sFirstPacketOffsetMsec = 500;
|
|
|
|
void ReflectorStream::Register()
|
|
{
|
|
// Add text messages attributes
|
|
static char* sCantBindReflectorSocket= "QTSSReflectorModuleCantBindReflectorSocket";
|
|
static char* sCantJoinMulticastGroup = "QTSSReflectorModuleCantJoinMulticastGroup";
|
|
|
|
(void)QTSS_AddStaticAttribute(qtssTextMessagesObjectType, sCantBindReflectorSocket, NULL, qtssAttrDataTypeCharArray);
|
|
(void)QTSS_IDForAttr(qtssTextMessagesObjectType, sCantBindReflectorSocket, &sCantBindReflectorSocketErr);
|
|
|
|
(void)QTSS_AddStaticAttribute(qtssTextMessagesObjectType, sCantJoinMulticastGroup, NULL, qtssAttrDataTypeCharArray);
|
|
(void)QTSS_IDForAttr(qtssTextMessagesObjectType, sCantJoinMulticastGroup, &sCantJoinMulticastGroupErr);
|
|
}
|
|
|
|
void ReflectorStream::Initialize(QTSS_ModulePrefsObject inPrefs)
|
|
{
|
|
|
|
QTSSModuleUtils::GetAttribute(inPrefs, "reflector_bucket_offset_delay_msec", qtssAttrDataTypeUInt32,
|
|
&ReflectorStream::sBucketDelayInMsec, &sDefaultBucketDelayInMsec, sizeof(sBucketDelayInMsec));
|
|
|
|
QTSSModuleUtils::GetAttribute(inPrefs, "reflector_buffer_size_sec", qtssAttrDataTypeUInt32,
|
|
&ReflectorStream::sOverBufferInSec, &sDefaultOverBufferInSec, sizeof(sDefaultOverBufferInSec));
|
|
|
|
QTSSModuleUtils::GetAttribute(inPrefs, "reflector_use_in_packet_receive_time", qtssAttrDataTypeBool16,
|
|
&ReflectorStream::sUsePacketReceiveTime, &sDefaultUsePacketReceiveTime, sizeof(sDefaultUsePacketReceiveTime));
|
|
|
|
QTSSModuleUtils::GetAttribute(inPrefs, "reflector_in_packet_max_receive_sec", qtssAttrDataTypeUInt32,
|
|
&ReflectorStream::sMaxFuturePacketSec, &sDefaultMaxFuturePacketTimeSec, sizeof(sDefaultMaxFuturePacketTimeSec));
|
|
|
|
QTSSModuleUtils::GetAttribute(inPrefs, "reflector_rtp_info_offset_msec", qtssAttrDataTypeUInt32,
|
|
&ReflectorStream::sFirstPacketOffsetMsec, &sDefaultFirstPacketOffsetMsec, sizeof(sDefaultFirstPacketOffsetMsec));
|
|
|
|
ReflectorStream::sOverBufferInMsec = sOverBufferInSec * 1000;
|
|
ReflectorStream::sMaxFuturePacketMSec = sMaxFuturePacketSec * 1000;
|
|
ReflectorStream::sMaxPacketAgeMSec = (UInt32) (sOverBufferInMsec * 1.5); //allow a little time before deleting.
|
|
|
|
}
|
|
|
|
void ReflectorStream::GenerateSourceID(SourceInfo::StreamInfo* inInfo, char* ioBuffer)
|
|
{
|
|
|
|
::memcpy(ioBuffer, &inInfo->fSrcIPAddr, sizeof(inInfo->fSrcIPAddr));
|
|
::memcpy(&ioBuffer[sizeof(inInfo->fSrcIPAddr)], &inInfo->fPort, sizeof(inInfo->fPort));
|
|
}
|
|
|
|
|
|
ReflectorStream::ReflectorStream(SourceInfo::StreamInfo* inInfo)
|
|
: fPacketCount(0),
|
|
fSockets(NULL),
|
|
fRTPSender(NULL, qtssWriteFlagsIsRTP),
|
|
fRTCPSender(NULL, qtssWriteFlagsIsRTCP),
|
|
fOutputArray(NULL),
|
|
fNumBuckets(kMinNumBuckets),
|
|
fNumElements(0),
|
|
fBucketMutex(),
|
|
|
|
fDestRTCPAddr(0),
|
|
fDestRTCPPort(0),
|
|
|
|
fCurrentBitRate(0),
|
|
fLastBitRateSample(OS::Milliseconds()), // don't calculate our first bit rate until kBitRateAvgIntervalInMilSecs has passed!
|
|
fBytesSentInThisInterval(0),
|
|
|
|
fRTPChannel(-1),
|
|
fRTCPChannel(-1),
|
|
fHasFirstRTCPPacket(false),
|
|
fHasFirstRTPPacket(false),
|
|
fEnableBuffer(false),
|
|
fEyeCount(0),
|
|
fFirst_RTCP_RTP_Time(0),
|
|
fFirst_RTCP_Arrival_Time(0)
|
|
{
|
|
|
|
fRTPSender.fStream = this;
|
|
fRTCPSender.fStream = this;
|
|
|
|
fStreamInfo.Copy(*inInfo);
|
|
|
|
// ALLOCATE BUCKET ARRAY
|
|
this->AllocateBucketArray(fNumBuckets);
|
|
|
|
// WRITE RTCP PACKET
|
|
|
|
//write as much of the RTCP RR as is possible right now (most of it never changes)
|
|
UInt32 theSsrc = (UInt32)::rand();
|
|
char theTempCName[RTCPSRPacket::kMaxCNameLen];
|
|
UInt32 cNameLen = RTCPSRPacket::GetACName(theTempCName);
|
|
|
|
//write the RR (just header + ssrc)
|
|
UInt32* theRRWriter = (UInt32*)&fReceiverReportBuffer[0];
|
|
*theRRWriter = htonl(0x80c90001);
|
|
theRRWriter++;
|
|
*theRRWriter = htonl(theSsrc);
|
|
theRRWriter++;
|
|
|
|
//SDES length is the length of the CName, plus 2 32bit words, minus 1
|
|
*theRRWriter = htonl(0x81ca0000 + (cNameLen >> 2) + 1);
|
|
theRRWriter++;
|
|
*theRRWriter = htonl(theSsrc);
|
|
theRRWriter++;
|
|
::memcpy(theRRWriter, theTempCName, cNameLen);
|
|
theRRWriter += cNameLen >> 2;
|
|
|
|
//APP packet format, QTSS specific stuff
|
|
*theRRWriter = htonl(0x80cc0008);
|
|
theRRWriter++;
|
|
*theRRWriter = htonl(theSsrc);
|
|
theRRWriter++;
|
|
*theRRWriter = htonl(FOUR_CHARS_TO_INT('Q','T','S','S'));
|
|
theRRWriter++;
|
|
*theRRWriter = htonl(0);
|
|
theRRWriter++;
|
|
*theRRWriter = htonl(0x00000004);
|
|
theRRWriter++;
|
|
*theRRWriter = htonl(0x6579000c);
|
|
theRRWriter++;
|
|
|
|
fEyeLocation = theRRWriter;
|
|
fReceiverReportSize = kReceiverReportSize + kAppSize + cNameLen;
|
|
|
|
// If the source is a multicast, we should send our receiver reports
|
|
// to the multicast address
|
|
if (SocketUtils::IsMulticastIPAddr(fStreamInfo.fDestIPAddr))
|
|
{
|
|
fDestRTCPAddr = fStreamInfo.fDestIPAddr;
|
|
fDestRTCPPort = fStreamInfo.fPort + 1;
|
|
}
|
|
}
|
|
|
|
|
|
ReflectorStream::~ReflectorStream()
|
|
{
|
|
Assert(fNumElements == 0);
|
|
|
|
if (fSockets != NULL)
|
|
{
|
|
//first things first, let's take this stream off the socket's queue
|
|
//of streams. This will basically ensure that no reflecting activity
|
|
//can happen on this stream.
|
|
((ReflectorSocket*)fSockets->GetSocketA())->RemoveSender(&fRTPSender);
|
|
((ReflectorSocket*)fSockets->GetSocketB())->RemoveSender(&fRTCPSender);
|
|
|
|
//leave the multicast group. Because this socket is shared amongst several
|
|
//potential multicasts, we don't want to remain a member of a stale multicast
|
|
if (SocketUtils::IsMulticastIPAddr(fStreamInfo.fDestIPAddr))
|
|
{
|
|
fSockets->GetSocketA()->LeaveMulticast(fStreamInfo.fDestIPAddr);
|
|
fSockets->GetSocketB()->LeaveMulticast(fStreamInfo.fDestIPAddr);
|
|
}
|
|
//now release the socket pair
|
|
sSocketPool.ReleaseUDPSocketPair(fSockets);
|
|
}
|
|
|
|
//qtss_printf("Deleting stream %x\n", this);
|
|
|
|
//delete every client Bucket
|
|
for (UInt32 y = 0; y < fNumBuckets; y++)
|
|
delete [] fOutputArray[y];
|
|
delete [] fOutputArray;
|
|
}
|
|
|
|
void ReflectorStream::AllocateBucketArray(UInt32 inNumBuckets)
|
|
{
|
|
Bucket* oldArray = fOutputArray;
|
|
//allocate the 2-dimensional array
|
|
fOutputArray = NEW Bucket[inNumBuckets];
|
|
for (UInt32 x = 0; x < inNumBuckets; x++)
|
|
{
|
|
fOutputArray[x] = NEW ReflectorOutput*[sBucketSize];
|
|
::memset(fOutputArray[x], 0, sizeof(ReflectorOutput*) * sBucketSize);
|
|
}
|
|
|
|
//copy over the old information if there was an old array
|
|
if (oldArray != NULL)
|
|
{
|
|
Assert(inNumBuckets > fNumBuckets);
|
|
for (UInt32 y = 0; y < fNumBuckets; y++)
|
|
{
|
|
::memcpy(fOutputArray[y],oldArray[y], sBucketSize * sizeof(ReflectorOutput*));
|
|
delete [] oldArray[y];
|
|
}
|
|
delete [] oldArray;
|
|
}
|
|
fNumBuckets = inNumBuckets;
|
|
}
|
|
|
|
|
|
SInt32 ReflectorStream::AddOutput(ReflectorOutput* inOutput, SInt32 putInThisBucket)
|
|
{
|
|
OSMutexLocker locker(&fBucketMutex);
|
|
|
|
#if DEBUG
|
|
// We should never be adding an output twice to a stream
|
|
for (UInt32 dOne = 0; dOne < fNumBuckets; dOne++)
|
|
for (UInt32 dTwo = 0; dTwo < sBucketSize; dTwo++)
|
|
Assert(fOutputArray[dOne][dTwo] != inOutput);
|
|
#endif
|
|
|
|
// If caller didn't specify a bucket, find a bucket
|
|
if (putInThisBucket < 0)
|
|
putInThisBucket = this->FindBucket();
|
|
|
|
Assert(putInThisBucket >= 0);
|
|
|
|
if (fNumBuckets <= (UInt32)putInThisBucket)
|
|
this->AllocateBucketArray(putInThisBucket * 2);
|
|
|
|
for(UInt32 y = 0; y < sBucketSize; y++)
|
|
{
|
|
if (fOutputArray[putInThisBucket][y] == NULL)
|
|
{
|
|
fOutputArray[putInThisBucket][y] = inOutput;
|
|
#if REFLECTOR_STREAM_DEBUGGING
|
|
qtss_printf("Adding new output (0x%lx) to bucket %"_S32BITARG_", index %"_S32BITARG_",\nnum buckets %li bucketSize: %li \n",(SInt32)inOutput, putInThisBucket, y, (SInt32)fNumBuckets, (SInt32)sBucketSize);
|
|
#endif
|
|
fNumElements++;
|
|
return putInThisBucket;
|
|
}
|
|
}
|
|
// There was no empty spot in the specified bucket. Return an error
|
|
return -1;
|
|
}
|
|
|
|
SInt32 ReflectorStream::FindBucket()
|
|
{
|
|
// If we need more buckets, allocate them.
|
|
if (fNumElements == (sBucketSize * fNumBuckets))
|
|
this->AllocateBucketArray(fNumBuckets * 2);
|
|
|
|
//find the first open spot in the array
|
|
for (SInt32 putInThisBucket = 0; (UInt32)putInThisBucket < fNumBuckets; putInThisBucket++)
|
|
{
|
|
for(UInt32 y = 0; y < sBucketSize; y++)
|
|
if (fOutputArray[putInThisBucket][y] == NULL)
|
|
return putInThisBucket;
|
|
}
|
|
Assert(0);
|
|
return 0;
|
|
}
|
|
|
|
void ReflectorStream::RemoveOutput(ReflectorOutput* inOutput)
|
|
{
|
|
OSMutexLocker locker(&fBucketMutex);
|
|
Assert(fNumElements > 0);
|
|
|
|
//look at all the indexes in the array
|
|
for (UInt32 x = 0; x < fNumBuckets; x++)
|
|
{
|
|
for (UInt32 y = 0; y < sBucketSize; y++)
|
|
{
|
|
//The array may have blank spaces!
|
|
if (fOutputArray[x][y] == inOutput)
|
|
{
|
|
fOutputArray[x][y] = NULL;//just clear out the pointer
|
|
|
|
#if REFLECTOR_STREAM_DEBUGGING
|
|
qtss_printf("Removing output %x from bucket %"_S32BITARG_", index %"_S32BITARG_"\n",inOutput,x,y);
|
|
#endif
|
|
fNumElements--;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
Assert(0);
|
|
}
|
|
|
|
void ReflectorStream::TearDownAllOutputs()
|
|
{
|
|
|
|
OSMutexLocker locker(&fBucketMutex);
|
|
|
|
//look at all the indexes in the array
|
|
for (UInt32 x = 0; x < fNumBuckets; x++)
|
|
{
|
|
for (UInt32 y = 0; y < sBucketSize; y++)
|
|
{ ReflectorOutput* theOutputPtr= fOutputArray[x][y];
|
|
//The array may have blank spaces!
|
|
if (theOutputPtr != NULL)
|
|
{ theOutputPtr->TearDown();
|
|
#if REFLECTOR_STREAM_DEBUGGING
|
|
qtss_printf("TearDownAllOutputs Removing output from bucket %"_S32BITARG_", index %"_S32BITARG_"\n",x,y);
|
|
#endif
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
QTSS_Error ReflectorStream::BindSockets(QTSS_StandardRTSP_Params* inParams, UInt32 inReflectorSessionFlags, Bool16 filterState, UInt32 timeout)
|
|
{
|
|
// If the incoming data is RTSP interleaved, we don't need to do anything here
|
|
if (inReflectorSessionFlags & ReflectorSession::kIsPushSession)
|
|
fStreamInfo.fSetupToReceive = true;
|
|
|
|
QTSS_RTSPRequestObject inRequest = NULL;
|
|
if (inParams != NULL)
|
|
inRequest = inParams->inRTSPRequest;
|
|
|
|
// Set the transport Type a Broadcaster
|
|
QTSS_RTPTransportType transportType = qtssRTPTransportTypeUDP;
|
|
if (inParams != NULL)
|
|
{ UInt32 theLen = sizeof(transportType);
|
|
(void) QTSS_GetValue(inParams->inRTSPRequest, qtssRTSPReqTransportType, 0, (void*)&transportType, &theLen);
|
|
}
|
|
|
|
// get a pair of sockets. The socket must be bound on INADDR_ANY because we don't know
|
|
// which interface has access to this broadcast. If there is a source IP address
|
|
// specified by the source info, we can use that to demultiplex separate broadcasts on
|
|
// the same port. If the src IP addr is 0, we cannot do this and must dedicate 1 port per
|
|
// broadcast
|
|
|
|
// changing INADDR_ANY to fStreamInfo.fDestIPAddr to deal with NATs (need to track this change though)
|
|
// change submitted by denis@berlin.ccc.de
|
|
Bool16 isMulticastDest = (SocketUtils::IsMulticastIPAddr(fStreamInfo.fDestIPAddr));
|
|
if (isMulticastDest) {
|
|
fSockets = sSocketPool.GetUDPSocketPair(INADDR_ANY, fStreamInfo.fPort, fStreamInfo.fSrcIPAddr, 0);
|
|
} else {
|
|
fSockets = sSocketPool.GetUDPSocketPair(fStreamInfo.fDestIPAddr, fStreamInfo.fPort, fStreamInfo.fSrcIPAddr, 0);
|
|
}
|
|
|
|
if ((fSockets == NULL) && fStreamInfo.fSetupToReceive)
|
|
{
|
|
fStreamInfo.fPort = 0;
|
|
if (isMulticastDest) {
|
|
fSockets = sSocketPool.GetUDPSocketPair(INADDR_ANY, fStreamInfo.fPort, fStreamInfo.fSrcIPAddr, 0);
|
|
} else {
|
|
fSockets = sSocketPool.GetUDPSocketPair(fStreamInfo.fDestIPAddr, fStreamInfo.fPort, fStreamInfo.fSrcIPAddr, 0);
|
|
}
|
|
}
|
|
if (fSockets == NULL)
|
|
return QTSSModuleUtils::SendErrorResponse(inRequest, qtssServerInternal,
|
|
sCantBindReflectorSocketErr);
|
|
|
|
// If we know the source IP address of this broadcast, we can demux incoming traffic
|
|
// on the same port by that source IP address. If we don't know the source IP addr,
|
|
// it is impossible for us to demux, and therefore we shouldn't allow multiple
|
|
// broadcasts on the same port.
|
|
if (((ReflectorSocket*)fSockets->GetSocketA())->HasSender() && (fStreamInfo.fSrcIPAddr == 0))
|
|
return QTSSModuleUtils::SendErrorResponse(inRequest, qtssServerInternal,
|
|
sCantBindReflectorSocketErr);
|
|
|
|
//also put this stream onto the socket's queue of streams
|
|
((ReflectorSocket*)fSockets->GetSocketA())->AddSender(&fRTPSender);
|
|
((ReflectorSocket*)fSockets->GetSocketB())->AddSender(&fRTCPSender);
|
|
|
|
// A broadcaster is setting up a UDP session so let the sockets update the session
|
|
if (fStreamInfo.fSetupToReceive && qtssRTPTransportTypeUDP == transportType && inParams != NULL)
|
|
{ ((ReflectorSocket*)fSockets->GetSocketA())->AddBroadcasterSession(inParams->inClientSession);
|
|
((ReflectorSocket*)fSockets->GetSocketB())->AddBroadcasterSession(inParams->inClientSession);
|
|
}
|
|
|
|
((ReflectorSocket*)fSockets->GetSocketA())->SetSSRCFilter(filterState, timeout);
|
|
((ReflectorSocket*)fSockets->GetSocketB())->SetSSRCFilter(filterState, timeout);
|
|
|
|
#if 1
|
|
// Always set the Rcv buf size for the sockets. This is important because the
|
|
// server is going to be getting many packets on these sockets.
|
|
fSockets->GetSocketA()->SetSocketRcvBufSize(512 * 1024);
|
|
fSockets->GetSocketB()->SetSocketRcvBufSize(512 * 1024);
|
|
#endif
|
|
|
|
//If the broadcaster is sending RTP directly to us, we don't
|
|
//need to join a multicast group because we're not using multicast
|
|
if (isMulticastDest)
|
|
{
|
|
QTSS_Error err = fSockets->GetSocketA()->JoinMulticast(fStreamInfo.fDestIPAddr);
|
|
if (err == QTSS_NoErr)
|
|
err = fSockets->GetSocketB()->JoinMulticast(fStreamInfo.fDestIPAddr);
|
|
// If we get an error when setting the TTL, this isn't too important (TTL on
|
|
// these sockets is only useful for RTCP RRs.
|
|
if (err == QTSS_NoErr)
|
|
(void)fSockets->GetSocketA()->SetTtl(fStreamInfo.fTimeToLive);
|
|
if (err == QTSS_NoErr)
|
|
(void)fSockets->GetSocketB()->SetTtl(fStreamInfo.fTimeToLive);
|
|
if (err != QTSS_NoErr)
|
|
return QTSSModuleUtils::SendErrorResponse(inRequest, qtssServerInternal,
|
|
sCantJoinMulticastGroupErr);
|
|
}
|
|
|
|
// If the port is 0, update the port to be the actual port value
|
|
fStreamInfo.fPort = fSockets->GetSocketA()->GetLocalPort();
|
|
|
|
//finally, register these sockets for events
|
|
fSockets->GetSocketA()->RequestEvent(EV_RE);
|
|
fSockets->GetSocketB()->RequestEvent(EV_RE);
|
|
|
|
// Copy the source ID and setup the ref
|
|
StrPtrLen theSourceID(fSourceIDBuf, kStreamIDSize);
|
|
ReflectorStream::GenerateSourceID(&fStreamInfo, fSourceIDBuf);
|
|
fRef.Set(theSourceID, this);
|
|
return QTSS_NoErr;
|
|
}
|
|
|
|
void ReflectorStream::SendReceiverReport()
|
|
{
|
|
// Check to see if our destination RTCP addr & port are setup. They may
|
|
// not be if the source is unicast and we haven't gotten any incoming packets yet
|
|
if (fDestRTCPAddr == 0)
|
|
return;
|
|
|
|
UInt32 theEyeCount = this->GetEyeCount();
|
|
UInt32* theEyeWriter = fEyeLocation;
|
|
*theEyeWriter = htonl(theEyeCount) & 0x7fffffff;//no idea why we do this!
|
|
theEyeWriter++;
|
|
*theEyeWriter = htonl(theEyeCount) & 0x7fffffff;
|
|
theEyeWriter++;
|
|
*theEyeWriter = htonl(0) & 0x7fffffff;
|
|
|
|
//send the packet to the multicast RTCP addr & port for this stream
|
|
(void)fSockets->GetSocketB()->SendTo(fDestRTCPAddr, fDestRTCPPort, fReceiverReportBuffer, fReceiverReportSize);
|
|
}
|
|
|
|
void ReflectorStream::PushPacket(char *packet, UInt32 packetLen, Bool16 isRTCP)
|
|
{
|
|
|
|
if (packetLen > 0)
|
|
{
|
|
ReflectorPacket* thePacket = NULL;
|
|
if (isRTCP)
|
|
{ //qtss_printf("ReflectorStream::PushPacket RTCP packetlen = %"_U32BITARG_"\n",packetLen);
|
|
thePacket = ((ReflectorSocket*)fSockets->GetSocketB())->GetPacket();
|
|
if (thePacket == NULL)
|
|
{ //qtss_printf("ReflectorStream::PushPacket RTCP GetPacket() is NULL\n");
|
|
return;
|
|
}
|
|
|
|
OSMutexLocker locker( ((ReflectorSocket*)(fSockets->GetSocketB()) )->GetDemuxer()->GetMutex());
|
|
thePacket->SetPacketData(packet, packetLen);
|
|
((ReflectorSocket*)fSockets->GetSocketB())->ProcessPacket(OS::Milliseconds(),thePacket,0,0);
|
|
((ReflectorSocket*)fSockets->GetSocketB())->Signal(Task::kIdleEvent);
|
|
}
|
|
else
|
|
{ //qtss_printf("ReflectorStream::PushPacket RTP packetlen = %"_U32BITARG_"\n",packetLen);
|
|
thePacket = ((ReflectorSocket*)fSockets->GetSocketA())->GetPacket();
|
|
if (thePacket == NULL)
|
|
{ //qtss_printf("ReflectorStream::PushPacket GetPacket() is NULL\n");
|
|
return;
|
|
}
|
|
|
|
OSMutexLocker locker(((ReflectorSocket*)(fSockets->GetSocketA()))->GetDemuxer()->GetMutex());
|
|
thePacket->SetPacketData(packet, packetLen);
|
|
((ReflectorSocket*)fSockets->GetSocketA())->ProcessPacket(OS::Milliseconds(),thePacket,0,0);
|
|
((ReflectorSocket*)fSockets->GetSocketA())->Signal(Task::kIdleEvent);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
ReflectorSender::ReflectorSender(ReflectorStream* inStream, UInt32 inWriteFlag)
|
|
: fStream(inStream),
|
|
fWriteFlag(inWriteFlag),
|
|
fFirstNewPacketInQueue(NULL),
|
|
fFirstPacketInQueueForNewOutput(NULL),
|
|
fHasNewPackets(false),
|
|
fNextTimeToRun(0),
|
|
fLastRRTime(0),
|
|
fSocketQueueElem()
|
|
{
|
|
fSocketQueueElem.SetEnclosingObject(this);
|
|
}
|
|
|
|
ReflectorSender::~ReflectorSender()
|
|
{
|
|
//dequeue and delete every buffer
|
|
while (fPacketQueue.GetLength() > 0)
|
|
{
|
|
ReflectorPacket* packet = (ReflectorPacket*)fPacketQueue.DeQueue()->GetEnclosingObject();
|
|
delete packet;
|
|
}
|
|
}
|
|
|
|
|
|
Bool16 ReflectorSender::ShouldReflectNow(const SInt64& inCurrentTime, SInt64* ioWakeupTime)
|
|
{
|
|
Assert(ioWakeupTime != NULL);
|
|
//check to make sure there actually is work to do for this stream.
|
|
if ((!fHasNewPackets) && ((fNextTimeToRun == 0) || (inCurrentTime < fNextTimeToRun)))
|
|
{
|
|
//We don't need to do work right now, but
|
|
//this stream must still communicate when it needs to be woken up next
|
|
SInt64 theWakeupTime = fNextTimeToRun + inCurrentTime;
|
|
//qtss_printf("ReflectorSender::ShouldReflectNow theWakeupTime=%qd newWakeUpTime=%qd ioWakepTime=%qd\n", theWakeupTime, fNextTimeToRun + inCurrentTime,*ioWakeupTime);
|
|
if ((fNextTimeToRun > 0) && (theWakeupTime < *ioWakeupTime))
|
|
*ioWakeupTime = theWakeupTime;
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
UInt32 ReflectorSender::GetOldestPacketRTPTime(Bool16 *foundPtr)
|
|
{
|
|
if (foundPtr != NULL)
|
|
*foundPtr = false;
|
|
OSMutexLocker locker(&fStream->fBucketMutex);
|
|
OSQueueElem* packetElem = this->GetClientBufferStartPacket();
|
|
if (packetElem == NULL)
|
|
return 0;
|
|
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject();
|
|
if (thePacket == NULL)
|
|
return 0;
|
|
|
|
if (foundPtr != NULL)
|
|
*foundPtr = true;
|
|
|
|
return thePacket->GetPacketRTPTime();
|
|
}
|
|
|
|
UInt16 ReflectorSender::GetFirstPacketRTPSeqNum(Bool16 *foundPtr)
|
|
{
|
|
if (foundPtr != NULL)
|
|
*foundPtr = false;
|
|
|
|
UInt16 resultSeqNum = 0;
|
|
OSMutexLocker locker(&fStream->fBucketMutex);
|
|
OSQueueElem* packetElem = this->GetClientBufferStartPacket();
|
|
|
|
if (packetElem == NULL)
|
|
return 0;
|
|
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject();
|
|
if (thePacket == NULL)
|
|
return 0;
|
|
|
|
if (foundPtr != NULL)
|
|
*foundPtr = true;
|
|
|
|
resultSeqNum = thePacket->GetPacketRTPSeqNum();
|
|
|
|
return resultSeqNum;
|
|
}
|
|
|
|
OSQueueElem* ReflectorSender::GetClientBufferNextPacketTime(UInt32 inRTPTime)
|
|
{
|
|
|
|
OSQueueIter qIter(&fPacketQueue);// start at oldest packet in q
|
|
OSQueueElem* requestedPacket = NULL;
|
|
OSQueueElem* elem = NULL;
|
|
|
|
while ( !qIter.IsDone() ) // start at oldest packet in q
|
|
{
|
|
elem = qIter.GetCurrent();
|
|
|
|
if (requestedPacket == NULL)
|
|
requestedPacket = elem;
|
|
|
|
if (requestedPacket == NULL)
|
|
break;
|
|
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)elem->GetEnclosingObject();
|
|
Assert( thePacket );
|
|
|
|
if (thePacket->GetPacketRTPTime() > inRTPTime)
|
|
{
|
|
requestedPacket = elem; // return the first packet we have that has a later time
|
|
break; // found the packet we need: done processing
|
|
}
|
|
qIter.Next();
|
|
|
|
|
|
}
|
|
|
|
return requestedPacket;
|
|
}
|
|
|
|
Bool16 ReflectorSender::GetFirstRTPTimePacket(UInt16* outSeqNumPtr, UInt32* outRTPTimePtr, SInt64* outArrivalTimePtr)
|
|
{
|
|
OSMutexLocker locker(&fStream->fBucketMutex);
|
|
OSQueueElem* packetElem = this->GetClientBufferStartPacketOffset(ReflectorStream::sFirstPacketOffsetMsec);
|
|
|
|
if (packetElem == NULL)
|
|
return false;
|
|
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject();
|
|
if (thePacket == NULL)
|
|
return false;
|
|
|
|
packetElem = GetClientBufferNextPacketTime(thePacket->GetPacketRTPTime());
|
|
if (packetElem == NULL)
|
|
return false;
|
|
|
|
thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject();
|
|
if (thePacket == NULL)
|
|
return false;
|
|
|
|
if (outSeqNumPtr)
|
|
*outSeqNumPtr = thePacket->GetPacketRTPSeqNum();
|
|
|
|
if (outRTPTimePtr)
|
|
*outRTPTimePtr = thePacket->GetPacketRTPTime();
|
|
|
|
if (outArrivalTimePtr)
|
|
*outArrivalTimePtr = thePacket->fTimeArrived;
|
|
|
|
return true;
|
|
}
|
|
|
|
Bool16 ReflectorSender::GetFirstPacketInfo(UInt16* outSeqNumPtr, UInt32* outRTPTimePtr, SInt64* outArrivalTimePtr)
|
|
{
|
|
OSMutexLocker locker(&fStream->fBucketMutex);
|
|
OSQueueElem* packetElem = this->GetClientBufferStartPacketOffset(ReflectorStream::sFirstPacketOffsetMsec);
|
|
// OSQueueElem* packetElem = this->GetClientBufferStartPacket();
|
|
|
|
if (packetElem == NULL)
|
|
return false;
|
|
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject();
|
|
if (thePacket == NULL)
|
|
return false;
|
|
|
|
if (outSeqNumPtr)
|
|
*outSeqNumPtr = thePacket->GetPacketRTPSeqNum();
|
|
|
|
if (outRTPTimePtr)
|
|
*outRTPTimePtr = thePacket->GetPacketRTPTime();
|
|
|
|
if (outArrivalTimePtr)
|
|
*outArrivalTimePtr = thePacket->fTimeArrived;
|
|
|
|
thePacket->fNeededByOutput = true;
|
|
|
|
return true;
|
|
}
|
|
|
|
|
|
#if REFLECTOR_STREAM_DEBUGGING
|
|
static UInt16 DGetPacketSeqNumber(StrPtrLen* inPacket)
|
|
{
|
|
if (inPacket->Len < 4)
|
|
return 0;
|
|
|
|
//The RTP seq number is the second short of the packet
|
|
UInt16* seqNumPtr = (UInt16*)inPacket->Ptr;
|
|
return ntohs(seqNumPtr[1]);
|
|
}
|
|
|
|
|
|
|
|
#endif
|
|
|
|
|
|
void ReflectorSender::ReflectRelayPackets(SInt64* ioWakeupTime, OSQueue* inFreeQueue)
|
|
{
|
|
//Most of this code is useless i.e. buckets and bookmarks. This code will get cleaned up eventually
|
|
|
|
//printf("ReflectorSender::ReflectPackets %qd %qd\n",*ioWakeupTime,fNextTimeToRun);
|
|
#if DEBUG
|
|
Assert(ioWakeupTime != NULL);
|
|
#endif
|
|
#if REFLECTOR_STREAM_DEBUGGING > 2
|
|
Bool16 printQueueLenOnExit = false;
|
|
#endif
|
|
|
|
SInt64 currentTime = OS::Milliseconds();
|
|
|
|
//make sure to reset these state variables
|
|
fHasNewPackets = false;
|
|
fNextTimeToRun = 1000; // init to 1 secs
|
|
|
|
//determine if we need to send a receiver report to the multicast source
|
|
if ((fWriteFlag == qtssWriteFlagsIsRTCP) && (currentTime > (fLastRRTime + kRRInterval)))
|
|
{
|
|
fLastRRTime = currentTime;
|
|
fStream->SendReceiverReport();
|
|
#if REFLECTOR_STREAM_DEBUGGING > 2
|
|
printQueueLenOnExit = true;
|
|
printf( "fPacketQueue len %li\n", (SInt32)fPacketQueue.GetLength() );
|
|
#endif
|
|
}
|
|
|
|
//the rest of this function must be atomic wrt the ReflectorSession, because
|
|
//it involves iterating through the RTPSession array, which isn't thread safe
|
|
OSMutexLocker locker(&fStream->fBucketMutex);
|
|
|
|
// Check to see if we should update the session's bitrate average
|
|
if ((fStream->fLastBitRateSample + ReflectorStream::kBitRateAvgIntervalInMilSecs) < currentTime)
|
|
{
|
|
unsigned int intervalBytes = fStream->fBytesSentInThisInterval;
|
|
(void)atomic_sub(&fStream->fBytesSentInThisInterval, intervalBytes);
|
|
|
|
// Multiply by 1000 to convert from milliseconds to seconds, and by 8 to convert from bytes to bits
|
|
Float32 bps = (Float32)(intervalBytes * 8) / (Float32)(currentTime - fStream->fLastBitRateSample);
|
|
bps *= 1000;
|
|
fStream->fCurrentBitRate = (UInt32)bps;
|
|
|
|
// Don't check again for awhile!
|
|
fStream->fLastBitRateSample = currentTime;
|
|
}
|
|
|
|
for (UInt32 bucketIndex = 0; bucketIndex < fStream->fNumBuckets; bucketIndex++)
|
|
{
|
|
for (UInt32 bucketMemberIndex = 0; bucketMemberIndex < fStream->sBucketSize; bucketMemberIndex++)
|
|
{
|
|
ReflectorOutput* theOutput = fStream->fOutputArray[bucketIndex][bucketMemberIndex];
|
|
|
|
|
|
if (theOutput != NULL)
|
|
{
|
|
SInt32 availBookmarksPosition = -1; // -1 == invalid position
|
|
OSQueueElem* packetElem = NULL;
|
|
UInt32 curBookmark = 0;
|
|
|
|
Assert( curBookmark < theOutput->fNumBookmarks );
|
|
|
|
// see if we've bookmarked a held packet for this Sender in this Output
|
|
while ( curBookmark < theOutput->fNumBookmarks )
|
|
{
|
|
OSQueueElem* bookmarkedElem = theOutput->fBookmarkedPacketsElemsArray[curBookmark];
|
|
|
|
if ( bookmarkedElem ) // there may be holes in this array
|
|
{
|
|
if ( bookmarkedElem->IsMember( fPacketQueue ) )
|
|
{
|
|
// this packet was previously bookmarked for this specific queue
|
|
// remove if from the bookmark list and use it
|
|
// to jump ahead into the Sender's over all packet queue
|
|
theOutput->fBookmarkedPacketsElemsArray[curBookmark] = NULL;
|
|
availBookmarksPosition = curBookmark;
|
|
packetElem = bookmarkedElem;
|
|
break;
|
|
}
|
|
|
|
}
|
|
else
|
|
{
|
|
availBookmarksPosition = curBookmark;
|
|
}
|
|
|
|
curBookmark++;
|
|
|
|
}
|
|
|
|
Assert( availBookmarksPosition != -1 );
|
|
|
|
#if REFLECTOR_STREAM_DEBUGGING > 1
|
|
if ( packetElem ) // show 'em what we got johnny
|
|
{ ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject();
|
|
printf("Bookmarked packet time: %li, packetSeq %i\n", (SInt32)thePacket->fTimeArrived, DGetPacketSeqNumber( &thePacket->fPacketPtr ) );
|
|
}
|
|
#endif
|
|
|
|
// the output did not have a bookmarked packet if it's own
|
|
// so show it the first new packet we have in this sender.
|
|
// ( since TCP flow control may delay the sending of packets, this may not
|
|
// be the same as the first packet in the queue
|
|
if ( packetElem == NULL )
|
|
{
|
|
packetElem = fFirstNewPacketInQueue;
|
|
|
|
#if REFLECTOR_STREAM_DEBUGGING > 1
|
|
if ( packetElem ) // show 'em what we got johnny
|
|
{
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject();
|
|
printf("1st NEW packet from Sender sess 0x%lx time: %li, packetSeq %i\n", (SInt32)theOutput, (SInt32)thePacket->fTimeArrived, DGetPacketSeqNumber( &thePacket->fPacketPtr ) );
|
|
}
|
|
else
|
|
printf("no new packets\n" );
|
|
#endif
|
|
}
|
|
|
|
OSQueueIter qIter(&fPacketQueue, packetElem); // starts from beginning if packetElem == NULL, else from packetElem
|
|
|
|
Bool16 dodBookmarkPacket = false;
|
|
|
|
while ( !qIter.IsDone() )
|
|
{
|
|
packetElem = qIter.GetCurrent();
|
|
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject();
|
|
QTSS_Error err = QTSS_NoErr;
|
|
|
|
#if REFLECTOR_STREAM_DEBUGGING > 2
|
|
printf("packet time: %li, packetSeq %i\n", (SInt32)thePacket->fTimeArrived, DGetPacketSeqNumber( &thePacket->fPacketPtr ) );
|
|
#endif
|
|
|
|
// once we see a packet we cant' send, we need to stop trying
|
|
// during this pass mark remaining as still needed
|
|
if ( !dodBookmarkPacket )
|
|
{
|
|
SInt64 packetLateness = currentTime - thePacket->fTimeArrived - (ReflectorStream::sBucketDelayInMsec * (SInt64)bucketIndex);
|
|
// packetLateness measures how late this packet it after being corrected for the bucket delay
|
|
|
|
#if REFLECTOR_STREAM_DEBUGGING > 2
|
|
printf("packetLateness %li, seq# %li\n", (SInt32)packetLateness, (SInt32) DGetPacketSeqNumber( &thePacket->fPacketPtr ) );
|
|
#endif
|
|
|
|
SInt64 timeToSendPacket = -1;
|
|
err = theOutput->WritePacket(&thePacket->fPacketPtr, fStream, fWriteFlag, packetLateness, &timeToSendPacket, NULL, NULL, false);
|
|
|
|
if ( err == QTSS_WouldBlock )
|
|
{
|
|
#if REFLECTOR_STREAM_DEBUGGING > 2
|
|
printf("EAGAIN bookmark: %li, packetSeq %i\n", (SInt32)packetLateness, DGetPacketSeqNumber( &thePacket->fPacketPtr ) );
|
|
#endif
|
|
// tag it and bookmark it
|
|
thePacket->fNeededByOutput = true;
|
|
|
|
Assert( availBookmarksPosition != -1 );
|
|
if ( availBookmarksPosition != -1 )
|
|
theOutput->fBookmarkedPacketsElemsArray[availBookmarksPosition] = packetElem;
|
|
|
|
dodBookmarkPacket = true;
|
|
|
|
// call us again in # ms to retry on an EAGAIN
|
|
if ((timeToSendPacket > 0) && (fNextTimeToRun > timeToSendPacket ))
|
|
fNextTimeToRun = timeToSendPacket;
|
|
if ( timeToSendPacket == -1 )
|
|
this->SetNextTimeToRun(5); // keep in synch with delay on would block for on-demand lower is better for high-bit rate movies.
|
|
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if ( thePacket->fNeededByOutput ) // optimization: if the packet is already marked, another Output has been through this already
|
|
break;
|
|
thePacket->fNeededByOutput = true;
|
|
}
|
|
|
|
qIter.Next();
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
// reset our first new packet bookmark
|
|
fFirstNewPacketInQueue = NULL;
|
|
|
|
// iterate one more through the senders queue to clear out
|
|
// the unneeded packets
|
|
OSQueueIter removeIter(&fPacketQueue);
|
|
while ( !removeIter.IsDone() )
|
|
{
|
|
OSQueueElem* elem = removeIter.GetCurrent();
|
|
Assert( elem );
|
|
|
|
//at this point, move onto the next queue element, because we may be altering
|
|
//the queue itself in the code below
|
|
removeIter.Next();
|
|
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)elem->GetEnclosingObject();
|
|
Assert( thePacket );
|
|
|
|
if ( thePacket->fNeededByOutput == false )
|
|
{
|
|
thePacket->fNeededByOutput = true;
|
|
fPacketQueue.Remove( elem );
|
|
inFreeQueue->EnQueue( elem );
|
|
|
|
}
|
|
else // reset for next call to ReflectPackets
|
|
{
|
|
thePacket->fNeededByOutput = false;
|
|
}
|
|
}
|
|
|
|
//Don't forget that the caller also wants to know when we next want to run
|
|
if (*ioWakeupTime == 0)
|
|
*ioWakeupTime = fNextTimeToRun;
|
|
else if ((fNextTimeToRun > 0) && (*ioWakeupTime > fNextTimeToRun))
|
|
*ioWakeupTime = fNextTimeToRun;
|
|
// exit with fNextTimeToRun in real time, not relative time.
|
|
fNextTimeToRun += currentTime;
|
|
|
|
#if REFLECTOR_STREAM_DEBUGGING > 2
|
|
if ( printQueueLenOnExit )
|
|
printf( "EXIT fPacketQueue len %li\n", (SInt32)fPacketQueue.GetLength() );
|
|
#endif
|
|
}
|
|
|
|
/***********************************************************************************************
|
|
/ ReflectorSender::ReflectPackets
|
|
/
|
|
/ There are n ReflectorSender's for n output streams per presentation.
|
|
/
|
|
/ Each sender is associated with an array of ReflectorOutput's. Each
|
|
/ output represents a client connection. Each output has # RTPStream's.
|
|
/
|
|
/ When we write a packet to the ReflectorOutput he matches it's payload
|
|
/ to one of his streams and sends it there.
|
|
/
|
|
/ To smooth the bandwitdth (server, not user) requirements of the reflected streams, the Sender
|
|
/ groups the ReflectorOutput's into buckets. The input streams are reflected to
|
|
/ each bucket progressively later in time. So rather than send a single packet
|
|
/ to say 1000 clients all at once, we send it to just the first 16, then then next 16
|
|
/ 100 ms later and so on.
|
|
/
|
|
/
|
|
/ intputs ioWakeupTime - relative time to call us again in MSec
|
|
/ inFreeQueue - queue of free packets.
|
|
*/
|
|
|
|
void ReflectorSender::ReflectPackets(SInt64* ioWakeupTime, OSQueue* inFreeQueue)
|
|
{
|
|
if (!fStream->BufferEnabled()) // Call old routine for relays; they don't want buffering.
|
|
{
|
|
this->ReflectRelayPackets(ioWakeupTime,inFreeQueue);
|
|
return;
|
|
}
|
|
|
|
SInt64 currentTime = OS::Milliseconds();
|
|
|
|
//make sure to reset these state variables
|
|
fHasNewPackets = false;
|
|
|
|
fNextTimeToRun = 1000; // init to 1 secs
|
|
|
|
if (fWriteFlag == qtssWriteFlagsIsRTCP)
|
|
fNextTimeToRun = 1000;
|
|
|
|
//determine if we need to send a receiver report to the multicast source
|
|
if ((fWriteFlag == qtssWriteFlagsIsRTCP) && (currentTime > (fLastRRTime + kRRInterval)))
|
|
{
|
|
fLastRRTime = currentTime;
|
|
fStream->SendReceiverReport();
|
|
}
|
|
|
|
//the rest of this function must be atomic wrt the ReflectorSession, because
|
|
//it involves iterating through the RTPSession array, which isn't thread safe
|
|
OSMutexLocker locker(&fStream->fBucketMutex);
|
|
|
|
// Check to see if we should update the session's bitrate average
|
|
fStream->UpdateBitRate(currentTime);
|
|
|
|
// where to start new clients in the q
|
|
fFirstPacketInQueueForNewOutput = this->GetClientBufferStartPacketOffset(0);
|
|
|
|
#if (0) //test code
|
|
if (NULL != fFirstPacketInQueueForNewOutput)
|
|
printf("ReflectorSender::ReflectPackets SET first packet fFirstPacketInQueueForNewOutput %d \n", DGetPacketSeqNumber( &( (ReflectorPacket*) ( fFirstPacketInQueueForNewOutput->GetEnclosingObject()))->fPacketPtr ));
|
|
|
|
ReflectorPacket* thePacket = NULL;
|
|
if (fFirstPacketInQueueForNewOutput != NULL)
|
|
thePacket = (ReflectorPacket*) fFirstPacketInQueueForNewOutput->GetEnclosingObject();
|
|
if (thePacket == NULL)
|
|
{ printf("fFirstPacketInQueueForNewOutput is NULL \n");
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
Bool16 firstPacket =false;
|
|
|
|
for (UInt32 bucketIndex = 0; bucketIndex < fStream->fNumBuckets; bucketIndex++)
|
|
{
|
|
for (UInt32 bucketMemberIndex = 0; bucketMemberIndex < fStream->sBucketSize; bucketMemberIndex++)
|
|
{
|
|
ReflectorOutput* theOutput = fStream->fOutputArray[bucketIndex][bucketMemberIndex];
|
|
if (theOutput != NULL)
|
|
{
|
|
if ( false == theOutput->IsPlaying() )
|
|
continue;
|
|
|
|
OSQueueElem* packetElem = theOutput->GetBookMarkedPacket(&fPacketQueue);
|
|
if ( packetElem == NULL ) // should only be a new output
|
|
{
|
|
packetElem = fFirstPacketInQueueForNewOutput; // everybody starts at the oldest packet in the buffer delay or uses a bookmark
|
|
firstPacket = true;
|
|
theOutput->fNewOutput = false;
|
|
//if (packetElem) printf("ReflectorSender::ReflectPackets Sending first packet in Queue packetElem=fFirstPacketInQueueForNewOutput %d \n", ( (ReflectorPacket*) (packetElem->GetEnclosingObject() ) )->GetPacketRTPSeqNum());
|
|
|
|
}
|
|
|
|
SInt64 bucketDelay = ReflectorStream::sBucketDelayInMsec * (SInt64)bucketIndex;
|
|
packetElem = this->SendPacketsToOutput(theOutput, packetElem,currentTime, bucketDelay, firstPacket);
|
|
if (packetElem)
|
|
{
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject();
|
|
thePacket->fNeededByOutput = true; // flag to prevent removal in RemoveOldPackets
|
|
(void) theOutput->SetBookMarkPacket(packetElem); // store a reference to the packet
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
this->RemoveOldPackets(inFreeQueue);
|
|
fFirstNewPacketInQueue = NULL;
|
|
|
|
//Don't forget that the caller also wants to know when we next want to run
|
|
if (*ioWakeupTime == 0)
|
|
*ioWakeupTime = fNextTimeToRun;
|
|
else if ((fNextTimeToRun > 0) && (*ioWakeupTime > fNextTimeToRun))
|
|
*ioWakeupTime = fNextTimeToRun;
|
|
// exit with fNextTimeToRun in real time, not relative time.
|
|
fNextTimeToRun += currentTime;
|
|
|
|
// qtss_printf("SetNextTimeToRun fNextTimeToRun=%qd + currentTime=%qd\n", fNextTimeToRun, currentTime);
|
|
// qtss_printf("ReflectorSender::ReflectPackets *ioWakeupTime = %qd\n", *ioWakeupTime);
|
|
|
|
}
|
|
|
|
OSQueueElem* ReflectorSender::SendPacketsToOutput(ReflectorOutput* theOutput, OSQueueElem* currentPacket, SInt64 currentTime, SInt64 bucketDelay, Bool16 firstPacket)
|
|
{
|
|
OSQueueElem* lastPacket = currentPacket;
|
|
OSQueueIter qIter(&fPacketQueue, currentPacket); // starts from beginning if currentPacket == NULL, else from currentPacket
|
|
|
|
UInt32 count = 0;
|
|
QTSS_Error err = QTSS_NoErr;
|
|
while ( !qIter.IsDone() )
|
|
{
|
|
currentPacket = qIter.GetCurrent();
|
|
lastPacket = currentPacket;
|
|
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)currentPacket->GetEnclosingObject();
|
|
SInt64 packetLateness = bucketDelay;
|
|
SInt64 timeToSendPacket = -1;
|
|
|
|
//printf("packetLateness %qd, seq# %li\n", packetLateness, (SInt32) DGetPacketSeqNumber( &thePacket->fPacketPtr ) );
|
|
|
|
err = theOutput->WritePacket(&thePacket->fPacketPtr, fStream, fWriteFlag, packetLateness, &timeToSendPacket,&thePacket->fStreamCountID,&thePacket->fTimeArrived, firstPacket );
|
|
|
|
if (err == QTSS_WouldBlock)
|
|
{ // call us again in # ms to retry on an EAGAIN
|
|
|
|
if ((timeToSendPacket > 0) && ( (fNextTimeToRun + currentTime) > timeToSendPacket )) // blocked but we are scheduled to wake up later
|
|
fNextTimeToRun = timeToSendPacket - currentTime;
|
|
|
|
if (theOutput->fLastIntervalMilliSec < 5 )
|
|
theOutput->fLastIntervalMilliSec = 5;
|
|
|
|
if ( timeToSendPacket < 0 ) // blocked and we are behind
|
|
{ //qtss_printf("fNextTimeToRun = theOutput->fLastIntervalMilliSec=%qd;\n", theOutput->fLastIntervalMilliSec); // Use the last packet interval
|
|
this->SetNextTimeToRun(theOutput->fLastIntervalMilliSec);
|
|
}
|
|
|
|
if (fNextTimeToRun > 100) //don't wait that long
|
|
{ //qtss_printf("fNextTimeToRun = %qd now 100;\n", fNextTimeToRun);
|
|
this->SetNextTimeToRun(100);
|
|
}
|
|
|
|
if (fNextTimeToRun < 5) //wait longer
|
|
{ //qtss_printf("fNextTimeToRun = 5;\n");
|
|
this->SetNextTimeToRun(5);
|
|
}
|
|
|
|
if (theOutput->fLastIntervalMilliSec >= 100) // allow up to 1 second max -- allow some time for the socket to clear and don't go into a tight loop if the client is gone.
|
|
theOutput->fLastIntervalMilliSec = 100;
|
|
else
|
|
theOutput->fLastIntervalMilliSec *= 2; // scale upwards over time
|
|
|
|
//qtss_printf ( "Blocked ReflectorSender::SendPacketsToOutput timeToSendPacket=%qd fLastIntervalMilliSec=%qd fNextTimeToRun=%qd \n", timeToSendPacket, theOutput->fLastIntervalMilliSec, fNextTimeToRun);
|
|
|
|
break;
|
|
}
|
|
|
|
count++;
|
|
qIter.Next();
|
|
|
|
}
|
|
|
|
return lastPacket;
|
|
}
|
|
|
|
OSQueueElem* ReflectorSender::GetClientBufferStartPacketOffset(SInt64 offsetMsec)
|
|
{
|
|
|
|
OSQueueIter qIter(&fPacketQueue);// start at oldest packet in q
|
|
SInt64 theCurrentTime = OS::Milliseconds();
|
|
SInt64 packetDelay = 0;
|
|
OSQueueElem* oldestPacketInClientBufferTime = NULL;
|
|
|
|
|
|
while ( !qIter.IsDone() ) // start at oldest packet in q
|
|
{
|
|
OSQueueElem* elem = qIter.GetCurrent();
|
|
Assert( elem );
|
|
qIter.Next();
|
|
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)elem->GetEnclosingObject();
|
|
Assert( thePacket );
|
|
|
|
packetDelay = theCurrentTime - thePacket->fTimeArrived;
|
|
if (offsetMsec > ReflectorStream::sOverBufferInMsec)
|
|
offsetMsec = ReflectorStream::sOverBufferInMsec;
|
|
|
|
if ( packetDelay <= (ReflectorStream::sOverBufferInMsec - offsetMsec) )
|
|
{
|
|
oldestPacketInClientBufferTime = &thePacket->fQueueElem;
|
|
break; // found the packet we need: done processing
|
|
}
|
|
|
|
}
|
|
|
|
return oldestPacketInClientBufferTime;
|
|
}
|
|
|
|
void ReflectorSender::RemoveOldPackets(OSQueue* inFreeQueue)
|
|
{
|
|
|
|
// Iterate through the senders queue to clear out packets
|
|
// Start at the oldest packet and walk forward to the newest packet
|
|
//
|
|
OSQueueIter removeIter(&fPacketQueue);
|
|
SInt64 theCurrentTime = OS::Milliseconds();
|
|
SInt64 packetDelay = 0;
|
|
SInt64 currentMaxPacketDelay = ReflectorStream::sMaxPacketAgeMSec;
|
|
|
|
|
|
while ( !removeIter.IsDone() )
|
|
{
|
|
OSQueueElem* elem = removeIter.GetCurrent();
|
|
Assert( elem );
|
|
|
|
//at this point, move onto the next queue element, because we may be altering
|
|
//the queue itself in the code below
|
|
removeIter.Next();
|
|
|
|
ReflectorPacket* thePacket = (ReflectorPacket*)elem->GetEnclosingObject();
|
|
Assert( thePacket );
|
|
//printf("ReflectorSender::RemoveOldPackets Packet %d in queue is %qd milliseconds old\n", DGetPacketSeqNumber( &thePacket->fPacketPtr ) ,theCurrentTime - thePacket->fTimeArrived);
|
|
|
|
|
|
packetDelay = theCurrentTime - thePacket->fTimeArrived;
|
|
|
|
// walk q and remove packets that are too old
|
|
if ( !thePacket->fNeededByOutput && packetDelay > currentMaxPacketDelay) // delete based on late tolerance and whether a client is blocked on the packet
|
|
{ // not needed and older than our required buffer
|
|
thePacket->Reset();
|
|
fPacketQueue.Remove( elem );
|
|
inFreeQueue->EnQueue( elem );
|
|
}
|
|
else
|
|
{ // we want to keep all of these but we should reset the ones that should be aged out unless marked
|
|
// as need the next time through reflect packets.
|
|
|
|
thePacket->fNeededByOutput = false; //mark not needed.. will be set next time through reflect packets
|
|
if (packetDelay <= currentMaxPacketDelay) // this packet is going to be kept around as well as the ones that follow.
|
|
break;
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
|
|
void ReflectorSocketPool::SetUDPSocketOptions(UDPSocketPair* inPair)
|
|
{
|
|
// Fix add ReuseAddr for compatibility with MPEG4IP broadcaster which likes to use the same
|
|
//sockets.
|
|
|
|
//Make sure this works with PlaylistBroadcaster
|
|
//inPair->GetSocketA()->ReuseAddr();
|
|
//inPair->GetSocketA()->ReuseAddr();
|
|
|
|
}
|
|
|
|
|
|
UDPSocketPair* ReflectorSocketPool::ConstructUDPSocketPair()
|
|
{
|
|
return NEW UDPSocketPair
|
|
(NEW ReflectorSocket(), NEW ReflectorSocket());
|
|
}
|
|
|
|
void ReflectorSocketPool::DestructUDPSocket(ReflectorSocket* socket)
|
|
{
|
|
if (socket) // allocated
|
|
{
|
|
if (socket->GetLocalPort() > 0) // bound and active
|
|
{ //The socket's run function may be executing RIGHT NOW! So we can't
|
|
//just delete the thing, we need to send the sockets kill events.
|
|
//qtss_printf("ReflectorSocketPool::DestructUDPSocketPair Signal kKillEvent socket=%p\n",socket);
|
|
socket->Signal(Task::kKillEvent);
|
|
}
|
|
else // not bound ok to delete
|
|
{ //qtss_printf("ReflectorSocketPool::DestructUDPSocketPair delete socket=%p\n",socket);
|
|
delete socket;
|
|
}
|
|
}
|
|
}
|
|
|
|
void ReflectorSocketPool::DestructUDPSocketPair(UDPSocketPair *inPair)
|
|
{
|
|
//qtss_printf("ReflectorSocketPool::DestructUDPSocketPair inPair=%p socketA=%p\n", inPair,(ReflectorSocket*)inPair->GetSocketA());
|
|
this->DestructUDPSocket((ReflectorSocket*)inPair->GetSocketA());
|
|
|
|
//qtss_printf("ReflectorSocketPool::DestructUDPSocketPair inPair=%p socketB=%p\n", inPair,(ReflectorSocket*)inPair->GetSocketB());
|
|
this->DestructUDPSocket((ReflectorSocket*)inPair->GetSocketB());
|
|
|
|
delete inPair;
|
|
}
|
|
|
|
ReflectorSocket::ReflectorSocket()
|
|
: IdleTask(),
|
|
UDPSocket(NULL, Socket::kNonBlockingSocketType | UDPSocket::kWantsDemuxer),
|
|
fBroadcasterClientSession(NULL),
|
|
fLastBroadcasterTimeOutRefresh(0),
|
|
fSleepTime(0),
|
|
fValidSSRC(0),
|
|
fLastValidSSRCTime(0),
|
|
fFilterSSRCs(true),
|
|
fTimeoutSecs(30),
|
|
fHasReceiveTime(false),
|
|
fFirstReceiveTime(0),
|
|
fFirstArrivalTime(0),
|
|
fCurrentSSRC(0)
|
|
|
|
{
|
|
//construct all the preallocated packets
|
|
this->SetTaskName("ReflectorSocket");
|
|
this->SetTask(this);
|
|
|
|
for (UInt32 numPackets = 0; numPackets < kNumPreallocatedPackets; numPackets++)
|
|
{
|
|
//If the local port # of this socket is odd, then all the packets
|
|
//used for this socket are rtcp packets.
|
|
ReflectorPacket* packet = NEW ReflectorPacket();
|
|
fFreeQueue.EnQueue(&packet->fQueueElem);//put this packet onto the free queue
|
|
}
|
|
}
|
|
|
|
ReflectorSocket::~ReflectorSocket()
|
|
{
|
|
//printf("ReflectorSocket::~ReflectorSocket\n");
|
|
while (fFreeQueue.GetLength() > 0)
|
|
{
|
|
ReflectorPacket* packet = (ReflectorPacket*)fFreeQueue.DeQueue()->GetEnclosingObject();
|
|
delete packet;
|
|
}
|
|
}
|
|
|
|
void ReflectorSocket::AddSender(ReflectorSender* inSender)
|
|
{
|
|
OSMutexLocker locker(this->GetDemuxer()->GetMutex());
|
|
QTSS_Error err = this->GetDemuxer()->RegisterTask(inSender->fStream->fStreamInfo.fSrcIPAddr, 0, inSender);
|
|
Assert(err == QTSS_NoErr);
|
|
fSenderQueue.EnQueue(&inSender->fSocketQueueElem);
|
|
}
|
|
|
|
void ReflectorSocket::RemoveSender(ReflectorSender* inSender)
|
|
{
|
|
OSMutexLocker locker(this->GetDemuxer()->GetMutex());
|
|
fSenderQueue.Remove(&inSender->fSocketQueueElem);
|
|
QTSS_Error err = this->GetDemuxer()->UnregisterTask(inSender->fStream->fStreamInfo.fSrcIPAddr, 0, inSender);
|
|
Assert(err == QTSS_NoErr);
|
|
}
|
|
|
|
SInt64 ReflectorSocket::Run()
|
|
{
|
|
//We want to make sure we can't get idle events WHILE we are inside
|
|
//this function. That will cause us to run the queues unnecessarily
|
|
//and just get all confused.
|
|
this->CancelTimeout();
|
|
|
|
Task::EventFlags theEvents = this->GetEvents();
|
|
//if we have been told to delete ourselves, do so.
|
|
if (theEvents & Task::kKillEvent)
|
|
return -1;
|
|
|
|
OSMutexLocker locker(this->GetDemuxer()->GetMutex());
|
|
SInt64 theMilliseconds = OS::Milliseconds();
|
|
|
|
//Only check for data on the socket if we've actually been notified to that effect
|
|
if (theEvents & Task::kReadEvent)
|
|
this->GetIncomingData(theMilliseconds);
|
|
|
|
#if DEBUG
|
|
//make sure that we haven't gotten here prematurely! This wouldn't mess
|
|
//anything up, but it would waste CPU.
|
|
if (theEvents & Task::kIdleEvent)
|
|
{
|
|
SInt32 temp = (SInt32)(fSleepTime - theMilliseconds);
|
|
char tempBuf[20];
|
|
qtss_sprintf(tempBuf,"%"_S32BITARG_"",temp);
|
|
WarnV(fSleepTime <= theMilliseconds, tempBuf);
|
|
}
|
|
#endif
|
|
|
|
fSleepTime = 0;
|
|
//Now that we've gotten all available packets, have the streams reflect
|
|
for (OSQueueIter iter2(&fSenderQueue); !iter2.IsDone(); iter2.Next())
|
|
{
|
|
ReflectorSender* theSender2 = (ReflectorSender*)iter2.GetCurrent()->GetEnclosingObject();
|
|
if (theSender2 != NULL && theSender2->ShouldReflectNow(theMilliseconds, &fSleepTime))
|
|
theSender2->ReflectPackets(&fSleepTime, &fFreeQueue);
|
|
}
|
|
|
|
#if DEBUG
|
|
theMilliseconds = OS::Milliseconds();
|
|
#endif
|
|
|
|
//For smoothing purposes, the streams can mark when they want to wakeup.
|
|
if (fSleepTime > 0)
|
|
this->SetIdleTimer(fSleepTime);
|
|
#if DEBUG
|
|
//The debugging check above expects real time.
|
|
fSleepTime += theMilliseconds;
|
|
#endif
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
void ReflectorSocket::FilterInvalidSSRCs(ReflectorPacket* thePacket,Bool16 isRTCP)
|
|
{ // assume the first SSRC we see is valid and all others are to be ignored.
|
|
if ( thePacket->fPacketPtr.Len > 0) do
|
|
{
|
|
SInt64 currentTime = OS::Milliseconds() / 1000;
|
|
if (0 == fValidSSRC)
|
|
{ fValidSSRC = thePacket->GetSSRC(isRTCP); // SSRC of 0 is allowed
|
|
fLastValidSSRCTime = currentTime;
|
|
//qtss_printf("socket=%"_U32BITARG_" FIRST PACKET fValidSSRC=%"_U32BITARG_" \n", (UInt32) this,fValidSSRC);
|
|
break;
|
|
}
|
|
|
|
UInt32 packetSSRC = thePacket->GetSSRC(isRTCP);
|
|
if (packetSSRC != 0)
|
|
{
|
|
if (packetSSRC == fValidSSRC)
|
|
{ fLastValidSSRCTime = currentTime;
|
|
//qtss_printf("socket=%"_U32BITARG_" good packet\n", (UInt32) this );
|
|
break;
|
|
}
|
|
|
|
//qtss_printf("socket=%"_U32BITARG_" bad packet packetSSRC= %"_U32BITARG_" fValidSSRC=%"_U32BITARG_" \n", (UInt32) this,packetSSRC,fValidSSRC);
|
|
thePacket->fPacketPtr.Len = 0; // ignore this packet wrong SSRC
|
|
}
|
|
|
|
// this executes whenever an invalid SSRC is found -- maybe the original stream ended and a new one is now active
|
|
if ( (fLastValidSSRCTime + fTimeoutSecs) < currentTime) // fValidSSRC timed out --no packets with this SSRC seen for awhile
|
|
{ fValidSSRC = 0; // reset the valid SSRC with the next packet's SSRC
|
|
//qtss_printf("RESET fValidSSRC\n");
|
|
}
|
|
|
|
}while (false);
|
|
}
|
|
|
|
Bool16 ReflectorSocket::ProcessPacket(const SInt64& inMilliseconds,ReflectorPacket* thePacket,UInt32 theRemoteAddr,UInt16 theRemotePort)
|
|
{
|
|
Bool16 done = false; // stop when result is true
|
|
if (thePacket != NULL) do
|
|
{
|
|
if (GetLocalPort() & 1)
|
|
thePacket->fIsRTCP = true;
|
|
else
|
|
thePacket->fIsRTCP = false;
|
|
|
|
if (fBroadcasterClientSession != NULL) // alway refresh timeout even if we are filtering.
|
|
{ if ( (inMilliseconds - fLastBroadcasterTimeOutRefresh) > kRefreshBroadcastSessionIntervalMilliSecs)
|
|
{ QTSS_RefreshTimeOut(fBroadcasterClientSession);
|
|
fLastBroadcasterTimeOutRefresh = inMilliseconds;
|
|
}
|
|
}
|
|
|
|
|
|
if (thePacket->fPacketPtr.Len == 0)
|
|
{
|
|
//put the packet back on the free queue, because we didn't actually
|
|
//get any data here.
|
|
fFreeQueue.EnQueue(&thePacket->fQueueElem);
|
|
this->RequestEvent(EV_RE);
|
|
done = true;
|
|
//qtss_printf("ReflectorSocket::ProcessPacket no more packets on this socket!\n");
|
|
break;//no more packets on this socket!
|
|
}
|
|
|
|
if (thePacket->IsRTCP())
|
|
{
|
|
//if this is a new RTCP packet, check to see if it is a sender report.
|
|
//We should only reflect sender reports. Because RTCP packets can't have both
|
|
//an SR & an RR, and because the SR & the RR must be the first packet in a
|
|
//compound RTCP packet, all we have to do to determine this is look at the
|
|
//packet type of the first packet in the compound packet.
|
|
RTCPPacket theRTCPPacket;
|
|
if ((!theRTCPPacket.ParsePacket((UInt8*)thePacket->fPacketPtr.Ptr, thePacket->fPacketPtr.Len)) ||
|
|
(theRTCPPacket.GetPacketType() != RTCPSRPacket::kSRPacketType))
|
|
{
|
|
//pretend as if we never got this packet
|
|
fFreeQueue.EnQueue(&thePacket->fQueueElem);
|
|
done = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Only reflect one SSRC stream at a time.
|
|
// Pass the packet and whether it is an RTCP or RTP packet based on the port number.
|
|
if (fFilterSSRCs)
|
|
this->FilterInvalidSSRCs(thePacket,GetLocalPort() & 1);// thePacket->fPacketPtr.Len is set to 0 for invalid SSRCs.
|
|
|
|
// Find the appropriate ReflectorSender for this packet.
|
|
ReflectorSender* theSender = (ReflectorSender*)this->GetDemuxer()->GetTask(theRemoteAddr, 0);
|
|
// If there is a generic sender for this socket, use it.
|
|
if (theSender == NULL)
|
|
theSender = (ReflectorSender*)this->GetDemuxer()->GetTask(0, 0);
|
|
|
|
if (theSender == NULL)
|
|
{
|
|
//UInt16* theSeqNumberP = (UInt16*)thePacket->fPacketPtr.Ptr;
|
|
//qtss_printf("ReflectorSocket::ProcessPacket no sender found for packet! sequence number=%d\n",ntohs(theSeqNumberP[1]));
|
|
fFreeQueue.EnQueue(&thePacket->fQueueElem); // don't process the packet
|
|
done = true;
|
|
break;
|
|
}
|
|
|
|
Assert(theSender != NULL); // at this point we have a sender
|
|
|
|
const UInt32 maxQSize = 4000;
|
|
|
|
// Check to see if we need to set the remote RTCP address
|
|
// for this stream. This will be necessary if the source is unicast.
|
|
#ifdef NAT_WORKAROUND
|
|
if ((theRemoteAddr != 0) && ((theSender->fStream->fDestRTCPAddr == 0) || (thePacket->IsRTCP()))) // Submitted fix from denis@berlin.ccc.de
|
|
{
|
|
Assert(!SocketUtils::IsMulticastIPAddr(theSender->fStream->fStreamInfo.fDestIPAddr));
|
|
Assert(theRemotePort != 0);
|
|
theSender->fStream->fDestRTCPAddr = theRemoteAddr;
|
|
theSender->fStream->fDestRTCPPort = theRemotePort;
|
|
|
|
// RTCPs are always on odd ports, so check to see if this port is an
|
|
// RTP port, and if so, just add 1.
|
|
if (!(thePacket->IsRTCP()) && !(theRemotePort & 1))
|
|
theSender->fStream->fDestRTCPPort++;
|
|
}
|
|
#else
|
|
if ((theRemoteAddr != 0) && (theSender->fStream->fDestRTCPAddr == 0))
|
|
{
|
|
// If the source is multicast, this shouldn't be necessary
|
|
Assert(!SocketUtils::IsMulticastIPAddr(theSender->fStream->fStreamInfo.fDestIPAddr));
|
|
Assert(theRemotePort != 0);
|
|
theSender->fStream->fDestRTCPAddr = theRemoteAddr;
|
|
theSender->fStream->fDestRTCPPort = theRemotePort;
|
|
|
|
// RTCPs are always on odd ports, so check to see if this port is an
|
|
// RTP port, and if so, just add 1.
|
|
if (!(theRemotePort & 1))
|
|
theSender->fStream->fDestRTCPPort++;
|
|
}
|
|
#endif //NAT_WORKAROUND
|
|
thePacket->fStreamCountID = ++(theSender->fStream->fPacketCount);
|
|
thePacket->fBucketsSeenThisPacket = 0;
|
|
thePacket->fTimeArrived = inMilliseconds;
|
|
theSender->fPacketQueue.EnQueue(&thePacket->fQueueElem);
|
|
if ( theSender->fFirstNewPacketInQueue == NULL )
|
|
theSender->fFirstNewPacketInQueue = &thePacket->fQueueElem;
|
|
theSender->fHasNewPackets = true;
|
|
|
|
|
|
if (!(thePacket->IsRTCP()))
|
|
{
|
|
// don't check for duplicate packets, they may be needed to keep in sync.
|
|
// Because this is an RTP packet make sure to atomic add this because
|
|
// multiple sockets can be adding to this variable simultaneously
|
|
(void)atomic_add(&theSender->fStream->fBytesSentInThisInterval, thePacket->fPacketPtr.Len);
|
|
//printf("ReflectorSocket::ProcessPacket received RTP id=%qu\n", thePacket->fStreamCountID);
|
|
theSender->fStream->SetHasFirstRTP(true);
|
|
}
|
|
else
|
|
{
|
|
//printf("ReflectorSocket::ProcessPacket received RTCP id=%qu\n", thePacket->fStreamCountID);
|
|
theSender->fStream->SetHasFirstRTCP(true);
|
|
theSender->fStream->SetFirst_RTCP_RTP_Time(thePacket->GetPacketRTPTime());
|
|
theSender->fStream->SetFirst_RTCP_Arrival_Time(thePacket->fTimeArrived);
|
|
}
|
|
|
|
|
|
if (ReflectorStream::sUsePacketReceiveTime && thePacket->fPacketPtr.Len > 12)
|
|
{
|
|
UInt32 offset = thePacket->fPacketPtr.Len;
|
|
char* theTag = ((char*) thePacket->fPacketPtr.Ptr + offset) - 12;
|
|
UInt64* theValue = (UInt64*) ((char*) ( (char*) thePacket->fPacketPtr.Ptr + offset) - 8);
|
|
|
|
if (0 == ::strncmp(theTag,"aktt",4))
|
|
{
|
|
UInt64 theReceiveTime = OS::NetworkToHostSInt64(*theValue);
|
|
UInt32 theSSRC = thePacket->GetSSRC(theRemotePort & 1); // use to check if broadcast has restarted so we can reset
|
|
|
|
if ( !this->fHasReceiveTime || (this->fCurrentSSRC != theSSRC) )
|
|
{
|
|
this->fCurrentSSRC = theSSRC;
|
|
this->fFirstArrivalTime = thePacket->fTimeArrived;
|
|
this->fFirstReceiveTime = theReceiveTime;
|
|
this->fHasReceiveTime = true;
|
|
}
|
|
|
|
|
|
SInt64 packetOffsetFromStart = theReceiveTime - this->fFirstReceiveTime; // packets arrive at time 0 and fill forward into the future
|
|
thePacket->fTimeArrived = this->fFirstArrivalTime + packetOffsetFromStart; // offset starts negative by over buffer amount
|
|
thePacket->fPacketPtr.Len -= 12;
|
|
|
|
SInt64 arrivalTimeOffset = thePacket->fTimeArrived - inMilliseconds;
|
|
if ( arrivalTimeOffset > ReflectorStream::sMaxFuturePacketMSec ) // way out in the future.
|
|
thePacket->fTimeArrived = inMilliseconds + ReflectorStream::sMaxFuturePacketMSec; //keep it but only for sMaxFuturePacketMSec = (sMaxPacketAgeMSec <-- current --> sMaxFuturePacketMSec)
|
|
|
|
// if it was in the past we leave it alone because it will be deleted after processing.
|
|
|
|
|
|
//printf("ReflectorSocket::ProcessPacket packetOffsetFromStart=%f\n", (Float32) packetOffsetFromStart / 1000);
|
|
}
|
|
|
|
}
|
|
|
|
//printf("ReflectorSocket::GetIncomingData has packet from time=%qd src addr=%"_U32BITARG_" src port=%u packetlen=%"_U32BITARG_"\n",inMilliseconds, theRemoteAddr,theRemotePort,thePacket->fPacketPtr.Len);
|
|
if (0) //turn on / off buffer size checking -- pref can go here if we find we need to adjust this
|
|
if (theSender->fPacketQueue.GetLength() > maxQSize) //don't grow memory too big
|
|
{
|
|
char outMessage[256];
|
|
sprintf(outMessage,"Packet Queue for port=%d qsize = %"_S32BITARG_" hit max qSize=%"_U32BITARG_"", theRemotePort,theSender->fPacketQueue.GetLength(), maxQSize);
|
|
WarnV(false, outMessage);
|
|
}
|
|
|
|
|
|
|
|
} while(false);
|
|
|
|
return done;
|
|
}
|
|
|
|
|
|
void ReflectorSocket::GetIncomingData(const SInt64& inMilliseconds)
|
|
{
|
|
OSMutexLocker locker(this->GetDemuxer()->GetMutex());
|
|
UInt32 theRemoteAddr = 0;
|
|
UInt16 theRemotePort = 0;
|
|
//get all the outstanding packets for this socket
|
|
while (true)
|
|
{
|
|
//get a packet off the free queue.
|
|
ReflectorPacket* thePacket = this->GetPacket();
|
|
|
|
thePacket->fPacketPtr.Len = 0;
|
|
(void)this->RecvFrom(&theRemoteAddr, &theRemotePort, thePacket->fPacketPtr.Ptr,
|
|
ReflectorPacket::kMaxReflectorPacketSize, &thePacket->fPacketPtr.Len);
|
|
|
|
if (this->ProcessPacket(inMilliseconds,thePacket,theRemoteAddr, theRemotePort))
|
|
break;
|
|
|
|
//printf("ReflectorSocket::GetIncomingData \n");
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ReflectorPacket* ReflectorSocket::GetPacket()
|
|
{
|
|
OSMutexLocker locker(this->GetDemuxer()->GetMutex());
|
|
if (fFreeQueue.GetLength() == 0)
|
|
//if the port number of this socket is odd, this packet is an RTCP packet.
|
|
return NEW ReflectorPacket();
|
|
else
|
|
return (ReflectorPacket*)fFreeQueue.DeQueue()->GetEnclosingObject();
|
|
}
|