【windows版】tp5中使用GatewayWorker和Workerman
发表于:2023-01-19 14:52:39浏览:210次
安装
1、安装WorkerMan
composer require workerman/workerman-for-win
composer require topthink/think-worker=1.*
2、安装GatewayWorker
composer require workerman/gateway-worker-for-win
3、安装GatewayClient
composer require workerman/gatewayclient
服务端
- start_for_win.bat(目录:与application同级别,作用:双击启动服务)
php start_register.php start_gateway.php start_businessworker.php
pause
- start_register.php(目录:与application同级别,作用:启动文件)
<?php
// [ 应用入口文件 ]
// 定义应用目录
define('APP_PATH', __DIR__ . '/application/');
//绑定
define('BIND_MODULE','push/Sregister');
// 加载框架引导文件
require __DIR__ . '/thinkphp/start.php';
- start_gateway.php(目录:与application同级别,作用:启动文件)
<?php
// [ 应用入口文件 ]
// 定义应用目录
define('APP_PATH', __DIR__ . '/application/');
//绑定
define('BIND_MODULE','push/Sgateway');
// 加载框架引导文件
require __DIR__ . '/thinkphp/start.php';
- start_businessworker.php(目录:与application同级别,作用:启动文件)
<?php
// [ 应用入口文件 ]
// 定义应用目录
define('APP_PATH', __DIR__ . '/application/');
//绑定
define('BIND_MODULE','push/Sbusinessworker');
// 加载框架引导文件
require __DIR__ . '/thinkphp/start.php';
- 控制器:app\push\controller\Sregister.php
<?php
namespace app\push\controller;
use think\Controller;
use \Workerman\Worker;
use GatewayWorker\Register;
class Sregister extends Controller
{
function __construct()
{
require_once __DIR__ . '/../../../vendor/autoload.php';
// register 服务必须是text协议
$register = new Register('text://0.0.0.0:1236');
// 如果不是在根目录启动,则运行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
}
}
- 控制器:app\push\controller\Sgateway.php
<?php
namespace app\push\controller;
use think\Controller;
use \Workerman\Worker;
use \GatewayWorker\Gateway;
use \Workerman\Autoloader;
class Sgateway extends Controller
{
function __construct()
{
require_once __DIR__ . '/../../../vendor/autoload.php';
// gateway 进程
$gateway = new Gateway("Websocket://0.0.0.0:7272");
// 设置名称,方便status时查看
$gateway->name = 'Gateway';
// 设置进程数,gateway进程数建议与cpu核数相同
$gateway->count = 4;
// 分布式部署时请设置成内网ip(非127.0.0.1)
$gateway->lanIp = '127.0.0.1';
// 内部通讯起始端口。假如$gateway->count=4,起始端口为2300
// 则一般会使用2300 2301 2302 2303 4个端口作为内部通讯端口
$gateway->startPort = 2300;
// 心跳间隔
$gateway->pingInterval = 30;
$gateway->pingNotResponseLimit = 1;
// 心跳数据
$gateway->pingData = '';
// 服务注册地址
$gateway->registerAddress = '127.0.0.1:1236';
/*
// 当客户端连接上来时,设置连接的onWebSocketConnect,即在websocket握手时的回调
$gateway->onConnect = function($connection)
{
$connection->onWebSocketConnect = function($connection , $http_header)
{
// 可以在这里判断连接来源是否合法,不合法就关掉连接
// $_SERVER['HTTP_ORIGIN']标识来自哪个站点的页面发起的websocket链接
if($_SERVER['HTTP_ORIGIN'] != 'http://chat.workerman.net')
{
$connection->close();
}
// onWebSocketConnect 里面$_GET $_SERVER是可用的
// var_dump($_GET, $_SERVER);
};
};
*/
//windows 热更新
new FileMonitor($gateway);
// 如果不是在根目录启动,则运行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
}
}
- 控制器:app\push\controller\Sbusinessworker.php
<?php
namespace app\push\controller;
use think\Controller;
use \Workerman\Worker;
use \GatewayWorker\BusinessWorker;
use \Workerman\Autoloader;
class Sbusinessworker extends Controller
{
function __construct()
{
require_once __DIR__ . '/../../../vendor/autoload.php';
// bussinessWorker 进程
$worker = new BusinessWorker();
// worker名称
$worker->name = 'BusinessWorker';
// bussinessWorker进程数量
$worker->count = 4;
// 服务注册地址
$worker->registerAddress = '127.0.0.1:1236';
//设置处理业务的类,此处制定Events的命名空间
$worker->eventHandler = 'app\push\controller\Events';
//windows 热更新
new FileMonitor($worker);
// 如果不是在根目录启动,则运行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
}
}
- 控制器:app\push\controller\FileMonitor.php(作用:windows的热更新)
<?php
namespace app\push\controller;
use think\Controller;
use \Workerman\Worker;
use \Workerman\Lib\Timer;
/**
* workerman-filemonitor for windows
*
* 监控文件更新并自动reload workerman
*
* 使用方法:
* require_once __DIR__ . '/FileMonitor.php';
* new FileMonitor($worker, $dir, $timer);
*/
class FileMonitor extends Controller
{
//待监听的项目目录
private $_monitor_dir = '';
//热更新间隔时间,默认3s
private $_interval = 0;
//最后一次同步时间
private $_last_time = 0;
function __construct ($worker, $dir = '../', $timer = 3)
{
// watch Applications catalogue
$this->_monitor_dir = __DIR__ .'/'. $dir;
$this->_interval = $timer;
$this->_last_time = time();
// Emitted when data received
$worker->reloadable = false;
// Emitted when data received
$worker->onWorkerStart = function()
{
// watch files only in daemon mode
if (Worker::$daemonize === false)
{
// chek mtime of files per second
Timer::add($this->_interval, [$this, 'monitor']);
}
};
}
//监听器,kill进程
public function monitor ()
{
// recursive traversal directory
$iterator = new \RecursiveDirectoryIterator($this->_monitor_dir);
$iterator = new \RecursiveIteratorIterator($iterator);
foreach ($iterator as $file)
{
// only check php files
if (pathinfo($file, PATHINFO_EXTENSION) != 'php') continue;
// check mtime
if ($this->_last_time < $file->getMTime())
{
exec('taskkill -f -pid '. getmypid());
$this->_last_time = $file->getMTime();
return true;
}
}
}
}
- 控制器:app\push\controller\Events.php(作用:业务处理)
<?php
namespace app\push\controller;
use think\Controller;
use \GatewayWorker\Lib\Gateway;
class Events extends Controller
{
/**
* 有消息时
* @param integer $client_id 连接的客户端
* @param mixed $message
* @return void
*/
public static function onMessage($client_id, $message)
{
$message = json_decode($message,true);
$action = $message['action'];
$data = $message['data'];
switch ($action) {
case 'ping'://发:{"action":"ping","data":{"online":1}}
return;
break;
case 'line'://发:{"action":"line","data":{"type":"1i"}}
$json = json_encode([
'action'=>'line',
'data'=>[],
]);
break;
default:
return;
break;
}
Gateway::sendToClient($client_id, $json);
}
/**
* 当用户连接时触发的方法
* @param integer $client_id 连接的客户端
* @return void
*/
public static function onConnect($client_id)
{
$json = json_encode([
'action'=>'ping',
'data'=>[
'online'=>1
]
]);
Gateway::sendToClient($client_id, $json);
}
/**
* 当用户断开连接时触发的方法
* @param integer $client_id 断开连接的客户端
* @return void
*/
public static function onClose($client_id)
{
}
/**
* 当进程启动时
* @param integer $businessWorker 进程实例
*/
public static function onWorkerStart($businessWorker)
{
}
/**
* 当进程关闭时
* @param integer $businessWorker 进程实例
*/
public static function onWorkerStop($businessWorker)
{
}
}
- 控制器:app\push\controller\Client.php(作用:主动推送消息给客户端)
<?php
namespace app\push\controller;
use GatewayClient\Gateway;
class Client
{
/**
* === 指定registerAddress表明与哪个GatewayWorker(集群)通讯。===
* GatewayWorker里用Register服务来区分集群,即一个GatewayWorker(集群)只有一个Register服务,
* GatewayClient要与之通讯必须知道这个Register服务地址才能通讯,这个地址格式为 ip:端口 ,
* 其中ip为Register服务运行的ip(如果GatewayWorker是单机部署则ip就是运行GatewayWorker的服务器ip),
* 端口是对应ip的服务器上start_register.php文件中监听的端口,也就是GatewayWorker启动时看到的Register的端口。
* GatewayClient要想推送数据给客户端,必须知道客户端位于哪个GatewayWorker(集群),
* 然后去连这个GatewayWorker(集群)Register服务的 ip:端口,才能与对应GatewayWorker(集群)通讯。
* 这个 ip:端口 在GatewayClient一侧使用 Gateway::$registerAddress 来指定。
*
* === 如果GatewayClient和GatewayWorker不在同一台服务器需要以下步骤 ===
* 1、需要设置start_gateway.php中的lanIp为实际的本机内网ip(如不在一个局域网也可以设置成外网ip),设置完后要重启GatewayWorker
* 2、GatewayClient这里的Gateway::$registerAddress的ip填写填写上面步骤1lanIp所指定的ip,端口
* 3、需要开启GatewayWorker所在服务器的防火墙,让以下端口可以被GatewayClient所在服务器访问,
* 端口包括Rgister服务的端口以及start_gateway.php中lanIp与startPort指定的几个端口
*
* === 如果GatewayClient和GatewayWorker在同一台服务器 ===
* GatewayClient和Register服务都在一台服务器上,ip填写127.0.0.1及即可,无需其它设置。
**/
public function __construct()
{
Gateway::$registerAddress = '127.0.0.1:1236';
}
// GatewayClient支持GatewayWorker中的所有接口(Gateway::closeCurrentClient Gateway::sendToCurrentClient除外)
//Gateway::sendToAll($data);
//Gateway::sendToClient($client_id, $data);
//Gateway::closeClient($client_id);
//Gateway::isOnline($client_id);
//Gateway::bindUid($client_id, $uid);
//Gateway::isUidOnline($uid);
//Gateway::getClientIdByUid($client_id);
//Gateway::unbindUid($client_id, $uid);
//Gateway::sendToUid($uid, $dat);
//Gateway::joinGroup($client_id, $group);
//Gateway::sendToGroup($group, $data);
//Gateway::leaveGroup($client_id, $group);
//Gateway::getClientCountByGroup($group);
//Gateway::getClientSessionsByGroup($group);
//Gateway::getAllClientCount();
//Gateway::getAllClientSessions();
//Gateway::setSession($client_id, $session);
//Gateway::updateSession($client_id, $session);
//Gateway::getSession($client_id);
//////////////////// 业务 //////////////////////
}
栏目分类全部>