Karp 的技术博客

看到有同学在 Webscocket pong 包 php 踩坑实例 文章中留言 要我给出Demo
满足这位同学的要求:

下面Demo使用的 Saber 人性化的协程HTTP客户端封装库
运行环境 swoole4.2+ php7.1+ 需要使用协程 go();

demo 并不全 但坑都是踩过的 getSendMessage 创建订阅数据json格式这个就不提供了, 很简单没坑, 但最好控制好订阅交易对数量.

币安的webscocket 主要坑在于 ping包 非常正规的使用opcode是9, 并且收包中的data是空包, 回pongopcode传10.

/**
 * 币安现货ws 订阅
 * @param array $symbolList 订阅交易对列表
 * @param $subType 订阅类型
 * @return bool
 */
public function binanceSubscribe(array $symbolList, $subType = self::SUB_SPOT_DEPTH_TYPE)
{
    // 订阅地址
    $this->stream = 'wss://stream.binance.com:443/stream?streams=';

    $startTime = time();

    try {
        // 判断连接是否为空
        if (!empty($this->websocket)) {
            // 关闭连接
            @$this->websocket->close();
        }

        if (empty($this->websocket)) {

            $this->websocket = SaberGM::websocket($this->stream);

            // 创建订阅数据 json 数据
            $sendMessage = $this->getSendMessage($symbolList, $subType);

            // 推送订阅内容
            $this->websocket->push($sendMessage);
        }

        // 容错
        if ($this->websocket == null) {
            return false;
        }

        $errCount = 0;
        do {
            // 诶币安比较乖 莫名其妙就断包了 , 还是10分钟一重连比较好
            if (($startTime + 600) < time()) {

                Logger::wss(['超时10分钟' => $startTime, 'send' => $sendMessage]);
                return false;
            }

            // 超时 1秒抛出空包
            $data = $this->websocket->recv(1);  // timeout

            // 收包数据结构对象类型 想要存储仅能通过 serialize
            Logger::recv(['s' => serialize($data)]);

            $errCount++;

            // 空包容错 5次 (空包或超时过多最好重新连接)
            if ($errCount > 5) return false;

            // 币安对于opcode 还是有要求的
            if (empty($data->opcode)) {

                Logger::wss(['超时' => serialize($data)]);

                continue;
            } else if ($data->opcode == 9) {

                // pong 币安的ping包 opcode=9 且是data是空 我们需要 回 pong 且opcode=10
                Logger::pong(['s' => serialize($data)]);
                $this->websocket->push('pong', 10, true);

                continue;
            }

            // 空包
            if (empty($data->data)) continue;

            $responseData = $data->data;
            $finish = $data->finish;

            if (!$finish || !$responseData) continue;

            Logger::wsDepthJson(['opcode' => $data->opcode, 'fd' => $data->fd, 's' => serialize($data)]);

            //同步处理
            $responseData = json_decode($responseData, true);
            // todo  ... 代码逻辑 根据订阅类型分别存储于不同redis 有序集合Key中 , 获取时永远提取最新1秒数据 取不到在通过 api 拿, 币安api延迟太狠了.

            //  计数器归零
            $errCount = 0;

        } while (true);

    } catch (\Swlib\Http\Exception\ConnectException $e) {

        echo 'ConnectException ', $e->getMessage(),PHP_EOL;
        return false;
    } catch (\Throwable $e) {

        echo 'Throwable ', $e->getMessage(),PHP_EOL;

        return false;
    }

    return true;
}

OKEX 的webscocket订阅重要在于 解包 使用 gzinflate 函数实现解包

/**
 * okex现货ws 订阅
 * @param array $symbolList
 * @param $subType
 * @return bool
 */
public function okexSubscribe(array $symbolList, $subType)
{
    $this->stream = 'wss://real.okex.com:8443/ws/v3';

    try {

        if (!empty($this->websocket))
        {
            // 关闭连接
            @$this->websocket->close();
        }

        if (empty($this->websocket))
        {
            // Saber\WebSocket
            $this->websocket = SaberGM::websocket($this->stream);
            $sendMessage = self::getSendMessage($symbolList, $subType);

            $this->websocket->push($sendMessage);
        }

        if ($this->websocket == null)
        {
            Logger::errmsg(['platform' => "ok", 'stream' => $this->stream]);
            return false;
        }

        $errCount = 0;
        do {
            $data = $this->websocket->recv(1);// timeout
            $errCount++;

            if(empty($data->data)) return false;
            $responseData = gzinflate($data->data);

            if ($errCount > 5) return false;
            if (empty($responseData)) return false;

            $finish = $data->finish;
            if (!$finish || !$responseData) return false;

            //同步处理
            $responseData = json_decode($responseData, true);
           
           // todo ... code

            //  计数器归零
            $errCount = 0;

        } while (true);

    } catch (\Swlib\Http\Exception\ConnectException $e) {

        $this->websocket = null;
    } catch (\Exception $e) {

        $this->websocket = null;
    }

    return true;
}

火币坑位也是在于解包 gzip解压, 感谢开发同学的钻研, 为我大PHP在币圈量化提供轮子.

/**
 * 火币现货ws 订阅
 * @param array $symbolList
 * @param string $subType 订阅类型
 * @return bool
 */
public function huobiSubscribe(array $symbolList, $subType)
{
    $this->stream = 'wss://api.huobi.pro:443/ws';

    try {

        if (!empty($this->websocket)) {
            // 关闭连接
            @$this->websocket->close();
        }

        if (empty($this->websocket)) {
            // Saber\WebSocket
            $this->websocket = SaberGM::websocket($this->stream);

            foreach ($symbolList as $sym) {
                $huoBiSymbol = strtolower(implode('', explode('_', $sym)));
                $sendMessage = $this->getSendMessage($huoBiSymbol, $subType, $sym);
                $this->websocket->push($sendMessage);
            }
        }

        if ($this->websocket == null) {
            Logger::errmsg(['platform' => "huobi", 'stream' => $this->stream]);
            return false;
        }

        $errCount = 0;
        do {

            $data = $this->websocket->recv(1);// timeout'

            $errCount++;

            //容错5次
            if ($errCount > 5) return false;

            if (empty($data->finish)) continue;

            $responseData = json_decode($this->gzdecode($data), true);

            //接受到ping 恢复pong  {"ping": 1492420473027}   {"pong": 1492420473027}
            if (isset($responseData['ping'])) {
                $this->websocket->push($this->getPong($responseData['ping']));
            }

            if (!$responseData) return false;

            // todo ...code

            // 计数器归零
            $errCount = 0;

        } while (true);

    } catch (\Swlib\Http\Exception\ConnectException $e) {

        $this->websocket = null;
    } catch (\Throwable $e) {

        $this->websocket = null;
    }

    return true;
}

/**
 * gzip解压
 * @param $data
 * @return false|string
 */
public function gzdecode($data)
{
    $flags = ord(substr($data, 3, 1));
    $headerlen = 10;
    if ($flags & 4) {
        $extralen = unpack('v', substr($data, 10, 2));
        $extralen = $extralen[1];
        $headerlen += 2 + $extralen;
    }
    
    if ($flags & 8) // Filename
        $headerlen = strpos($data, chr(0), $headerlen) + 1;
    if ($flags & 16) // Comment
        $headerlen = strpos($data, chr(0), $headerlen) + 1;
    if ($flags & 2) // CRC at end of file
        $headerlen += 2;
    $unpacked = @gzinflate(substr($data, $headerlen));
    if ($unpacked === FALSE) $unpacked = $data;
    
    return $unpacked;
}

/**
 * 发送pong信息
 * @param $pingInt
 * @return false|string
 */
public static function getPong($pingInt)
{
    return json_encode(["pong" => $pingInt], JSON_UNESCAPED_UNICODE);
}

好了 这篇文章 demo 就写着点, 部分内容没全写进去, 如果你不是自己踩坑就想直接使用,我建议你还是不要用我的demo了.
伸手党的同学也不要发评论要demo. 本文仅帮助广大PHP同行 不重复入坑.

php websocket

版权属于:karp
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
更新于: 2024年10月18日 06:45
7

目录

来自 《PHP 连接币安,okex,火币,websoket的示例代码》