音视频项目:RTMP流媒体服务器开发指南

项目概述

本项目是一个基于muduo网络库实现的高性能RTMP流媒体服务器,支持:

  • ✅ H.264 + AAC 音视频编码格式
  • ✅ 多路推流和拉流
  • ✅ GOP缓存机制 实现快速首屏
  • ✅ 音视频同步
  • ✅ 低延迟优化
  • ✅ 时间戳映射 保证播放连续性

技术栈

  • 网络框架: muduo (高性能网络库)
  • 协议支持: RTMP 1.0 规范
  • 音视频格式: H.264 + AAC
  • 编程语言: C++17
  • 构建工具: CMake

视频讲解及源码领取https://www.bilibili.com/video/BV1mou4zhEFX/

快速开始

编译运行步骤

# 1. 编译RTMP服务器
mkdir build && cd build

# 配置CMake(Debug版本,便于调试)
cmake -DCMAKE_BUILD_TYPE=Debug ..
# 或者(默认debug模式)
cmake ..
# 或者编译Release版本(生产环境)
cmake -DCMAKE_BUILD_TYPE=Release  ..

# 编译(使用所有CPU核心)
make -j$(nproc)

# 2.修改配置文件(可以不改,用默认的就行)
配置文件默认路径: 相对于build目录是../config.json, 即是在项目源码根目录

# 3. 运行程序
# 方法1:直接运行 运行服务器,默认使用../config.json配置文件
./bin/rtmp_server

# 方法2:指定配置文件
./bin/rtmp_server --config=/path/to/your/config.json

# 4. 测试(另开终端)
# 推流:
ffmpeg -re -i test_video.mp4 -c copy -f flv rtmp://localhost:1935/live/test

# 拉流:
ffplay rtmp://localhost:1935/live/test

系统架构

整体架构图

核心模块职责

模块

职责

文件

RtmpServer

服务器主框架,管理TCP连接和配置

rtmp_server.h/cc

RtmpConnection

单个RTMP连接处理(握手、命令、媒体数据)

rtmp_connection.h/cc

StreamManager

流管理,协调推流端和拉流端

stream_manager.h/cc

RtmpProtocolParser

RTMP协议解析和消息序列化

rtmp_protocol.h/cc

GopCache

GOP缓存,实现快速首屏

gop_cache.h/cc

AmfCodec

AMF数据编解码

rtmp_protocol.h/cc

核心模块详解

1. RtmpServer 服务器主框架

class RtmpServer {
    // 核心功能
    void start();                    // 启动服务器
    void stop();                     // 停止服务器
    void onConnection();             // 处理新连接
    void onMessage();                // 处理消息
    
    // 配置管理
    RtmpServerConfig config_;        // 服务器配置
    
    // 连接管理
    std::unordered_map<std::string, 
        std::shared_ptr<RtmpConnection>> connections_;
    
    // 流管理
    std::shared_ptr<StreamManager> streamManager_;
};

关键配置参数:

struct RtmpServerConfig {
    std::string listenAddress = "0.0.0.0";
    uint16_t listenPort = 1935;
    int maxConnections = 1000;
    int maxStreams = 100;
    int gopCacheMaxCount = 1;        // GOP缓存数量(优化延迟)
    int gopCacheMaxSizeBytes = 2 * 1024 * 1024; // 2MB  GOP缓存大小
    int chunkSize = 60000;             // RTMP块大小
};

2. RtmpConnection 连接处理

class RtmpConnection {
    // 连接状态
    enum class RtmpConnectionType {
        kUnknown,
        kPublisher,      // 推流连接
        kSubscriber      // 拉流连接
    };
    
    // 核心处理流程
    void onMessage();                // 接收TCP数据
    bool handleHandshake();          // RTMP握手处理
    void processMessages();          // 处理RTMP消息
    void processCommand();           // 处理RTMP命令
    void processAudioData();         // 处理音频数据
    void processVideoData();         // 处理视频数据
    void processMetaData();          // 处理元数据
};

3. StreamManager 流管理器

class StreamManager {
    // 流管理
    std::shared_ptr<Stream> createStream(const std::string& name);
    std::shared_ptr<Stream> getStream(const std::string& name);
    
    // 推流管理
    std::shared_ptr<Publisher> createPublisher(const std::string& streamName,
                                              TcpConnectionPtr conn);
    
    // 拉流管理  
    std::shared_ptr<Subscriber> createSubscriber(const std::string& streamName,
                                                TcpConnectionPtr conn);
    
    // 数据转发
    void onPublisherData(const std::string& streamName, 
                        std::shared_ptr<MediaPacket> packet);
    
private:
    std::unordered_map<std::string, std::shared_ptr<Stream>> streams_;
};

RTMP协议实现

RTMP协议栈

+---------------------------+
|    Application Layer      |  <- RTMP Commands, Media Data
+---------------------------+
|      RTMP Messages        |  <- Audio/Video/Command Messages
+---------------------------+
|      RTMP Chunks          |  <- Chunk分片传输
+---------------------------+
|         TCP              |  <- 可靠传输
+---------------------------+

1. 消息类型定义

enum class RtmpMessageType : uint8_t {
    kSetChunkSize = 1,           // 设置Chunk大小
    kAbort = 2,                  // 中止消息
    kAck = 3,                    // 确认消息
    kUserControl = 4,            // 用户控制消息
    kWindowAckSize = 5,          // 窗口确认大小
    kSetPeerBandwidth = 6,       // 设置对等带宽
    kAudio = 8,                  // 音频数据
    kVideo = 9,                  // 视频数据
    kDataAMF0 = 18,             // AMF0数据消息
    kCommandAMF0 = 20,          // AMF0命令消息
};

2. Chunk格式

enum class RtmpChunkFormat : uint8_t {
    kType0 = 0,  // 完整消息头 (11字节)
    kType1 = 1,  // 无消息流ID (7字节)
    kType2 = 2,  // 仅时间戳增量 (3字节)
    kType3 = 3   // 仅数据 (0字节)
};

Chunk Header 结构:

Type 0: [Basic Header][Timestamp][Message Length][Message Type ID][Message Stream ID]
Type 1: [Basic Header][Timestamp Delta][Message Length][Message Type ID]
Type 2: [Basic Header][Timestamp Delta]
Type 3: [Basic Header]

3. AMF编解码

class AmfCodec {
    // 基本类型编解码
    static void encodeAmfValue(Buffer* buffer, const AmfValue& value);
    static bool decodeAmfValue(Buffer* buffer, AmfValue* value);
    
    // 复合类型编解码
    static void encodeAmfObject(Buffer* buffer, 
                               const std::unordered_map<std::string, AmfValue>& object);
    static bool decodeAmfObject(Buffer* buffer, 
                               std::unordered_map<std::string, AmfValue>* object);
};

AMF数据类型:

enum class AmfType : uint8_t {
    kNumber = 0x00,      // 64位双精度浮点数
    kBoolean = 0x01,     // 布尔值
    kString = 0x02,      // UTF-8字符串
    kObject = 0x03,      // 对象
    kNull = 0x05,        // 空值
    kUndefined = 0x06,   // 未定义
    kEcmaArray = 0x08,   // ECMA数组
    kObjectEnd = 0x09,   // 对象结束
    kStrictArray = 0x0A, // 严格数组
};

推流实现详解

推流交互时序图

推流核心代码流程

1. 处理publish命令

void RtmpConnection::handlePublish(const RtmpCommand& command) {
    // 解析流名
    std::string streamName = command.getStringArg(0);
    std::string publishType = command.getStringArg(1); // "live"
    
    // 设置连接类型
    connectionType_ = RtmpConnectionType::kPublisher;
    streamName_ = streamName;
    
    // 创建Publisher对象
    auto mgr = streamManager_.lock();
    auto conn = tcpConnection_.lock();
    publisher_ = mgr->createPublisher(streamName, conn);
    
    // 启动推流
    publisher_->startPublish();
    
    // 发送响应
    sendOnFCPublish("NetStream.Publish.Start", "Started publishing stream.");
    sendPublishStatus("NetStream.Publish.Start", "Started publishing stream.");
    
    // 更新状态
    parser_.setState(RtmpConnectionState::kPublishing);
}

2. 处理媒体数据

void RtmpConnection::processAudioData(const RtmpMessage& message) {
    if (!isPublisher() || !publisher_) return;
    
    // 创建MediaPacket
    auto packet = std::make_shared<MediaPacket>(
        MediaPacketType::kAudio,
        message.payload(),
        message.header().timestamp,
        message.header().messageStreamId
    );
    
    // 转发给Publisher
    publisher_->onMediaData(packet);
}

void RtmpConnection::processVideoData(const RtmpMessage& message) {
    if (!isPublisher() || !publisher_) return;
    
    // 创建MediaPacket
    auto packet = std::make_shared<MediaPacket>(
        MediaPacketType::kVideo,
        message.payload(),
        message.header().timestamp,
        message.header().messageStreamId
    );
    
    // 转发给Publisher
    publisher_->onMediaData(packet);
}

3. Publisher数据转发

void Publisher::onMediaData(std::shared_ptr<MediaPacket> packet) {
    if (!packet || !isPublishing_) return;
    
    updateLastActiveTime();
    updateStats(packet->data().size(), 1);
    
    // 转发给StreamManager
    if (auto mgr = manager_.lock()) {
        mgr->onPublisherData(streamName_, packet);
    }
}

关键实现细节

1. 序列头检测和缓存

// 检测AVC Sequence Header
bool isAvcSequenceHeader(const MediaPacket& packet) {
    return packet.type() == MediaPacketType::kVideo && 
           packet.videoCodec() == VideoCodec::kAVC && 
           packet.data().size() > 1 &&
           static_cast<uint8_t>(packet.data()[1]) == 0;
}

// 检测AAC Sequence Header  
bool isAacSequenceHeader(const MediaPacket& packet) {
    return packet.type() == MediaPacketType::kAudio && 
           packet.audioFormat() == AudioFormat::kAAC &&
           packet.data().size() > 1 &&
           static_cast<uint8_t>(packet.data()[1]) == 0;
}

2. 关键帧检测

bool MediaPacket::isKeyFrame() const {
    if (type_ != MediaPacketType::kVideo || data_.empty()) {
        return false;
    }
    
    // FLV视频标签格式:Frame Type (4 bits) + Codec ID (4 bits)
    uint8_t firstByte = static_cast<uint8_t>(data_[0]);
    uint8_t frameType = (firstByte >> 4) & 0x0F;
    
    return frameType == 1; // 1 = 关键帧
}

拉流实现详解

拉流交互时序图

拉流核心代码流程

1. 处理play命令

void RtmpConnection::handlePlay(const RtmpCommand& command) {
    std::string streamName = command.getStringArg(0);
    
    // 设置连接类型
    connectionType_ = RtmpConnectionType::kSubscriber;
    streamName_ = streamName;
    
    // 创建Subscriber对象
    auto mgr = streamManager_.lock();
    auto conn = tcpConnection_.lock();
    subscriber_ = mgr->createSubscriber(streamName, conn);
    
    // 发送RTMP标准播放响应序列
    uint32_t streamId = 1;
    
    // 1. StreamBegin用户控制消息
    sendStreamBegin(streamId);
    
    // 2. NetStream.Play.Reset
    sendPlayStatus("NetStream.Play.Reset", "Playing and resetting stream.");
    
    // 3. NetStream.Play.Start
    sendPlayStatus("NetStream.Play.Start", "Started playing stream.");
    
    // 4. RtmpSampleAccess
    sendRtmpSampleAccess();
    
    // 5. NetStream.Data.Start
    sendDataStart();
    
    // 启动播放
    subscriber_->startPlay();
    parser_.setState(RtmpConnectionState::kPlaying);
}

2. Subscriber播放启动

void Subscriber::startPlay() {
    if (!isPlaying_) {
        isPlaying_ = true;
        startTime_ = Timestamp::now();
        
        // 重置时间戳映射器
        timeStampMapper_.reset();
        
        // 发送缓存数据
        if (auto mgr = manager_.lock()) {
            auto stream = mgr->getStream(streamName_);
            if (stream) {
                // 1. 发送序列头(必须)
                auto sequenceHeaders = stream->getSequenceHeaders();
                if (!sequenceHeaders.empty()) {
                    sendCachedData(sequenceHeaders);
                }
                
                // 2. 发送GOP缓存(可选,快速首屏)
                auto gopPackets = stream->gopCache()->getLatestKeyFramePackets();
                if (!gopPackets.empty()) {
                    sendCachedData(gopPackets);
                }
            }
        }
    }
}

3. 实时数据广播

void Stream::broadcastToSubscribers(std::shared_ptr<MediaPacket> packet) {
    std::vector<std::shared_ptr<Subscriber>> subscribersCopy;
    
    // 复制订阅者列表(避免在发送时修改)
    {
        MutexLockGuard lock(mutex_);
        subscribersCopy.reserve(subscribers_.size());
        
        for (const auto& pair : subscribers_) {
            if (pair.second && pair.second->isConnected()) {
                subscribersCopy.push_back(pair.second);
            }
        }
    }
    
    // 发送数据
    for (auto& subscriber : subscribersCopy) {
        subscriber->sendMediaData(packet);
    }
}

关键实现细节

1. 序列头管理

// Stream类独立管理序列头
class Stream {
private:
    std::shared_ptr<MediaPacket> avcSequenceHeader_;  // H.264配置
    std::shared_ptr<MediaPacket> aacSequenceHeader_;  // AAC配置
    std::shared_ptr<MediaPacket> metadataPacket_;     // 元数据
    
public:
    std::vector<std::shared_ptr<MediaPacket>> getSequenceHeaders() const {
        std::vector<std::shared_ptr<MediaPacket>> headers;
        MutexLockGuard lock(mutex_);
        
        // 按顺序添加
        if (metadataPacket_) headers.push_back(metadataPacket_);
        if (avcSequenceHeader_) headers.push_back(avcSequenceHeader_);
        if (aacSequenceHeader_) headers.push_back(aacSequenceHeader_);
        
        return headers;
    }
};

2. 播放响应消息格式

// StreamBegin用户控制消息
void RtmpConnection::sendStreamBegin(uint32_t streamId) {
    Buffer buffer;
    buffer.appendInt16(0x0000); // 事件类型:StreamBegin
    buffer.appendInt32(streamId);
    
    RtmpMessageHeader header;
    header.messageTypeId = static_cast<uint8_t>(RtmpMessageType::kUserControl);
    header.messageLength = static_cast<uint32_t>(buffer.readableBytes());
    
    RtmpMessage message(header, std::string(buffer.peek(), buffer.readableBytes()));
    sendMessage(message);
}

// onStatus消息
void RtmpConnection::sendPlayStatus(const std::string& code, const std::string& description) {
    Buffer buffer;
    
    // 命令名:onStatus
    AmfCodec::encodeAmfValue(&buffer, AmfValue("onStatus"));
    
    // 事务ID:0
    AmfCodec::encodeAmfValue(&buffer, AmfValue(0.0));
    
    // 命令对象:null
    AmfCodec::encodeAmfValue(&buffer, AmfValue());
    
    // 信息对象
    std::unordered_map<std::string, AmfValue> info;
    info["level"] = AmfValue("status");
    info["code"] = AmfValue(code);
    info["description"] = AmfValue(description);
    AmfCodec::encodeAmfObject(&buffer, info);
    
    RtmpMessageHeader header;
    header.messageTypeId = static_cast<uint8_t>(RtmpMessageType::kCommandAMF0);
    header.messageLength = static_cast<uint32_t>(buffer.readableBytes());
    header.messageStreamId = 1;
    
    RtmpMessage message(header, std::string(buffer.peek(), buffer.readableBytes()));
    sendMessage(message);
}

多路流处理

多路推流架构

多路拉流架构

核心实现

1. 流管理器

class StreamManager {
private:
    std::unordered_map<std::string, std::shared_ptr<Stream>> streams_;
    
public:
    // 创建或获取流
    std::shared_ptr<Stream> getOrCreateStream(const std::string& streamName) {
        MutexLockGuard lock(mutex_);
        
        auto it = streams_.find(streamName);
        if (it != streams_.end()) {
            return it->second;
        }
        
        // 创建新流
        auto stream = std::make_shared<Stream>(streamName);
        stream->initialize();
        streams_[streamName] = stream;
        
        return stream;
    }
    
    // 数据转发
    void onPublisherData(const std::string& streamName, 
                        std::shared_ptr<MediaPacket> packet) {
        auto stream = getStream(streamName);
        if (stream) {
            stream->onMediaData(packet);  // 广播给所有订阅者
        }
    }
};

2. 订阅者管理

class Stream {
private:
    std::unordered_map<std::string, std::shared_ptr<Subscriber>> subscribers_;
    
public:
    void addSubscriber(std::shared_ptr<Subscriber> subscriber) {
        MutexLockGuard lock(mutex_);
        subscribers_[subscriber->id()] = subscriber;
        
        LOG_INFO << "Added subscriber: " << subscriber->id() 
                 << " to stream: " << name_ 
                 << ", total subscribers: " << subscribers_.size();
    }
    
    void removeSubscriber(const std::string& subscriberId) {
        MutexLockGuard lock(mutex_);
        auto it = subscribers_.find(subscriberId);
        if (it != subscribers_.end()) {
            subscribers_.erase(it);
            LOG_INFO << "Removed subscriber: " << subscriberId 
                     << " from stream: " << name_
                     << ", remaining subscribers: " << subscribers_.size();
        }
    }
};

3. 并发数据广播

void Stream::broadcastToSubscribers(std::shared_ptr<MediaPacket> packet) {
    // 使用副本避免长时间持锁
    std::vector<std::shared_ptr<Subscriber>> subscribersCopy;
    
    {
        MutexLockGuard lock(mutex_);
        subscribersCopy.reserve(subscribers_.size());
        
        for (const auto& pair : subscribers_) {
            if (pair.second && pair.second->isConnected()) {
                subscribersCopy.push_back(pair.second);
            }
        }
    }
    
    // 并发发送数据
    for (auto& subscriber : subscribersCopy) {
        subscriber->sendMediaData(packet);
    }
}

性能优化

1. 连接池管理

class RtmpServer {
private:
    std::unordered_map<std::string, std::shared_ptr<RtmpConnection>> connections_;
    std::atomic<int> connectionCount_{0};
    
public:
    void onConnection(const TcpConnectionPtr& conn) {
        if (connectionCount_.load() >= config_.maxConnections) {
            LOG_WARN << "Max connections reached, rejecting new connection";
            conn->shutdown();
            return;
        }
        
        // 创建RTMP连接
        auto rtmpConn = std::make_shared<RtmpConnection>(conn, streamManager_);
        connections_[rtmpConn->id()] = rtmpConn;
        connectionCount_.fetch_add(1);
    }
    
    void onDisconnection(const TcpConnectionPtr& conn) {
        connectionCount_.fetch_sub(1);
        // 清理连接
    }
};

2. 内存优化

// 使用对象池减少内存分配
class MediaPacketPool {
private:
    std::queue<std::shared_ptr<MediaPacket>> pool_;
    std::mutex mutex_;
    
public:
    std::shared_ptr<MediaPacket> acquire() {
        std::lock_guard<std::mutex> lock(mutex_);
        if (!pool_.empty()) {
            auto packet = pool_.front();
            pool_.pop();
            return packet;
        }
        return std::make_shared<MediaPacket>();
    }
    
    void release(std::shared_ptr<MediaPacket> packet) {
        packet->reset();
        std::lock_guard<std::mutex> lock(mutex_);
        pool_.push(packet);
    }
};

GOP缓存机制

GOP缓存架构

GOP缓存实现

1. 核心数据结构

class GopCache {
private:
    std::shared_ptr<Gop> currentGop_;              // 当前GOP
    size_t maxCacheSize_;                          // 最大缓存大小
    size_t currentCacheSize_;                      // 当前缓存大小
    
    // 音视频对齐缓冲区
    std::deque<std::shared_ptr<MediaPacket>> audioBuffer_;
    uint32_t lastKeyFrameTimestamp_ = 0;
    static const uint32_t AUDIO_ALIGN_WINDOW = 200;  // 200ms对齐窗口
    
public:
    void addPacket(std::shared_ptr<MediaPacket> packet);
    std::vector<std::shared_ptr<MediaPacket>> getCachedPackets() const;
    std::vector<std::shared_ptr<MediaPacket>> getLatestKeyFramePackets() const;
};

2. GOP检测和切换

bool GopCache::needNewGop(std::shared_ptr<MediaPacket> packet) const {
    // 没有当前GOP
    if (!currentGop_) {
        return true;
    }
    
    // 检测到新的关键帧
    if (packet->type() == MediaPacketType::kVideo && packet->isKeyFrame()) {
        return true;
    }
    
    return false;
}

void GopCache::addPacket(std::shared_ptr<MediaPacket> packet) {
    MutexLockGuard lock(mutex_);
    
    // 音频帧:维护音频缓冲区
    if (packet->type() == MediaPacketType::kAudio) {
        maintainAudioBuffer(packet);
    }
    
    // 检查是否需要新GOP
    bool isNewGop = needNewGop(packet);
    
    if (isNewGop) {
        startNewGop();
        
        // 关键帧:进行音视频对齐
        if (packet->type() == MediaPacketType::kVideo && packet->isKeyFrame()) {
            lastKeyFrameTimestamp_ = packet->timestamp();
            alignAudioWithKeyFrame(lastKeyFrameTimestamp_);
        }
    }
    
    // 添加到当前GOP
    if (!currentGop_) {
        startNewGop();
    }
    
    currentGop_->addPacket(packet);
    updateCacheSize();
}

3. 音视频对齐

void GopCache::alignAudioWithKeyFrame(uint32_t keyFrameTimestamp) {
    if (audioBuffer_.empty() || !currentGop_) {
        return;
    }
    
    std::vector<std::shared_ptr<MediaPacket>> alignedAudio;
    
    // 查找关键帧前后的音频帧
    for (const auto& audioPacket : audioBuffer_) {
        uint32_t audioTimestamp = audioPacket->timestamp();
        int32_t timeDiff = static_cast<int32_t>(audioTimestamp) - 
                          static_cast<int32_t>(keyFrameTimestamp);
        
        // 收集关键帧前200ms到后50ms的音频帧
        if (timeDiff >= -static_cast<int32_t>(AUDIO_ALIGN_WINDOW) && 
            timeDiff <= 50) {
            alignedAudio.push_back(audioPacket);
        }
    }
    
    // 按时间戳排序
    std::sort(alignedAudio.begin(), alignedAudio.end(), 
              [](const std::shared_ptr<MediaPacket>& a, 
                 const std::shared_ptr<MediaPacket>& b) {
                  return a->timestamp() < b->timestamp();
              });
    
    // 添加到GOP开头
    for (const auto& audioPacket : alignedAudio) {
        currentGop_->addPacket(audioPacket);
    }
    
    LOG_INFO << "Aligned " << alignedAudio.size() 
             << " audio frames with key frame at " << keyFrameTimestamp;
}

4. 快速首屏优化

std::vector<std::shared_ptr<MediaPacket>> 
GopCache::getLatestKeyFramePackets() const {
    MutexLockGuard lock(mutex_);
    
    std::vector<std::shared_ptr<MediaPacket>> packets;
    
    if (currentGop_ && !currentGop_->empty()) {
        const auto& gopPackets = currentGop_->packets();
        
        // 找到关键帧位置
        size_t keyFrameIndex = 0;
        bool foundKeyFrame = false;
        
        for (size_t i = 0; i < gopPackets.size(); ++i) {
            if (gopPackets[i]->type() == MediaPacketType::kVideo && 
                gopPackets[i]->isKeyFrame()) {
                keyFrameIndex = i;
                foundKeyFrame = true;
                break;
            }
        }
        
        if (foundKeyFrame) {
            // 从关键帧开始,最多取10个包(约1秒内容)
            size_t endIndex = std::min(keyFrameIndex + 10, gopPackets.size());
            
            for (size_t i = keyFrameIndex; i < endIndex; ++i) {
                packets.push_back(gopPackets[i]);
            }
        }
    }
    
    return packets;
}

GOP缓存统计

struct GopInfo {
    uint32_t audioDuration = 0;        // 音频时长
    uint32_t videoDuration = 0;        // 视频时长
    size_t audioPackets = 0;           // 音频包数
    size_t videoPackets = 0;           // 视频包数
    size_t totalSize = 0;              // 总大小
    uint32_t firstTimestamp = 0;       // 第一个时间戳
    uint32_t lastTimestamp = 0;        // 最后一个时间戳
    bool hasKeyFrame = false;          // 是否有关键帧
    uint32_t audioSampleRate = 44100;  // 音频采样率
    double audioFrameDuration = 0.0;   // 音频帧时长
};

GopInfo Gop::getGopInfo() const {
    GopInfo info;
    
    if (packets_.empty()) {
        return info;
    }
    
    // 统计音视频帧数和时长
    uint32_t minTimestamp = packets_[0]->timestamp();
    uint32_t maxTimestamp = packets_[0]->timestamp();
    
    for (const auto& packet : packets_) {
        uint32_t timestamp = packet->timestamp();
        
        minTimestamp = std::min(minTimestamp, timestamp);
        maxTimestamp = std::max(maxTimestamp, timestamp);
        info.totalSize += packet->data().size();
        
        if (packet->type() == MediaPacketType::kAudio) {
            info.audioPackets++;
        } else if (packet->type() == MediaPacketType::kVideo) {
            info.videoPackets++;
            if (packet->isKeyFrame()) {
                info.hasKeyFrame = true;
            }
        }
    }
    
    info.firstTimestamp = minTimestamp;
    info.lastTimestamp = maxTimestamp;
    
    // 计算音频帧时长(基于44.1kHz采样率,1024采样点/帧)
    info.audioFrameDuration = 1024.0 / 44100.0 * 1000.0;  // 23.22ms
    info.audioDuration = static_cast<uint32_t>(info.audioPackets * info.audioFrameDuration);
    
    return info;
}

时间戳同步

时间戳映射器

时间戳映射实现

1. 时间戳映射器结构

struct TimestampMapper {
    // 统一基准时间戳
    uint32_t baseTimestamp = 0;
    bool initialized = false;
    
    // 音频同步
    uint32_t audioBaseTimestamp = 0;
    uint32_t audioFrameNum = 0;
    uint32_t audioFrameInterval = 0;        // 动态计算
    bool audioInitialized = false;
    
    // 视频同步
    uint32_t videoBaseTimestamp = 0;
    uint32_t lastVideoTimestamp = 0;
    bool videoInitialized = false;
    
    // 同步阈值
    static const uint32_t SYNC_THRESHOLD = 300;  // 300ms
    
    uint32_t mapTimestamp(uint32_t originalTimestamp, MediaPacketType type);
    void reset();
};

2. 音频时间戳映射

uint32_t TimestampMapper::mapAudioTimestamp(uint32_t originalTimestamp) {
    // 初始化统一基准
    initializeBase(originalTimestamp);
    
    if (originalTimestamp < baseTimestamp) {
        return lastAudioTimestamp;  // 防止时间戳倒退
    }
    
    uint32_t mappedTimestamp = originalTimestamp - baseTimestamp;
    
    // 动态计算音频帧间隔
    calculateAudioFrameInterval(originalTimestamp);
    
    // 初始化音频基准
    if (!audioInitialized) {
        audioBaseTimestamp = mappedTimestamp;
        audioFrameNum = 0;
        audioInitialized = true;
        lastAudioTimestamp = mappedTimestamp;
        return mappedTimestamp;
    }
    
    // 如果还没有稳定的帧间隔,直接返回
    if (!audioFrameIntervalCalculated) {
        lastAudioTimestamp = mappedTimestamp;
        return mappedTimestamp;
    }
    
    // 计算期望的音频时间戳
    uint32_t expectedTimestamp = audioBaseTimestamp + audioFrameNum * audioFrameInterval;
    
    // 检查时间戳差异
    int32_t timeDiff = static_cast<int32_t>(mappedTimestamp) - 
                      static_cast<int32_t>(expectedTimestamp);
    
    // 如果在同步阈值内,使用期望时间戳
    if (abs(timeDiff) <= SYNC_THRESHOLD) {
        lastAudioTimestamp = expectedTimestamp;
        audioFrameNum++;
        return expectedTimestamp;
    } else {
        // 超出阈值,重新校准
        audioBaseTimestamp = mappedTimestamp;
        audioFrameNum = 0;
        lastAudioTimestamp = mappedTimestamp;
        return mappedTimestamp;
    }
}

3. 视频时间戳映射

uint32_t TimestampMapper::mapVideoTimestamp(uint32_t originalTimestamp) {
    // 初始化统一基准
    initializeBase(originalTimestamp);
    
    if (originalTimestamp < baseTimestamp) {
        return lastVideoTimestamp;  // 防止时间戳倒退
    }
    
    uint32_t mappedTimestamp = originalTimestamp - baseTimestamp;
    
    // 初始化视频基准
    if (!videoInitialized) {
        videoBaseTimestamp = mappedTimestamp;
        videoInitialized = true;
        lastVideoTimestamp = mappedTimestamp;
        return mappedTimestamp;
    }
    
    // 检查时间戳单调性
    if (mappedTimestamp < lastVideoTimestamp) {
        // 时间戳倒退,使用上一个时间戳
        return lastVideoTimestamp;
    }
    
    // 检查时间戳跳跃
    uint32_t timeDiff = mappedTimestamp - lastVideoTimestamp;
    if (timeDiff > 2000) {  // 超过2秒的跳跃
        LOG_WARN << "Video timestamp jump detected: " << timeDiff << "ms";
        // 可以选择重新校准或者限制跳跃
        mappedTimestamp = lastVideoTimestamp + 33;  // 假设30fps
    }
    
    lastVideoTimestamp = mappedTimestamp;
    return mappedTimestamp;
}

4. 音频帧间隔计算

void TimestampMapper::calculateAudioFrameInterval(uint32_t currentTimestamp) {
    if (!audioFrameIntervalCalculated) {
        if (lastRawAudioTimestamp != 0) {
            uint32_t interval = currentTimestamp - lastRawAudioTimestamp;
            
            // 过滤异常值(正常音频帧间隔应该在10-50ms之间)
            if (interval >= 10 && interval <= 50) {
                audioFrameIntervalSum += interval;
                audioFrameIntervalCount++;
                
                // 收集足够样本后计算平均值
                if (audioFrameIntervalCount >= FRAME_INTERVAL_SAMPLES) {
                    audioFrameInterval = audioFrameIntervalSum / audioFrameIntervalCount;
                    audioFrameIntervalCalculated = true;
                    
                    LOG_INFO << "Audio frame interval calculated: " << audioFrameInterval << "ms";
                }
            }
        }
        lastRawAudioTimestamp = currentTimestamp;
    }
}

同步检查

bool TimestampMapper::checkSync() const {
    if (!initialized || !audioInitialized || !videoInitialized) {
        return true;  // 未初始化完成,暂不检查
    }
    
    // 计算音视频时间戳差异
    int32_t avDiff = static_cast<int32_t>(lastAudioTimestamp) - 
                     static_cast<int32_t>(lastVideoTimestamp);
    
    bool inSync = abs(avDiff) <= SYNC_THRESHOLD;
    
    if (!inSync) {
        LOG_WARN << "Audio-Video sync issue detected: " << avDiff << "ms";
    }
    
    return inSync;
}

关键技术细节

1. RTMP握手实现

class RtmpProtocolParser {
public:
    // 生成C0+C1握手数据
    std::string generateHandshakeC0C1() {
        std::string c0c1;
        c0c1.resize(1 + kRtmpHandshakeSize);
        
        // C0: 版本号
        c0c1[0] = 0x03;
        
        // C1: 握手数据
        char* c1 = &c0c1[1];
        
        // 时间戳 (4字节)
        uint32_t timestamp = static_cast<uint32_t>(time(nullptr));
        c1[0] = (timestamp >> 24) & 0xFF;
        c1[1] = (timestamp >> 16) & 0xFF;
        c1[2] = (timestamp >> 8) & 0xFF;
        c1[3] = timestamp & 0xFF;
        
        // 版本 (4字节)
        c1[4] = c1[5] = c1[6] = c1[7] = 0x00;
        
        // 随机数据 (1528字节)
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> dis(0, 255);
        
        for (int i = 8; i < kRtmpHandshakeSize; ++i) {
            c1[i] = static_cast<char>(dis(gen));
        }
        
        return c0c1;
    }
    
    // 生成C2握手数据
    std::string generateHandshakeC2(const std::string& s1) {
        std::string c2;
        c2.resize(kRtmpHandshakeSize);
        
        // C2是S1的回显
        std::memcpy(&c2[0], s1.data(), kRtmpHandshakeSize);
        
        return c2;
    }
};

2. Chunk解析

bool RtmpProtocolParser::parseChunkHeader(Buffer* buffer, RtmpChunkHeader* header) {
    if (buffer->readableBytes() < 1) {
        return false;
    }
    
    // 解析Basic Header
    uint8_t basicHeader = static_cast<uint8_t>(buffer->peekInt8());
    header->format = static_cast<RtmpChunkFormat>((basicHeader >> 6) & 0x03);
    uint32_t chunkStreamId = basicHeader & 0x3F;
    
    // 处理扩展Chunk Stream ID
    size_t basicHeaderSize = 1;
    if (chunkStreamId == 0) {
        if (buffer->readableBytes() < 2) return false;
        chunkStreamId = static_cast<uint8_t>(buffer->peek()[1]) + 64;
        basicHeaderSize = 2;
    } else if (chunkStreamId == 1) {
        if (buffer->readableBytes() < 3) return false;
        chunkStreamId = static_cast<uint8_t>(buffer->peek()[1]) * 256 + 
                       static_cast<uint8_t>(buffer->peek()[2]) + 64;
        basicHeaderSize = 3;
    }
    
    header->chunkStreamId = chunkStreamId;
    
    // 计算Message Header大小
    size_t messageHeaderSize = 0;
    switch (header->format) {
        case RtmpChunkFormat::kType0: messageHeaderSize = 11; break;
        case RtmpChunkFormat::kType1: messageHeaderSize = 7; break;
        case RtmpChunkFormat::kType2: messageHeaderSize = 3; break;
        case RtmpChunkFormat::kType3: messageHeaderSize = 0; break;
    }
    
    // 检查数据是否足够
    if (buffer->readableBytes() < basicHeaderSize + messageHeaderSize) {
        return false;
    }
    
    // 跳过Basic Header
    buffer->retrieve(basicHeaderSize);
    
    // 解析Message Header
    if (messageHeaderSize > 0) {
        parseMessageHeader(buffer, header, messageHeaderSize);
    }
    
    return true;
}

3. AMF编解码实现

// AMF Number编码
void AmfCodec::encodeNumber(Buffer* buffer, double number) {
    char data[8];
    uint64_t bits;
    std::memcpy(&bits, &number, 8);
    
    // 大端序写入
    for (int i = 0; i < 8; ++i) {
        data[i] = static_cast<char>((bits >> (56 - i * 8)) & 0xFF);
    }
    
    buffer->append(data, 8);
}

// AMF String编码
void AmfCodec::encodeString(Buffer* buffer, const std::string& str) {
    // 字符串长度(2字节大端序)
    buffer->appendInt16(static_cast<int16_t>(str.size()));
    
    // 字符串内容
    buffer->append(str);
}

// AMF Object编码
void AmfCodec::encodeAmfObject(Buffer* buffer, 
                              const std::unordered_map<std::string, AmfValue>& object) {
    // 对象类型标识
    buffer->appendInt8(static_cast<int8_t>(AmfType::kObject));
    
    // 编码键值对
    for (const auto& pair : object) {
        // 键(不包含类型标识)
        buffer->appendInt16(static_cast<int16_t>(pair.first.size()));
        buffer->append(pair.first);
        
        // 值(包含类型标识)
        encodeAmfValue(buffer, pair.second);
    }
    
    // 对象结束标记
    buffer->appendInt16(0);  // 空键
    buffer->appendInt8(static_cast<int8_t>(AmfType::kObjectEnd));
}

4. 消息序列化

void RtmpProtocolParser::serializeMessage(const RtmpMessage& message, Buffer* buffer) {
    const auto& header = message.header();
    const auto& payload = message.payload();
    
    // 选择Chunk Stream ID
    uint32_t chunkStreamId = selectChunkStreamId(header.messageTypeId);
    
    // 计算需要的Chunk数量
    uint32_t chunkCount = (payload.size() + chunkSize_ - 1) / chunkSize_;
    
    for (uint32_t i = 0; i < chunkCount; ++i) {
        // 计算当前Chunk的payload大小
        uint32_t chunkPayloadSize = std::min(chunkSize_, 
                                           static_cast<uint32_t>(payload.size() - i * chunkSize_));
        
        // 写入Chunk Header
        if (i == 0) {
            // 第一个Chunk使用Type 0
            writeChunkHeader(buffer, RtmpChunkFormat::kType0, chunkStreamId, header);
        } else {
            // 后续Chunk使用Type 3
            writeChunkHeader(buffer, RtmpChunkFormat::kType3, chunkStreamId, header);
        }
        
        // 写入Chunk Payload
        buffer->append(&payload[i * chunkSize_], chunkPayloadSize);
    }
}

5. 媒体数据检测

class MediaPacket {
public:
    // 检测视频帧类型
    VideoFrameType videoFrameType() const {
        if (type_ != MediaPacketType::kVideo || data_.empty()) {
            return VideoFrameType::kInterFrame;
        }
        
        // FLV视频标签:Frame Type (4 bits) + Codec ID (4 bits)
        uint8_t firstByte = static_cast<uint8_t>(data_[0]);
        uint8_t frameType = (firstByte >> 4) & 0x0F;
        
        return static_cast<VideoFrameType>(frameType);
    }
    
    // 检测视频编码格式
    VideoCodec videoCodec() const {
        if (type_ != MediaPacketType::kVideo || data_.empty()) {
            return VideoCodec::kAVC;
        }
        
        uint8_t firstByte = static_cast<uint8_t>(data_[0]);
        uint8_t codecId = firstByte & 0x0F;
        
        return static_cast<VideoCodec>(codecId);
    }
    
    // 检测音频格式
    AudioFormat audioFormat() const {
        if (type_ != MediaPacketType::kAudio || data_.empty()) {
            return AudioFormat::kAAC;
        }
        
        uint8_t firstByte = static_cast<uint8_t>(data_[0]);
        uint8_t format = (firstByte >> 4) & 0x0F;
        
        return static_cast<AudioFormat>(format);
    }
    
    // 检测AAC序列头
    bool isAacSequenceHeader() const {
        if (type_ != MediaPacketType::kAudio || data_.size() < 2) {
            return false;
        }
        
        // AAC序列头:音频格式=AAC && AAC包类型=0
        uint8_t firstByte = static_cast<uint8_t>(data_[0]);
        uint8_t secondByte = static_cast<uint8_t>(data_[1]);
        
        uint8_t format = (firstByte >> 4) & 0x0F;
        uint8_t aacPacketType = secondByte;
        
        return (format == static_cast<uint8_t>(AudioFormat::kAAC)) && 
               (aacPacketType == 0);
    }
};

监控和日志

1. 日志级别

// 设置日志级别
Logger::setLogLevel(Logger::LogLevel::DEBUG);

// 关键日志输出
LOG_INFO << "RTMP Server started on " << config_.listenAddress << ":" << config_.listenPort;
LOG_DEBUG << "Received media packet: " << packet->data().size() << " bytes";
LOG_WARN << "Connection timeout: " << connectionId;
LOG_ERROR << "Failed to parse RTMP message";

2. 性能监控

struct ServerStatistics {
    std::atomic<uint64_t> totalConnections{0};
    std::atomic<uint64_t> currentConnections{0};
    std::atomic<uint64_t> totalStreams{0};
    std::atomic<uint64_t> currentStreams{0};
    std::atomic<uint64_t> totalBytesReceived{0};
    std::atomic<uint64_t> totalBytesSent{0};
    std::atomic<uint64_t> totalPacketsReceived{0};
    std::atomic<uint64_t> totalPacketsSent{0};
};

// 定期输出统计信息
void printStatistics() {
    auto stats = server->getStatistics();
    LOG_INFO << "=== RTMP Server Statistics ===";
    LOG_INFO << "Connections: " << stats.currentConnections.load() 
             << "/" << stats.totalConnections.load();
    LOG_INFO << "Streams: " << stats.currentStreams.load() 
             << "/" << stats.totalStreams.load();
    LOG_INFO << "Bytes: RX=" << stats.totalBytesReceived.load() 
             << " TX=" << stats.totalBytesSent.load();
    LOG_INFO << "Packets: RX=" << stats.totalPacketsReceived.load() 
             << " TX=" << stats.totalPacketsSent.load();
}

常见问题解决

1. 连接问题

问题: 客户端连接超时

ERROR: Connection timeout: conn_12345

解决方案:

  • 检查防火墙设置
  • 确认端口1935未被占用
  • 检查网络连通性
# 检查端口监听
netstat -tlnp | grep 1935

# 测试连接
telnet localhost 1935

2. 握手失败

问题: RTMP握手失败

ERROR: Invalid handshake data from: conn_12345

解决方案:

  • 检查客户端RTMP版本(必须为3)
  • 验证握手数据完整性
  • 检查网络传输是否稳定
// 调试握手过程
LOG_DEBUG << "Handshake state: " << static_cast<int>(parser_.state());
LOG_DEBUG << "Handshake data size: " << buffer->readableBytes();

3. 音视频同步问题

问题: 音视频不同步

WARN: Audio-Video sync issue detected: 500ms

解决方案:

  • 检查时间戳映射器配置
  • 调整同步阈值
  • 检查GOP缓存对齐
// 调整同步阈值
static const uint32_t SYNC_THRESHOLD = 500;  // 增加到500ms

// 检查音视频时间戳
LOG_DEBUG << "Audio timestamp: " << audioTimestamp 
          << " Video timestamp: " << videoTimestamp
          << " Diff: " << (audioTimestamp - videoTimestamp);

4. 调试技巧

1. 抓包分析

# 使用tcpdump抓包
sudo tcpdump -i lo -w rtmp.pcap port 1935

# 使用wireshark分析
wireshark rtmp.pcap

2. 性能分析

# 使用top监控CPU和内存
top -p $(pgrep rtmp_server)

# 使用perf分析性能热点
perf record -g ./bin/rtmp_server
perf report

总结

本RTMP推拉流项目实现了一个完整的流媒体服务器,主要特点包括:

  1. 完整的RTMP协议支持: 握手、命令处理、媒体数据传输
  2. 高性能网络处理: 基于muduo库的异步网络框架
  3. 音视频同步机制: 时间戳映射器确保播放连续性
  4. GOP缓存优化: 实现快速首屏和低延迟播放
  5. 多路流支持: 支持多个推流和拉流并发
  6. 完善的错误处理: 异常检测和恢复机制

通过深入理解RTMP协议和流媒体技术,可以进一步扩展功能,如:

  • 支持更多编码格式(H.265、Opus等)
  • 增加HLS/DASH输出
  • 实现流录制功能
  • 添加用户认证和权限控制
  • 集成CDN分发
#我的成功项目解析##简历中的项目经历要怎么写##c++##项目##牛客创作赏金赛#
全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务