第二十五章 消息数据传递机制

消息机制

在建立P2P连接之后,前后端,信令等都要进行一定的行为交互,每个行为一般的我们会指定一个指令ID或者字符串,然后跟随对应的数据内容。

心跳机制,就是其中一个典型的使用例子。

主要接口

为了满足消息机制的规范化,我们在网络通信机制层面抽象了一个ClientSink类对象,专门处理对应的行为。比较典型的是以下的源代码中作用的函数。分别是:

  1. OnConnnected, 联通后
  2. OnConnectionError,连接错误时
  3. OnClosed, 连接关系时
  4. OnMessage, 多端有消息收到时。这个多端可以是Web前端,信令,后端渲染服务器等。我们目前的传递数据结构为Json
  5. OnData,接收到数据时。这个OnData并不是数据通道DataChannel的回调数据,请注意区别。而是我们的ClientSink定义的。用于扩展传输二进制数据用的。
                // override QLClientSink
                void OnConnected(int sessionid);
                void OnConnectionError(int sessionid, const std::string& error);
                void OnClosed(int sessionid, int statuscode, const std::string& reason, bool bwasclean);
                void OnMessage(int sessionid, const std::string& msg);
                void OnData(int sessionid, const unsigned char* pdata, int len);

主要的代码如下

        /// <summary>
        /// 這是Websocket的消息回調。並不是DataChannel的,在下面
        /// 当连接的WEbSocket收到任何的讯息时,做解析
        /// QLSession中调用
        /// </summary>
        /// <param name="sessionid"></param>
        /// <param name="s_msg">Json格式,Key是消息ID,value是具体SDP消息</param>
        void QLSignalConnection::OnMessage(int sessionid, const std::string& s_msg)
        {
                std::string content = "WebSocket Received : " + s_msg;
                QL_LOG(content.c_str());

                Json::Value msg = TO_JSON(s_msg);
                if (msg.isNull())
                {
                        std::cout << "QLSignalConnection::OnMessage:invalid message" << std::endl;
                        return;
                }

                std::string msg_type = JSON_STR(msg, MessageKeyName::type);
                if (msg_type == MessageTypeName::pong)
                {
                        ProcessPongAction();
                        return;
                }
                else if (msg_type == MessageTypeName::config)
                {
                        OnConfig(msg);
                        return;
                }
                else if (msg_type == MessageTypeName::identify)
                {
                        OnIdRequested();
                }

                else if (msg_type == MessageTypeName::offer)
                {
                        //这种是客户端主动发送offer
                        OnSessionDescriptionWhenGetOffer(msg);
                }
                else if (msg_type == MessageTypeName::answer)
                {
                        //理论上不会到这里。
                        //只有当我们Streamer作为后端发送Offer,前端回复Anwer的时候,这里的函数才会收到内容。做处理。目前留空

                }
                else if (msg_type == MessageTypeName::icecandidate)
                {
                        int playerid = JSON_INT(msg, MessageKeyName::playerid);
                        QL_LOG("OnMessage get:icecandidate");

                        //The json structure should depend on detailed request data.
                        //Different structure could be different codes
                        Json::Value msg = TO_JSON(s_msg);
                        Json::Value dataMsg = msg["candidate"];
                        std::string candidate = JSON_STR(dataMsg, "candidate");
                        std::string sdpMid = JSON_STR(dataMsg, "sdpMid");
                        int sdpMLineIndex = JSON_INT(dataMsg, "sdpMLineIndex");


                        if (playerid > 0)
                        {
                                OnPlayerIceCandidate(msg);
                        }
                        else
                        {
                                OnStreamerIceCandidate(msg);
                        }


                        //Start to preapre a IceCandidate object ,add to 
                        webrtc::SdpParseError error;
                        std::unique_ptr<webrtc::IceCandidateInterface> myCandidateInterface(webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error));
                        //if failed
                        if (!myCandidateInterface.get())
                        {
                                std::string content = "Error : Can't parse received candidate message. SdpParseError was: " + candidate;
                                QL_LOG(content.c_str());
                                return;
                        }
                        if (!mPeerConnection->AddIceCandidate(myCandidateInterface.get()))
                        {
                                std::string content = "Error : Failed to apply the received candidate " + candidate;
                                QL_LOG(content.c_str());
                                std::string content2 = "Error : Failed candidate Error: Line:" + error.line + " Desc: " + error.description;
                                QL_LOG(content2.c_str());
                                return;
                        }
                        else
                        {
                                std::string content = "Successed to apply the received candidate " + candidate;
                                QL_LOG(content.c_str());
                                return;
                        }
                }
                else if (msg_type == MessageTypeName::playercount)
                {
                        OnPlayerCount(msg);
                }
                else if (msg_type == MessageTypeName::playerdisconnected)
                {
                        OnPlayerDisconnected(msg);
                }

                else
                {
                        //invalid command
                        std::cout << "QLSignalConnection::OnMessage: invalid message type:" << msg_type << std::endl;
                }
        }

主要内容解释:

上述代码主要是在收到OnMessage回调函数后,解析内容。

重点如下:

  1. 我们传递的消息格式是Json,所以用了一个Cpp Json的库解析。
  2. 每个指令双方都协商好后,完全对等定义在两方。 比如这种
        /// <summary>
        /// 传递的消息类型
        /// </summary>
        namespace MessageTypeName
        {
                const std::string identify = "identify";
                const std::string config = "config";
                const std::string offer = "offer";
                const std::string answer = "answer";
                const std::string icecandidate = "iceCandidate";
                const std::string playercount = "playerCount";
                const std::string playerdisconnected = "playerDisconnected";
                const std::string ping = "ping";
                const std::string pong = "pong";
                const std::string endpointid = "endpointId";
        }
  1. 要注意的是这里一般我们接受信令的消息通知,从而进行某些行为动作。比如Offer或者Answer等。
  2. 实际上我们也可以看到,这里主要是渲染端和信令的交互,因为信令服务器还和Web前端有交互连接。很多数据,行为操作是从Web客户端请求的一个转发。同样的,我们也需要信令服务器来转发我们回复Web前端的数据和指令。

RA/SD 衍生者AI训练营。发布者:chris,转载请注明出处:https://www.shxcj.com/archives/6574

(0)
上一篇 4天前
下一篇 4天前

相关推荐

发表回复

登录后才能评论
本文授权以下站点有原版访问授权 https://www.shxcj.com https://www.2img.ai https://www.2video.cn