标签:算数 exce wait cep mys shm 消费 下一步 received
springboot 集成storm,计算日志中的展示信息,将实时的计算数据存储到redis中,并判断redis中的数量信息进行下一步的操作,存储到mysql中等
1.配置redis参数,redis采用集群模式,需要配置redis集群
spring:
redis:
database: 0
password:
cluster:
nodes:
- 127.0.0.1:6380
maxRedirects: 3
pool:
max-idle: 8
min-idle: 0
max-active: 8
max-wait: -1
2.redis配置类实现
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class RedisConfig {
@Autowired
private JedisConnectionFactory jedisConnectionFactory;
@Bean("redisTemplate")
public RedisTemplate<?, ?> getRedisTemplate(){
RedisTemplate<?,?> template = new StringRedisTemplate(jedisConnectionFactory);
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(genericJackson2JsonRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(genericJackson2JsonRedisSerializer);
return template;
}
}
public class RedisConfUtils {
/**
* {@link org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration#}
* @param redisProperties
* @return
*/
public static RedisClusterConfiguration getClusterConfiguration(RedisProperties redisProperties) {
if (redisProperties.getCluster() == null) {
return null;
}
RedisProperties.Cluster clusterProperties = redisProperties.getCluster();
RedisClusterConfiguration config = new RedisClusterConfiguration(
clusterProperties.getNodes());
if (clusterProperties.getMaxRedirects() != null) {
config.setMaxRedirects(clusterProperties.getMaxRedirects());
}
return config;
}
public static RedisTemplate buildRedisTemplate(byte[] redisProperties){
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(
RedisConfUtils.getClusterConfiguration(
(RedisProperties) Serializer.INSTANCE.deserialize(redisProperties)));
RedisTemplate<String, Long> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(jedisConnectionFactory);
jedisConnectionFactory.afterPropertiesSet();
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
3.storm 日志控制Builder配置redis信息,将redis信息传递到控制类中
@Getter
@Setter
@Configuration
@DependsOn("redisTemplate")
@ConfigurationProperties(prefix = "storm.bolt.logConsoleBolt")
public class LogConsoleBoltBuilder extends BoltBuilder {
@Autowired
private RedisProperties redisProperties;
private int emitFrequencyInSeconds = 60;//每60s发射一次数据
@Bean("logConsoleBolt")
public LogConsoleBolt buildBolt() {
super.setId("logConsoleBolt");
LogConsoleBolt logConsoleBolt = new LogConsoleBolt();
logConsoleBolt.setRedisProperties(Serializer.INSTANCE.serialize(redisProperties));
logConsoleBolt.setEmitFrequencyInSeconds(emitFrequencyInSeconds);
return logConsoleBolt;
}
}
4.storm 日志控制类获取实例化redis信息,将计算得到的信息存储到redis中
@Slf4j
public class LogConsoleBolt extends BaseRichBolt {
private final static String AD_LIST_SHOW_COUNT = "AD_LIST_SHOW_COUNT";
private OutputCollector collector;
private HashOperations<String, String, Long> hashOperations;
@Setter
private byte[] redisProperties;
@Setter
private int emitFrequencyInSeconds;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
this.hashOperations = RedisConfUtils.buildRedisTemplate(redisProperties).opsForHash();
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
/**
* 这里配置TickTuple的发送频率
*/
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;
}
@Override
public void execute(Tuple input) {
try {
log.info(input.toString());
//判断日志类型,不是需要的日志则不做处理
if (input.size()<5){
collector.ack(input);
}else {
String value = input.getStringByField("value").toString();
AdShowLogEntity adShowLogEntity = AdShowLogEntity.logToEntity(value);
if (adShowLogEntity != null){
AdShowLogEntity.Message msg = adShowLogEntity.getMessage().get(0);
// 输出
// collector.emit(new Values(LogAdIdKeyEnum.AD_LIST_TYPE.getPrefix()+msg.getCreativeId(), String.valueOf(1)));
//存储信息到redis
Long cont = hashOperations.increment(AD_LIST_SHOW_COUNT, LogAdIdKeyEnum.AD_LIST_TYPE.getPrefix()+msg.getCreativeId(), 1l);
collector.emit(new Values(Integer.parseInt(msg.getCreativeId()),System.currentTimeMillis(),0.01f));
}else {
// collector.ack(input);
}
collector.ack(input);
// System.out.println("received from kafka : "+ value);
// 必须ack,否则会重复消费kafka中的消息
}
}catch (Exception e){
e.printStackTrace();
collector.fail(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("adId","updateTime","price")); //分词定义的field为word
}
}
标签:算数 exce wait cep mys shm 消费 下一步 received
原文地址:https://www.cnblogs.com/flyyu1/p/11448039.html