标签:io os 使用 java ar for 文件 数据 sp
storm与kafka单机功能整合很顺利,但是到了storm集群环境和数据处理性能时则出现了一些问题,现将测试过程和问题简单记录如下:
性能指标:每分钟处理至少100万的信息(csv格式,100bytes左右),信息解析后持久化到DB中。
架构设计:flume读取文件缓存到kafka队列后消费到storm中
问题:
一、storm集群任务调度时出现如下问题,具体日志见下:
2014-09-24 16:47:38 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-computer7-62/ip:6706... [8] 2014-09-24 16:47:38 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-computer7-62/ip:6706, [id: 0x0b596170, /ip:34836 => computer7-62/ip:6706] 2014-09-24 16:47:38 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-computer7-60/ip:6706 2014-09-24 16:47:38 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-computer7-60/ip:6706 java.nio.channels.ClosedChannelException: null at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:649) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:117) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:632) [netty-3.2.2.Final.jar:na] at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:611) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:578) [netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259) [netty-3.2.2.Final.jar:na] at backtype.storm.messaging.netty.Client.flushRequest(Client.java:328) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.messaging.netty.Client.close(Client.java:272) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_24] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.6.0_24] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24] at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24] at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.5.1.jar:na] at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:284) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:250) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24] 2014-09-24 16:47:38 b.s.m.n.Client [INFO] failed to send requests to computer7-60/ip:6706: java.nio.channels.ClosedChannelException: null at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:649) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:117) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:632) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:611) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.Channels.write(Channels.java:578) ~[netty-3.2.2.Final.jar:na] at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259) ~[netty-3.2.2.Final.jar:na] at backtype.storm.messaging.netty.Client.flushRequest(Client.java:328) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.messaging.netty.Client.close(Client.java:272) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_24] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.6.0_24] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24] at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24] at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.5.1.jar:na] at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:284) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:250) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24] 2014-09-24 16:47:38 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-computer7-60/ip:6706..., timeout: 600000ms, pendings: 0 2014-09-24 16:47:38 b.s.util [ERROR] Async loop died! java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24] Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more at backtype.storm.messaging.netty.Client.send(Client.java:194) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927$fn__5928.invoke(worker.clj:322) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927.invoke(worker.clj:320) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] ... 6 common frames omitted 2014-09-24 16:47:38 b.s.m.n.Client [INFO] New Netty Client, connect to computer7-60, 6706, config: , buffer_size: 52
二、kafka性能瓶颈
kafka与storm整合时数据处理性能不是很好,未达到预期要求。一开始怀疑是kafkaspout代码问题,但是storm external中已经将其收录进来,感觉问题应该不是出在这里。后来看了一下kafkaspout实现,找到了可能的性能瓶颈点。kafka在设计时,为了增加并发访问及处理性能,在topic中加入了partitions属性,也就是将数据打散,提高并发与处理性能。由于队列信息offset是在客户端维护,kafkaspout在解决并发互斥时采用task与partitions一一对应的方式来解决互斥访问。topology在使用时,kafkaspout的并发度可以根据具体topic的partitions属性来设定。这样通过增加topic partitions和并发度(8),达到了预期的处理性能。
由此联想,之前遇到的flume缓存到kafka队列的问题也可能是partitions设定方式问题导致,后续再测试验证一下。
标签:io os 使用 java ar for 文件 数据 sp
原文地址:http://my.oschina.net/u/262605/blog/318756