fluent-logger-java is a Java library, to record events via Fluentd, from Java application. https://github.com/fluent/fluent-logger-java
使用该sdk过程发现,tcp连接断开之后,该sdk的重连机制无效。
2018-01-26 12:36:25,620 ERROR [org.fluentd.logger.sender.RawSocketSender] - <org.fluentd.logger.sender.RawSocketSender> java.net.SocketException: Software caused connection abort: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) at java.net.SocketOutputStream.write(SocketOutputStream.java:159) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at org.fluentd.logger.sender.RawSocketSender.flush(RawSocketSender.java:200) at org.fluentd.logger.sender.RawSocketSender.send(RawSocketSender.java:188) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:158) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:140) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:135) at org.fluentd.logger.FluentLogger.log(FluentLogger.java:101) at org.fluentd.logger.FluentLogger.log(FluentLogger.java:86) at fluentdDemo.fluentdDemo.main(fluentdDemo.java:90)
查看源码:见RawSocketSender类
private void reconnect() throws IOException { if (socket == null) { connect(); } else if (socket.isClosed() || (!socket.isConnected())) { close(); connect(); } }
判断 Socket 远程端连接如果关闭的话,就要重建连接。Socket的类提供了一些已经封装好的方法, 如 isClosed()、isConnected()、isInputStreamShutdown()、isOutputStreamShutdown()等,
在测试时发现,这些方法都是本地端的状态,无法判断远端是否已经断开连接。
有些同学处理类似问题时,通过OutputStream发送一段测试数据,如果发送失败就表示远端已经断开连接,类似ping,但是这样会影响到正常的输出数据,远端无法把正常数据和测试数据分开。
其实,这种方法也是可以的,只不过,不要发送测试数据,直接发送需要发送的数据,一旦失败,就主动close socket,再新建连接,再重新发送就行了。
也有些同学想到通过发送紧急数据,来验证连接状态,见socket类(如下),如果失败,就close socket,再新建连接。
/** * Send one byte of urgent data on the socket. The byte to be sent is the lowest eight * bits of the data parameter. The urgent byte is * sent after any preceding writes to the socket OutputStream * and before any future writes to the OutputStream. * @param data The byte of data to send * @exception IOException if there is an error * sending the data. * @since 1.4 */ public void sendUrgentData (int data) throws IOException { if (!getImpl().supportsUrgentData ()) { throw new SocketException ("Urgent data not supported"); } getImpl().sendUrgentData (data); }
可通过如下写法实现:
/** * 判断是否断开连接,断开返回true,没有返回false * @param socket * @return */ public Boolean isServerClose(Socket socket){ try{ socket.sendUrgentData(0xFF);//发送1个字节的紧急数据,默认情况下,服务器端没有开启紧急数据处理,不影响正常通信 return false; }catch(Exception se){ return true; } }
前提:对方Socket的SO_OOBINLINE属性没有打开,就会自动舍弃这个字节,而SO_OOBINLINE属性默认情况下就是关闭的
见SocketOptions接口
/** * When the OOBINLINE option is set, any TCP urgent data received on * the socket will be received through the socket input stream. * When the option is disabled (which is the default) urgent data * is silently discarded. * * @see Socket#setOOBInline * @see Socket#getOOBInline */ @Native public final static int SO_OOBINLINE = 0x1003;
当然,我觉得也可以通过定时发送紧急数据来做心跳,确保tcp长连接保活,对方可以不用回应。
测试结果:
这两种方式再连接断开后的第一次发送数据,并没有异常,但是server端没收到数据。第二次发送时候,才检测到连接异常。
有同学的说法是:Socket通过发送数据sendUrgentData()或PrintWriter 发送数据时的数据太小,被放到缓冲区没用实时发送导致的。后来尝试设置setSendBufferSize(1)发现能够正常出现异常,这样就能够判断实时网络连接断开了。(网上资料说sendUrgentData是实时发送数据不经过缓冲区的,但跟我实际测试的不一样,有待验证)
查看了一下源码,紧急数据的发送时间是,在之前write到OutputStream之后,在接下来write到OutputStream之前
/** * Send one byte of urgent data on the socket. The byte to be sent is the lowest eight * bits of the data parameter. The urgent byte is * sent after any preceding writes to the socket OutputStream * and before any future writes to the OutputStream. * @param data The byte of data to send * @exception IOException if there is an error * sending the data. * @since 1.4 */ public void sendUrgentData (int data) throws IOException { if (!getImpl().supportsUrgentData ()) { throw new SocketException ("Urgent data not supported"); } getImpl().sendUrgentData (data); }
尝试设置setSendBufferSize(1)发现能够正常出现异常,这样就能够判断实时网络连接断开了。
fluentd的in_forward插件提供了基于udp的心跳监听,遗憾的是fluent-logger-java并没有做对应的心跳机制。
https://docs.fluentd.org/v0.12/articles/in_forward