您的当前位置:首页>全部文章>文章详情

【windows版】tp5中使用GatewayWorker和Workerman

发表于:2023-01-19 14:52:39浏览:210次TAG: #ThinkPHP #Workerman #windows

安装

1、安装WorkerMan

composer require workerman/workerman-for-win
composer require topthink/think-worker=1.*

2、安装GatewayWorker

composer require workerman/gateway-worker-for-win

3、安装GatewayClient

composer require workerman/gatewayclient

服务端

  • start_for_win.bat(目录:与application同级别,作用:双击启动服务)
php start_register.php start_gateway.php start_businessworker.php
pause
  • start_register.php(目录:与application同级别,作用:启动文件)
<?php
// [ 应用入口文件 ]
// 定义应用目录
define('APP_PATH', __DIR__ . '/application/');
//绑定
define('BIND_MODULE','push/Sregister');
// 加载框架引导文件
require __DIR__ . '/thinkphp/start.php';
  • start_gateway.php(目录:与application同级别,作用:启动文件)
<?php
// [ 应用入口文件 ]
// 定义应用目录
define('APP_PATH', __DIR__ . '/application/');
//绑定
define('BIND_MODULE','push/Sgateway');
// 加载框架引导文件
require __DIR__ . '/thinkphp/start.php';
  • start_businessworker.php(目录:与application同级别,作用:启动文件)
<?php
// [ 应用入口文件 ]
// 定义应用目录
define('APP_PATH', __DIR__ . '/application/');
//绑定
define('BIND_MODULE','push/Sbusinessworker');
// 加载框架引导文件
require __DIR__ . '/thinkphp/start.php';
  • 控制器:app\push\controller\Sregister.php
<?php 
namespace app\push\controller;
use think\Controller;
use \Workerman\Worker;
use GatewayWorker\Register;
class Sregister extends Controller
{
    function __construct()
    {
        require_once __DIR__ . '/../../../vendor/autoload.php';
        // register 服务必须是text协议
        $register = new Register('text://0.0.0.0:1236');

        // 如果不是在根目录启动,则运行runAll方法
        if(!defined('GLOBAL_START'))
        {
            Worker::runAll();
        }
    }
}
  • 控制器:app\push\controller\Sgateway.php
<?php 
namespace app\push\controller;
use think\Controller;
use \Workerman\Worker;
use \GatewayWorker\Gateway;
use \Workerman\Autoloader;
class Sgateway extends Controller
{
    function __construct()
    {
        require_once __DIR__ . '/../../../vendor/autoload.php';
        // gateway 进程
        $gateway = new Gateway("Websocket://0.0.0.0:7272");
        // 设置名称,方便status时查看
        $gateway->name = 'Gateway';
        // 设置进程数,gateway进程数建议与cpu核数相同
        $gateway->count = 4;
        // 分布式部署时请设置成内网ip(非127.0.0.1)
        $gateway->lanIp = '127.0.0.1';
        // 内部通讯起始端口。假如$gateway->count=4,起始端口为2300
        // 则一般会使用2300 2301 2302 2303 4个端口作为内部通讯端口 
        $gateway->startPort = 2300;
        // 心跳间隔
        $gateway->pingInterval = 30;
        $gateway->pingNotResponseLimit = 1;
        // 心跳数据
        $gateway->pingData = '';
        // 服务注册地址
        $gateway->registerAddress = '127.0.0.1:1236';
        /* 
        // 当客户端连接上来时,设置连接的onWebSocketConnect,即在websocket握手时的回调
        $gateway->onConnect = function($connection)
        {
            $connection->onWebSocketConnect = function($connection , $http_header)
            {
                // 可以在这里判断连接来源是否合法,不合法就关掉连接
                // $_SERVER['HTTP_ORIGIN']标识来自哪个站点的页面发起的websocket链接
                if($_SERVER['HTTP_ORIGIN'] != 'http://chat.workerman.net')
                {
                    $connection->close();
                }
                // onWebSocketConnect 里面$_GET $_SERVER是可用的
                // var_dump($_GET, $_SERVER);
            };
        }; 
        */
        //windows 热更新
        new FileMonitor($gateway);
        // 如果不是在根目录启动,则运行runAll方法
        if(!defined('GLOBAL_START'))
        {
            Worker::runAll();
        }
    }
}
  • 控制器:app\push\controller\Sbusinessworker.php
<?php
namespace app\push\controller;
use think\Controller;
use \Workerman\Worker;
use \GatewayWorker\BusinessWorker;
use \Workerman\Autoloader;
class Sbusinessworker extends Controller
{
    function __construct()
    {
        require_once __DIR__ . '/../../../vendor/autoload.php';
        // bussinessWorker 进程
        $worker = new BusinessWorker();
        // worker名称
        $worker->name = 'BusinessWorker';
        // bussinessWorker进程数量
        $worker->count = 4;
        // 服务注册地址
        $worker->registerAddress = '127.0.0.1:1236';
        //设置处理业务的类,此处制定Events的命名空间
        $worker->eventHandler = 'app\push\controller\Events';
        //windows 热更新
        new FileMonitor($worker);
        // 如果不是在根目录启动,则运行runAll方法
        if(!defined('GLOBAL_START'))
        {
            Worker::runAll();
        }
    }
}
  • 控制器:app\push\controller\FileMonitor.php(作用:windows的热更新)
<?php 
namespace app\push\controller;
use think\Controller;
use \Workerman\Worker;
use \Workerman\Lib\Timer;
/**
 * workerman-filemonitor for windows
 *
 * 监控文件更新并自动reload workerman
 *
 * 使用方法:
 * require_once __DIR__ . '/FileMonitor.php';
 * new FileMonitor($worker, $dir, $timer);
 */
class FileMonitor extends Controller
{
    //待监听的项目目录
    private $_monitor_dir = '';
    //热更新间隔时间,默认3s
    private $_interval = 0;
    //最后一次同步时间
    private $_last_time = 0;
    function __construct ($worker, $dir = '../', $timer = 3)
    {
        // watch Applications catalogue
        $this->_monitor_dir = __DIR__ .'/'. $dir;
        $this->_interval = $timer;
        $this->_last_time = time();
        // Emitted when data received
        $worker->reloadable = false;
        // Emitted when data received
        $worker->onWorkerStart = function()
        {
            // watch files only in daemon mode
            if (Worker::$daemonize === false)
            {
                // chek mtime of files per second 
                Timer::add($this->_interval, [$this, 'monitor']);
            }
        };
    }
    //监听器,kill进程
    public function monitor ()
    {
        // recursive traversal directory
        $iterator = new \RecursiveDirectoryIterator($this->_monitor_dir);
        $iterator = new \RecursiveIteratorIterator($iterator);
        foreach ($iterator as $file)
        {
            // only check php files
            if (pathinfo($file, PATHINFO_EXTENSION) != 'php') continue;
            // check mtime
            if ($this->_last_time < $file->getMTime())
            {
                exec('taskkill -f -pid '. getmypid());
                $this->_last_time = $file->getMTime();
                return true;
            }
        }
    }
}
  • 控制器:app\push\controller\Events.php(作用:业务处理)
<?php
namespace app\push\controller;
use think\Controller;
use \GatewayWorker\Lib\Gateway;
class Events extends Controller
{
    /**
     * 有消息时
     * @param integer $client_id 连接的客户端
     * @param mixed $message
     * @return void
     */
    public static function onMessage($client_id, $message)
    {

        $message = json_decode($message,true);
        $action = $message['action'];
        $data = $message['data'];
        switch ($action) {
            case 'ping'://发:{"action":"ping","data":{"online":1}}
                return;
                break;
            case 'line'://发:{"action":"line","data":{"type":"1i"}}
                $json = json_encode([
                    'action'=>'line',
                    'data'=>[],
                ]);
                break;
            default:
                return;
                break;
        }
        Gateway::sendToClient($client_id, $json);
    }

    /**
     * 当用户连接时触发的方法
     * @param integer $client_id 连接的客户端
     * @return void
     */
    public static function onConnect($client_id)
    {
        $json = json_encode([
                'action'=>'ping',
                'data'=>[
                    'online'=>1
                ]
            ]);
        Gateway::sendToClient($client_id, $json);
    }

    /**
     * 当用户断开连接时触发的方法
     * @param integer $client_id 断开连接的客户端
     * @return void
     */
    public static function onClose($client_id)
    {

    }

    /**
     * 当进程启动时
     * @param integer $businessWorker 进程实例
     */
    public static function onWorkerStart($businessWorker)
    {

    }

    /**
     * 当进程关闭时
     * @param integer $businessWorker 进程实例
     */
    public static function onWorkerStop($businessWorker)
    {

    }
}
  • 控制器:app\push\controller\Client.php(作用:主动推送消息给客户端)
<?php
namespace app\push\controller;
use GatewayClient\Gateway;
class Client
{
    /**
     * === 指定registerAddress表明与哪个GatewayWorker(集群)通讯。===
     * GatewayWorker里用Register服务来区分集群,即一个GatewayWorker(集群)只有一个Register服务,
     * GatewayClient要与之通讯必须知道这个Register服务地址才能通讯,这个地址格式为 ip:端口 ,
     * 其中ip为Register服务运行的ip(如果GatewayWorker是单机部署则ip就是运行GatewayWorker的服务器ip),
     * 端口是对应ip的服务器上start_register.php文件中监听的端口,也就是GatewayWorker启动时看到的Register的端口。
     * GatewayClient要想推送数据给客户端,必须知道客户端位于哪个GatewayWorker(集群),
     * 然后去连这个GatewayWorker(集群)Register服务的 ip:端口,才能与对应GatewayWorker(集群)通讯。
     * 这个 ip:端口 在GatewayClient一侧使用 Gateway::$registerAddress 来指定。
     * 
     * === 如果GatewayClient和GatewayWorker不在同一台服务器需要以下步骤 ===
     * 1、需要设置start_gateway.php中的lanIp为实际的本机内网ip(如不在一个局域网也可以设置成外网ip),设置完后要重启GatewayWorker
     * 2、GatewayClient这里的Gateway::$registerAddress的ip填写填写上面步骤1lanIp所指定的ip,端口
     * 3、需要开启GatewayWorker所在服务器的防火墙,让以下端口可以被GatewayClient所在服务器访问,
     *    端口包括Rgister服务的端口以及start_gateway.php中lanIp与startPort指定的几个端口
     *
     * === 如果GatewayClient和GatewayWorker在同一台服务器 ===
     * GatewayClient和Register服务都在一台服务器上,ip填写127.0.0.1及即可,无需其它设置。
     **/
    public function __construct()
    {
        Gateway::$registerAddress = '127.0.0.1:1236';
    }
    // GatewayClient支持GatewayWorker中的所有接口(Gateway::closeCurrentClient Gateway::sendToCurrentClient除外)
    //Gateway::sendToAll($data);
    //Gateway::sendToClient($client_id, $data);
    //Gateway::closeClient($client_id);
    //Gateway::isOnline($client_id);
    //Gateway::bindUid($client_id, $uid);
    //Gateway::isUidOnline($uid);
    //Gateway::getClientIdByUid($client_id);
    //Gateway::unbindUid($client_id, $uid);
    //Gateway::sendToUid($uid, $dat);
    //Gateway::joinGroup($client_id, $group);
    //Gateway::sendToGroup($group, $data);
    //Gateway::leaveGroup($client_id, $group);
    //Gateway::getClientCountByGroup($group);
    //Gateway::getClientSessionsByGroup($group);
    //Gateway::getAllClientCount();
    //Gateway::getAllClientSessions();
    //Gateway::setSession($client_id, $session);
    //Gateway::updateSession($client_id, $session);
    //Gateway::getSession($client_id);
    //////////////////// 业务 //////////////////////
}