音视频项目:RTMP流媒体服务器开发指南
项目概述
本项目是一个基于muduo网络库实现的高性能RTMP流媒体服务器,支持:
- ✅ H.264 + AAC 音视频编码格式
- ✅ 多路推流和拉流
- ✅ GOP缓存机制 实现快速首屏
- ✅ 音视频同步
- ✅ 低延迟优化
- ✅ 时间戳映射 保证播放连续性
技术栈
- 网络框架: muduo (高性能网络库)
- 协议支持: RTMP 1.0 规范
- 音视频格式: H.264 + AAC
- 编程语言: C++17
- 构建工具: CMake
视频讲解及源码领取:https://www.bilibili.com/video/BV1mou4zhEFX/
快速开始
编译运行步骤
# 1. 编译RTMP服务器 mkdir build && cd build # 配置CMake(Debug版本,便于调试) cmake -DCMAKE_BUILD_TYPE=Debug .. # 或者(默认debug模式) cmake .. # 或者编译Release版本(生产环境) cmake -DCMAKE_BUILD_TYPE=Release .. # 编译(使用所有CPU核心) make -j$(nproc) # 2.修改配置文件(可以不改,用默认的就行) 配置文件默认路径: 相对于build目录是../config.json, 即是在项目源码根目录 # 3. 运行程序 # 方法1:直接运行 运行服务器,默认使用../config.json配置文件 ./bin/rtmp_server # 方法2:指定配置文件 ./bin/rtmp_server --config=/path/to/your/config.json # 4. 测试(另开终端) # 推流: ffmpeg -re -i test_video.mp4 -c copy -f flv rtmp://localhost:1935/live/test # 拉流: ffplay rtmp://localhost:1935/live/test
系统架构
整体架构图
核心模块职责
模块 | 职责 | 文件 |
RtmpServer | 服务器主框架,管理TCP连接和配置 | rtmp_server.h/cc |
RtmpConnection | 单个RTMP连接处理(握手、命令、媒体数据) | rtmp_connection.h/cc |
StreamManager | 流管理,协调推流端和拉流端 | stream_manager.h/cc |
RtmpProtocolParser | RTMP协议解析和消息序列化 | rtmp_protocol.h/cc |
GopCache | GOP缓存,实现快速首屏 | gop_cache.h/cc |
AmfCodec | AMF数据编解码 | rtmp_protocol.h/cc |
核心模块详解
1. RtmpServer 服务器主框架
class RtmpServer { // 核心功能 void start(); // 启动服务器 void stop(); // 停止服务器 void onConnection(); // 处理新连接 void onMessage(); // 处理消息 // 配置管理 RtmpServerConfig config_; // 服务器配置 // 连接管理 std::unordered_map<std::string, std::shared_ptr<RtmpConnection>> connections_; // 流管理 std::shared_ptr<StreamManager> streamManager_; };
关键配置参数:
struct RtmpServerConfig { std::string listenAddress = "0.0.0.0"; uint16_t listenPort = 1935; int maxConnections = 1000; int maxStreams = 100; int gopCacheMaxCount = 1; // GOP缓存数量(优化延迟) int gopCacheMaxSizeBytes = 2 * 1024 * 1024; // 2MB GOP缓存大小 int chunkSize = 60000; // RTMP块大小 };
2. RtmpConnection 连接处理
class RtmpConnection { // 连接状态 enum class RtmpConnectionType { kUnknown, kPublisher, // 推流连接 kSubscriber // 拉流连接 }; // 核心处理流程 void onMessage(); // 接收TCP数据 bool handleHandshake(); // RTMP握手处理 void processMessages(); // 处理RTMP消息 void processCommand(); // 处理RTMP命令 void processAudioData(); // 处理音频数据 void processVideoData(); // 处理视频数据 void processMetaData(); // 处理元数据 };
3. StreamManager 流管理器
class StreamManager { // 流管理 std::shared_ptr<Stream> createStream(const std::string& name); std::shared_ptr<Stream> getStream(const std::string& name); // 推流管理 std::shared_ptr<Publisher> createPublisher(const std::string& streamName, TcpConnectionPtr conn); // 拉流管理 std::shared_ptr<Subscriber> createSubscriber(const std::string& streamName, TcpConnectionPtr conn); // 数据转发 void onPublisherData(const std::string& streamName, std::shared_ptr<MediaPacket> packet); private: std::unordered_map<std::string, std::shared_ptr<Stream>> streams_; };
RTMP协议实现
RTMP协议栈
+---------------------------+ | Application Layer | <- RTMP Commands, Media Data +---------------------------+ | RTMP Messages | <- Audio/Video/Command Messages +---------------------------+ | RTMP Chunks | <- Chunk分片传输 +---------------------------+ | TCP | <- 可靠传输 +---------------------------+
1. 消息类型定义
enum class RtmpMessageType : uint8_t { kSetChunkSize = 1, // 设置Chunk大小 kAbort = 2, // 中止消息 kAck = 3, // 确认消息 kUserControl = 4, // 用户控制消息 kWindowAckSize = 5, // 窗口确认大小 kSetPeerBandwidth = 6, // 设置对等带宽 kAudio = 8, // 音频数据 kVideo = 9, // 视频数据 kDataAMF0 = 18, // AMF0数据消息 kCommandAMF0 = 20, // AMF0命令消息 };
2. Chunk格式
enum class RtmpChunkFormat : uint8_t { kType0 = 0, // 完整消息头 (11字节) kType1 = 1, // 无消息流ID (7字节) kType2 = 2, // 仅时间戳增量 (3字节) kType3 = 3 // 仅数据 (0字节) };
Chunk Header 结构:
Type 0: [Basic Header][Timestamp][Message Length][Message Type ID][Message Stream ID] Type 1: [Basic Header][Timestamp Delta][Message Length][Message Type ID] Type 2: [Basic Header][Timestamp Delta] Type 3: [Basic Header]
3. AMF编解码
class AmfCodec { // 基本类型编解码 static void encodeAmfValue(Buffer* buffer, const AmfValue& value); static bool decodeAmfValue(Buffer* buffer, AmfValue* value); // 复合类型编解码 static void encodeAmfObject(Buffer* buffer, const std::unordered_map<std::string, AmfValue>& object); static bool decodeAmfObject(Buffer* buffer, std::unordered_map<std::string, AmfValue>* object); };
AMF数据类型:
enum class AmfType : uint8_t { kNumber = 0x00, // 64位双精度浮点数 kBoolean = 0x01, // 布尔值 kString = 0x02, // UTF-8字符串 kObject = 0x03, // 对象 kNull = 0x05, // 空值 kUndefined = 0x06, // 未定义 kEcmaArray = 0x08, // ECMA数组 kObjectEnd = 0x09, // 对象结束 kStrictArray = 0x0A, // 严格数组 };
推流实现详解
推流交互时序图
推流核心代码流程
1. 处理publish命令
void RtmpConnection::handlePublish(const RtmpCommand& command) { // 解析流名 std::string streamName = command.getStringArg(0); std::string publishType = command.getStringArg(1); // "live" // 设置连接类型 connectionType_ = RtmpConnectionType::kPublisher; streamName_ = streamName; // 创建Publisher对象 auto mgr = streamManager_.lock(); auto conn = tcpConnection_.lock(); publisher_ = mgr->createPublisher(streamName, conn); // 启动推流 publisher_->startPublish(); // 发送响应 sendOnFCPublish("NetStream.Publish.Start", "Started publishing stream."); sendPublishStatus("NetStream.Publish.Start", "Started publishing stream."); // 更新状态 parser_.setState(RtmpConnectionState::kPublishing); }
2. 处理媒体数据
void RtmpConnection::processAudioData(const RtmpMessage& message) { if (!isPublisher() || !publisher_) return; // 创建MediaPacket auto packet = std::make_shared<MediaPacket>( MediaPacketType::kAudio, message.payload(), message.header().timestamp, message.header().messageStreamId ); // 转发给Publisher publisher_->onMediaData(packet); } void RtmpConnection::processVideoData(const RtmpMessage& message) { if (!isPublisher() || !publisher_) return; // 创建MediaPacket auto packet = std::make_shared<MediaPacket>( MediaPacketType::kVideo, message.payload(), message.header().timestamp, message.header().messageStreamId ); // 转发给Publisher publisher_->onMediaData(packet); }
3. Publisher数据转发
void Publisher::onMediaData(std::shared_ptr<MediaPacket> packet) { if (!packet || !isPublishing_) return; updateLastActiveTime(); updateStats(packet->data().size(), 1); // 转发给StreamManager if (auto mgr = manager_.lock()) { mgr->onPublisherData(streamName_, packet); } }
关键实现细节
1. 序列头检测和缓存
// 检测AVC Sequence Header bool isAvcSequenceHeader(const MediaPacket& packet) { return packet.type() == MediaPacketType::kVideo && packet.videoCodec() == VideoCodec::kAVC && packet.data().size() > 1 && static_cast<uint8_t>(packet.data()[1]) == 0; } // 检测AAC Sequence Header bool isAacSequenceHeader(const MediaPacket& packet) { return packet.type() == MediaPacketType::kAudio && packet.audioFormat() == AudioFormat::kAAC && packet.data().size() > 1 && static_cast<uint8_t>(packet.data()[1]) == 0; }
2. 关键帧检测
bool MediaPacket::isKeyFrame() const { if (type_ != MediaPacketType::kVideo || data_.empty()) { return false; } // FLV视频标签格式:Frame Type (4 bits) + Codec ID (4 bits) uint8_t firstByte = static_cast<uint8_t>(data_[0]); uint8_t frameType = (firstByte >> 4) & 0x0F; return frameType == 1; // 1 = 关键帧 }
拉流实现详解
拉流交互时序图
拉流核心代码流程
1. 处理play命令
void RtmpConnection::handlePlay(const RtmpCommand& command) { std::string streamName = command.getStringArg(0); // 设置连接类型 connectionType_ = RtmpConnectionType::kSubscriber; streamName_ = streamName; // 创建Subscriber对象 auto mgr = streamManager_.lock(); auto conn = tcpConnection_.lock(); subscriber_ = mgr->createSubscriber(streamName, conn); // 发送RTMP标准播放响应序列 uint32_t streamId = 1; // 1. StreamBegin用户控制消息 sendStreamBegin(streamId); // 2. NetStream.Play.Reset sendPlayStatus("NetStream.Play.Reset", "Playing and resetting stream."); // 3. NetStream.Play.Start sendPlayStatus("NetStream.Play.Start", "Started playing stream."); // 4. RtmpSampleAccess sendRtmpSampleAccess(); // 5. NetStream.Data.Start sendDataStart(); // 启动播放 subscriber_->startPlay(); parser_.setState(RtmpConnectionState::kPlaying); }
2. Subscriber播放启动
void Subscriber::startPlay() { if (!isPlaying_) { isPlaying_ = true; startTime_ = Timestamp::now(); // 重置时间戳映射器 timeStampMapper_.reset(); // 发送缓存数据 if (auto mgr = manager_.lock()) { auto stream = mgr->getStream(streamName_); if (stream) { // 1. 发送序列头(必须) auto sequenceHeaders = stream->getSequenceHeaders(); if (!sequenceHeaders.empty()) { sendCachedData(sequenceHeaders); } // 2. 发送GOP缓存(可选,快速首屏) auto gopPackets = stream->gopCache()->getLatestKeyFramePackets(); if (!gopPackets.empty()) { sendCachedData(gopPackets); } } } } }
3. 实时数据广播
void Stream::broadcastToSubscribers(std::shared_ptr<MediaPacket> packet) { std::vector<std::shared_ptr<Subscriber>> subscribersCopy; // 复制订阅者列表(避免在发送时修改) { MutexLockGuard lock(mutex_); subscribersCopy.reserve(subscribers_.size()); for (const auto& pair : subscribers_) { if (pair.second && pair.second->isConnected()) { subscribersCopy.push_back(pair.second); } } } // 发送数据 for (auto& subscriber : subscribersCopy) { subscriber->sendMediaData(packet); } }
关键实现细节
1. 序列头管理
// Stream类独立管理序列头 class Stream { private: std::shared_ptr<MediaPacket> avcSequenceHeader_; // H.264配置 std::shared_ptr<MediaPacket> aacSequenceHeader_; // AAC配置 std::shared_ptr<MediaPacket> metadataPacket_; // 元数据 public: std::vector<std::shared_ptr<MediaPacket>> getSequenceHeaders() const { std::vector<std::shared_ptr<MediaPacket>> headers; MutexLockGuard lock(mutex_); // 按顺序添加 if (metadataPacket_) headers.push_back(metadataPacket_); if (avcSequenceHeader_) headers.push_back(avcSequenceHeader_); if (aacSequenceHeader_) headers.push_back(aacSequenceHeader_); return headers; } };
2. 播放响应消息格式
// StreamBegin用户控制消息 void RtmpConnection::sendStreamBegin(uint32_t streamId) { Buffer buffer; buffer.appendInt16(0x0000); // 事件类型:StreamBegin buffer.appendInt32(streamId); RtmpMessageHeader header; header.messageTypeId = static_cast<uint8_t>(RtmpMessageType::kUserControl); header.messageLength = static_cast<uint32_t>(buffer.readableBytes()); RtmpMessage message(header, std::string(buffer.peek(), buffer.readableBytes())); sendMessage(message); } // onStatus消息 void RtmpConnection::sendPlayStatus(const std::string& code, const std::string& description) { Buffer buffer; // 命令名:onStatus AmfCodec::encodeAmfValue(&buffer, AmfValue("onStatus")); // 事务ID:0 AmfCodec::encodeAmfValue(&buffer, AmfValue(0.0)); // 命令对象:null AmfCodec::encodeAmfValue(&buffer, AmfValue()); // 信息对象 std::unordered_map<std::string, AmfValue> info; info["level"] = AmfValue("status"); info["code"] = AmfValue(code); info["description"] = AmfValue(description); AmfCodec::encodeAmfObject(&buffer, info); RtmpMessageHeader header; header.messageTypeId = static_cast<uint8_t>(RtmpMessageType::kCommandAMF0); header.messageLength = static_cast<uint32_t>(buffer.readableBytes()); header.messageStreamId = 1; RtmpMessage message(header, std::string(buffer.peek(), buffer.readableBytes())); sendMessage(message); }
多路流处理
多路推流架构
多路拉流架构
核心实现
1. 流管理器
class StreamManager { private: std::unordered_map<std::string, std::shared_ptr<Stream>> streams_; public: // 创建或获取流 std::shared_ptr<Stream> getOrCreateStream(const std::string& streamName) { MutexLockGuard lock(mutex_); auto it = streams_.find(streamName); if (it != streams_.end()) { return it->second; } // 创建新流 auto stream = std::make_shared<Stream>(streamName); stream->initialize(); streams_[streamName] = stream; return stream; } // 数据转发 void onPublisherData(const std::string& streamName, std::shared_ptr<MediaPacket> packet) { auto stream = getStream(streamName); if (stream) { stream->onMediaData(packet); // 广播给所有订阅者 } } };
2. 订阅者管理
class Stream { private: std::unordered_map<std::string, std::shared_ptr<Subscriber>> subscribers_; public: void addSubscriber(std::shared_ptr<Subscriber> subscriber) { MutexLockGuard lock(mutex_); subscribers_[subscriber->id()] = subscriber; LOG_INFO << "Added subscriber: " << subscriber->id() << " to stream: " << name_ << ", total subscribers: " << subscribers_.size(); } void removeSubscriber(const std::string& subscriberId) { MutexLockGuard lock(mutex_); auto it = subscribers_.find(subscriberId); if (it != subscribers_.end()) { subscribers_.erase(it); LOG_INFO << "Removed subscriber: " << subscriberId << " from stream: " << name_ << ", remaining subscribers: " << subscribers_.size(); } } };
3. 并发数据广播
void Stream::broadcastToSubscribers(std::shared_ptr<MediaPacket> packet) { // 使用副本避免长时间持锁 std::vector<std::shared_ptr<Subscriber>> subscribersCopy; { MutexLockGuard lock(mutex_); subscribersCopy.reserve(subscribers_.size()); for (const auto& pair : subscribers_) { if (pair.second && pair.second->isConnected()) { subscribersCopy.push_back(pair.second); } } } // 并发发送数据 for (auto& subscriber : subscribersCopy) { subscriber->sendMediaData(packet); } }
性能优化
1. 连接池管理
class RtmpServer { private: std::unordered_map<std::string, std::shared_ptr<RtmpConnection>> connections_; std::atomic<int> connectionCount_{0}; public: void onConnection(const TcpConnectionPtr& conn) { if (connectionCount_.load() >= config_.maxConnections) { LOG_WARN << "Max connections reached, rejecting new connection"; conn->shutdown(); return; } // 创建RTMP连接 auto rtmpConn = std::make_shared<RtmpConnection>(conn, streamManager_); connections_[rtmpConn->id()] = rtmpConn; connectionCount_.fetch_add(1); } void onDisconnection(const TcpConnectionPtr& conn) { connectionCount_.fetch_sub(1); // 清理连接 } };
2. 内存优化
// 使用对象池减少内存分配 class MediaPacketPool { private: std::queue<std::shared_ptr<MediaPacket>> pool_; std::mutex mutex_; public: std::shared_ptr<MediaPacket> acquire() { std::lock_guard<std::mutex> lock(mutex_); if (!pool_.empty()) { auto packet = pool_.front(); pool_.pop(); return packet; } return std::make_shared<MediaPacket>(); } void release(std::shared_ptr<MediaPacket> packet) { packet->reset(); std::lock_guard<std::mutex> lock(mutex_); pool_.push(packet); } };
GOP缓存机制
GOP缓存架构
GOP缓存实现
1. 核心数据结构
class GopCache { private: std::shared_ptr<Gop> currentGop_; // 当前GOP size_t maxCacheSize_; // 最大缓存大小 size_t currentCacheSize_; // 当前缓存大小 // 音视频对齐缓冲区 std::deque<std::shared_ptr<MediaPacket>> audioBuffer_; uint32_t lastKeyFrameTimestamp_ = 0; static const uint32_t AUDIO_ALIGN_WINDOW = 200; // 200ms对齐窗口 public: void addPacket(std::shared_ptr<MediaPacket> packet); std::vector<std::shared_ptr<MediaPacket>> getCachedPackets() const; std::vector<std::shared_ptr<MediaPacket>> getLatestKeyFramePackets() const; };
2. GOP检测和切换
bool GopCache::needNewGop(std::shared_ptr<MediaPacket> packet) const { // 没有当前GOP if (!currentGop_) { return true; } // 检测到新的关键帧 if (packet->type() == MediaPacketType::kVideo && packet->isKeyFrame()) { return true; } return false; } void GopCache::addPacket(std::shared_ptr<MediaPacket> packet) { MutexLockGuard lock(mutex_); // 音频帧:维护音频缓冲区 if (packet->type() == MediaPacketType::kAudio) { maintainAudioBuffer(packet); } // 检查是否需要新GOP bool isNewGop = needNewGop(packet); if (isNewGop) { startNewGop(); // 关键帧:进行音视频对齐 if (packet->type() == MediaPacketType::kVideo && packet->isKeyFrame()) { lastKeyFrameTimestamp_ = packet->timestamp(); alignAudioWithKeyFrame(lastKeyFrameTimestamp_); } } // 添加到当前GOP if (!currentGop_) { startNewGop(); } currentGop_->addPacket(packet); updateCacheSize(); }
3. 音视频对齐
void GopCache::alignAudioWithKeyFrame(uint32_t keyFrameTimestamp) { if (audioBuffer_.empty() || !currentGop_) { return; } std::vector<std::shared_ptr<MediaPacket>> alignedAudio; // 查找关键帧前后的音频帧 for (const auto& audioPacket : audioBuffer_) { uint32_t audioTimestamp = audioPacket->timestamp(); int32_t timeDiff = static_cast<int32_t>(audioTimestamp) - static_cast<int32_t>(keyFrameTimestamp); // 收集关键帧前200ms到后50ms的音频帧 if (timeDiff >= -static_cast<int32_t>(AUDIO_ALIGN_WINDOW) && timeDiff <= 50) { alignedAudio.push_back(audioPacket); } } // 按时间戳排序 std::sort(alignedAudio.begin(), alignedAudio.end(), [](const std::shared_ptr<MediaPacket>& a, const std::shared_ptr<MediaPacket>& b) { return a->timestamp() < b->timestamp(); }); // 添加到GOP开头 for (const auto& audioPacket : alignedAudio) { currentGop_->addPacket(audioPacket); } LOG_INFO << "Aligned " << alignedAudio.size() << " audio frames with key frame at " << keyFrameTimestamp; }
4. 快速首屏优化
std::vector<std::shared_ptr<MediaPacket>> GopCache::getLatestKeyFramePackets() const { MutexLockGuard lock(mutex_); std::vector<std::shared_ptr<MediaPacket>> packets; if (currentGop_ && !currentGop_->empty()) { const auto& gopPackets = currentGop_->packets(); // 找到关键帧位置 size_t keyFrameIndex = 0; bool foundKeyFrame = false; for (size_t i = 0; i < gopPackets.size(); ++i) { if (gopPackets[i]->type() == MediaPacketType::kVideo && gopPackets[i]->isKeyFrame()) { keyFrameIndex = i; foundKeyFrame = true; break; } } if (foundKeyFrame) { // 从关键帧开始,最多取10个包(约1秒内容) size_t endIndex = std::min(keyFrameIndex + 10, gopPackets.size()); for (size_t i = keyFrameIndex; i < endIndex; ++i) { packets.push_back(gopPackets[i]); } } } return packets; }
GOP缓存统计
struct GopInfo { uint32_t audioDuration = 0; // 音频时长 uint32_t videoDuration = 0; // 视频时长 size_t audioPackets = 0; // 音频包数 size_t videoPackets = 0; // 视频包数 size_t totalSize = 0; // 总大小 uint32_t firstTimestamp = 0; // 第一个时间戳 uint32_t lastTimestamp = 0; // 最后一个时间戳 bool hasKeyFrame = false; // 是否有关键帧 uint32_t audioSampleRate = 44100; // 音频采样率 double audioFrameDuration = 0.0; // 音频帧时长 }; GopInfo Gop::getGopInfo() const { GopInfo info; if (packets_.empty()) { return info; } // 统计音视频帧数和时长 uint32_t minTimestamp = packets_[0]->timestamp(); uint32_t maxTimestamp = packets_[0]->timestamp(); for (const auto& packet : packets_) { uint32_t timestamp = packet->timestamp(); minTimestamp = std::min(minTimestamp, timestamp); maxTimestamp = std::max(maxTimestamp, timestamp); info.totalSize += packet->data().size(); if (packet->type() == MediaPacketType::kAudio) { info.audioPackets++; } else if (packet->type() == MediaPacketType::kVideo) { info.videoPackets++; if (packet->isKeyFrame()) { info.hasKeyFrame = true; } } } info.firstTimestamp = minTimestamp; info.lastTimestamp = maxTimestamp; // 计算音频帧时长(基于44.1kHz采样率,1024采样点/帧) info.audioFrameDuration = 1024.0 / 44100.0 * 1000.0; // 23.22ms info.audioDuration = static_cast<uint32_t>(info.audioPackets * info.audioFrameDuration); return info; }
时间戳同步
时间戳映射器
时间戳映射实现
1. 时间戳映射器结构
struct TimestampMapper { // 统一基准时间戳 uint32_t baseTimestamp = 0; bool initialized = false; // 音频同步 uint32_t audioBaseTimestamp = 0; uint32_t audioFrameNum = 0; uint32_t audioFrameInterval = 0; // 动态计算 bool audioInitialized = false; // 视频同步 uint32_t videoBaseTimestamp = 0; uint32_t lastVideoTimestamp = 0; bool videoInitialized = false; // 同步阈值 static const uint32_t SYNC_THRESHOLD = 300; // 300ms uint32_t mapTimestamp(uint32_t originalTimestamp, MediaPacketType type); void reset(); };
2. 音频时间戳映射
uint32_t TimestampMapper::mapAudioTimestamp(uint32_t originalTimestamp) { // 初始化统一基准 initializeBase(originalTimestamp); if (originalTimestamp < baseTimestamp) { return lastAudioTimestamp; // 防止时间戳倒退 } uint32_t mappedTimestamp = originalTimestamp - baseTimestamp; // 动态计算音频帧间隔 calculateAudioFrameInterval(originalTimestamp); // 初始化音频基准 if (!audioInitialized) { audioBaseTimestamp = mappedTimestamp; audioFrameNum = 0; audioInitialized = true; lastAudioTimestamp = mappedTimestamp; return mappedTimestamp; } // 如果还没有稳定的帧间隔,直接返回 if (!audioFrameIntervalCalculated) { lastAudioTimestamp = mappedTimestamp; return mappedTimestamp; } // 计算期望的音频时间戳 uint32_t expectedTimestamp = audioBaseTimestamp + audioFrameNum * audioFrameInterval; // 检查时间戳差异 int32_t timeDiff = static_cast<int32_t>(mappedTimestamp) - static_cast<int32_t>(expectedTimestamp); // 如果在同步阈值内,使用期望时间戳 if (abs(timeDiff) <= SYNC_THRESHOLD) { lastAudioTimestamp = expectedTimestamp; audioFrameNum++; return expectedTimestamp; } else { // 超出阈值,重新校准 audioBaseTimestamp = mappedTimestamp; audioFrameNum = 0; lastAudioTimestamp = mappedTimestamp; return mappedTimestamp; } }
3. 视频时间戳映射
uint32_t TimestampMapper::mapVideoTimestamp(uint32_t originalTimestamp) { // 初始化统一基准 initializeBase(originalTimestamp); if (originalTimestamp < baseTimestamp) { return lastVideoTimestamp; // 防止时间戳倒退 } uint32_t mappedTimestamp = originalTimestamp - baseTimestamp; // 初始化视频基准 if (!videoInitialized) { videoBaseTimestamp = mappedTimestamp; videoInitialized = true; lastVideoTimestamp = mappedTimestamp; return mappedTimestamp; } // 检查时间戳单调性 if (mappedTimestamp < lastVideoTimestamp) { // 时间戳倒退,使用上一个时间戳 return lastVideoTimestamp; } // 检查时间戳跳跃 uint32_t timeDiff = mappedTimestamp - lastVideoTimestamp; if (timeDiff > 2000) { // 超过2秒的跳跃 LOG_WARN << "Video timestamp jump detected: " << timeDiff << "ms"; // 可以选择重新校准或者限制跳跃 mappedTimestamp = lastVideoTimestamp + 33; // 假设30fps } lastVideoTimestamp = mappedTimestamp; return mappedTimestamp; }
4. 音频帧间隔计算
void TimestampMapper::calculateAudioFrameInterval(uint32_t currentTimestamp) { if (!audioFrameIntervalCalculated) { if (lastRawAudioTimestamp != 0) { uint32_t interval = currentTimestamp - lastRawAudioTimestamp; // 过滤异常值(正常音频帧间隔应该在10-50ms之间) if (interval >= 10 && interval <= 50) { audioFrameIntervalSum += interval; audioFrameIntervalCount++; // 收集足够样本后计算平均值 if (audioFrameIntervalCount >= FRAME_INTERVAL_SAMPLES) { audioFrameInterval = audioFrameIntervalSum / audioFrameIntervalCount; audioFrameIntervalCalculated = true; LOG_INFO << "Audio frame interval calculated: " << audioFrameInterval << "ms"; } } } lastRawAudioTimestamp = currentTimestamp; } }
同步检查
bool TimestampMapper::checkSync() const { if (!initialized || !audioInitialized || !videoInitialized) { return true; // 未初始化完成,暂不检查 } // 计算音视频时间戳差异 int32_t avDiff = static_cast<int32_t>(lastAudioTimestamp) - static_cast<int32_t>(lastVideoTimestamp); bool inSync = abs(avDiff) <= SYNC_THRESHOLD; if (!inSync) { LOG_WARN << "Audio-Video sync issue detected: " << avDiff << "ms"; } return inSync; }
关键技术细节
1. RTMP握手实现
class RtmpProtocolParser { public: // 生成C0+C1握手数据 std::string generateHandshakeC0C1() { std::string c0c1; c0c1.resize(1 + kRtmpHandshakeSize); // C0: 版本号 c0c1[0] = 0x03; // C1: 握手数据 char* c1 = &c0c1[1]; // 时间戳 (4字节) uint32_t timestamp = static_cast<uint32_t>(time(nullptr)); c1[0] = (timestamp >> 24) & 0xFF; c1[1] = (timestamp >> 16) & 0xFF; c1[2] = (timestamp >> 8) & 0xFF; c1[3] = timestamp & 0xFF; // 版本 (4字节) c1[4] = c1[5] = c1[6] = c1[7] = 0x00; // 随机数据 (1528字节) std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution<> dis(0, 255); for (int i = 8; i < kRtmpHandshakeSize; ++i) { c1[i] = static_cast<char>(dis(gen)); } return c0c1; } // 生成C2握手数据 std::string generateHandshakeC2(const std::string& s1) { std::string c2; c2.resize(kRtmpHandshakeSize); // C2是S1的回显 std::memcpy(&c2[0], s1.data(), kRtmpHandshakeSize); return c2; } };
2. Chunk解析
bool RtmpProtocolParser::parseChunkHeader(Buffer* buffer, RtmpChunkHeader* header) { if (buffer->readableBytes() < 1) { return false; } // 解析Basic Header uint8_t basicHeader = static_cast<uint8_t>(buffer->peekInt8()); header->format = static_cast<RtmpChunkFormat>((basicHeader >> 6) & 0x03); uint32_t chunkStreamId = basicHeader & 0x3F; // 处理扩展Chunk Stream ID size_t basicHeaderSize = 1; if (chunkStreamId == 0) { if (buffer->readableBytes() < 2) return false; chunkStreamId = static_cast<uint8_t>(buffer->peek()[1]) + 64; basicHeaderSize = 2; } else if (chunkStreamId == 1) { if (buffer->readableBytes() < 3) return false; chunkStreamId = static_cast<uint8_t>(buffer->peek()[1]) * 256 + static_cast<uint8_t>(buffer->peek()[2]) + 64; basicHeaderSize = 3; } header->chunkStreamId = chunkStreamId; // 计算Message Header大小 size_t messageHeaderSize = 0; switch (header->format) { case RtmpChunkFormat::kType0: messageHeaderSize = 11; break; case RtmpChunkFormat::kType1: messageHeaderSize = 7; break; case RtmpChunkFormat::kType2: messageHeaderSize = 3; break; case RtmpChunkFormat::kType3: messageHeaderSize = 0; break; } // 检查数据是否足够 if (buffer->readableBytes() < basicHeaderSize + messageHeaderSize) { return false; } // 跳过Basic Header buffer->retrieve(basicHeaderSize); // 解析Message Header if (messageHeaderSize > 0) { parseMessageHeader(buffer, header, messageHeaderSize); } return true; }
3. AMF编解码实现
// AMF Number编码 void AmfCodec::encodeNumber(Buffer* buffer, double number) { char data[8]; uint64_t bits; std::memcpy(&bits, &number, 8); // 大端序写入 for (int i = 0; i < 8; ++i) { data[i] = static_cast<char>((bits >> (56 - i * 8)) & 0xFF); } buffer->append(data, 8); } // AMF String编码 void AmfCodec::encodeString(Buffer* buffer, const std::string& str) { // 字符串长度(2字节大端序) buffer->appendInt16(static_cast<int16_t>(str.size())); // 字符串内容 buffer->append(str); } // AMF Object编码 void AmfCodec::encodeAmfObject(Buffer* buffer, const std::unordered_map<std::string, AmfValue>& object) { // 对象类型标识 buffer->appendInt8(static_cast<int8_t>(AmfType::kObject)); // 编码键值对 for (const auto& pair : object) { // 键(不包含类型标识) buffer->appendInt16(static_cast<int16_t>(pair.first.size())); buffer->append(pair.first); // 值(包含类型标识) encodeAmfValue(buffer, pair.second); } // 对象结束标记 buffer->appendInt16(0); // 空键 buffer->appendInt8(static_cast<int8_t>(AmfType::kObjectEnd)); }
4. 消息序列化
void RtmpProtocolParser::serializeMessage(const RtmpMessage& message, Buffer* buffer) { const auto& header = message.header(); const auto& payload = message.payload(); // 选择Chunk Stream ID uint32_t chunkStreamId = selectChunkStreamId(header.messageTypeId); // 计算需要的Chunk数量 uint32_t chunkCount = (payload.size() + chunkSize_ - 1) / chunkSize_; for (uint32_t i = 0; i < chunkCount; ++i) { // 计算当前Chunk的payload大小 uint32_t chunkPayloadSize = std::min(chunkSize_, static_cast<uint32_t>(payload.size() - i * chunkSize_)); // 写入Chunk Header if (i == 0) { // 第一个Chunk使用Type 0 writeChunkHeader(buffer, RtmpChunkFormat::kType0, chunkStreamId, header); } else { // 后续Chunk使用Type 3 writeChunkHeader(buffer, RtmpChunkFormat::kType3, chunkStreamId, header); } // 写入Chunk Payload buffer->append(&payload[i * chunkSize_], chunkPayloadSize); } }
5. 媒体数据检测
class MediaPacket { public: // 检测视频帧类型 VideoFrameType videoFrameType() const { if (type_ != MediaPacketType::kVideo || data_.empty()) { return VideoFrameType::kInterFrame; } // FLV视频标签:Frame Type (4 bits) + Codec ID (4 bits) uint8_t firstByte = static_cast<uint8_t>(data_[0]); uint8_t frameType = (firstByte >> 4) & 0x0F; return static_cast<VideoFrameType>(frameType); } // 检测视频编码格式 VideoCodec videoCodec() const { if (type_ != MediaPacketType::kVideo || data_.empty()) { return VideoCodec::kAVC; } uint8_t firstByte = static_cast<uint8_t>(data_[0]); uint8_t codecId = firstByte & 0x0F; return static_cast<VideoCodec>(codecId); } // 检测音频格式 AudioFormat audioFormat() const { if (type_ != MediaPacketType::kAudio || data_.empty()) { return AudioFormat::kAAC; } uint8_t firstByte = static_cast<uint8_t>(data_[0]); uint8_t format = (firstByte >> 4) & 0x0F; return static_cast<AudioFormat>(format); } // 检测AAC序列头 bool isAacSequenceHeader() const { if (type_ != MediaPacketType::kAudio || data_.size() < 2) { return false; } // AAC序列头:音频格式=AAC && AAC包类型=0 uint8_t firstByte = static_cast<uint8_t>(data_[0]); uint8_t secondByte = static_cast<uint8_t>(data_[1]); uint8_t format = (firstByte >> 4) & 0x0F; uint8_t aacPacketType = secondByte; return (format == static_cast<uint8_t>(AudioFormat::kAAC)) && (aacPacketType == 0); } };
监控和日志
1. 日志级别
// 设置日志级别 Logger::setLogLevel(Logger::LogLevel::DEBUG); // 关键日志输出 LOG_INFO << "RTMP Server started on " << config_.listenAddress << ":" << config_.listenPort; LOG_DEBUG << "Received media packet: " << packet->data().size() << " bytes"; LOG_WARN << "Connection timeout: " << connectionId; LOG_ERROR << "Failed to parse RTMP message";
2. 性能监控
struct ServerStatistics { std::atomic<uint64_t> totalConnections{0}; std::atomic<uint64_t> currentConnections{0}; std::atomic<uint64_t> totalStreams{0}; std::atomic<uint64_t> currentStreams{0}; std::atomic<uint64_t> totalBytesReceived{0}; std::atomic<uint64_t> totalBytesSent{0}; std::atomic<uint64_t> totalPacketsReceived{0}; std::atomic<uint64_t> totalPacketsSent{0}; }; // 定期输出统计信息 void printStatistics() { auto stats = server->getStatistics(); LOG_INFO << "=== RTMP Server Statistics ==="; LOG_INFO << "Connections: " << stats.currentConnections.load() << "/" << stats.totalConnections.load(); LOG_INFO << "Streams: " << stats.currentStreams.load() << "/" << stats.totalStreams.load(); LOG_INFO << "Bytes: RX=" << stats.totalBytesReceived.load() << " TX=" << stats.totalBytesSent.load(); LOG_INFO << "Packets: RX=" << stats.totalPacketsReceived.load() << " TX=" << stats.totalPacketsSent.load(); }
常见问题解决
1. 连接问题
问题: 客户端连接超时
ERROR: Connection timeout: conn_12345
解决方案:
- 检查防火墙设置
- 确认端口1935未被占用
- 检查网络连通性
# 检查端口监听 netstat -tlnp | grep 1935 # 测试连接 telnet localhost 1935
2. 握手失败
问题: RTMP握手失败
ERROR: Invalid handshake data from: conn_12345
解决方案:
- 检查客户端RTMP版本(必须为3)
- 验证握手数据完整性
- 检查网络传输是否稳定
// 调试握手过程 LOG_DEBUG << "Handshake state: " << static_cast<int>(parser_.state()); LOG_DEBUG << "Handshake data size: " << buffer->readableBytes();
3. 音视频同步问题
问题: 音视频不同步
WARN: Audio-Video sync issue detected: 500ms
解决方案:
- 检查时间戳映射器配置
- 调整同步阈值
- 检查GOP缓存对齐
// 调整同步阈值 static const uint32_t SYNC_THRESHOLD = 500; // 增加到500ms // 检查音视频时间戳 LOG_DEBUG << "Audio timestamp: " << audioTimestamp << " Video timestamp: " << videoTimestamp << " Diff: " << (audioTimestamp - videoTimestamp);
4. 调试技巧
1. 抓包分析
# 使用tcpdump抓包 sudo tcpdump -i lo -w rtmp.pcap port 1935 # 使用wireshark分析 wireshark rtmp.pcap
2. 性能分析
# 使用top监控CPU和内存 top -p $(pgrep rtmp_server) # 使用perf分析性能热点 perf record -g ./bin/rtmp_server perf report
总结
本RTMP推拉流项目实现了一个完整的流媒体服务器,主要特点包括:
- 完整的RTMP协议支持: 握手、命令处理、媒体数据传输
- 高性能网络处理: 基于muduo库的异步网络框架
- 音视频同步机制: 时间戳映射器确保播放连续性
- GOP缓存优化: 实现快速首屏和低延迟播放
- 多路流支持: 支持多个推流和拉流并发
- 完善的错误处理: 异常检测和恢复机制
通过深入理解RTMP协议和流媒体技术,可以进一步扩展功能,如:
- 支持更多编码格式(H.265、Opus等)
- 增加HLS/DASH输出
- 实现流录制功能
- 添加用户认证和权限控制
- 集成CDN分发