首页 » SEO优化 » php流媒体开辟实战技巧_Golang流媒体实战之七hls拉流做事源码阅读

php流媒体开辟实战技巧_Golang流媒体实战之七hls拉流做事源码阅读

访客 2024-11-11 0

扫一扫用手机浏览

文章目录 [+]

《Golang流媒体实战》系列的链接体验开源项目lal回源转推和录制lalserver的启动源码阅读Golang流媒体实战之五:lal推流做事源码阅读Golang流媒体实战之六:lal拉流做事源码阅读Golang流媒体实战之七:hls拉流做事源码阅读](https://xinchen.blog.csdn.net/article/details/130165581)推流,初始阶段首先看推流处理,关于rtmp推流的源码,实在已在 《Golang流媒体实战之五:lal推流做事源码阅读》有详细剖析,以是这里就不从头提及了,只挑出hls有关代码来看处理推流时,publish命令由server_session.go#doPublish方法卖力处理,调用栈如下

server_session.go#doCommandMessage->doPublish->server.go#OnNewRtmpPubSession->server_manager__.go#OnNewRtmpPubSession->group__in.go#AddRtmpPubSession->addIn把稳这个addIn方法中有下面这么一段代码

group.rtmp2MpegtsRemuxer = remux.NewRtmp2MpegtsRemuxer(group)也便是说,推流阶段,该流对应的group工具,其成员变量rtmp2MpegtsRemuxer是有值的,看名字,这个rtmp2MpegtsRemuxer变量卖力的是将rtmp协议内的数据转为mpeg格式的韶光分片文件记住这个group.rtmp2MpegtsRemuxer,稍后立时就会用到推流,处理媒体数据阶段在《Golang流媒体实战之五:lal推流做事源码阅读》一文中咱们已经看过,lal收到媒体数据后,详细的处理逻辑是group__core_streaming.go#broadcastByRtmpMsg方法,里面有这么一段

// # mpegts remuxerif group.rtmp2MpegtsRemuxer != nil {group.rtmp2MpegtsRemuxer.FeedRtmpMessage(msg)}展开上述FeedRtmpMessage方法的堆栈有点深,这里简化一下

rtmp2mpegts.go#FeedRtmpMessage->rtmp2mpegts_filter_.go#Push->rtmp2mpegts.go#onPop->feedVideo (这段代码比较繁芜,值得细看)->onFrame->muxer.go#OnTsPackets->FeedMpegts->fragment.go#WriteFile上面这繁芜的调用栈,重点是rtmp2mpegts_filter_.go的逻辑,先从入口Push方法看起,此方法的功能是从中取得音频和视频的codecID,用于确定ts文件所需的pat表和pmt表的内容

func (q rtmp2MpegtsFilter) Push(msg base.RtmpMsg) {// q.done是个标志,一旦即是true,今后收到的都直接给不雅观察者,// 但是即是true之前,收到的都放在切片中缓存起来,// 如果从中成功取得音频和视频的codecID,就在drain方法中把标准设置为trueif q.done {q.observer.onPop(msg)return}// 将数据缓存到q.dataq.data = append(q.data, msg.Clone())// 如果是音频或者视频,就可以得到对应的codecIDswitch msg.Header.MsgTypeId {case base.RtmpTypeIdAudio:q.audioCodecId = int(msg.Payload[0] >> 4)case base.RtmpTypeIdVideo:q.videoCodecId = int(msg.Payload[0] & 0xF)}// 一旦音频和视频的codecID都搜集到了,就实行drain,if q.videoCodecId != -1 && q.audioCodecId != -1 {q.drain()return}// 缓存存不下的时候也会实行drainif len(q.data) >= q.maxMsgSize {q.drain()return}}func (q rtmp2MpegtsFilter) drain() {// 根据当前视频的codecId,确定ts文件的PAT,PMT格式switch q.videoCodecId {case int(base.RtmpCodecIdAvc):q.observer.onPatPmt(mpegts.FixedFragmentHeader)case int(base.RtmpCodecIdHevc):q.observer.onPatPmt(mpegts.FixedFragmentHeaderHevc)default:// TODO(chef) 精确处理只有音频或只有视频的情形 #56q.observer.onPatPmt(mpegts.FixedFragmentHeader)}// 将缓存的所有输出给不雅观察者for i := range q.data {q.observer.onPop(q.data[i])}q.data = nilq.done = true}从上述代码可见,随着根据CodecId的不同,pat、pmt包也有差别,详细定义在mpegts.go中,

php流媒体开辟实战技巧_Golang流媒体实战之七hls拉流做事源码阅读

上面的onPatPmt方法,对应的是lal/pkg/logic/group__core_streaming.go#OnPatPmt,展开看看,紧张是group.hlsMuxer.FeedPatPmt方法被实行了,也便是PAT和PMT被存入group.hlsMuxer工具,至于后面的group.recordMpegts.Write,那个和录制有关,这里暂不关注

func (group Group) OnPatPmt(b []byte) {group.patpmt = bif group.hlsMuxer != nil {group.hlsMuxer.FeedPatPmt(b)}if group.recordMpegts != nil {if err := group.recordMpegts.Write(b); err != nil {Log.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err)}}}回到主线,一旦PAT和PMT确定后,rtmp2MpegtsFilter的浸染就非常纯挚了:每当新到来,只调用不雅观察者的onPop方法

func (s Rtmp2MpegtsRemuxer) onPop(msg base.RtmpMsg) {switch msg.Header.MsgTypeId {case base.RtmpTypeIdAudio:s.feedAudio(msg)case base.RtmpTypeIdVideo:s.feedVideo(msg)}}上述代码中的feedVideo方法,代码太长就不贴出了,紧张功能是:先做合法性检讨,再从一个中取出多个nalu逐个处理,紧张是在关键帧前面放入SPS(Sequence Parameter Set)、PPS(Picture Parameter Sets),待这些都准备好之后就能组装好frame工具,然后调用rtmp2mpegts.go#onFrameonFrame的浸染:先调用frame.Pack方法做格式转换,得到ts格式的数据,再调用不雅观察者的OnTsPackets方法

func (s Rtmp2MpegtsRemuxer) onFrame(frame mpegts.Frame) {s.adjustDtsPts(frame)//Log.Debugf("Rtmp2MpegtsRemuxer::onFrame, frame=%s", frame.DebugString())var boundary boolif frame.Sid == mpegts.StreamIdAudio {// 为了考虑没有视频的情形也能切片,以是这里判断spspps为空时,也建议天生fragmentboundary = !s.videoSeqHeaderCached()} else {// 收到视频,可能触发建立fragment的条件是:// 关键帧数据 &&// (// (没有收到过音频seq header) || 解释 只有视频// (收到过音频seq header && fragment没有打开) || 解释 音视频都有,且都已ready// (收到过音频seq header && fragment已经打开 && 音频缓存数据不为空) 解释 为什么音频缓存需不为空?// )boundary = frame.Key && (!s.audioSeqHeaderCached() || !s.opened || !s.audioCacheEmpty())}if boundary {s.opened = true}packets := frame.Pack()s.observer.OnTsPackets(packets, frame, boundary)}更新切片文件,将音视频数据写入切片文件接下来进入本篇的核心代码:天生新切片文件,关闭旧切片文件,将音视频数据写入新切片文件OnTsPackets对应的是muxer.go#FeedMpegts:先用updateFragment方法实行关闭旧切片开启新切片的操作,再调用WriteFile把数据写入当前切片

func (m Muxer) FeedMpegts(tsPackets []byte, frame mpegts.Frame, boundary bool) {//Log.Debugf("> FeedMpegts. boundary=%v, frame=%p, sid=%d", boundary, frame, frame.Sid)if frame.Sid == mpegts.StreamIdAudio {// TODO(chef): 为什么音频用pts,视频用dtsif err := m.updateFragment(frame.Pts, boundary, frame); err != nil {Log.Errorf("[%s] update fragment error. err=%+v", m.UniqueKey, err)return}if !m.opened {Log.Warnf("[%s] FeedMpegts A not opened. boundary=%t", m.UniqueKey, boundary)return}//Log.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))} else {if err := m.updateFragment(frame.Dts, boundary, frame); err != nil {Log.Errorf("[%s] update fragment error. err=%+v", m.UniqueKey, err)return}if !m.opened {// 走到这,可能是第一个包并且boundary为falseLog.Warnf("[%s] FeedMpegts V not opened. boundary=%t, key=%t", m.UniqueKey, boundary, frame.Key)return}//Log.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.Dts, len(frame.Raw))}if err := m.fragment.WriteFile(tsPackets); err != nil {Log.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err)return}}展开updateFragment去探寻核心代码,如下可见,一旦剖断有必要启用新文件,就先调用closeFragment将当前TS文件关闭掉,再调用openFragment新建一个TS文件,剖断条件有两个:当前TS文件存储内容是否超过一定长度,以及是否到达边界(boundary入参,如果是新的关键帧,此标志可能为true)

func (m Muxer) updateFragment(ts uint64, boundary bool, frame mpegts.Frame) error {discont := true// 如果已经有TS切片,检讨是否须要逼迫开启新的切片,以及切片是否发生跳跃// 把稳,音频和视频是在一起检讨的if m.opened {f := m.getCurrFrag()// 以下情形,逼迫开启新的分片:// 1. 当前韶光戳 - 当前分片的初始韶光戳 > 配置中单个ts分片时长的10倍// 缘故原由可能是:// 1. 当前包的韶光戳发生了大的跳跃// 2. 一贯没有I帧导致没有得当的韶光重新切片,堆积的包达到阈值// 2. 往回跳跃超过了阈值//maxfraglen := uint64(m.config.FragmentDurationMs 90 10)if (ts > m.fragTs && ts-m.fragTs > maxfraglen) || (m.fragTs > ts && m.fragTs-ts > negMaxfraglen) {Log.Warnf("[%s] force fragment split. fragTs=%d, ts=%d, frame=%s", m.UniqueKey, m.fragTs, ts, frame.DebugString())if err := m.closeFragment(false); err != nil {return err}if err := m.openFragment(ts, true); err != nil {return err}}// 更新当前分片的韶光长度//// TODO chef:// f.duration(也即写入m3u8中记录分片韶光长度)的做法我以为有问题// 此处用最新收到的数据更新f.duration// 但是假设fragment翻滚,数据可能是写入下一个分片中// 是否就导致了f.duration和实际分片韶光长度不一致if ts > m.fragTs {duration := float64(ts-m.fragTs) / 90000if duration > f.duration {f.duration = duration}}discont = false// 已经有TS切片,切片时长没有达到设置的阈值,则不开启新的切片if f.duration < float64(m.config.FragmentDurationMs)/1000 {return nil}}// 开启新的fragment// 此时的情形是,上层认为是得当的开启分片的机遇(比如是I帧),并且// 1. 当前是第一个分片// 2. 当前不是第一个分片,但是上一个分片已经达到配置时长if boundary {if err := m.closeFragment(false); err != nil {return err}if err := m.openFragment(ts, discont); err != nil {return err}}return nil}在closeFragment的代码中,还有个主要操作:调用writePlaylist方法天生m3u8文件

func (m Muxer) writePlaylist(isLast bool) {// 找出时长最长的fragmentmaxFrag := float64(m.config.FragmentDurationMs) / 1000m.iterateFragsInPlaylist(func(frag fragmentInfo) {if frag.duration > maxFrag {maxFrag = frag.duration + 0.5}})// TODO chef 优化这块buffer的布局var buf bytes.Bufferbuf.WriteString("#EXTM3U\n")buf.WriteString("#EXT-X-VERSION:3\n")buf.WriteString("#EXT-X-ALLOW-CACHE:NO\n")buf.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(maxFrag)))buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n\n", m.extXMediaSeq()))m.iterateFragsInPlaylist(func(frag fragmentInfo) {if frag.discont {buf.WriteString("#EXT-X-DISCONTINUITY\n")}buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s\n", frag.duration, frag.filename))})if isLast {buf.WriteString("#EXT-X-ENDLIST\n")}if err := writeM3u8File(buf.Bytes(), m.playlistFilename, m.playlistFilenameBak); err != nil {Log.Errorf("[%s] write live m3u8 file error. err=%+v", m.UniqueKey, err)}}还有个比较主要的地方,便是openFragment方法,里面是打开一个新的TS文件的操作:天生TS文件名,将准备好的PAT和PMT信息写入文件,调用不雅观察者的回调接口

func (m Muxer) openFragment(ts uint64, discont bool) error {if m.opened {return nazaerrors.Wrap(base.ErrHls)}id := m.getFragmentId()filename := PathStrategy.GetTsFileName(m.streamName, id, int(Clock.Now().UnixNano()/1e6))filenameWithPath := PathStrategy.GetTsFileNameWithPath(m.outPath, filename)if err := m.fragment.OpenFile(filenameWithPath); err != nil {return err}if err := m.fragment.WriteFile(m.patpmt); err != nil {return err}m.opened = truefrag := m.getCurrFrag()frag.discont = discontfrag.id = idfrag.filename = filenamefrag.duration = 0m.fragTs = ts// nrm said: start fragment with audio to make iPhone happym.observer.OnFragmentOpen()m.observer.OnHlsMakeTs(base.HlsMakeTsInfo{Event: "open",StreamName: m.streamName,Cwd: base.GetWd(),TsFile: filenameWithPath,LiveM3u8File: m.playlistFilename,RecordM3u8File: m.recordPlayListFilename,Id: id,Duration: frag.duration,})return nil}TS文件名的天生逻辑很大略,用流名+韶光戳+TS序号拼接

func (DefaultPathStrategy) GetTsFileName(streamName string, index int, timestamp int) string {return fmt.Sprintf("%s-%d-%d.ts", streamName, timestamp, index)}至此,天生逻辑的代码算是看过了,接下来要看播放逻辑拉流播放要相应客户真个拉流要求,首先要准备好server做事,咱们就从server初始化看起hls的server工具,是main方法中创建的,调用栈如下:

main()->logic.go#NewLalServer->server_manager__.go#NewServerManager->hls/server_handler.go#NewServerHandler上述代码创建了hlsServerHandler工具,存入sm.hlsServerHandler,接下来便是server_manager__.go#RunLoop启动hls做事,代码如下

if err := addMux(sm.config.HlsConfig.CommonHttpServerConfig, sm.serveHls, "hls"); err != nil {return err}也便是说,下面这个方法卖力相应hls要求

func (sm ServerManager) serveHls(writer http.ResponseWriter, req http.Request) {urlCtx, err := base.ParseUrl(base.ParseHttpRequest(req), 80)if err != nil {Log.Errorf("parse url. err=%+v", err)return}if urlCtx.GetFileType() == "m3u8" {// TODO(chef): [refactor] 须要整理,这里利用 hls.PathStrategy 不太好 202207streamName := hls.PathStrategy.GetRequestInfo(urlCtx, sm.config.HlsConfig.OutPath).StreamNameif err = sm.option.Authentication.OnHls(streamName, urlCtx.RawQuery); err != nil {Log.Errorf("simple auth failed. err=%+v", err)return}}sm.hlsServerHandler.ServeHTTP(writer, req)}对付hls的要求,处理逻辑的调用链

server_handler.go#ServeHTTP->ServeHTTPWithUrlCtx相应hls要求的关键是ServeHTTPWithUrlCtx,来看它的关键代码,实在很大略,便是根据要求到达文件名找到文件,读取内容并返回,把稳代码表明中有详细解释

// 根据要求信息天生读取TS或者M3U8文件的关键参数,例如流名和文件路径ri := PathStrategy.GetRequestInfo(urlCtx, s.outPath)//Log.Debugf("%+v", ri)// 合法性检讨if filename == "" || (filetype != "m3u8" && filetype != "ts") || ri.StreamName == "" || ri.FileNameWithPath == "" {err = errors.New(fmt.Sprintf("invalid hls request. url=%+v, request=%+v", urlCtx, ri))Log.Warnf(err.Error())resp.WriteHeader(http.StatusFound)return}// 抽象过的读取文件操作,放入二进制切片,// 详细的读取操作有两种:从磁盘读取或者从内存读取,这取决于配置的是写入磁盘还是内存content, _err := ReadFile(ri.FileNameWithPath)if _err != nil {err = errors.New(fmt.Sprintf("read hls file failed. request=%+v, err=%+v", ri, _err))Log.Warnf(err.Error())resp.WriteHeader(http.StatusNotFound)return}// 根据文件类型不同,设置不同的相应headerswitch filetype {case "m3u8":resp.Header().Add("Content-Type", "application/x-mpegurl")resp.Header().Add("Server", base.LalHlsM3u8Server)// 给ts文件都携带上session_id字段if sessionIdHash != "" {content = bytes.ReplaceAll(content, []byte(".ts"), []byte(".ts?session_id="+sessionIdHash))}case "ts":resp.Header().Add("Content-Type", "video/mp2t")resp.Header().Add("Server", base.LalHlsTsServer)}resp.Header().Add("Cache-Control", "no-cache")resp.Header().Add("Access-Control-Allow-Origin", "")if sessionIdHash != "" {session := s.getSubSession(sessionIdHash)if session != nil {session.AddWroteBytesSum(uint64(len(content)))}}// 相应_, _ = resp.Write(content)return至此,hls拉流做事的源码阅读已经完成,大略来说,便是一起RTMP的推流会在处理每个音视频的时候,实时天生m3u8文件,以及多个TS文件,这样每当hls拉流要求到达时,就可以根据指定的文件名返回已经天生的内容了大略清晰的逻辑,满满的知识点Get,再一次感谢lal的作者欢迎关注头条号:程序员欣宸学习路上,你不孤单,欣宸原创一起相伴...

php流媒体开辟实战技巧_Golang流媒体实战之七hls拉流做事源码阅读
(图片来自网络侵删)
标签:

相关文章

谷歌云ss防火墙规则,防火墙规则设置

越来越多的企业选择将业务迁移至云端。云环境下的安全问题也日益凸显。为了保障云上业务的安全,谷歌云推出了SS防火墙(Security...

SEO优化 2025-04-02 阅读0 评论0

谷歌pixel3的超级算法,google,pixel3

智能手机行业日新月异。作为全球领先的科技巨头,谷歌近年来在智能手机领域也取得了显著的成就。其中,谷歌Pixel 3凭借其独特的超级...

SEO优化 2025-04-02 阅读0 评论0

和田谷歌SEO公司助力企业拓展全球市场

越来越多的企业开始关注网络营销,尤其是谷歌SEO优化。在我国新疆和田地区,也有越来越多的企业开始意识到谷歌SEO的重要性,希望通过...

SEO优化 2025-03-31 阅读1 评论0