看到有同学在 Webscocket pong 包 php 踩坑实例 文章中留言 要我给出Demo
满足这位同学的要求:
下面Demo使用的 Saber 人性化的协程HTTP客户端封装库
运行环境 swoole4.2+ php7.1+
需要使用协程 go()
;
demo
并不全 但坑都是踩过的 getSendMessage
创建订阅数据json格式这个就不提供了, 很简单没坑, 但最好控制好订阅交易对数量.
币安的webscocket 主要坑在于 ping
包 非常正规的使用opcode
是9, 并且收包中的data是空包, 回pong
包 opcode
传10.
/**
* 币安现货ws 订阅
* @param array $symbolList 订阅交易对列表
* @param $subType 订阅类型
* @return bool
*/
public function binanceSubscribe(array $symbolList, $subType = self::SUB_SPOT_DEPTH_TYPE)
{
// 订阅地址
$this->stream = 'wss://stream.binance.com:443/stream?streams=';
$startTime = time();
try {
// 判断连接是否为空
if (!empty($this->websocket)) {
// 关闭连接
@$this->websocket->close();
}
if (empty($this->websocket)) {
$this->websocket = SaberGM::websocket($this->stream);
// 创建订阅数据 json 数据
$sendMessage = $this->getSendMessage($symbolList, $subType);
// 推送订阅内容
$this->websocket->push($sendMessage);
}
// 容错
if ($this->websocket == null) {
return false;
}
$errCount = 0;
do {
// 诶币安比较乖 莫名其妙就断包了 , 还是10分钟一重连比较好
if (($startTime + 600) < time()) {
Logger::wss(['超时10分钟' => $startTime, 'send' => $sendMessage]);
return false;
}
// 超时 1秒抛出空包
$data = $this->websocket->recv(1); // timeout
// 收包数据结构对象类型 想要存储仅能通过 serialize
Logger::recv(['s' => serialize($data)]);
$errCount++;
// 空包容错 5次 (空包或超时过多最好重新连接)
if ($errCount > 5) return false;
// 币安对于opcode 还是有要求的
if (empty($data->opcode)) {
Logger::wss(['超时' => serialize($data)]);
continue;
} else if ($data->opcode == 9) {
// pong 币安的ping包 opcode=9 且是data是空 我们需要 回 pong 且opcode=10
Logger::pong(['s' => serialize($data)]);
$this->websocket->push('pong', 10, true);
continue;
}
// 空包
if (empty($data->data)) continue;
$responseData = $data->data;
$finish = $data->finish;
if (!$finish || !$responseData) continue;
Logger::wsDepthJson(['opcode' => $data->opcode, 'fd' => $data->fd, 's' => serialize($data)]);
//同步处理
$responseData = json_decode($responseData, true);
// todo ... 代码逻辑 根据订阅类型分别存储于不同redis 有序集合Key中 , 获取时永远提取最新1秒数据 取不到在通过 api 拿, 币安api延迟太狠了.
// 计数器归零
$errCount = 0;
} while (true);
} catch (\Swlib\Http\Exception\ConnectException $e) {
echo 'ConnectException ', $e->getMessage(),PHP_EOL;
return false;
} catch (\Throwable $e) {
echo 'Throwable ', $e->getMessage(),PHP_EOL;
return false;
}
return true;
}
OKEX 的webscocket订阅重要在于 解包 使用 gzinflate
函数实现解包
/**
* okex现货ws 订阅
* @param array $symbolList
* @param $subType
* @return bool
*/
public function okexSubscribe(array $symbolList, $subType)
{
$this->stream = 'wss://real.okex.com:8443/ws/v3';
try {
if (!empty($this->websocket))
{
// 关闭连接
@$this->websocket->close();
}
if (empty($this->websocket))
{
// Saber\WebSocket
$this->websocket = SaberGM::websocket($this->stream);
$sendMessage = self::getSendMessage($symbolList, $subType);
$this->websocket->push($sendMessage);
}
if ($this->websocket == null)
{
Logger::errmsg(['platform' => "ok", 'stream' => $this->stream]);
return false;
}
$errCount = 0;
do {
$data = $this->websocket->recv(1);// timeout
$errCount++;
if(empty($data->data)) return false;
$responseData = gzinflate($data->data);
if ($errCount > 5) return false;
if (empty($responseData)) return false;
$finish = $data->finish;
if (!$finish || !$responseData) return false;
//同步处理
$responseData = json_decode($responseData, true);
// todo ... code
// 计数器归零
$errCount = 0;
} while (true);
} catch (\Swlib\Http\Exception\ConnectException $e) {
$this->websocket = null;
} catch (\Exception $e) {
$this->websocket = null;
}
return true;
}
火币坑位也是在于解包 gzip解压
, 感谢开发同学的钻研, 为我大PHP在币圈量化提供轮子.
/**
* 火币现货ws 订阅
* @param array $symbolList
* @param string $subType 订阅类型
* @return bool
*/
public function huobiSubscribe(array $symbolList, $subType)
{
$this->stream = 'wss://api.huobi.pro:443/ws';
try {
if (!empty($this->websocket)) {
// 关闭连接
@$this->websocket->close();
}
if (empty($this->websocket)) {
// Saber\WebSocket
$this->websocket = SaberGM::websocket($this->stream);
foreach ($symbolList as $sym) {
$huoBiSymbol = strtolower(implode('', explode('_', $sym)));
$sendMessage = $this->getSendMessage($huoBiSymbol, $subType, $sym);
$this->websocket->push($sendMessage);
}
}
if ($this->websocket == null) {
Logger::errmsg(['platform' => "huobi", 'stream' => $this->stream]);
return false;
}
$errCount = 0;
do {
$data = $this->websocket->recv(1);// timeout'
$errCount++;
//容错5次
if ($errCount > 5) return false;
if (empty($data->finish)) continue;
$responseData = json_decode($this->gzdecode($data), true);
//接受到ping 恢复pong {"ping": 1492420473027} {"pong": 1492420473027}
if (isset($responseData['ping'])) {
$this->websocket->push($this->getPong($responseData['ping']));
}
if (!$responseData) return false;
// todo ...code
// 计数器归零
$errCount = 0;
} while (true);
} catch (\Swlib\Http\Exception\ConnectException $e) {
$this->websocket = null;
} catch (\Throwable $e) {
$this->websocket = null;
}
return true;
}
/**
* gzip解压
* @param $data
* @return false|string
*/
public function gzdecode($data)
{
$flags = ord(substr($data, 3, 1));
$headerlen = 10;
if ($flags & 4) {
$extralen = unpack('v', substr($data, 10, 2));
$extralen = $extralen[1];
$headerlen += 2 + $extralen;
}
if ($flags & 8) // Filename
$headerlen = strpos($data, chr(0), $headerlen) + 1;
if ($flags & 16) // Comment
$headerlen = strpos($data, chr(0), $headerlen) + 1;
if ($flags & 2) // CRC at end of file
$headerlen += 2;
$unpacked = @gzinflate(substr($data, $headerlen));
if ($unpacked === FALSE) $unpacked = $data;
return $unpacked;
}
/**
* 发送pong信息
* @param $pingInt
* @return false|string
*/
public static function getPong($pingInt)
{
return json_encode(["pong" => $pingInt], JSON_UNESCAPED_UNICODE);
}
好了 这篇文章 demo
就写着点, 部分内容没全写进去, 如果你不是自己踩坑就想直接使用,我建议你还是不要用我的demo了.
伸手党的同学也不要发评论要demo. 本文仅帮助广大PHP同行 不重复入坑.