本文原创,转载请注明出处:
使用KafkaSpout需要子类实现Scheme,storm-kafka实现了StringScheme,KeyValueStringScheme等等,大家可以用。
这些Scheme主要负责从消息流中解析出所需要的数据。
public interface Scheme extends Serializable { public List<Object> deserialize(byte[] ser); public Fields getOutputFields(); }
public class StringScheme implements Scheme { public static final String STRING_SCHEME_KEY = "str"; public List<Object> deserialize(byte[] bytes) { return new Values(deserializeString(bytes)); } public static String deserializeString(byte[] string) { try { return new String(string, "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } } public Fields getOutputFields() { return new Fields(STRING_SCHEME_KEY); } }
tuple.getStringByField("str")
public class SchemeAsMultiScheme implements MultiScheme { public final Scheme scheme; public SchemeAsMultiScheme(Scheme scheme) { this.scheme = scheme; } @Override public Iterable<List<Object>> deserialize(final byte[] ser) { List<Object> o = scheme.deserialize(ser); if(o == null) return null; else return Arrays.asList(o); } @Override public Fields getOutputFields() { return scheme.getOutputFields(); } } public interface MultiScheme extends Serializable { public Iterable<List<Object>> deserialize(byte[] ser); public Fields getOutputFields(); }
其实本身还是调用了传入的scheme方法,只不过返回结果组合成一个list而已,小弟觉得不用也可以。但是storm-kafka里面默认是需要的,在KafkaUtils解析message时调用scheme信息:
public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) { Iterable<List<Object>> tups; ByteBuffer payload = msg.payload(); if (payload == null) { return null; } ByteBuffer key = msg.key(); if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) { tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload)); } else { tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload)); } return tups; }
kafka收到的message多种多样,而且往下发射的信息页多种多样,所以很多时候我们需要自己写scheme,下面举2个例子
第一:一般默认发射一个field,但是如果我需要多发几个fields的话,该怎么办呐,现在发射2个,其实网上已有大牛,把kafka的offset加到了发射的信息中去,分析的过程如下:
//returns false if it's reached the end of current batch public EmitState next(SpoutOutputCollector collector) { if (_waitingToEmit.isEmpty()) { fill(); } while (true) { MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); if (toEmit == null) { return EmitState.NO_EMITTED; } Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); if (tups != null) { for (List<Object> tup : tups) { collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } break; } else { ack(toEmit.offset); } } if (!_waitingToEmit.isEmpty()) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } }
(log-message"Opening spout " component-id ":" (keys task-datas)) (doseq[[task-id task-data]task-datas :let[^ISpout spout-obj (:objecttask-data) tasks-fn(:tasks-fntask-data) send-spout-msg (fn[out-stream-id values message-id out-task-id] (.increment emitted-count) (let[out-tasks (ifout-task-id (tasks-fnout-task-id out-stream-id values) (tasks-fnout-stream-id values)) rooted? (andmessage-id has-ackers?) root-id (ifrooted? (MessageId/generateId rand)) out-ids (fast-list-for[t out-tasks](ifrooted? (MessageId/generateId rand)))]
从这段代码可以看出,messageId是随机生成的,跟之前kafkaSpout 锚定的new KafkaMessageId(_partition, toEmit.offset)一点关系都没有,所以需要自己手动把offset加到发射的tuple中去,这就需要我们自己实现Scheme了,代码如下:
publicclass KafkaOffsetWrapperScheme implements Scheme { public static final String SCHEME_OFFSET_KEY = "offset"; private String _offsetTupleKeyName; private Scheme _localScheme; public KafkaOffsetWrapperScheme() { _localScheme = new StringScheme(); _offsetTupleKeyName = SCHEME_OFFSET_KEY; } public KafkaOffsetWrapperScheme(Scheme localScheme, String offsetTupleKeyName) { _localScheme = localScheme; _offsetTupleKeyName = offsetTupleKeyName; } public KafkaOffsetWrapperScheme(Scheme localScheme) { this(localScheme, SCHEME_OFFSET_KEY); } public List<Object> deserialize(byte[] bytes) { return_localScheme.deserialize(bytes); } publicFields getOutputFields() { List<String> outputFields = _localScheme .getOutputFields() .toList(); outputFields.add(_offsetTupleKeyName); returnnew Fields(outputFields); } }
这里的scheme输出是两个fields,一个是str,由StringScheme负责反序列化,或者自己实现其他的scheme;一个是offset,但是offset如何加到发射的tuple中呐??我们从PartitionManager中找到被发射的tuple
public EmitState next(SpoutOutputCollector collector) { if (_waitingToEmit.isEmpty()) { fill(); } while (true) { MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); if (toEmit == null) { return EmitState.NO_EMITTED; } Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); if (tups != null) { for (List<Object> tup : tups) { tup.add(toEmit.offset); collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } break; } else { ack(toEmit.offset); } } if (!_waitingToEmit.isEmpty()) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } }
public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) { Iterable<List<Object>> tups; ByteBuffer payload = msg.payload(); if (payload == null) { return null; } ByteBuffer key = msg.key(); if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) { tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload)); } else { tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload)); } return tups; }
唯一要做的就是在构建SpoutConfig时,指定scheme为KafkaOffsetWrapperScheme
第二,kafka里面的存的message是其他格式的,如thrift,avro,protobuf格式,那这样就需要自己实现反序列化的过程
这里以avro scheme格式为例(这里就不对avro扫盲了,自己google一下吧)
这时kafka中存放的是avro格式的message,如果avro schema如下
{"namespace": "example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
public class AvroMessageScheme implements Scheme{ private final static Logger logger = LoggerFactory.getLogger(AvroMessageScheme.class); private GenericRecord e2; private AvroRecord avroRecord; public AvroMessageScheme() { } @Override public List<Object> deserialize(byte[] bytes) { e2 = null; avroRecord = null; try { InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("examples.avsc"); Schema schema = new Schema.Parser().parse(is); DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema); Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); e2 = datumReader.read(null, decoder); avroRecord = new AvroRecord(e2); } catch (Exception e) { e.printStackTrace(); return new Values(avroRecord); } return new Values(avroRecord); } @Override public Fields getOutputFields() { return new Fields("msg"); } }
public class AvroRecord implements Serializable { private String name; private int favorite_number; private String favorite_color; public AvroRecord(GenericRecord gr) { try { this.name = String.valueOf(gr.get("name")); this.favorite_number = Integer.parseInt(gr.get("favorite_number")); this.favorite_color = gr.get("favorite_color").toString(); } catch (Exception e) { logger.error("read AvroRecord error!"); } } @Override public String toString() { return "AvroRecord{" + "name='" + name + '\'' + ", favorite_number=" + favorite_number + ", favorite_color='" + favorite_color + '\'' + '}'; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getFavorite_color() { return favorite_color; } public void setFavorite_color(String favorite_color) { this.favorite_color = favorite_color; } public int getFavorite_number() { return favorite_number; } public void setFavorite_number(int favorite_number) { this.favorite_number = favorite_number; } }
https://blog.deck36.de/no-more-over-counting-making-counters-in-apache-storm-idempotent-using-redis-hyperloglog/
原文地址:http://blog.csdn.net/wzhg0508/article/details/40874155