`
m635674608
  • 浏览: 4941912 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

JedisCluster 源码分析

 
阅读更多
  BinaryJedisCluster 
  public String set(final byte[] key, final byte[] value) {
    return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
      @Override
      public String execute(Jedis connection) {
        return connection.set(key, value);
      } 
    }.runBinary(key);
  }

   

JedisClusterCommand
 public T runBinary(byte[] key) {
    if (key == null) {
      throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
    }

    return runWithRetries(key, this.redirections, false, false);
  }

 private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) {
    if (redirections <= 0) {
      throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
    }

    Jedis connection = null;
    try {

      if (asking) {
        // TODO: Pipeline asking with the original command to make it
        // faster....
        connection = askConnection.get();
        connection.asking();

        // if asking success, reset asking flag
        asking = false;
      } else {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
//获取 连接
          connection = connectionHandler.getConnectionFromSlot(
//获取槽 
JedisClusterCRC16.getSlot(key));
        }
      }

      return execute(connection);
    } catch (JedisConnectionException jce) {
      if (tryRandomNode) {
        // maybe all connection is down
        throw jce;
      }

      // release current connection before recursion
      releaseConnection(connection);
      connection = null;

      // retry with random connection
      return runWithRetries(key, redirections - 1, true, asking);
    } catch (JedisRedirectionException jre) {
      // if MOVED redirection occurred,
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache
        // recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }

      // release current connection before recursion or renewing
      releaseConnection(connection);
      connection = null;

      if (jre instanceof JedisAskDataException) {
        asking = true;
      askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
      } else if (jre instanceof JedisMovedDataException) {
      } else {
        throw new JedisClusterException(jre);
      }

      return runWithRetries(key, redirections - 1, false, asking);
    } finally {
      releaseConnection(connection);
    }
  }

 @Override
  public Jedis getConnectionFromSlot(int slot) {
    JedisPool connectionPool = cache.getSlotPool(slot);
    if (connectionPool != null) {
      // It can't guaranteed to get valid connection because of node
      // assignment
      return connectionPool.getResource();
    } else {
      return getConnection();
    }
  }

  

public abstract class JedisClusterConnectionHandler {
  protected final JedisClusterInfoCache cache;

  public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
                                       final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) {
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout);
    //通过slot 初始化集群信息
    initializeSlotsCache(nodes, poolConfig);
  }

  abstract Jedis getConnection();

  abstract Jedis getConnectionFromSlot(int slot);

  public Jedis getConnectionFromNode(HostAndPort node) {
    cache.setNodeIfNotExist(node);
    return cache.getNode(JedisClusterInfoCache.getNodeKey(node)).getResource();
  }

public class JedisClusterInfoCache {
  private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
  private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

  private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  private final Lock r = rwl.readLock();
  private final Lock w = rwl.writeLock();
  private final GenericObjectPoolConfig poolConfig;

  private int connectionTimeout;
  private int soTimeout;

  private static final int MASTER_NODE_INDEX = 2;

  public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {
    this(poolConfig, timeout, timeout);
  }

  public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,
      final int connectionTimeout, final int soTimeout) {
    this.poolConfig = poolConfig;
    this.connectionTimeout = connectionTimeout;
    this.soTimeout = soTimeout;
  }

  public void discoverClusterNodesAndSlots(Jedis jedis) {
    w.lock();

    try {
      this.nodes.clear();
      this.slots.clear();

      List<Object> slots = jedis.clusterSlots();

      for (Object slotInfoObj : slots) {
        List<Object> slotInfo = (List<Object>) slotInfoObj;

        if (slotInfo.size() <= MASTER_NODE_INDEX) {
          continue;
        }

        List<Integer> slotNums = getAssignedSlotArray(slotInfo);

        // hostInfos
        int size = slotInfo.size();
        for (int i = MASTER_NODE_INDEX; i < size; i++) {
          List<Object> hostInfos = (List<Object>) slotInfo.get(i);
          if (hostInfos.size() <= 0) {
            continue;
          }

          HostAndPort targetNode = generateHostAndPort(hostInfos);
          setNodeIfNotExist(targetNode);
          if (i == MASTER_NODE_INDEX) {
            assignSlotsToNode(slotNums, targetNode);
          }
        }
      }
    } finally {
      w.unlock();
    }
  }
  //初始化集群信息 
  public void discoverClusterSlots(Jedis jedis) {
    w.lock();

    try {
      this.slots.clear();
      //通过 slots 命令获取集群信息
      List<Object> slots = jedis.clusterSlots();

      for (Object slotInfoObj : slots) {
        List<Object> slotInfo = (List<Object>) slotInfoObj;

        if (slotInfo.size() <= 2) {
          continue;
        }

        List<Integer> slotNums = getAssignedSlotArray(slotInfo);

        // hostInfos
        List<Object> hostInfos = (List<Object>) slotInfo.get(2);
        if (hostInfos.size() <= 0) {
          continue;
        }

        // at this time, we just use master, discard slave information
        HostAndPort targetNode = generateHostAndPort(hostInfos);

        setNodeIfNotExist(targetNode);
        assignSlotsToNode(slotNums, targetNode);
      }
    } finally {
      w.unlock();
    }
  }

  private HostAndPort generateHostAndPort(List<Object> hostInfos) {
    return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
        ((Long) hostInfos.get(1)).intValue());
  }

  public void setNodeIfNotExist(HostAndPort node) {
    w.lock();
    try {
      String nodeKey = getNodeKey(node);
      if (nodes.containsKey(nodeKey)) return;

      JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
              connectionTimeout, soTimeout, null, 0, null);
      nodes.put(nodeKey, nodePool);
    } finally {
      w.unlock();
    }
  }

  public void assignSlotToNode(int slot, HostAndPort targetNode) {
    w.lock();
    try {
      JedisPool targetPool = nodes.get(getNodeKey(targetNode));

      if (targetPool == null) {
        setNodeIfNotExist(targetNode);
        targetPool = nodes.get(getNodeKey(targetNode));
      }
      slots.put(slot, targetPool);
    } finally {
      w.unlock();
    }
  }

  public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
    w.lock();
    try {
      JedisPool targetPool = nodes.get(getNodeKey(targetNode));

      if (targetPool == null) {
        setNodeIfNotExist(targetNode);
        targetPool = nodes.get(getNodeKey(targetNode));
      }

      for (Integer slot : targetSlots) {
        slots.put(slot, targetPool);
      }
    } finally {
      w.unlock();
    }
  }

  public JedisPool getNode(String nodeKey) {
    r.lock();
    try {
      return nodes.get(nodeKey);
    } finally {
      r.unlock();
    }
  }

  public JedisPool getSlotPool(int slot) {
    r.lock();
    try {
      return slots.get(slot);
    } finally {
      r.unlock();
    }
  }

  public Map<String, JedisPool> getNodes() {
    r.lock();
    try {
      return new HashMap<String, JedisPool>(nodes);
    } finally {
      r.unlock();
    }
  }

  public static String getNodeKey(HostAndPort hnp) {
    return hnp.getHost() + ":" + hnp.getPort();
  }

  public static String getNodeKey(Client client) {
    return client.getHost() + ":" + client.getPort();
  }

  public static String getNodeKey(Jedis jedis) {
    return getNodeKey(jedis.getClient());
  }

  private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
    List<Integer> slotNums = new ArrayList<Integer>();
    for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))
        .intValue(); slot++) {
      slotNums.add(slot);
    }
    return slotNums;
  }

}

  
  public Map<String, JedisPool> getNodes() {
    return cache.getNodes();
  }

  private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) {
    for (HostAndPort hostAndPort : startNodes) {
      Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
      try {
        cache.discoverClusterNodesAndSlots(jedis);
        break;
      } catch (JedisConnectionException e) {
        // try next nodes
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }

    for (HostAndPort node : startNodes) {
      cache.setNodeIfNotExist(node);
    }
  }

  public void renewSlotCache() {
    for (JedisPool jp : getShuffledNodesPool()) {
      Jedis jedis = null;
      try {
        jedis = jp.getResource();
        cache.discoverClusterSlots(jedis);
        break;
      } catch (JedisConnectionException e) {
        // try next nodes
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
  }

  public void renewSlotCache(Jedis jedis) {
    try {
      cache.discoverClusterSlots(jedis);
    } catch (JedisConnectionException e) {
      renewSlotCache();
    }
  }

  protected List<JedisPool> getShuffledNodesPool() {
    List<JedisPool> pools = new ArrayList<JedisPool>();
    pools.addAll(cache.getNodes().values());
    Collections.shuffle(pools);
    return pools;
  }
}

   总结:

  1.JedisCluster 会初始化一个 连接获取集群信息通过 solts 命令。(JedisClusterInfoCache 构造方法初始化)

  2.get ,set 的时候。会通过key JedisClusterCRC16.getSlot(key) 定位到solt

  3. 然后根据solt获取 jedis

public Jedis getConnectionFromSlot(int slot) {
    JedisPool connectionPool = cache.getSlotPool(slot);

  4.执行操作

 

  读操作:主库,从库都会读

  写操作:主库写

分享到:
评论

相关推荐

    jedisCluster 配置文件和调用Api

    对jedischangyongApi的一些简单封装和分类,全部标有中文注释,可直接放入项目中使用,jedis集群配置可参考 https://blog.csdn.net/qq_31256487/article/details/83144088;

    jediscluster

    JedisClusterUtil.java

    jedisCluster链接池

    适用于redis集群,3.0版本以上支持集群

    jedis源码 (学习jedis)

    jedis源码 (学习jedis必备,附带测试用例)

    jedis 源码

    jedis 2.5.2 源码

    Jedis2.1.0源码与Jar包

    Jedis2.1.0源码与Jar包,实现在Java中操作Redis服务器!

    Jedis源码分析及配置说明.pdf

    分析了redis的: 1)java客户端实现源码 2)分析了连接channel,包括命令时客户端和服务端的socket连接;...3)分析了JedisPool连接池配置 4) 分析了命令get set hmset 等逻辑 5)分析了subscribe实现源码

    jedis 2.1源码

    就上一个资源 我发了两个jar包 表示歉意 这次补上源码文件 请童鞋们谅解

    JedisCluster

    JedisClusterRedisCluster集群搭建

    jedis单机版,集群版工具类

    jedis单机版,集群版工具类

    jedis-2.9.0 源码项目工程

    jedis-2.9.0 源码项目工程 导入 eclipse 可直接运行 * 基本测试: src\redis\clients\jedis\tests\JedisTest.java * 连接池测试: src\redis\clients\jedis\tests\JedisPoolTest.java * 常用命令测试: src\...

    Jedis源码-供查询redis API源码

    jedis源码,供开发者使用Java学习或者查询redis API使用,可以帮助开发者快速查找jedis源码实现。

    redis-cluster结合springboot的使用自定义序列化

    redis-cluster结合springboot的使用自定义缓存数据的序列化方式方便通过命令行查看里面的内容,里面包含一整套的代码内容,只需要将缓存地址换成自己的集群地址即可,亲测可用的代码内容.

    jedis操作源码

    jedis操作源码,本人亲测,都可以使用,学习的朋友可以下载看看。

    jedis2.7.0源码

    jedis源码包,jar包,希望下载的朋友可以用到。。。。

    SpringMVC整合Redis集群

    项目由maven构建,使用springMVC整合了Redis的集群,发布到tomcat中,访问http://localhost:8080/SpringRedisCluster/redis/hello.do测试即可,前提是配好了redis的集群。

    Jedis2.1 源码

    Jedis 2.1 的源码 找了半天终于找到了

    jedis-2.8-src

    jedis2.8.jar源码

    jedis-3.0.0源码

    Redis是一个开源(BSD许可),内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理。这是jedis-3.0.0源码,支持集群和管道,希望大家喜欢。

    jedis2.9.0的jar包及源码

    jedis2.9.0的jar包及源码,解压文件后会有jar包及文档

Global site tag (gtag.js) - Google Analytics