转载请注明出处哈:http://carlosfu.iteye.com/blog/2240426
通过jedis来连接操作redis总体来说比较简单,按照redis单机、redis-sentinel、redis-cluster略有不同。
一、Jedis相关依赖
1. jedis依赖(选择最新的稳定版本,支持redis-cluster)
<jedis.version>2.7.2</jedis.version> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${jedis.version}</version> </dependency>
2. logback和junit依赖
<logback.version>1.0.13</logback.version> <junit.version>4.11</junit.version> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency>
二、Jedis单机测试:
1. Jedis-简单Kv
Jedis jedis = new Jedis("127.0.0.1"); jedis.set("foo", "bar"); String value = jedis.get("foo");
建议所有的jedis都放在try catch finally(jedis.close操作)中
package com.sohu.tv.test.jedis; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; /** * 第一个jedis测试 * * @author leifu * @Date 2015年8月24日 * @Time 下午1:35:26 */ public class JedisFirstTest { private Logger logger = LoggerFactory.getLogger(JedisFirstTest.class); /** * redis单机host */ private final static String JEDIS_HOST = "127.0.0.1"; /** * redis单机port */ private final static int JEDIS_PORT = 6379; /** * 超时时间(毫秒) */ private final static int JEDIS_TIME_OUT = 300; @Test public void testJedis() { Jedis jedis = null; try { jedis = new Jedis(JEDIS_HOST, JEDIS_PORT, JEDIS_TIME_OUT); String key = "sohuKey"; jedis.set(key, "sohuValue"); String value = jedis.get(key); logger.info("get key {} from redis, value is {}", key, value); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { if (jedis != null) { jedis.close(); } } } }
2. Jedis-序列化Kv:
我们使用protostuff(Protostuff是基于大名鼎鼎的Google protobuff技术的Java版本)作为序列化工具:
(1)pom依赖:
<protostuff.version>1.0.8</protostuff.version> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>${protostuff.version}</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>${protostuff.version}</version> </dependency>
(2)Club实体类:
package com.sohu.tv.bean; import java.io.Serializable; import java.util.Date; /** * 俱乐部 * * @author leifu * @Date 2015年7月28日 * @Time 下午1:43:53 */ public class Club implements Serializable { /** * 俱乐部id */ private int id; /** * 俱乐部名 */ private String clubName; /** * 俱乐部描述 */ private String clubInfo; /** * 创建日期 */ private Date createDate; /** * 排名 */ private int rank; public Club(int id, String clubName, String clubInfo, Date createDate, int rank) { super(); this.id = id; this.clubName = clubName; this.clubInfo = clubInfo; this.createDate = createDate; this.rank = rank; } public Club() { super(); } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getClubName() { return clubName; } public void setClubName(String clubName) { this.clubName = clubName; } public String getClubInfo() { return clubInfo; } public void setClubInfo(String clubInfo) { this.clubInfo = clubInfo; } public Date getCreateDate() { return createDate; } public void setCreateDate(Date createDate) { this.createDate = createDate; } public int getRank() { return rank; } public void setRank(int rank) { this.rank = rank; } @Override public String toString() { return "Club [id=" + id + ", clubName=" + clubName + ", clubInfo=" + clubInfo + ", createDate=" + createDate + ", rank=" + rank + "]"; } }
(3)序列化工具:
package com.sohu.tv.serializer; import com.dyuproject.protostuff.LinkedBuffer; import com.dyuproject.protostuff.ProtostuffIOUtil; import com.dyuproject.protostuff.Schema; import com.dyuproject.protostuff.runtime.RuntimeSchema; import java.util.concurrent.ConcurrentHashMap; /** * protostuff序列化工具 * * @author leifu * @Date 2015-8-22 * @Time 上午10:05:20 */ public class ProtostuffSerializer { private static ConcurrentHashMap<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>(); public <T> byte[] serialize(final T source) { VO<T> vo = new VO<T>(source); final LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { final Schema<VO> schema = getSchema(VO.class); return serializeInternal(vo, schema, buffer); } catch (final Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } public <T> T deserialize(final byte[] bytes) { try { Schema<VO> schema = getSchema(VO.class); VO vo = deserializeInternal(bytes, schema.newMessage(), schema); if (vo != null && vo.getValue() != null) { return (T) vo.getValue(); } } catch (final Exception e) { throw new IllegalStateException(e.getMessage(), e); } return null; } private <T> byte[] serializeInternal(final T source, final Schema<T> schema, final LinkedBuffer buffer) { return ProtostuffIOUtil.toByteArray(source, schema, buffer); } private <T> T deserializeInternal(final byte[] bytes, final T result, final Schema<T> schema) { ProtostuffIOUtil.mergeFrom(bytes, result, schema); return result; } private static <T> Schema<T> getSchema(Class<T> clazz) { @SuppressWarnings("unchecked") Schema<T> schema = (Schema<T>) cachedSchema.get(clazz); if (schema == null) { schema = RuntimeSchema.createFrom(clazz); cachedSchema.put(clazz, schema); } return schema; } }
package com.sohu.tv.serializer; import java.io.Serializable; /** * @author leifu * @Date 2015-8-22 * @Time 上午10:05:44 * @param <T> */ public class VO<T> implements Serializable { private T value; public VO(T value) { this.value = value; } public VO() { } public T getValue() { return value; } @Override public String toString() { return "VO{" + "value=" + value + '}'; } }
(4)测试代码:
@Test public void testJedisSerializable() { ProtostuffSerializer protostuffSerializer = new ProtostuffSerializer(); Jedis jedis = null; try { jedis = new Jedis(JEDIS_HOST, JEDIS_PORT, JEDIS_TIME_OUT); String key = "sohuKeySerializable"; // 序列化 Club club = new Club(1, "AC", "米兰", new Date(), 1); byte[] clubBtyes = protostuffSerializer.serialize(club); jedis.set(key.getBytes(), clubBtyes); // 反序列化 byte[] resultBtyes = jedis.get(key.getBytes()); Club resultClub = protostuffSerializer.deserialize(resultBtyes); logger.info("get key {} from redis, value is {}", key, resultClub); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { if (jedis != null) { jedis.close(); } } }
(5)测试结果:
3. 连接池(推荐使用方式):一般线上系统的连接资源都是通过资源池的形式进行管理的。
package com.sohu.tv.test.jedis; import java.util.Date; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sohu.tv.serializer.ProtostuffSerializer; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * 第一个jedisPool测试 * * @author leifu * @Date 2015年8月24日 * @Time 下午1:35:26 */ public class JedisPoolTest { private Logger logger = LoggerFactory.getLogger(JedisPoolTest.class); private static JedisPool jedisPool; /** * redis单机host */ private final static String REDIS_HOST = "127.0.0.1"; /** * redis单机port */ private final static int REDIS_PORT = 6379; /** * 超时时间(毫秒) */ private final static int JEDIS_POOL_TIME_OUT = 1000; @BeforeClass public static void testBeforeClass() { GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); poolConfig.setMaxTotal(GenericObjectPoolConfig.DEFAULT_MAX_TOTAL * 5); poolConfig.setMaxIdle(GenericObjectPoolConfig.DEFAULT_MAX_IDLE * 3); poolConfig.setMinIdle(GenericObjectPoolConfig.DEFAULT_MIN_IDLE * 2); poolConfig.setJmxEnabled(true); poolConfig.setMaxWaitMillis(3000); jedisPool = new JedisPool(poolConfig, REDIS_HOST, REDIS_PORT, JEDIS_POOL_TIME_OUT); } @AfterClass public static void testAfterClass() { if (jedisPool != null) { jedisPool.destroy(); } } @Test public void testJedisPool() { Jedis jedis = null; try { jedis = jedisPool.getResource(); String key = "sohuKeyPool"; jedis.set(key, "sohuValue"); String value = jedis.get(key); logger.info("get key {} from redis, value is {}", key, value); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { if (jedis != null) { // 如果使用JedisPool,close操作不是关闭连接,代表归还资源池 jedis.close(); } } } }
(3.1) 注意:jedis.close的实现:
(2) dataSource=null代表直连,jedis.close代表关闭连接
(3) jedis.close放到finally里面做
Jedis源码中:
@Override public void close() { if (dataSource != null) { if (client.isBroken()) { this.dataSource.returnBrokenResource(this); } else { this.dataSource.returnResource(this); } } else { client.close(); } }
(3.2) GenericObjectPoolConfig参数说明如下:
- maxActive: 链接池中最大连接数,默认为8. (并非越大越好,具体原因可以参考GenericObjectPool的实现)
- maxIdle: 链接池中最大空闲的连接数,默认为8.
- minIdle: 连接池中最少空闲的连接数,默认为0.
- maxWait: 当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数;默认为-1.表示永不超时.
- jmxEnabled: 当设置为true, 且服务开启的jmx服务时,使用jconsole, jvisualvm等工具将看到如下关于连接池的很全面的统计,这些统计结果有助于优化自己的配置。
其余配置如下:
- minEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除。默认-1
#借资源时候是否要验证,比如jedis对象验证是 ip:port是否发生改变,且执行一个ping命令
#还资源时候是否要验证,同上。
6. timeBetweenEvictionRunsMillis: “空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1.
-> 0 : 抛出异常,
-> 1 : 阻塞,直到有可用链接资源
-> 2 : 强制创建新的链接资源
四、Redis-Sentinel
JedisSentinelPool sentinelPool = new JedisSentinelPool(masterName, sentinelSet, poolConfig, timeout); //获取jedis的方法和JedisPool一样的,不在赘述
sentinelSet: sentinel实例列表
poolConfig: common-pool包中的GenericObjectPoolConfig
timeout: 超时
有一点需要注意的是:sentinelSet: sentinel实例列表,而不是具体的redis实例列表,这是因为为了实现高可用,jedis屏蔽了redis实例信息,所有实例信息(主从信息)都是通过sentinel获取。
五、Redis-Cluster
Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>(); jedisClusterNodes.add(new HostAndPort("127.0.0.1", 7379)); .... PipelineCluster pipelineCluster = new PipelineCluster(jedisPoolConfig, nodeList, timeout); //获取jedis的方法和JedisPool一样的,不在赘述
jedisPoolConfig: common-pool包中的GenericObjectPoolConfig
timeout: 超时时间
有一点需要注意的是:nodeList尽可能写入所有的redis实例信息(虽然jedis可以从任一redis实例获取到集群的信息。)
有兴趣的可以一下jedis源码中JedisClusterConnectionHandler这个类的initializeSlotsCache方法:
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) { for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); try { cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } for (HostAndPort node : startNodes) { cache.setNodeIfNotExist(node); } }附一个redis-cluster工厂类:
package com.sohu.tv.common; import java.util.HashSet; import java.util.Set; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.PipelineCluster; /** * redis-cluster java客户端工具类(单例) * * @author leifu * @Date 2015-8-30 * @Time 上午10:08:12 */ public class RedisClusterComponent { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private static RedisClusterComponent redisClusterComponent = new RedisClusterComponent(); private static final String HOST = "127.0.0.1"; /** * redisCluster客户端 */ private PipelineCluster redisCluster; /** * 客户端超时时间 */ private final static int TIME_OUT = 5; private RedisClusterComponent() { // common-pool配置 GenericObjectPoolConfig poolConfig = getCommonPoolConfig(); try { // redis节点信息 Set<HostAndPort> nodeList = new HashSet<HostAndPort>(); nodeList.add(new HostAndPort(HOST, 8000)); nodeList.add(new HostAndPort(HOST, 8001)); nodeList.add(new HostAndPort(HOST, 8002)); nodeList.add(new HostAndPort(HOST, 8003)); nodeList.add(new HostAndPort(HOST, 8004)); nodeList.add(new HostAndPort(HOST, 8005)); redisCluster = new PipelineCluster(poolConfig, nodeList, TIME_OUT); } catch (Exception e) { logger.error(e.getMessage(), e); } } /** * 生成默认的common-pool配置 * * @return */ public static GenericObjectPoolConfig getCommonPoolConfig() { GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); poolConfig.setMaxTotal(GenericObjectPoolConfig.DEFAULT_MAX_TOTAL * 10); poolConfig.setMaxIdle(GenericObjectPoolConfig.DEFAULT_MAX_IDLE * 5); poolConfig.setMinIdle(GenericObjectPoolConfig.DEFAULT_MAX_IDLE * 2); // JedisPool.borrowObject最大等待时间 poolConfig.setMaxWaitMillis(1000L); // 开启jmx poolConfig.setJmxEnabled(true); return poolConfig; } public static RedisClusterComponent getInstance() { return redisClusterComponent; } public void destroy() { if (redisCluster != null) { redisCluster.close(); } } public PipelineCluster getRedisCluster() { return redisCluster; } public static void main(String[] args) { System.out.println(RedisClusterComponent.getInstance().getRedisCluster().set("testa", "b")); } }
六、总结和经验:
总体来说,通过jedis来连接操作redis是比较简单,只是按照redis单机、redis-sentinel、redis-cluster略有不同,但是有几点还是共通和需要注意的:
(1) 无论哪种类型的redis最终获取数据都是通过jedis从一个redis实例获取的。
(2) try catch finally操作是有必要的。(做异常梳理,关闭资源的)
(3) jedis.close()的实现方式针对不同类型的redis有很大不同(上面已经提过了)
(4) jedis依赖了common-pool,有关common-pool的参数需要根据不同的使用场景,各不相同,需要具体问题具体分析。
(5) jedis没有提供序列化功能(xmemcached, ehcache都有), 开发者可以根据自己的需求选取合适的序列化方式(附赠一篇序列化不错的文章:http://www.infoq.com/cn/articles/serialization-and-deserialization)