513 lines
20 KiB
C++
513 lines
20 KiB
C++
/*
|
|
*
|
|
* @APPLE_LICENSE_HEADER_START@
|
|
*
|
|
* Copyright (c) 1999-2008 Apple Inc. All Rights Reserved.
|
|
*
|
|
* This file contains Original Code and/or Modifications of Original Code
|
|
* as defined in and that are subject to the Apple Public Source License
|
|
* Version 2.0 (the 'License'). You may not use this file except in
|
|
* compliance with the License. Please obtain a copy of the License at
|
|
* http://www.opensource.apple.com/apsl/ and read it before using this
|
|
* file.
|
|
*
|
|
* The Original Code and all software distributed under the License are
|
|
* distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
|
|
* EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
|
|
* INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
|
|
* Please see the License for the specific language governing rights and
|
|
* limitations under the License.
|
|
*
|
|
* @APPLE_LICENSE_HEADER_END@
|
|
*
|
|
*/
|
|
/*
|
|
File: ReflectorStream.h
|
|
|
|
Contains: This object supports reflecting an RTP multicast stream to N
|
|
RTPStreams. It spaces out the packet send times in order to
|
|
maximize the randomness of the sending pattern and smooth
|
|
the stream.
|
|
|
|
|
|
|
|
*/
|
|
|
|
#ifndef _REFLECTOR_STREAM_H_
|
|
#define _REFLECTOR_STREAM_H_
|
|
|
|
#include "QTSS.h"
|
|
|
|
#include "IdleTask.h"
|
|
#include "SourceInfo.h"
|
|
|
|
#include "UDPSocket.h"
|
|
#include "UDPSocketPool.h"
|
|
#include "UDPDemuxer.h"
|
|
#include "EventContext.h"
|
|
#include "SequenceNumberMap.h"
|
|
|
|
#include "OSMutex.h"
|
|
#include "OSQueue.h"
|
|
#include "OSRef.h"
|
|
|
|
#include "RTCPSRPacket.h"
|
|
#include "ReflectorOutput.h"
|
|
#include "atomic.h"
|
|
|
|
//This will add some printfs that are useful for checking the thinning
|
|
#define REFLECTOR_THINNING_DEBUGGING 0
|
|
|
|
//Define to use new potential workaround for NAT problems
|
|
#define NAT_WORKAROUND 1
|
|
|
|
class ReflectorPacket;
|
|
class ReflectorSender;
|
|
class ReflectorStream;
|
|
class RTPSessionOutput;
|
|
|
|
class ReflectorPacket
|
|
{
|
|
public:
|
|
|
|
ReflectorPacket() : fQueueElem() { fQueueElem.SetEnclosingObject(this); this->Reset();}
|
|
void Reset() { // make packet ready to reuse fQueueElem is always in use
|
|
fBucketsSeenThisPacket = 0;
|
|
fTimeArrived = 0;
|
|
//fQueueElem -- should be set to this
|
|
fPacketPtr.Set(fPacketData, 0);
|
|
fIsRTCP = false;
|
|
fStreamCountID = 0;
|
|
fNeededByOutput = false;
|
|
}
|
|
|
|
~ReflectorPacket() {}
|
|
|
|
void SetPacketData(char *data, UInt32 len) { Assert(kMaxReflectorPacketSize > len); if (len > 0) memcpy(this->fPacketPtr.Ptr,data,len); this->fPacketPtr.Len = len;}
|
|
Bool16 IsRTCP() { return fIsRTCP; }
|
|
inline UInt32 GetPacketRTPTime();
|
|
inline UInt16 GetPacketRTPSeqNum();
|
|
inline UInt32 GetSSRC(Bool16 isRTCP);
|
|
inline SInt64 GetPacketNTPTime();
|
|
|
|
private:
|
|
|
|
enum
|
|
{
|
|
kMaxReflectorPacketSize = 2060 //jm 5/02 increased from 2048 by 12 bytes for test bytes appended to packets
|
|
};
|
|
|
|
UInt32 fBucketsSeenThisPacket;
|
|
SInt64 fTimeArrived;
|
|
OSQueueElem fQueueElem;
|
|
char fPacketData[kMaxReflectorPacketSize];
|
|
StrPtrLen fPacketPtr;
|
|
Bool16 fIsRTCP;
|
|
Bool16 fNeededByOutput; // is this packet still needed for output?
|
|
UInt64 fStreamCountID;
|
|
|
|
friend class ReflectorSender;
|
|
friend class ReflectorSocket;
|
|
friend class RTPSessionOutput;
|
|
|
|
|
|
};
|
|
|
|
UInt32 ReflectorPacket::GetSSRC(Bool16 isRTCP)
|
|
{
|
|
if (fPacketPtr.Ptr == NULL || fPacketPtr.Len < 8)
|
|
return 0;
|
|
|
|
UInt32* theSsrcPtr = (UInt32*)fPacketPtr.Ptr;
|
|
if (isRTCP)// RTCP
|
|
return ntohl(theSsrcPtr[1]);
|
|
|
|
if (fPacketPtr.Len < 12)
|
|
return 0;
|
|
|
|
return ntohl(theSsrcPtr[2]); // RTP SSRC
|
|
}
|
|
|
|
UInt32 ReflectorPacket::GetPacketRTPTime()
|
|
{
|
|
|
|
UInt32 timestamp = 0;
|
|
if (!fIsRTCP)
|
|
{
|
|
//The RTP timestamp number is the second long of the packet
|
|
if (fPacketPtr.Ptr == NULL || fPacketPtr.Len < 8)
|
|
return 0;
|
|
timestamp = ntohl( ((UInt32*)fPacketPtr.Ptr)[1]);
|
|
}
|
|
else
|
|
{
|
|
if (fPacketPtr.Ptr == NULL || fPacketPtr.Len < 20)
|
|
return 0;
|
|
timestamp = ntohl( ((UInt32*)fPacketPtr.Ptr)[4]);
|
|
}
|
|
return timestamp;
|
|
}
|
|
|
|
UInt16 ReflectorPacket::GetPacketRTPSeqNum()
|
|
{
|
|
Assert(!fIsRTCP); // not a supported type
|
|
|
|
if (fPacketPtr.Ptr == NULL || fPacketPtr.Len < 4 || fIsRTCP)
|
|
return 0;
|
|
|
|
UInt16 sequence = ntohs( ((UInt16*)fPacketPtr.Ptr)[1]); //The RTP sequenc number is the second short of the packet
|
|
return sequence;
|
|
}
|
|
|
|
|
|
SInt64 ReflectorPacket::GetPacketNTPTime()
|
|
{
|
|
Assert(fIsRTCP); // not a supported type
|
|
if (fPacketPtr.Ptr == NULL || fPacketPtr.Len < 16 || !fIsRTCP)
|
|
return 0;
|
|
|
|
UInt32* theReport = (UInt32*)fPacketPtr.Ptr;
|
|
theReport +=2;
|
|
SInt64 ntp = 0;
|
|
::memcpy(&ntp, theReport, sizeof(SInt64));
|
|
|
|
return OS::Time1900Fixed64Secs_To_TimeMilli(OS::NetworkToHostSInt64(ntp));
|
|
|
|
|
|
}
|
|
|
|
|
|
//Custom UDP socket classes for doing reflector packet retrieval, socket management
|
|
class ReflectorSocket : public IdleTask, public UDPSocket
|
|
{
|
|
public:
|
|
|
|
ReflectorSocket();
|
|
virtual ~ReflectorSocket();
|
|
void AddBroadcasterSession(QTSS_ClientSessionObject inSession) { OSMutexLocker locker(this->GetDemuxer()->GetMutex()); fBroadcasterClientSession = inSession; }
|
|
void RemoveBroadcasterSession(QTSS_ClientSessionObject inSession){ OSMutexLocker locker(this->GetDemuxer()->GetMutex()); if (inSession == fBroadcasterClientSession) fBroadcasterClientSession = NULL; }
|
|
void AddSender(ReflectorSender* inSender);
|
|
void RemoveSender(ReflectorSender* inStreamElem);
|
|
Bool16 HasSender() { return (this->GetDemuxer()->GetHashTable()->GetNumEntries() > 0); }
|
|
Bool16 ProcessPacket(const SInt64& inMilliseconds,ReflectorPacket* thePacket,UInt32 theRemoteAddr,UInt16 theRemotePort);
|
|
ReflectorPacket* GetPacket();
|
|
virtual SInt64 Run();
|
|
void SetSSRCFilter(Bool16 state, UInt32 timeoutSecs) { fFilterSSRCs = state; fTimeoutSecs = timeoutSecs;}
|
|
private:
|
|
|
|
//virtual SInt64 Run();
|
|
void GetIncomingData(const SInt64& inMilliseconds);
|
|
void FilterInvalidSSRCs(ReflectorPacket* thePacket,Bool16 isRTCP);
|
|
|
|
//Number of packets to allocate when the socket is first created
|
|
enum
|
|
{
|
|
kNumPreallocatedPackets = 20, //UInt32
|
|
kRefreshBroadcastSessionIntervalMilliSecs = 10000,
|
|
kSSRCTimeOut = 30000 // milliseconds before clearing the SSRC if no new ssrcs have come in
|
|
};
|
|
QTSS_ClientSessionObject fBroadcasterClientSession;
|
|
SInt64 fLastBroadcasterTimeOutRefresh;
|
|
// Queue of available ReflectorPackets
|
|
OSQueue fFreeQueue;
|
|
// Queue of senders
|
|
OSQueue fSenderQueue;
|
|
SInt64 fSleepTime;
|
|
|
|
UInt32 fValidSSRC;
|
|
SInt64 fLastValidSSRCTime;
|
|
Bool16 fFilterSSRCs;
|
|
UInt32 fTimeoutSecs;
|
|
|
|
Bool16 fHasReceiveTime;
|
|
UInt64 fFirstReceiveTime;
|
|
SInt64 fFirstArrivalTime;
|
|
UInt32 fCurrentSSRC;
|
|
|
|
};
|
|
|
|
|
|
class ReflectorSocketPool : public UDPSocketPool
|
|
{
|
|
public:
|
|
|
|
ReflectorSocketPool() {}
|
|
virtual ~ReflectorSocketPool() {}
|
|
|
|
virtual UDPSocketPair* ConstructUDPSocketPair();
|
|
virtual void DestructUDPSocketPair(UDPSocketPair *inPair);
|
|
virtual void SetUDPSocketOptions(UDPSocketPair* inPair);
|
|
void DestructUDPSocket( ReflectorSocket* socket);
|
|
|
|
|
|
};
|
|
|
|
class ReflectorSender : public UDPDemuxerTask
|
|
{
|
|
public:
|
|
ReflectorSender(ReflectorStream* inStream, UInt32 inWriteFlag);
|
|
virtual ~ReflectorSender();
|
|
// Queue of senders
|
|
OSQueue fSenderQueue;
|
|
SInt64 fSleepTime;
|
|
|
|
//Used for adjusting sequence numbers in light of thinning
|
|
UInt16 GetPacketSeqNumber(const StrPtrLen& inPacket);
|
|
void SetPacketSeqNumber(const StrPtrLen& inPacket, UInt16 inSeqNumber);
|
|
Bool16 PacketShouldBeThinned(QTSS_RTPStreamObject inStream, const StrPtrLen& inPacket);
|
|
|
|
//We want to make sure that ReflectPackets only gets invoked when there
|
|
//is actually work to do, because it is an expensive function
|
|
Bool16 ShouldReflectNow(const SInt64& inCurrentTime, SInt64* ioWakeupTime);
|
|
|
|
//This function gets data from the multicast source and reflects.
|
|
//Returns the time at which it next needs to be invoked
|
|
void ReflectPackets(SInt64* ioWakeupTime, OSQueue* inFreeQueue);
|
|
|
|
//this is the old way of doing reflect packets. It is only here until the relay code can be cleaned up.
|
|
void ReflectRelayPackets(SInt64* ioWakeupTime, OSQueue* inFreeQueue);
|
|
|
|
OSQueueElem* SendPacketsToOutput(ReflectorOutput* theOutput, OSQueueElem* currentPacket, SInt64 currentTime, SInt64 bucketDelay, Bool16 firstPacket);
|
|
|
|
UInt32 GetOldestPacketRTPTime(Bool16 *foundPtr);
|
|
UInt16 GetFirstPacketRTPSeqNum(Bool16 *foundPtr);
|
|
Bool16 GetFirstPacketInfo(UInt16* outSeqNumPtr, UInt32* outRTPTimePtr, SInt64* outArrivalTimePtr);
|
|
|
|
OSQueueElem*GetClientBufferNextPacketTime(UInt32 inRTPTime);
|
|
Bool16 GetFirstRTPTimePacket(UInt16* outSeqNumPtr, UInt32* outRTPTimePtr, SInt64* outArrivalTimePtr);
|
|
|
|
void RemoveOldPackets(OSQueue* inFreeQueue);
|
|
OSQueueElem* GetClientBufferStartPacketOffset(SInt64 offsetMsec);
|
|
OSQueueElem* GetClientBufferStartPacket() { return this->GetClientBufferStartPacketOffset(0); };
|
|
|
|
ReflectorStream* fStream;
|
|
UInt32 fWriteFlag;
|
|
|
|
OSQueue fPacketQueue;
|
|
OSQueueElem* fFirstNewPacketInQueue;
|
|
OSQueueElem* fFirstPacketInQueueForNewOutput;
|
|
|
|
//these serve as an optimization, keeping track of when this
|
|
//sender needs to run so it doesn't run unnecessarily
|
|
|
|
inline void SetNextTimeToRun(SInt64 nextTime) { fNextTimeToRun = nextTime;
|
|
//qtss_printf("SetNextTimeToRun =%"_64BITARG_"d\n", fNextTimeToRun);
|
|
}
|
|
|
|
Bool16 fHasNewPackets;
|
|
SInt64 fNextTimeToRun;
|
|
|
|
//how often to send RRs to the source
|
|
enum
|
|
{
|
|
kRRInterval = 5000 //SInt64 (every 5 seconds)
|
|
};
|
|
|
|
SInt64 fLastRRTime;
|
|
OSQueueElem fSocketQueueElem;
|
|
|
|
friend class ReflectorSocket;
|
|
friend class ReflectorStream;
|
|
};
|
|
|
|
class ReflectorStream
|
|
{
|
|
public:
|
|
|
|
enum
|
|
{
|
|
// A ReflectorStream is uniquely identified by the
|
|
// destination IP address & destination port of the broadcast.
|
|
// This ID simply contains that information.
|
|
//
|
|
// A unicast broadcast can also be identified by source IP address. If
|
|
// you are attempting to demux by source IP, this ID will not guarentee
|
|
// uniqueness and special care should be used.
|
|
kStreamIDSize = sizeof(UInt32) + sizeof(UInt16)
|
|
};
|
|
|
|
// Uses a StreamInfo to generate a unique ID
|
|
static void GenerateSourceID(SourceInfo::StreamInfo* inInfo, char* ioBuffer);
|
|
|
|
ReflectorStream(SourceInfo::StreamInfo* inInfo);
|
|
~ReflectorStream();
|
|
|
|
//
|
|
// SETUP
|
|
//
|
|
// Call Register from the Register role, as this object has some QTSS API
|
|
// attributes to setup
|
|
static void Register();
|
|
static void Initialize(QTSS_ModulePrefsObject inPrefs);
|
|
|
|
//
|
|
// MODIFIERS
|
|
|
|
// Call this to initialize the reflector sockets. Uses the QTSS_RTSPRequestObject
|
|
// if provided to report any errors that occur
|
|
// Passes the QTSS_ClientSessionObject to the socket so the socket can update the session if needed.
|
|
QTSS_Error BindSockets(QTSS_StandardRTSP_Params* inParams, UInt32 inReflectorSessionFlags, Bool16 filterState, UInt32 timeout);
|
|
|
|
// This stream reflects packets from the broadcast to specific ReflectorOutputs.
|
|
// You attach outputs to ReflectorStreams this way. You can force the ReflectorStream
|
|
// to put this output into a certain bucket by passing in a certain bucket index.
|
|
// Pass in -1 if you don't care. AddOutput returns the bucket index this output was
|
|
// placed into, or -1 on an error.
|
|
|
|
SInt32 AddOutput(ReflectorOutput* inOutput, SInt32 putInThisBucket);
|
|
|
|
// Removes the specified output from this ReflectorStream.
|
|
void RemoveOutput(ReflectorOutput* inOutput); // Removes this output from all tracks
|
|
|
|
void TearDownAllOutputs(); // causes a tear down and then a remove
|
|
|
|
// If the incoming data is RTSP interleaved, packets for this stream are identified
|
|
// by channel numbers
|
|
void SetRTPChannelNum(SInt16 inChannel) { fRTPChannel = inChannel; }
|
|
void SetRTCPChannelNum(SInt16 inChannel) { fRTCPChannel = inChannel; }
|
|
void PushPacket(char *packet, UInt32 packetLen, Bool16 isRTCP);
|
|
|
|
//
|
|
// ACCESSORS
|
|
|
|
OSRef* GetRef() { return &fRef; }
|
|
UInt32 GetBitRate() { return fCurrentBitRate; }
|
|
SourceInfo::StreamInfo* GetStreamInfo() { return &fStreamInfo; }
|
|
OSMutex* GetMutex() { return &fBucketMutex; }
|
|
void* GetStreamCookie() { return this; }
|
|
SInt16 GetRTPChannel() { return fRTPChannel; }
|
|
SInt16 GetRTCPChannel() { return fRTCPChannel; }
|
|
UDPSocketPair* GetSocketPair() { return fSockets;}
|
|
ReflectorSender* GetRTPSender() { return &fRTPSender; }
|
|
ReflectorSender* GetRTCPSender() { return &fRTCPSender; }
|
|
|
|
void SetHasFirstRTCP(Bool16 hasPacket) { fHasFirstRTCPPacket = hasPacket; }
|
|
Bool16 HasFirstRTCP() { return fHasFirstRTCPPacket; }
|
|
|
|
void SetFirst_RTCP_RTP_Time(UInt32 time) { fFirst_RTCP_RTP_Time = time; }
|
|
UInt32 GetFirst_RTCP_RTP_Time() { return fFirst_RTCP_RTP_Time; }
|
|
|
|
void SetFirst_RTCP_Arrival_Time(SInt64 time) { fFirst_RTCP_Arrival_Time = time; }
|
|
SInt64 GetFirst_RTCP_Arrival_Time() { return fFirst_RTCP_Arrival_Time; }
|
|
|
|
|
|
void SetHasFirstRTP(Bool16 hasPacket) { fHasFirstRTPPacket = hasPacket; }
|
|
Bool16 HasFirstRTP() { return fHasFirstRTPPacket; }
|
|
|
|
UInt32 GetBufferDelay() { return ReflectorStream::sOverBufferInMsec; }
|
|
UInt32 GetTimeScale() { return fStreamInfo.fTimeScale; }
|
|
UInt64 fPacketCount;
|
|
|
|
void SetEnableBuffer(Bool16 enableBuffer) { fEnableBuffer = enableBuffer; }
|
|
Bool16 BufferEnabled() { return fEnableBuffer; }
|
|
inline void UpdateBitRate(SInt64 currentTime);
|
|
static UInt32 sOverBufferInMsec;
|
|
|
|
void IncEyeCount() { OSMutexLocker locker(&fBucketMutex); fEyeCount ++; }
|
|
void DecEyeCount() { OSMutexLocker locker(&fBucketMutex); fEyeCount --; }
|
|
UInt32 GetEyeCount() { OSMutexLocker locker(&fBucketMutex); return fEyeCount; }
|
|
|
|
private:
|
|
|
|
//Sends an RTCP receiver report to the broadcast source
|
|
void SendReceiverReport();
|
|
void AllocateBucketArray(UInt32 inNumBuckets);
|
|
SInt32 FindBucket();
|
|
// Unique ID & OSRef. ReflectorStreams can be mapped & shared
|
|
OSRef fRef;
|
|
char fSourceIDBuf[kStreamIDSize];
|
|
|
|
// Reflector sockets, retrieved from the socket pool
|
|
UDPSocketPair* fSockets;
|
|
|
|
ReflectorSender fRTPSender;
|
|
ReflectorSender fRTCPSender;
|
|
SequenceNumberMap fSequenceNumberMap; //for removing duplicate packets
|
|
|
|
// All the necessary info about this stream
|
|
SourceInfo::StreamInfo fStreamInfo;
|
|
|
|
enum
|
|
{
|
|
kReceiverReportSize = 16, //UInt32
|
|
kAppSize = 36, //UInt32
|
|
kMinNumBuckets = 16, //UInt32
|
|
kBitRateAvgIntervalInMilSecs = 30000 // time between bitrate averages
|
|
};
|
|
|
|
// BUCKET ARRAY
|
|
|
|
//ReflectorOutputs are kept in a 2-dimensional array, "Buckets"
|
|
typedef ReflectorOutput** Bucket;
|
|
Bucket* fOutputArray;
|
|
|
|
UInt32 fNumBuckets; //Number of buckets currently
|
|
UInt32 fNumElements; //Number of reflector outputs in the array
|
|
|
|
//Bucket array can't be modified while we are sending packets.
|
|
OSMutex fBucketMutex;
|
|
|
|
// RTCP RR information
|
|
|
|
char fReceiverReportBuffer[kReceiverReportSize + kAppSize +
|
|
RTCPSRPacket::kMaxCNameLen];
|
|
UInt32* fEyeLocation;//place in the buffer to write the eye information
|
|
UInt32 fReceiverReportSize;
|
|
|
|
// This is the destination address & port for RTCP
|
|
// receiver reports.
|
|
UInt32 fDestRTCPAddr;
|
|
UInt16 fDestRTCPPort;
|
|
|
|
// Used for calculating average bit rate
|
|
UInt32 fCurrentBitRate;
|
|
SInt64 fLastBitRateSample;
|
|
unsigned int fBytesSentInThisInterval;// unsigned int because we need to atomic_add
|
|
|
|
// If incoming data is RTSP interleaved
|
|
SInt16 fRTPChannel; //These will be -1 if not set to anything
|
|
SInt16 fRTCPChannel;
|
|
|
|
Bool16 fHasFirstRTCPPacket;
|
|
Bool16 fHasFirstRTPPacket;
|
|
|
|
Bool16 fEnableBuffer;
|
|
UInt32 fEyeCount;
|
|
|
|
UInt32 fFirst_RTCP_RTP_Time;
|
|
SInt64 fFirst_RTCP_Arrival_Time;
|
|
|
|
static UInt32 sBucketSize;
|
|
static UInt32 sMaxPacketAgeMSec;
|
|
static UInt32 sMaxFuturePacketSec;
|
|
|
|
static UInt32 sMaxFuturePacketMSec;
|
|
static UInt32 sOverBufferInSec;
|
|
static UInt32 sBucketDelayInMsec;
|
|
static Bool16 sUsePacketReceiveTime;
|
|
static UInt32 sFirstPacketOffsetMsec;
|
|
|
|
friend class ReflectorSocket;
|
|
friend class ReflectorSender;
|
|
};
|
|
|
|
|
|
void ReflectorStream::UpdateBitRate(SInt64 currentTime)
|
|
{
|
|
if ((fLastBitRateSample + ReflectorStream::kBitRateAvgIntervalInMilSecs) < currentTime)
|
|
{
|
|
unsigned int intervalBytes = fBytesSentInThisInterval;
|
|
(void)atomic_sub(&fBytesSentInThisInterval, intervalBytes);
|
|
|
|
// Multiply by 1000 to convert from milliseconds to seconds, and by 8 to convert from bytes to bits
|
|
Float32 bps = (Float32)(intervalBytes * 8) / (Float32)(currentTime - fLastBitRateSample);
|
|
bps *= 1000;
|
|
fCurrentBitRate = (UInt32)bps;
|
|
|
|
// Don't check again for awhile!
|
|
fLastBitRateSample = currentTime;
|
|
}
|
|
}
|
|
#endif //_REFLECTOR_SESSION_H_
|
|
|