内容简介:Storm-redis 使用 Jedis 作为 Redis 客户端。添加Maven依赖:2. 常用Bolt
Storm-redis 使用 Jedis 作为 Redis 客户端。
1. 如何使用
添加Maven依赖:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>${storm.version}</version> <type>jar</type> </dependency>
2. 常用Bolt
Storm-redis 提供了基本的 Bolt 实现:RedisLookupBolt,RedisStoreBolt 以及 RedisFilterBolt。
根据名字我们就可以知道其功能,RedisLookupBolt 从 Redis 中检索指定键的值,RedisStoreBolt 将键/值存储到 Redis 上。RedisFilterBolt 过滤键或字段不在 Redis 上的元组。
一个元组匹配一个键/值对,你可以在 TupleMapper 中定义匹配模式。你还可以从 RedisDataTypeDescription 中选择你需要的数据类型。通过 RedisDataTypeDescription.RedisDataType 来查看支持哪些数据类型。一些数据类型,例如散列,有序集,还需要指定额外的键来将元组转换为元素:
public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) { this.dataType = dataType; this.additionalKey = additionalKey; if (dataType == RedisDataType.HASH || dataType == RedisDataType.SORTED_SET || dataType == RedisDataType.GEO) { if (additionalKey == null) { throw new IllegalArgumentException("Hash, Sorted Set and GEO should have additional key"); } } }
这些接口与 RedisLookupMapper,RedisStoreMapper 以及 RedisFilterMapper 组合使用,分别适用于 RedisLookupBolt,RedisStoreBolt 以及 RedisFilterBolt。当你实现 RedisFilterMapper 时,请确保在 declareOutputFields() 中声明与输入流相同的字段,因为 FilterBolt 只是转发存在 Redis 上输入元组。
2.1 RedisLookupBolt
实现RedisLookupMapper:
class WordCountRedisLookupMapper implements RedisLookupMapper { private RedisDataTypeDescription description; private final String hashKey = "wordCount"; public WordCountRedisLookupMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.HASH, hashKey); } @Override public List<Values> toTuple(ITuple input, Object value) { String member = getKeyFromTuple(input); List<Values> values = Lists.newArrayList(); values.add(new Values(member, value)); return values; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("wordName", "count")); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { return tuple.getStringByField("word"); } @Override public String getValueFromTuple(ITuple tuple) { return null; } }
根据如下方式使用:
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(host).setPort(port).build(); RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper(); RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
2.2 RedisStoreBolt
实现RedisStoreMapper:
class WordCountStoreMapper implements RedisStoreMapper { private RedisDataTypeDescription description; private final String hashKey = "wordCount"; public WordCountStoreMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.HASH, hashKey); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { return tuple.getStringByField("word"); } @Override public String getValueFromTuple(ITuple tuple) { return tuple.getStringByField("count"); } }
根据如下方式使用:
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(host).setPort(port).build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
2.3 RedisFilterBolt
实现RedisFilterMapper:
class BlacklistWordFilterMapper implements RedisFilterMapper { private RedisDataTypeDescription description; private final String setKey = "blacklist"; public BlacklistWordFilterMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.SET, setKey); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { return tuple.getStringByField("word"); } @Override public String getValueFromTuple(ITuple tuple) { return null; } }
根据如下方式使用:
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(host).setPort(port).build(); RedisFilterMapper filterMapper = new BlacklistWordFilterMapper(); RedisFilterBolt filterBolt = new RedisFilterBolt(poolConfig, filterMapper);
3. 自定义Bolt
如果你的场景不适合 RedisStoreBolt,RedisLookupBolt 以及 RedisFilterBolt,那么 storm-redis 还提供了 AbstractRedisBolt,你可以自定义自己的业务逻辑。
public static class LookupWordTotalCountBolt extends AbstractRedisBolt { private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class); private static final Random RANDOM = new Random(); public LookupWordTotalCountBolt(JedisPoolConfig config) { super(config); } public LookupWordTotalCountBolt(JedisClusterConfig config) { super(config); } @Override public void execute(Tuple input) { JedisCommands jedisCommands = null; try { jedisCommands = getInstance(); String wordName = input.getStringByField("word"); String countStr = jedisCommands.get(wordName); if (countStr != null) { int count = Integer.parseInt(countStr); this.collector.emit(new Values(wordName, count)); // print lookup result with low probability if(RANDOM.nextInt(1000) > 995) { LOG.info("Lookup result - word : " + wordName + " / count : " + count); } } else { // skip LOG.warn("Word not found in Redis - word : " + wordName); } } finally { if (jedisCommands != null) { returnInstance(jedisCommands); } this.collector.ack(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // wordName, count declarer.declare(new Fields("wordName", "count")); } }
4. Trident State 用法
-
RedisState 和 RedisMapState,为单机 Redis 模式提供了 Jedis 接口。
-
RedisClusterState 和 RedisClusterMapState,为 Redis 集群模式提供了 JedisCluster 接口。
RedisState:
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(redisHost).setPort(redisPort) .build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisLookupMapper lookupMapper = new WordCountLookupMapper(); RedisState.Factory factory = new RedisState.Factory(poolConfig); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout); stream.partitionPersist(factory, fields, new RedisStateUpdater(storeMapper).withExpire(86400000), new Fields() ); TridentState state = topology.newStaticState(factory); stream = stream.stateQuery(state, new Fields("word"), new RedisStateQuerier(lookupMapper), new Fields("columnName","columnValue") );
RedisClusterState:
Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>(); for (String hostPort : redisHostPort.split(",")) { String[] host_port = hostPort.split(":"); nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1]))); } JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes).build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisLookupMapper lookupMapper = new WordCountLookupMapper(); RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout); stream.partitionPersist(factory, fields, new RedisClusterStateUpdater(storeMapper).withExpire(86400000, new Fields() ); TridentState state = topology.newStaticState(factory); stream = stream.stateQuery(state, new Fields("word"), new RedisClusterStateQuerier(lookupMapper), new Fields("columnName","columnValue") );
storm版本:2.0.0-SNAPSHOT
欢迎关注我的公众号和博客:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 持续集成:数据库集成及快速构建
- ShareSDK集成及集成后遇到的一些问题【原创】
- 持续集成与持续部署宝典Part 3:创建集成环境
- 持续集成与持续部署宝典Part 2:创建持续集成流水线
- 禅道 12.3.stable 版本发布,全面集成八种单元测试框架,打通持续集成闭环
- 持续集成将死
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
JavaScript征途
朱印宏 / 电子工业出版社 / 2009-9 / 89.00元
《JavaScript征途》是一本学习JavaScript语言的权威书籍,在遵循语言学习的特殊规律基础上精心选材,力争做到统筹、有序,在结构上体现系统性和完整性。同时还重点挖掘JavaScript基于对象的开发精髓及函数式编程两个技术核心。《JavaScript征途》内容全面,由浅入深,包括6篇21章,主要内容包括:JavaScript语言的基本特性,开发简单的JavaScript程序,JavaS......一起来看看 《JavaScript征途》 这本书的介绍吧!