Redis

首页 -  Redis  -  redis读写分离数据延迟负载均衡demo

redis读写分离数据延迟负载均衡demo

redis读写分离数据延迟负载均衡demo分为3个文件config配置文件input日志文件redis操作文件index入口文件

1.index入口文件


<?php
require_once "./config.php";
require_once "./redis.php";

// 在swoole事件中 echo 和 var_dump是输出在 控制台 不是浏览器
$http = new Swoole\Http\Server("0.0.0.0", 9501);

// 设置swoole进程个数
$http->set([
    'worker_num' => 1
]);
// 在创建的时候执行  ; 进程创建的时候触发时候
// 理解为一个构造函数,初始化
$http->on('workerStart', function ($server, $worker_id) use($config){
    global $redisMS;
    $redisMS = new RedisMS($config);
});

// 通过浏览器访问 http://本机ip :9501会执行的代码
$http->on('request', function ($request, $response) {
    global $redisMS;
    /*
    $request->get = [
        'type' => '操作类型 1写,2读' ,
        'method' => '操作的方法',
        'params' => []
    ]
     */
    $ret = 'p';
    if ($request->get['type'] == 1) {
        $ret = $redisMS->runCall($request->get['method'], explode(',', $request->get['params']));
    } else {
        // 读
        $ret = $redisMS->runCall('get', [$request->get['params']]);
    }

    $response->end($ret);
});

$http->start();

2.redis操作文件

<?php
require 'Input.php';
/**
 * redis 基于c写的
 * predis 基于php扩展
 */
class RedisMS
{
    protected $config;

    /**
     * 记录redis连接
     * [
     *     "master" => \\Redis,
     *     "slaves "=> [
     *       'slaveIP1:port' => \Redis
     *       'slaveIP2:port' => \Redis
     *       'slaveIP3:port' => \Redis
     *    ]
     * ]
     */
    protected $connections;
    /**
     * [
     *    "0" => 'slaveIP1:port' ,
     *    "1" => 'slaveIP2:port',
     *    "2" => 'slaveIP3:port',
     * ]
     */
    protected $connSlaveIndexs;

    protected $call = [
        'write' => [
            'set',
            'sadd'
            // ..
        ],
        'read' => [
            'get',
            'smembers'
            //..
        ],
    ];

    public function __construct($config)
    {
        if ($config["is_ms"]) {
            $this->connections['master'] = $this->getRedis($config['master']['host'], $config['master']['port']);

            $this->createSlave($config['slaves']);

            Input::info($this->connections, "这是获取的连接");
            Input::info($this->connSlaveIndexs, "这是连接的下标");

            $this->maintain();

        }
        $this->config = $config;
    }

    // --------------主从维护--------------------

    /**
     * 去维护从节点列表
     *
     * 重整 1台服务器,多个从节点
     */
    protected function maintain()
    {
        /*
        1. 获取主节点连接信息
        2. 获取从节点的偏移量
        3. 获取连接个数
            3.1 偏移量的计算
            3.2 维护列表
         */
         $masterRedis = $this->getMaster();
         swoole_timer_tick(2000, function ($timer_id) use($masterRedis){
              // 得到主节点的连接信息
              $replInfo = $masterRedis->info('replication');
              // Input::info($replInfo, "复制信息");
              // 得到主节点偏移量
              $masterOffset = $replInfo['master_repl_offset'];
              // 记录新增的从节点
              $slaves = [];
              for ($i=0; $i < $replInfo['connected_slaves']; $i++) {
                  // 获取slave的信息
                  $slaveInfo = $this->stringToArr($replInfo['slave'.$i]);
                  $slaveFlag = $this->redisFlag($slaveInfo['ip'], $slaveInfo['port']);
                  // 延迟检测
                  if (($masterOffset - $slaveInfo['offset']) < 100) {
                      // 是正常范围
                      // 如果之前因为网络延迟删除了节点,现在恢复了网络 -》新增
                      // 这是动态新增
                      if (!in_array($slaveFlag, $this->connSlaveIndexs)) {
                          $slaves[$slaveFlag] = [
                              'host' => $slaveInfo['ip'],
                              'port' => $slaveInfo['port']
                          ];
                          Input::info($slaveFlag, "新增从节点");
                      }
                  } else {
                      // 延迟 -> 删除节点
                      Input::info($slaveFlag, "删除节点");
                      unset($this->connections['slaves'][$slaveFlag]);
                  }
              }
              $this->createSlave($slaves);
         });
    }

    protected function stringToArr($str, $flag1 = ',', $flag2 = '=')
    {
        // "ip=192.160.1.130,port=6379,state=online,offset=72574,lag=0"
        $arr = explode($flag1, $str);
        $ret = [];
        // $key ip
        // $value 192.160.1.130
        foreach ($arr as $key => $value) {
            $arr2 = explode($flag2, $value);
            $ret[$arr2[0]] = $arr2[1];
        }
        return $ret;
    }

    // --------------创建主从连接--------------------
    /**
     * $slaves = [
     *   'slave1' => [
     *     'host' => '192.160.1.130',
     *     'port' => 6379
     *    ],
     *   'slave2' => [
     *     'host' => '192.160.1.140',
     *     'port' => 6379
     *   ]
     * ]
     * 
     */
    private function createSlave($slaves)
    {
        // var_dump($slaves);
        // 这个是用于做负载的时候选择从节点对象
        foreach ($slaves as $key => $slave) {
            $this->connections['slaves'][$this->redisFlag($slave['host'], $slave['port'])] = $this->getRedis($slave['host'], $slave['port']);
        }
        // 记录从节点的下标
        $this->connSlaveIndexs = array_keys($this->connections['slaves']);
    }

    private function redisFlag($host, $port)
    {
        return $host.":".$port;
    }

    public function getRedis($host, $port)
    {
        $redis = new \Redis();
        $redis->pconnect($host, $port);
        return $redis;
    }

    public function getConnSlaveIndexs()
    {
        return $this->connSlaveIndexs;
    }

    // --------------获取主从连接方法--------------------

    public function getMaster()
    {
        return $this->connections['master'];
    }
    public function getSlaves()
    {
        return $this->connections['slaves'];
    }

    public function oneSlave()
    {
        $indexs = $this->connSlaveIndexs;
        $i = mt_rand(0, count($indexs) - 1);
        return $this->connections['slaves'][$indexs[$i]];
        // $slaves = $this->getSlaves();
        // // 对于所有从节点  负载均衡算法
        // $i = mt_rand(0, count($slaves) - 1);
        // return $slaves[$i];
    }

    // --------------执行命令方法--------------------

    public function runCall($command, $params = [])
    {
        try {
            if ($this->config['is_ms']) {
                // 获取操作的对象(是主还是从)

                $redis = $this->getRedisCall($command);
                // var_dump($redis);
                return $redis->{$command}(...$params);
            }
        } catch (\Exception $e) {}
    }
    /**
     * 判断操作类型
     * @param  [type]  $command [description]
     * @return boolean          [description]
     */
    protected function getRedisCall($command)
    {
        if (in_array($command, $this->call['write'])) {
            return $this->getMaster();
        } else if (in_array($command, $this->call['read'])){
            return $this->oneSlave();
        } else {
            throw new \Exception("不支持");
        }
    }
}
/*
1. 主节点连接,从节点
2. 对于主节点连接和从节点
3. 写命令 -》
    3.1 -》 判断类型
    3.2 -》 主
    3.3 -》 从(从是有多个节点)
        3.3.4 -》 负载均衡(随机)
 */

3.input日志文件

<?php
class Input
{
    public static function info($message, $description = null)
    {
        echo "======>>> ".$description." start\n";
        if (\is_array($message)) {
            echo \var_export($message, true);
        } else if (\is_string($message)) {
            echo $message."\n";
        } else {
            var_dump($message);
        }
        echo  "======>>> ".$description." end\n";
    }
}

4.conf配置文件

<?php

$config = [
    // 做个判断是否开启主从
    'is_ms' => true,
    'master' => [
        'host' => '192.160.1.150',
        'port' => 6379
    ],
    'slaves' => [
        'slave1' => [
            'host' => '192.160.1.130',
            'port' => 6379
        ],
        'slave2' => [
            'host' => '192.160.1.140',
            'port' => 6379
        ]
    ],
];


(0)
分享:

本文由:xiaoshu168.com 作者:xiaoshu发表,转载请注明来源!

标签:

相关阅读