flume作为日志收集端,其本质上也是一个生产者消费者结构,source作为消息的producer,sink作为消息的consumer,中间的channel作消息的存储
为了保证消息消费的正确性,flume使用了事务的机制,主要涉及的类:
1)org.apache.flume.Transaction 接口类,为访问channel提供事务的功能(可以是put,也可以是take)
首先定义了一个enum类TransactionState,定义了时间的几种状态 {Started, Committed, RolledBack, Closed }
定义的抽象方法:
begin //开始事务处理 commit //事务完成 rollback //回滚 close //关闭事务
使用方法,比如:
Channel ch = ... //构造一个channel对象 Transaction tx = ch.getTransaction(); //获取此channel对象的Transaction 对象 try { tx.begin(); //事务开始 ... ch.put(event) or ch.take() //数据操作 ... tx.commit(); //提交事务 } catch (ChannelException ex) { tx.rollback(); //异常时回滚事务 ... } finally { tx.close(); }
2)org.apache.flume.channel.BasicTransactionSemantics 实现了Transaction接口的抽象类 (和BasicChannelSemantics共同使用)
定义了一个enum的类State,标识了transaction的状态NEW(新建), OPEN(可以执行commit或者rollback), COMPLETED(commit或者rollback之后的状态), CLOSED
定义了几个抽象方法doBegin/doPut/doTake/doCommit/doRollback/doClose,同时定义了put/take/begin/commit/rollback/close方法
每一个方法中都会使用Preconditions.checkState做检测,检测是否是同一线程操作,状态是否是要求的状态,比如
protected BasicTransactionSemantics() { state = State.NEW; //在构造方法中先设置state为NEW initialThreadId = Thread.currentThread().getId(); } public void begin() { //begin方法 Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, "begin() called from different thread than getTransaction()!"); //检测是否为同一线程操作 Preconditions.checkState(state.equals(State.NEW), "begin() called when transaction is " + state + "!"); //检测当前状态是否为NEW try { doBegin(); //调用doBegin方法 } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ChannelException(e.toString(), e); } state = State.OPEN; //设置状态为OPEN }
事务的顺序:
begin-->put/take-->commit/rollback-->close
3)而BasicTransactionSemantics 的子类包括如下几个,分别对应具体的channel
FileChannel-->FileBackedTranscation MemoryChannel-->MemoryTranscation SpillableMemoryChannel-->SpillableMemoryTransaction
channel类:
每个具体的BasicTransactionSemantics 子类会实现具体的doPut/doTake/doCommit/doRollback方法
下面看怎么调用的:
调用channel实例的put/take/commit/rollback方法时,以put为例
1)在org.apache.flume.channel.ChannelProcessor中调用processEvent(操作一条event)或者processEventBatch(插入一批event)方法
比如processEvent中:
// Process required channels List<Channel> requiredChannels = selector.getRequiredChannels(event); for (Channel reqChannel : requiredChannels) { Transaction tx = reqChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); //transaction对象不能为null try { tx.begin(); //事务开始 reqChannel.put(event); //调用channel的put方法,这里为org.apache.flume.channel.BasicChannelSemantics tx.commit(); //事务提交,put有commit的操作,take也有commit的操作 } catch (Throwable t) { tx.rollback(); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else { throw new ChannelException("Unable to put event on required " + "channel: " + reqChannel, t); } } finally { if (tx != null ) { tx.close(); } } }
2)调用父类org.apache.flume.channel.BasicChannelSemantics的put方法:
public void put(Event event) throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread" ); transaction.put(event); //调用BasicTransactionSemantics 具体实现类的put方法 }
3)org.apache.flume.channel.BasicTransactionSemantics的put方法(子类没有具体的put实现)
protected void put(Event event) { Preconditions.checkState(Thread.currentThread().getId() == initialThreadId, "put() called from different thread than getTransaction()!" ); Preconditions.checkState(state .equals(State.OPEN), "put() called when transaction is %s!" , state ); Preconditions.checkArgument(event != null, "put() called with null event!" ); //开始会做一些状态判断,比如transcation的状态是否正确等 try { doPut(event); //调用doPut方法 } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ChannelException(e.toString(), e); } }
4)然后调用对应channel中MemoryTransaction具体实现类的doPut方法
比如org.apache.flume.channel.MemoryChannel的内部类MemoryTransaction的doPut:
protected void doPut(Event event) throws InterruptedException { channelCounter.incrementEventPutAttemptCount(); int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ byteCapacitySlotSize); if (! putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count" ); } putByteCounter += eventByteSize; }
本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1617017
原文地址:http://caiguangguang.blog.51cto.com/1652935/1617017