首先你需要了解JAVA KEYSTORE
该SSL用于Avro Sink到Avro Source之间的数据传输
该场景主要用于分布式Flume之间的数据传输,从分散的各个flume agent到中心汇集节点的flume agent
下面来看下如何实现的?
在这个传输过程中,sink其实就相当于socket的client端了
flume源码中有个类NettyAvroRpcClient,该类中还有个内部类SSLCompressionChannelFactory
其中定义了如下属性:
private final boolean enableCompression;
private final int compressionLevel;
private final boolean enableSsl;
private final boolean trustAllCerts;
private final String truststore;
private final String truststorePassword;
private final String truststoreType;
private final List excludeProtocols;
1、要使用SSL进行数据传输,首先要将ssl开关打开,true
2、truststore指定生成的keystore文件
3、truststorepassword指定密码(这里注意生成的keypass和storepass一定相同,否则报错)
KeyStore keystore = null;
if (truststore != null) {
if (truststorePassword == null) {
throw new NullPointerException("truststore password is null");
}
InputStream truststoreStream = new FileInputStream(truststore);
keystore = KeyStore.getInstance(truststoreType);
keystore.load(truststoreStream, truststorePassword.toCharArray());
}
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
// null keystore is OK, with SunX509 it defaults to system CA Certs
// see http://docs.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#X509TrustManager
tmf.init(keystore);
managers = tmf.getTrustManagers();
该段代码就去加载了keystore文件
TrustManagerFactory是JDK原生的一个信任管理器工厂,每个新人管理器管理特定类型的由安全套接字使用的信任材料。信任材料是基于keystore或提供者特定源。
init方法通过证书授权源和相关的信任材料初始化此工厂
最后为此信任材料返回一个信任管理器
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, managers, null);
SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(true);
List<String> enabledProtocols = new ArrayList<String>();
for (String protocol : sslEngine.getEnabledProtocols()) {
if (!excludeProtocols.contains(protocol)) {
enabledProtocols.add(protocol);
}
}
sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
logger.info("SSLEngine protocols enabled: " +
Arrays.asList(sslEngine.getEnabledProtocols()));
// addFirst() will make SSL handling the first stage of decoding
// and the last stage of encoding this must be added after
// adding compression handling above
pipeline.addFirst("ssl", new SslHandler(sslEngine));
1、返回指定协议的SSLContext对象
TLS安全传输层协议
2、初始化此上下文,初始化参数只有信任管理器
3、初始化SSLEngine,并指定引擎在握手时使用客户端模式
最终这个安全的Socket就建立起来了
source我们可以认为是socket的server端,打开连接后,等待客户端的连接
private static final String PORT_KEY = “port”;
private static final String BIND_KEY = “bind”;
private static final String COMPRESSION_TYPE = “compression-type”;
private static final String SSL_KEY = “ssl”;
private static final String IP_FILTER_KEY = “ipFilter”;
private static final String IP_FILTER_RULES_KEY = “ipFilterRules”;
private static final String KEYSTORE_KEY = “keystore”;
private static final String KEYSTORE_PASSWORD_KEY = “keystore-password”;
private static final String KEYSTORE_TYPE_KEY = “keystore-type”;
private static final String EXCLUDE_PROTOCOLS = “exclude-protocols”;
以上Avro Source的一些配置属性
try {
KeyStore ks = KeyStore.getInstance(keystoreType);
ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
} catch (Exception ex) {
throw new FlumeException(
"Avro source configured with invalid keystore: " + keystore, ex);
}
从上面代码可以看出,source端在configure方法执行的时候就会load该keystore
if (enableSsl) {
SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
sslEngine.setUseClientMode(false);
List<String> enabledProtocols = new ArrayList<String>();
for (String protocol : sslEngine.getEnabledProtocols()) {
if (!excludeProtocols.contains(protocol)) {
enabledProtocols.add(protocol);
}
}
sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
logger.info("SSLEngine protocols enabled: " +
Arrays.asList(sslEngine.getEnabledProtocols()));
// addFirst() will make SSL handling the first stage of decoding
// and the last stage of encoding this must be added after
// adding compression handling above
pipeline.addFirst("ssl", new SslHandler(sslEngine));
}
注意这里的SSLEngine就配置了引擎在握手时使用的服务器模式
最终返回对象ChannelPipeline
以上所有内容可能理解起来比较费劲,大家不妨先来看看这篇文章
Channel与Pipeline这里写链接内容
首先准备一个keystore文件
Sink配置
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.11.177
a1.sinks.k1.port=9520
a1.sinks.k1.channel=c1
a1.sinks.k1.ssl=true
a1.sinks.k1.truststore=/home/flume/keystore/chiwei.keystore
a1.sinks.k1.truststore-type=JKS
a1.sinks.k1.truststore-password=123456
Source端配置
a1.sources.r1.type = avro
a1.sources.r1.channels=c1
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9520
a1.sources.r1.ssl=true
a1.sources.r1.keystore=/home/flume/keystore/chiwei.keystore
a1.sources.r1.keystore-password=123456
a1.sources.r1.keystore-type=JKS
a1.sources.r1.ipFilter=true
a1.sources.r1.ipFilterRules=allow:ip:192.168.11.176
望各位网友不吝指教!!
【Flume】flume中Avro Source到Avro Sink之间通过SSL传输数据的实现分析及使用
原文地址:http://blog.csdn.net/simonchi/article/details/44172909