91网首页-91网页版-91网在线观看-91网站免费观看-91网站永久视频-91网站在线播放

LOGO OA教程 ERP教程 模切知識(shí)交流 PMS教程 CRM教程 開發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

詳解 WebSocket 實(shí)現(xiàn)

freeflydom
2023年5月30日 10:7 本文熱度 1127

前言

為什么寫這篇文章?

對(duì)于應(yīng)用協(xié)議的了解,相信大部分👨‍🎓始終停留在使用上,可能讀了如 《圖解HTTP》、《計(jì)算機(jī)網(wǎng)絡(luò)》 一類的書籍,了解了更深層次的理論,但理論始終是理論,我們很難有機(jī)會(huì)能在工作場(chǎng)景里面去觸碰到協(xié)議的實(shí)現(xiàn)上,對(duì)應(yīng)用層協(xié)議的理解是片面的。

WebSocket 的實(shí)現(xiàn)非常適合前端的同學(xué)學(xué)習(xí),通過了解 WebSocket Node版實(shí)現(xiàn),站在更高的維度上去看待這些應(yīng)用層協(xié)議,無(wú)論是WebRTC,還是HTTP都好,有了對(duì)一種協(xié)議實(shí)現(xiàn)的整體思路,就有了面對(duì)各種協(xié)議的技術(shù)自信。

WebSocket簡(jiǎn)介

WebSocket 是 HTML5 開始提供的一種在單個(gè) TCP 連接上進(jìn)行全雙工(full-duplex)通訊的協(xié)議。沒有了 Request 和 Response 的概念,兩者地位完全平等,連接一旦建立,就建立了真•持久性連接,雙方可以隨時(shí)向?qū)Ψ桨l(fā)送數(shù)據(jù)。

注:全雙工(Full Duplex)是一種通信方式,指通信的雙方可以同時(shí)發(fā)送和接收數(shù)據(jù),而且在同一時(shí)刻,發(fā)送和 接收是獨(dú)立進(jìn)行的。

概念老生常談了,不妨思考兩個(gè)問題:

WebSocket 為什么能進(jìn)行全雙工通信?HTTP 卻不行?

HTTP是非持久化連接,每次客戶端向服務(wù)器發(fā)送請(qǐng)求時(shí),都需要建立一個(gè)新的 TCP 連接。這個(gè)連接在響應(yīng)結(jié)束后就會(huì)被關(guān)閉,不保留在系統(tǒng)中,而 websocket 會(huì)保留,所以可以繼續(xù)保持通信。

HTTP 為什么是非持久化連接而 WebSocket 是持久化連接?

HTTP 協(xié)議的設(shè)計(jì)初衷是為了傳輸靜態(tài)文本信息,如 HTML、CSS、JS 等等,傳輸靜態(tài)文本大部分時(shí)間連接并不會(huì)被頻繁地打開和關(guān)閉,所以使用持久化連接帶來(lái)的復(fù)雜度可能會(huì)超過它的收益。而不像 websocket 更多是為了實(shí)時(shí)通信的場(chǎng)景,所以采用持久化連接。

WebSocket握手過程

我們經(jīng)常聽到一種說(shuō)法,WebSocket 基于 HTTP,實(shí)際上只有在建立握手時(shí),數(shù)據(jù)是通過 HTTP 傳輸?shù)摹5墙⒅螅谡嬲齻鬏敃r(shí)候是不需要 HTTP協(xié)議。握手具體過程如圖所示:

注:websocket 會(huì)保留 HTTP 握手后的 socket 連接,后續(xù)即可利用這個(gè) socket 進(jìn)行通訊,無(wú)需再關(guān)注 HTTP。

websocket 為什么采用 HTTP 握手?

WebSocket 是相對(duì)較新的協(xié)議,可能并不是所有的網(wǎng)絡(luò)設(shè)備和服務(wù)器都支持。因此,在瀏覽器請(qǐng)求服務(wù)器進(jìn)行 WebSocket 握手時(shí),使用基于 HTTP 協(xié)議的握手方式可以避免協(xié)議兼容性問題。

WebSocket實(shí)現(xiàn)原理

WebSocket數(shù)據(jù)幀說(shuō)明

WebSocket 以幀的形式進(jìn)行數(shù)據(jù)傳輸,幀組成包括以下幾個(gè)部分:

  1. 頭部(Header):包括了一些控制信息,如 FIN、RSV1、RSV2、RSV3、Opcode、MASK、Payload Length 以及 Masking Key 等字段,用于描述該幀的類型、長(zhǎng)度以及是否經(jīng)過掩碼操作等信息。它有 2~14 個(gè)字節(jié)不等,其中前 2 個(gè)字節(jié)是必需的。

  2. 掩碼(Masking Key):用于對(duì)載荷(Payload)進(jìn)行加密解密操作,它的長(zhǎng)度固定為 4 個(gè)字節(jié),如果 MASK 標(biāo)志被設(shè)置為 1,則該字段必須存在。

  3. 載荷(Payload):包含了應(yīng)用層發(fā)送的數(shù)據(jù),具體內(nèi)容由 Opcode 字段指定的數(shù)據(jù)類型決定。如果有 MASK 標(biāo)志,則需要對(duì)其進(jìn)行解碼。

各字段含義如表格所示:

字段含義長(zhǎng)度
FIN是否為最后一幀1 bit
RSV預(yù)留位,方便后續(xù)拓展協(xié)議3 bit
opcode解釋 payload data 的用途4 bit
MASK定義“payload data”是否被添加掩碼1 bit
payload data length數(shù)據(jù)長(zhǎng)度7 bit 7+16 bit 7+64 bit
Serial Number序列號(hào)16bit
Masking-key掩碼32bit
payload data傳輸數(shù)據(jù)payload data length

把數(shù)據(jù)幀組成搞清楚,可以說(shuō) websocket 你就了解了一大半,協(xié)議實(shí)現(xiàn)里大部分操作都是對(duì)于數(shù)據(jù)幀的處理。

構(gòu)造幀

websocket 的數(shù)據(jù)是以幀的形式傳輸,那么我們就需要了解如何構(gòu)造幀。構(gòu)造幀只是聽起來(lái)很復(fù)雜,構(gòu)造一個(gè)數(shù)據(jù)幀我們只需要遵守協(xié)議規(guī)則填寫即可,按照WebSocket的協(xié)議標(biāo)準(zhǔn),構(gòu)造一個(gè)最短數(shù)據(jù)幀我們只需要三個(gè)字節(jié)就能完成,構(gòu)成如表格所示下:

字節(jié)編號(hào)填入內(nèi)容
1FIN、RSV、opcode
2MASK、payload data length
3payload data

代碼實(shí)現(xiàn):

注:本質(zhì)就是做一些字節(jié)拼接操作,把對(duì)應(yīng)的標(biāo)識(shí)放到對(duì)應(yīng)的位置即可。

數(shù)據(jù)傳輸

了解往構(gòu)造幀的過程,那 websocket 是如何把幀發(fā)送出去的?

WebSocket 在握手過程中會(huì)保留 HTTP 握手后的 socket 連接,這在前面有提到,所以我們可以通過這個(gè) socket 連接進(jìn)行數(shù)據(jù)的傳輸。

代碼實(shí)現(xiàn)如下:

心跳機(jī)制

在連接過中,防止連接因長(zhǎng)時(shí)間無(wú)數(shù)據(jù)傳輸而被提前關(guān)閉,WebSocket 還引入了心跳機(jī)制,原理可以概括為:定期發(fā)送心跳包,以確認(rèn)客戶端與服務(wù)器的連接狀態(tài),并避免連接因長(zhǎng)時(shí)間空閑而被中斷。 代碼實(shí)現(xiàn)如下:

mini-ws

以下為一個(gè) websocket server 的簡(jiǎn)易實(shí)現(xiàn)(代碼來(lái)源):


測(cè)試代碼:

var crypto = require("crypto");

var { EventEmitter } = require("events");

var MAX_FRAME_SIZE = 1024; // 最長(zhǎng)長(zhǎng)度限制

var MAGIC_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";


/**

 *  數(shù)據(jù)類型操作碼 TEXT 字符串

 *  BINARY 二進(jìn)制數(shù)據(jù) 常用來(lái)保存照片

 *  PING,PONG 用作心跳檢測(cè)

 *  CLOSE 關(guān)閉連接的數(shù)據(jù)幀 (有很多關(guān)閉連接的代碼 1001,1009,1007,1002)

 */

var OPCODES = {

    CONTINUE: 0,

    TEXT: 1,

    BINARY: 2,

    CLOSE: 8,

    PING: 9,

    PONG: 10,

};


var hashWebSocketKey = function (key) {

    var sha1 = crypto.createHash("sha1");

    sha1.update(key + MAGIC_STRING, "ascii");

    return sha1.digest("base64");

};

/**

 * 解掩碼

 * @param maskBytes 掩碼數(shù)據(jù)

 * @param data payload

 * @returns {Buffer}

 */

var unmask = function (maskBytes, data) {

    var payload = Buffer.alloc(data.length);

    for (var i = 0; i < data.length; i++) {

        payload[i] = maskBytes[i % 4] ^ data[i];

    }

    return payload;

};


/**

 * 編碼數(shù)據(jù)

 * @param opcode 操作碼

 * @param payload   數(shù)據(jù)

 * @returns {*}

 */

var encodeMessage = function (opcode, payload, isFinal = true) {

    var buf;

    var b1 = (isFinal ? 0x80 : 0x00) | opcode;

    var b2;

    var length = payload.length;

    if (length < 126) {

        buf = Buffer.alloc(payload.length + 2 + 0);

        b2 |= length;

        //buffer ,offset

        buf.writeUInt8(b1, 0); //讀前8bit

        buf.writeUInt8(b2, 1); //讀8―15bit

        payload.copy(buf, 2); //復(fù)制數(shù)據(jù),從2(第三)字節(jié)開始

    } else if (length < 1 << 16) {

        buf = Buffer.alloc(payload.length + 2 + 2);

        b2 |= 126;

        buf.writeUInt8(b1, 0);

        buf.writeUInt8(b2, 1);

        buf.writeUInt16BE(length, 2);

        payload.copy(buf, 4);

    } else {

        buf = Buffer.alloc(payload.length + 2 + 8);

        b2 |= 127;

        buf.writeUInt8(b1, 0);

        buf.writeUInt8(b2, 1);

        buf.writeUInt32BE(0, 2);

        buf.writeUInt32BE(length, 6);

        payload.copy(buf, 10);

    }


    return buf;

};


class WebSocket extends EventEmitter {

    constructor(req, socket, upgradeHead) {

        super();

        var resKey = hashWebSocketKey(req.headers["sec-websocket-key"]);


        // 構(gòu)造響應(yīng)頭

        var resHeaders = [

            "HTTP/1.1 101 Switching Protocols",

            "Upgrade: websocket",

            "Connection: Upgrade",

            "Sec-WebSocket-Accept: " + resKey,

        ]

            .concat("", "")

            .join("\r\n");


        socket.on("data", (data) => {

            this.buffer = Buffer.concat([this.buffer, data]);

            while (this._processBuffer()) {}

        });


        socket.on("close", (had_error) => {

            if (!this.closed) {

                this.emit("close", 1006);

                this.closed = true;

            }

        });


        socket.write(resHeaders);


        this.socket = socket;

        this.buffer = Buffer.alloc(0);

        this.closed = false;

        this.frames = Buffer.alloc(0);

        this.frameOpcode = 0;

        this.keepLiveTimer = null;

    }

    /*

 發(fā)送數(shù)據(jù)函數(shù)

 * */

    send(obj) {

        var opcode;

        var payload;

        // 如果是二進(jìn)制

        if (Buffer.isBuffer(obj)) {

            opcode = OPCODES.BINARY;

            payload = obj;

        } else if (typeof obj) {

            // 承載的文本內(nèi)容

            opcode = OPCODES.TEXT;

            //創(chuàng)造一個(gè)utf8的編碼,可以被編碼為字符串

            payload = Buffer.from(obj, "utf8");

        } else {

            throw new Error("cannot send object.Must be string of Buffer");

        }


        this._doSend(opcode, payload);

    }


    // 默認(rèn) 45 秒 保持發(fā)送心跳

    keepLive(timeout = 45000) {

        var self = this;

        function keepit() {

            self._doSend(OPCODES.PING, Buffer.from("ping"));

            console.log("server send ping...");

            // 在關(guān)閉連接的情況下就不再需要發(fā)送 ping 請(qǐng)求了

            if (!self.closed) {

                self.keepLiveTimer = setTimeout(keepit, timeout);

            }

        }

        keepit();

    }


    /*

 關(guān)閉連接函數(shù)

 * */

    close(code, reason) {

        var opcode = OPCODES.CLOSE;

        var buffer;

        if (code) {

            buffer = Buffer.alloc(Buffer.byteLength(reason) + 2);

            buffer.writeUInt16BE(code, 0);

            buffer.write(reason, 2);

        } else {

            buffer = Buffer.alloc(0);

        }

        this._doSend(opcode, buffer);

        this.closed = true;

    }


    _processBuffer() {

        var buf = this.buffer;

        if (buf.length < 2) {

            return;

        }

        var idx = 2;

        var byte1 = buf.readUInt8(0); // 讀取數(shù)據(jù)幀的前 8 bit

        var FIN = byte1 & 0x80; // 如果為0x80,則標(biāo)志傳輸結(jié)束,獲取高位 bit

        var opcode = byte1 & 0x0f; //截取第一個(gè)字節(jié)的后 4 位,即 opcode 碼


        // 如果是 0 的話,說(shuō)明是延續(xù)幀,需要保存好 opCode

        if (!FIN) {

            this.frameOpcode = opcode || this.frameOpcode; // 確保不為 0;

        }


        var byte2 = buf.readUInt8(1); // 讀取數(shù)據(jù)幀第二個(gè)字節(jié)

        var MASK = byte2 & 0x80; // 判斷是否有掩碼,客戶端必須要有,獲取高位 bit

        var length = byte2 & 0x7f; //獲取length屬性,也是小于126數(shù)據(jù)長(zhǎng)度的數(shù)據(jù)真實(shí)值

        if (length > 125) {

            if (buf.length < 8) {

                return; // 如果大于125,而字節(jié)數(shù)小于 8,則顯然不合規(guī)范要求

            }

        }

        if (length === 126) {

            //獲取的值為126 ,表示后兩個(gè)字節(jié)(16位)用于表示數(shù)據(jù)長(zhǎng)度

            length = buf.readUInt16BE(2); // 讀取 16bit 的值

            idx += 2; // +2

        } else if (length === 127) {

            //獲取的值為 127 ,表示后 8 個(gè)字節(jié)(64位)用于表示數(shù)據(jù)長(zhǎng)度,其中高 4 字節(jié)是 0

            var highBits = buf.readUInt32BE(2); //(1/0)1111111,切記 MSB 最高位是 0

            if (highBits != 0) {

                this.close(1009, ""); //1009 關(guān)閉代碼,說(shuō)明數(shù)據(jù)太大; 協(xié)議里是支持 63 位長(zhǎng)度,不過這里我們自己實(shí)現(xiàn)的話,只支持 32 位長(zhǎng)度,防止數(shù)據(jù)過大;

            }

            length = buf.readUInt32BE(6); // 從第 6 到第 10 個(gè)字節(jié)(32位)為真實(shí)存放的數(shù)據(jù)長(zhǎng)度

            idx += 8;

        }

        if (buf.length < idx + 4 + length) {

            //不夠長(zhǎng) 4為掩碼字節(jié)數(shù)

            return;

        }

        // 如果有 mask 標(biāo)志位,默認(rèn)都是有的

        if (MASK) {

            var maskBytes = buf.slice(idx, idx + 4); //獲取掩碼數(shù)據(jù)

            idx += 4; //指針前移到真實(shí)數(shù)據(jù)段

            var payload = buf.slice(idx, idx + length); // 數(shù)據(jù)長(zhǎng)度的單位是字節(jié)

            payload = unmask(maskBytes, payload); //解碼真實(shí)數(shù)據(jù)

        } else {

            payload = buf.slice(idx, idx + length);

        }


        this.buffer = buf.slice(idx + length); // 緩存 buffer

        // 有可能是分幀,需要拼接數(shù)據(jù)

        this.frames = Buffer.concat([this.frames, payload]); // 保存到 frames 中


        if (!FIN) {

            console.log(

                "server detect fragment, sizeof payload:",

                Buffer.byteLength(payload)

            );

        }


        if (FIN) {

            payload = this.frames.slice(0); // 獲取所有拼接完整的數(shù)據(jù)

            opcode = opcode || this.frameOpcode; // 如果是 0 ,則保持獲取之前保存的 code

            this.frames = Buffer.alloc(0); // 清空 frames

            this.frameOpcode = 0; // 清空 opcode

            this._handleFrame(opcode, payload); // 處理操作碼

        }


        return true; // 繼續(xù)處理

    }

    /**

     * 針對(duì)不同操作碼進(jìn)行不同處理

     * @param 操作碼

     * @param 數(shù)據(jù)

     */

    _handleFrame(opcode, buffer) {

        var payload;

        switch (opcode) {

            case OPCODES.TEXT:

                payload = buffer.toString("utf8"); //如果是文本需要轉(zhuǎn)化為utf8的編碼

                this.emit("data", opcode, payload); //Buffer.toString()默認(rèn)utf8 這里是故意指示的

                break;

            case OPCODES.BINARY: //二進(jìn)制文件直接交付

                payload = buffer;

                this.emit("data", opcode, payload);

                break;

            case OPCODES.PING: // 發(fā)送 pong 做響應(yīng)

                this._doSend(OPCODES.PONG, buffer);

                break;

            case OPCODES.PONG: //不做處理

                console.log("server receive pong");

                break;

            case OPCODES.CLOSE: // close有很多關(guān)閉碼

                let code, reason; // 用于獲取關(guān)閉碼和關(guān)閉原因

                if (buffer.length >= 2) {

                    code = buffer.readUInt16BE(0);

                    reason = buffer.toString("utf8", 2);

                }

                this.close(code, reason);

                this.emit("close", code, reason);

                break;

            default:

                this.close(1002, "unhandle opcode:" + opcode);

        }

    }

    // 這里可以針對(duì) payload 的長(zhǎng)度做分片

    _doSend(opcode, payload) {

        var len = Buffer.byteLength(payload);


        // 分片的距離邏輯

        var count = 0;

        while (len > MAX_FRAME_SIZE) {

            var framePayload = payload.slice(0, MAX_FRAME_SIZE);

            payload = payload.slice(MAX_FRAME_SIZE);

            this.socket.write(

                encodeMessage(

                    count > 0 ? OPCODES.CONTINUE : opcode,

                    framePayload,

                    false

                )

            ); //編碼后直接通過socket發(fā)送

            count++;

            len = Buffer.byteLength(payload);

        }


        this.socket.write(

            encodeMessage(count > 0 ? OPCODES.CONTINUE : opcode, payload)

        ); //編碼后直接通過socket發(fā)送

    }

}


module.exports = WebSocket;


運(yùn)行測(cè)試代碼,打開瀏覽器訪問:http://localhost:3000/,在控制臺(tái)輸入:

var http = require('http');

var WebSocket = require('./websocket');

// HTTP服務(wù)器部分

var server = http.createServer(function(req, res) {

  res.end('websocket test\r\n');

});


console.log('starting...');


// Upgrade請(qǐng)求處理

server.on('upgrade', callback);


function callback(req, socket, upgradeHead) {

  var ws = new WebSocket(req, socket, upgradeHead);

  // ws.keepLive(); // 保持心跳連接,否則一般經(jīng)過一定的時(shí)間沒有數(shù)據(jù)交互,瀏覽器端會(huì)主動(dòng)關(guān)閉 ws 鏈接

  ws.on('data', function(opcode, payload) {

    console.log('receive data:', opcode, payload.length);

    ws.send('good job');

  });



  ws.on('close', function(code, reason) {

    console.log('close:', code, reason);

  });


}


server.listen(3000);

建立連接后發(fā)送消息:

ws.send('hello world');

成功發(fā)送并收到回復(fù)!

也可以通過報(bào)文查看:

參考文檔




————————————————

https://juejin.cn/post/7236954203555151933


該文章在 2023/5/30 10:09:07 編輯過
關(guān)鍵字查詢
相關(guān)文章
正在查詢...
點(diǎn)晴ERP是一款針對(duì)中小制造業(yè)的專業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國(guó)內(nèi)大量中小企業(yè)的青睞。
點(diǎn)晴PMS碼頭管理系統(tǒng)主要針對(duì)港口碼頭集裝箱與散貨日常運(yùn)作、調(diào)度、堆場(chǎng)、車隊(duì)、財(cái)務(wù)費(fèi)用、相關(guān)報(bào)表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點(diǎn),圍繞調(diào)度、堆場(chǎng)作業(yè)而開發(fā)的。集技術(shù)的先進(jìn)性、管理的有效性于一體,是物流碼頭及其他港口類企業(yè)的高效ERP管理信息系統(tǒng)。
點(diǎn)晴WMS倉(cāng)儲(chǔ)管理系統(tǒng)提供了貨物產(chǎn)品管理,銷售管理,采購(gòu)管理,倉(cāng)儲(chǔ)管理,倉(cāng)庫(kù)管理,保質(zhì)期管理,貨位管理,庫(kù)位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號(hào)管理軟件。
點(diǎn)晴免費(fèi)OA是一款軟件和通用服務(wù)都免費(fèi),不限功能、不限時(shí)間、不限用戶的免費(fèi)OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2025 ClickSun All Rights Reserved

主站蜘蛛池模板: 97国产在线| 91人成亚洲高清 | 97视屏| 91视频一区| 九九免费精品视频 | 国产卡二卡三卡四卡 | 最新中文字幕一区 | 日本色色的视频一区 | 成人午夜资料库 | 91九色在线观看 | 91视频app下载| 国产人成精品 | 拍拍拍精品网站 | 国产在线观看福利片 | 国产精品精品国产 | 国产欧美高清视频 | 欧美亚洲欧美日韩中 | 中文字幕一区二区 | 亚洲无码 | 91精品国产自 | 国产精华 | 国产欧美在线高清 | 亚洲无码在线免费视频 | 日韩一区二区www | 国产性高清在线观看 | 福利资源在线 | 国产精品大战 | 91露脸对白| 国产手机在线观看 | 国产亚洲精品福利 | 91看片婬黄大片欧 | 日韩精品无 | 伦理片97影视网 | 欧美日韩三区 | 乱码在线观看 | 日本簧片在线观看 | 国产免费观看激情 | 日韩字幕欧美 | 91播放| 日韩成人精品 | 国产日韩一区欧美 |