Darwin-Streaming-Server/APIModules/QTSSReflectorModule/ReflectorStream.h

514 lines
20 KiB
C
Raw Permalink Normal View History

/*
*
* @APPLE_LICENSE_HEADER_START@
*
* Copyright (c) 1999-2008 Apple Inc. All Rights Reserved.
*
* This file contains Original Code and/or Modifications of Original Code
* as defined in and that are subject to the Apple Public Source License
* Version 2.0 (the 'License'). You may not use this file except in
* compliance with the License. Please obtain a copy of the License at
* http://www.opensource.apple.com/apsl/ and read it before using this
* file.
*
* The Original Code and all software distributed under the License are
* distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
* EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
* INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
* Please see the License for the specific language governing rights and
* limitations under the License.
*
* @APPLE_LICENSE_HEADER_END@
*
*/
/*
File: ReflectorStream.h
Contains: This object supports reflecting an RTP multicast stream to N
RTPStreams. It spaces out the packet send times in order to
maximize the randomness of the sending pattern and smooth
the stream.
*/
#ifndef _REFLECTOR_STREAM_H_
#define _REFLECTOR_STREAM_H_
#include "QTSS.h"
#include "IdleTask.h"
#include "SourceInfo.h"
#include "UDPSocket.h"
#include "UDPSocketPool.h"
#include "UDPDemuxer.h"
#include "EventContext.h"
#include "SequenceNumberMap.h"
#include "OSMutex.h"
#include "OSQueue.h"
#include "OSRef.h"
#include "RTCPSRPacket.h"
#include "ReflectorOutput.h"
#include "atomic.h"
//This will add some printfs that are useful for checking the thinning
#define REFLECTOR_THINNING_DEBUGGING 0
//Define to use new potential workaround for NAT problems
#define NAT_WORKAROUND 1
class ReflectorPacket;
class ReflectorSender;
class ReflectorStream;
class RTPSessionOutput;
class ReflectorPacket
{
public:
ReflectorPacket() : fQueueElem() { fQueueElem.SetEnclosingObject(this); this->Reset();}
void Reset() { // make packet ready to reuse fQueueElem is always in use
fBucketsSeenThisPacket = 0;
fTimeArrived = 0;
//fQueueElem -- should be set to this
fPacketPtr.Set(fPacketData, 0);
fIsRTCP = false;
fStreamCountID = 0;
fNeededByOutput = false;
}
~ReflectorPacket() {}
void SetPacketData(char *data, UInt32 len) { Assert(kMaxReflectorPacketSize > len); if (len > 0) memcpy(this->fPacketPtr.Ptr,data,len); this->fPacketPtr.Len = len;}
Bool16 IsRTCP() { return fIsRTCP; }
inline UInt32 GetPacketRTPTime();
inline UInt16 GetPacketRTPSeqNum();
inline UInt32 GetSSRC(Bool16 isRTCP);
inline SInt64 GetPacketNTPTime();
private:
enum
{
kMaxReflectorPacketSize = 2060 //jm 5/02 increased from 2048 by 12 bytes for test bytes appended to packets
};
UInt32 fBucketsSeenThisPacket;
SInt64 fTimeArrived;
OSQueueElem fQueueElem;
char fPacketData[kMaxReflectorPacketSize];
StrPtrLen fPacketPtr;
Bool16 fIsRTCP;
Bool16 fNeededByOutput; // is this packet still needed for output?
UInt64 fStreamCountID;
friend class ReflectorSender;
friend class ReflectorSocket;
friend class RTPSessionOutput;
};
UInt32 ReflectorPacket::GetSSRC(Bool16 isRTCP)
{
if (fPacketPtr.Ptr == NULL || fPacketPtr.Len < 8)
return 0;
UInt32* theSsrcPtr = (UInt32*)fPacketPtr.Ptr;
if (isRTCP)// RTCP
return ntohl(theSsrcPtr[1]);
if (fPacketPtr.Len < 12)
return 0;
return ntohl(theSsrcPtr[2]); // RTP SSRC
}
UInt32 ReflectorPacket::GetPacketRTPTime()
{
UInt32 timestamp = 0;
if (!fIsRTCP)
{
//The RTP timestamp number is the second long of the packet
if (fPacketPtr.Ptr == NULL || fPacketPtr.Len < 8)
return 0;
timestamp = ntohl( ((UInt32*)fPacketPtr.Ptr)[1]);
}
else
{
if (fPacketPtr.Ptr == NULL || fPacketPtr.Len < 20)
return 0;
timestamp = ntohl( ((UInt32*)fPacketPtr.Ptr)[4]);
}
return timestamp;
}
UInt16 ReflectorPacket::GetPacketRTPSeqNum()
{
Assert(!fIsRTCP); // not a supported type
if (fPacketPtr.Ptr == NULL || fPacketPtr.Len < 4 || fIsRTCP)
return 0;
UInt16 sequence = ntohs( ((UInt16*)fPacketPtr.Ptr)[1]); //The RTP sequenc number is the second short of the packet
return sequence;
}
SInt64 ReflectorPacket::GetPacketNTPTime()
{
Assert(fIsRTCP); // not a supported type
if (fPacketPtr.Ptr == NULL || fPacketPtr.Len < 16 || !fIsRTCP)
return 0;
UInt32* theReport = (UInt32*)fPacketPtr.Ptr;
theReport +=2;
SInt64 ntp = 0;
::memcpy(&ntp, theReport, sizeof(SInt64));
return OS::Time1900Fixed64Secs_To_TimeMilli(OS::NetworkToHostSInt64(ntp));
}
//Custom UDP socket classes for doing reflector packet retrieval, socket management
class ReflectorSocket : public IdleTask, public UDPSocket
{
public:
ReflectorSocket();
virtual ~ReflectorSocket();
void AddBroadcasterSession(QTSS_ClientSessionObject inSession) { OSMutexLocker locker(this->GetDemuxer()->GetMutex()); fBroadcasterClientSession = inSession; }
void RemoveBroadcasterSession(QTSS_ClientSessionObject inSession){ OSMutexLocker locker(this->GetDemuxer()->GetMutex()); if (inSession == fBroadcasterClientSession) fBroadcasterClientSession = NULL; }
void AddSender(ReflectorSender* inSender);
void RemoveSender(ReflectorSender* inStreamElem);
Bool16 HasSender() { return (this->GetDemuxer()->GetHashTable()->GetNumEntries() > 0); }
Bool16 ProcessPacket(const SInt64& inMilliseconds,ReflectorPacket* thePacket,UInt32 theRemoteAddr,UInt16 theRemotePort);
ReflectorPacket* GetPacket();
virtual SInt64 Run();
void SetSSRCFilter(Bool16 state, UInt32 timeoutSecs) { fFilterSSRCs = state; fTimeoutSecs = timeoutSecs;}
private:
//virtual SInt64 Run();
void GetIncomingData(const SInt64& inMilliseconds);
void FilterInvalidSSRCs(ReflectorPacket* thePacket,Bool16 isRTCP);
//Number of packets to allocate when the socket is first created
enum
{
kNumPreallocatedPackets = 20, //UInt32
kRefreshBroadcastSessionIntervalMilliSecs = 10000,
kSSRCTimeOut = 30000 // milliseconds before clearing the SSRC if no new ssrcs have come in
};
QTSS_ClientSessionObject fBroadcasterClientSession;
SInt64 fLastBroadcasterTimeOutRefresh;
// Queue of available ReflectorPackets
OSQueue fFreeQueue;
// Queue of senders
OSQueue fSenderQueue;
SInt64 fSleepTime;
UInt32 fValidSSRC;
SInt64 fLastValidSSRCTime;
Bool16 fFilterSSRCs;
UInt32 fTimeoutSecs;
Bool16 fHasReceiveTime;
UInt64 fFirstReceiveTime;
SInt64 fFirstArrivalTime;
UInt32 fCurrentSSRC;
};
class ReflectorSocketPool : public UDPSocketPool
{
public:
ReflectorSocketPool() {}
virtual ~ReflectorSocketPool() {}
virtual UDPSocketPair* ConstructUDPSocketPair();
virtual void DestructUDPSocketPair(UDPSocketPair *inPair);
virtual void SetUDPSocketOptions(UDPSocketPair* inPair);
void DestructUDPSocket( ReflectorSocket* socket);
};
class ReflectorSender : public UDPDemuxerTask
{
public:
ReflectorSender(ReflectorStream* inStream, UInt32 inWriteFlag);
virtual ~ReflectorSender();
// Queue of senders
OSQueue fSenderQueue;
SInt64 fSleepTime;
//Used for adjusting sequence numbers in light of thinning
UInt16 GetPacketSeqNumber(const StrPtrLen& inPacket);
void SetPacketSeqNumber(const StrPtrLen& inPacket, UInt16 inSeqNumber);
Bool16 PacketShouldBeThinned(QTSS_RTPStreamObject inStream, const StrPtrLen& inPacket);
//We want to make sure that ReflectPackets only gets invoked when there
//is actually work to do, because it is an expensive function
Bool16 ShouldReflectNow(const SInt64& inCurrentTime, SInt64* ioWakeupTime);
//This function gets data from the multicast source and reflects.
//Returns the time at which it next needs to be invoked
void ReflectPackets(SInt64* ioWakeupTime, OSQueue* inFreeQueue);
//this is the old way of doing reflect packets. It is only here until the relay code can be cleaned up.
void ReflectRelayPackets(SInt64* ioWakeupTime, OSQueue* inFreeQueue);
OSQueueElem* SendPacketsToOutput(ReflectorOutput* theOutput, OSQueueElem* currentPacket, SInt64 currentTime, SInt64 bucketDelay, Bool16 firstPacket);
UInt32 GetOldestPacketRTPTime(Bool16 *foundPtr);
UInt16 GetFirstPacketRTPSeqNum(Bool16 *foundPtr);
Bool16 GetFirstPacketInfo(UInt16* outSeqNumPtr, UInt32* outRTPTimePtr, SInt64* outArrivalTimePtr);
OSQueueElem*GetClientBufferNextPacketTime(UInt32 inRTPTime);
Bool16 GetFirstRTPTimePacket(UInt16* outSeqNumPtr, UInt32* outRTPTimePtr, SInt64* outArrivalTimePtr);
void RemoveOldPackets(OSQueue* inFreeQueue);
OSQueueElem* GetClientBufferStartPacketOffset(SInt64 offsetMsec);
OSQueueElem* GetClientBufferStartPacket() { return this->GetClientBufferStartPacketOffset(0); };
ReflectorStream* fStream;
UInt32 fWriteFlag;
OSQueue fPacketQueue;
OSQueueElem* fFirstNewPacketInQueue;
OSQueueElem* fFirstPacketInQueueForNewOutput;
//these serve as an optimization, keeping track of when this
//sender needs to run so it doesn't run unnecessarily
inline void SetNextTimeToRun(SInt64 nextTime) { fNextTimeToRun = nextTime;
//qtss_printf("SetNextTimeToRun =%"_64BITARG_"d\n", fNextTimeToRun);
}
Bool16 fHasNewPackets;
SInt64 fNextTimeToRun;
//how often to send RRs to the source
enum
{
kRRInterval = 5000 //SInt64 (every 5 seconds)
};
SInt64 fLastRRTime;
OSQueueElem fSocketQueueElem;
friend class ReflectorSocket;
friend class ReflectorStream;
};
class ReflectorStream
{
public:
enum
{
// A ReflectorStream is uniquely identified by the
// destination IP address & destination port of the broadcast.
// This ID simply contains that information.
//
// A unicast broadcast can also be identified by source IP address. If
// you are attempting to demux by source IP, this ID will not guarentee
// uniqueness and special care should be used.
kStreamIDSize = sizeof(UInt32) + sizeof(UInt16)
};
// Uses a StreamInfo to generate a unique ID
static void GenerateSourceID(SourceInfo::StreamInfo* inInfo, char* ioBuffer);
ReflectorStream(SourceInfo::StreamInfo* inInfo);
~ReflectorStream();
//
// SETUP
//
// Call Register from the Register role, as this object has some QTSS API
// attributes to setup
static void Register();
static void Initialize(QTSS_ModulePrefsObject inPrefs);
//
// MODIFIERS
// Call this to initialize the reflector sockets. Uses the QTSS_RTSPRequestObject
// if provided to report any errors that occur
// Passes the QTSS_ClientSessionObject to the socket so the socket can update the session if needed.
QTSS_Error BindSockets(QTSS_StandardRTSP_Params* inParams, UInt32 inReflectorSessionFlags, Bool16 filterState, UInt32 timeout);
// This stream reflects packets from the broadcast to specific ReflectorOutputs.
// You attach outputs to ReflectorStreams this way. You can force the ReflectorStream
// to put this output into a certain bucket by passing in a certain bucket index.
// Pass in -1 if you don't care. AddOutput returns the bucket index this output was
// placed into, or -1 on an error.
SInt32 AddOutput(ReflectorOutput* inOutput, SInt32 putInThisBucket);
// Removes the specified output from this ReflectorStream.
void RemoveOutput(ReflectorOutput* inOutput); // Removes this output from all tracks
void TearDownAllOutputs(); // causes a tear down and then a remove
// If the incoming data is RTSP interleaved, packets for this stream are identified
// by channel numbers
void SetRTPChannelNum(SInt16 inChannel) { fRTPChannel = inChannel; }
void SetRTCPChannelNum(SInt16 inChannel) { fRTCPChannel = inChannel; }
void PushPacket(char *packet, UInt32 packetLen, Bool16 isRTCP);
//
// ACCESSORS
OSRef* GetRef() { return &fRef; }
UInt32 GetBitRate() { return fCurrentBitRate; }
SourceInfo::StreamInfo* GetStreamInfo() { return &fStreamInfo; }
OSMutex* GetMutex() { return &fBucketMutex; }
void* GetStreamCookie() { return this; }
SInt16 GetRTPChannel() { return fRTPChannel; }
SInt16 GetRTCPChannel() { return fRTCPChannel; }
UDPSocketPair* GetSocketPair() { return fSockets;}
ReflectorSender* GetRTPSender() { return &fRTPSender; }
ReflectorSender* GetRTCPSender() { return &fRTCPSender; }
void SetHasFirstRTCP(Bool16 hasPacket) { fHasFirstRTCPPacket = hasPacket; }
Bool16 HasFirstRTCP() { return fHasFirstRTCPPacket; }
void SetFirst_RTCP_RTP_Time(UInt32 time) { fFirst_RTCP_RTP_Time = time; }
UInt32 GetFirst_RTCP_RTP_Time() { return fFirst_RTCP_RTP_Time; }
void SetFirst_RTCP_Arrival_Time(SInt64 time) { fFirst_RTCP_Arrival_Time = time; }
SInt64 GetFirst_RTCP_Arrival_Time() { return fFirst_RTCP_Arrival_Time; }
void SetHasFirstRTP(Bool16 hasPacket) { fHasFirstRTPPacket = hasPacket; }
Bool16 HasFirstRTP() { return fHasFirstRTPPacket; }
UInt32 GetBufferDelay() { return ReflectorStream::sOverBufferInMsec; }
UInt32 GetTimeScale() { return fStreamInfo.fTimeScale; }
UInt64 fPacketCount;
void SetEnableBuffer(Bool16 enableBuffer) { fEnableBuffer = enableBuffer; }
Bool16 BufferEnabled() { return fEnableBuffer; }
inline void UpdateBitRate(SInt64 currentTime);
static UInt32 sOverBufferInMsec;
void IncEyeCount() { OSMutexLocker locker(&fBucketMutex); fEyeCount ++; }
void DecEyeCount() { OSMutexLocker locker(&fBucketMutex); fEyeCount --; }
UInt32 GetEyeCount() { OSMutexLocker locker(&fBucketMutex); return fEyeCount; }
private:
//Sends an RTCP receiver report to the broadcast source
void SendReceiverReport();
void AllocateBucketArray(UInt32 inNumBuckets);
SInt32 FindBucket();
// Unique ID & OSRef. ReflectorStreams can be mapped & shared
OSRef fRef;
char fSourceIDBuf[kStreamIDSize];
// Reflector sockets, retrieved from the socket pool
UDPSocketPair* fSockets;
ReflectorSender fRTPSender;
ReflectorSender fRTCPSender;
SequenceNumberMap fSequenceNumberMap; //for removing duplicate packets
// All the necessary info about this stream
SourceInfo::StreamInfo fStreamInfo;
enum
{
kReceiverReportSize = 16, //UInt32
kAppSize = 36, //UInt32
kMinNumBuckets = 16, //UInt32
kBitRateAvgIntervalInMilSecs = 30000 // time between bitrate averages
};
// BUCKET ARRAY
//ReflectorOutputs are kept in a 2-dimensional array, "Buckets"
typedef ReflectorOutput** Bucket;
Bucket* fOutputArray;
UInt32 fNumBuckets; //Number of buckets currently
UInt32 fNumElements; //Number of reflector outputs in the array
//Bucket array can't be modified while we are sending packets.
OSMutex fBucketMutex;
// RTCP RR information
char fReceiverReportBuffer[kReceiverReportSize + kAppSize +
RTCPSRPacket::kMaxCNameLen];
UInt32* fEyeLocation;//place in the buffer to write the eye information
UInt32 fReceiverReportSize;
// This is the destination address & port for RTCP
// receiver reports.
UInt32 fDestRTCPAddr;
UInt16 fDestRTCPPort;
// Used for calculating average bit rate
UInt32 fCurrentBitRate;
SInt64 fLastBitRateSample;
unsigned int fBytesSentInThisInterval;// unsigned int because we need to atomic_add
// If incoming data is RTSP interleaved
SInt16 fRTPChannel; //These will be -1 if not set to anything
SInt16 fRTCPChannel;
Bool16 fHasFirstRTCPPacket;
Bool16 fHasFirstRTPPacket;
Bool16 fEnableBuffer;
UInt32 fEyeCount;
UInt32 fFirst_RTCP_RTP_Time;
SInt64 fFirst_RTCP_Arrival_Time;
static UInt32 sBucketSize;
static UInt32 sMaxPacketAgeMSec;
static UInt32 sMaxFuturePacketSec;
static UInt32 sMaxFuturePacketMSec;
static UInt32 sOverBufferInSec;
static UInt32 sBucketDelayInMsec;
static Bool16 sUsePacketReceiveTime;
static UInt32 sFirstPacketOffsetMsec;
friend class ReflectorSocket;
friend class ReflectorSender;
};
void ReflectorStream::UpdateBitRate(SInt64 currentTime)
{
if ((fLastBitRateSample + ReflectorStream::kBitRateAvgIntervalInMilSecs) < currentTime)
{
unsigned int intervalBytes = fBytesSentInThisInterval;
(void)atomic_sub(&fBytesSentInThisInterval, intervalBytes);
// Multiply by 1000 to convert from milliseconds to seconds, and by 8 to convert from bytes to bits
Float32 bps = (Float32)(intervalBytes * 8) / (Float32)(currentTime - fLastBitRateSample);
bps *= 1000;
fCurrentBitRate = (UInt32)bps;
// Don't check again for awhile!
fLastBitRateSample = currentTime;
}
}
#endif //_REFLECTOR_SESSION_H_