码迷,mamicode.com
首页 > 其他好文 > 详细

浅谈skywalking的TraceSegmentServiceClient

时间:2020-03-24 13:19:14      阅读:93      评论:0      收藏:0      [点我收藏+]

标签:ble   apache   use   find   abs   rate   参数   ddd   builder   

本文参考原文-http://bjbsair.com/2020-03-22/tech-info/5102/

本文主要研究一下skywalking的TraceSegmentServiceClient

技术图片

TracingContextListener

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContextListener.java

public interface TracingContextListener {  
 ?  void afterFinished(TraceSegment traceSegment);  
}
  • TracingContextListener定义了afterFinished方法,其参数为TraceSegment

TraceSegment

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegment.java

public class TraceSegment {  
?  
 ?  private ID traceSegmentId;  
?  
 ?  private List<TraceSegmentRef> refs;  
?  
 ?  private List<AbstractTracingSpan> spans;  
?  
 ?  private DistributedTraceIds relatedGlobalTraces;  
?  
 ?  private boolean ignore = false;  
?  
 ?  private boolean isSizeLimited = false;  
?  
 ?  private final long createTime;  
?  
 ?  public TraceSegment() {  
 ? ? ?  this.traceSegmentId = GlobalIdGenerator.generate();  
 ? ? ?  this.spans = new LinkedList<AbstractTracingSpan>();  
 ? ? ?  this.relatedGlobalTraces = new DistributedTraceIds();  
 ? ? ?  this.relatedGlobalTraces.append(new NewDistributedTraceId());  
 ? ? ?  this.createTime = System.currentTimeMillis();  
 ?  }  
?  
 ?  public void ref(TraceSegmentRef refSegment) {  
 ? ? ?  if (refs == null) {  
 ? ? ? ? ?  refs = new LinkedList<TraceSegmentRef>();  
 ? ? ?  }  
 ? ? ?  if (!refs.contains(refSegment)) {  
 ? ? ? ? ?  refs.add(refSegment);  
 ? ? ?  }  
 ?  }  
?  
 ?  public void relatedGlobalTraces(DistributedTraceId distributedTraceId) {  
 ? ? ?  relatedGlobalTraces.append(distributedTraceId);  
 ?  }  
?  
 ?  public void archive(AbstractTracingSpan finishedSpan) {  
 ? ? ?  spans.add(finishedSpan);  
 ?  }  
?  
 ?  public TraceSegment finish(boolean isSizeLimited) {  
 ? ? ?  this.isSizeLimited = isSizeLimited;  
 ? ? ?  return this;  
 ?  }  
?  
 ?  public ID getTraceSegmentId() {  
 ? ? ?  return traceSegmentId;  
 ?  }  
?  
 ?  public int getServiceId() {  
 ? ? ?  return RemoteDownstreamConfig.Agent.SERVICE_ID;  
 ?  }  
?  
 ?  public boolean hasRef() {  
 ? ? ?  return !(refs == null || refs.size() == 0);  
 ?  }  
?  
 ?  public List<TraceSegmentRef> getRefs() {  
 ? ? ?  return refs;  
 ?  }  
?  
 ?  public List<DistributedTraceId> getRelatedGlobalTraces() {  
 ? ? ?  return relatedGlobalTraces.getRelatedGlobalTraces();  
 ?  }  
?  
 ?  public boolean isSingleSpanSegment() {  
 ? ? ?  return this.spans != null && this.spans.size() == 1;  
 ?  }  
?  
 ?  public boolean isIgnore() {  
 ? ? ?  return ignore;  
 ?  }  
?  
 ?  public void setIgnore(boolean ignore) {  
 ? ? ?  this.ignore = ignore;  
 ?  }  
?  
 ?  public UpstreamSegment transform() {  
 ? ? ?  UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder();  
 ? ? ?  for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {  
 ? ? ? ? ?  upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.toUniqueId());  
 ? ? ?  }  
 ? ? ?  SegmentObject.Builder traceSegmentBuilder = SegmentObject.newBuilder();  
 ? ? ?  /**  
 ? ? ? ? * Trace Segment  
 ? ? ? ? */  
 ? ? ?  traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId.transform());  
 ? ? ?  // Don‘t serialize TraceSegmentReference  
?  
 ? ? ?  // SpanObject  
 ? ? ?  for (AbstractTracingSpan span : this.spans) {  
 ? ? ? ? ?  traceSegmentBuilder.addSpans(span.transform());  
 ? ? ?  }  
 ? ? ?  traceSegmentBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID);  
 ? ? ?  traceSegmentBuilder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);  
 ? ? ?  traceSegmentBuilder.setIsSizeLimited(this.isSizeLimited);  
?  
 ? ? ?  upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString());  
 ? ? ?  return upstreamBuilder.build();  
 ?  }  
?  
 ?  @Override  
 ?  public String toString() {  
 ? ? ?  return "TraceSegment{" +  
 ? ? ? ? ?  "traceSegmentId=‘" + traceSegmentId + ‘\‘‘ +  
 ? ? ? ? ?  ", refs=" + refs +  
 ? ? ? ? ?  ", spans=" + spans +  
 ? ? ? ? ?  ", relatedGlobalTraces=" + relatedGlobalTraces +  
 ? ? ? ? ?  ‘}‘;  
 ?  }  
?  
 ?  public int getApplicationInstanceId() {  
 ? ? ?  return RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID;  
 ?  }  
?  
 ?  public long createTime() {  
 ? ? ?  return this.createTime;  
 ?  }  
}
  • TraceSegment定义了traceSegmentId、refs、spans、relatedGlobalTraces等属性;它提供了ref、relatedGlobalTraces、archive 、finish、transform等方法

IConsumer

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java

public interface IConsumer<T> {  
 ?  void init();  
?  
 ?  void consume(List<T> data);  
?  
 ?  void onError(List<T> data, Throwable t);  
?  
 ?  void onExit();  
}
  • IConsumer定义了init、consume、onError、onExit方法

TraceSegmentServiceClient

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java

@DefaultImplementor  
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {  
 ?  private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class);  
 ?  private static final int TIMEOUT = 30 * 1000;  
?  
 ?  private long lastLogTime;  
 ?  private long segmentUplinkedCounter;  
 ?  private long segmentAbandonedCounter;  
 ?  private volatile DataCarrier<TraceSegment> carrier;  
 ?  private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub;  
 ?  private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;  
?  
 ?  @Override  
 ?  public void prepare() throws Throwable {  
 ? ? ?  ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);  
 ?  }  
?  
 ?  @Override  
 ?  public void boot() throws Throwable {  
 ? ? ?  lastLogTime = System.currentTimeMillis();  
 ? ? ?  segmentUplinkedCounter = 0;  
 ? ? ?  segmentAbandonedCounter = 0;  
 ? ? ?  carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);  
 ? ? ?  carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);  
 ? ? ?  carrier.consume(this, 1);  
 ?  }  
?  
 ?  @Override  
 ?  public void onComplete() throws Throwable {  
 ? ? ?  TracingContext.ListenerManager.add(this);  
 ?  }  
?  
 ?  @Override  
 ?  public void shutdown() throws Throwable {  
 ? ? ?  TracingContext.ListenerManager.remove(this);  
 ? ? ?  carrier.shutdownConsumers();  
 ?  }  
?  
 ?  @Override  
 ?  public void init() {  
?  
 ?  }  
?  
 ?  @Override  
 ?  public void consume(List<TraceSegment> data) {  
 ? ? ?  if (CONNECTED.equals(status)) {  
 ? ? ? ? ?  final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);  
 ? ? ? ? ?  StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public void onNext(Commands commands) {  
 ? ? ? ? ? ? ? ? ?  ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public void onError(Throwable throwable) {  
 ? ? ? ? ? ? ? ? ?  status.finished();  
 ? ? ? ? ? ? ? ? ?  if (logger.isErrorEnable()) {  
 ? ? ? ? ? ? ? ? ? ? ?  logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");  
 ? ? ? ? ? ? ? ? ?  }  
 ? ? ? ? ? ? ? ? ?  ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);  
 ? ? ? ? ? ? ?  }  
?  
 ? ? ? ? ? ? ?  @Override  
 ? ? ? ? ? ? ?  public void onCompleted() {  
 ? ? ? ? ? ? ? ? ?  status.finished();  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  });  
?  
 ? ? ? ? ?  try {  
 ? ? ? ? ? ? ?  for (TraceSegment segment : data) {  
 ? ? ? ? ? ? ? ? ?  UpstreamSegment upstreamSegment = segment.transform();  
 ? ? ? ? ? ? ? ? ?  upstreamSegmentStreamObserver.onNext(upstreamSegment);  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  } catch (Throwable t) {  
 ? ? ? ? ? ? ?  logger.error(t, "Transform and send UpstreamSegment to collector fail.");  
 ? ? ? ? ?  }  
?  
 ? ? ? ? ?  upstreamSegmentStreamObserver.onCompleted();  
?  
 ? ? ? ? ?  status.wait4Finish();  
 ? ? ? ? ?  segmentUplinkedCounter += data.size();  
 ? ? ?  } else {  
 ? ? ? ? ?  segmentAbandonedCounter += data.size();  
 ? ? ?  }  
?  
 ? ? ?  printUplinkStatus();  
 ?  }  
?  
 ?  private void printUplinkStatus() {  
 ? ? ?  long currentTimeMillis = System.currentTimeMillis();  
 ? ? ?  if (currentTimeMillis - lastLogTime > 30 * 1000) {  
 ? ? ? ? ?  lastLogTime = currentTimeMillis;  
 ? ? ? ? ?  if (segmentUplinkedCounter > 0) {  
 ? ? ? ? ? ? ?  logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter);  
 ? ? ? ? ? ? ?  segmentUplinkedCounter = 0;  
 ? ? ? ? ?  }  
 ? ? ? ? ?  if (segmentAbandonedCounter > 0) {  
 ? ? ? ? ? ? ?  logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter);  
 ? ? ? ? ? ? ?  segmentAbandonedCounter = 0;  
 ? ? ? ? ?  }  
 ? ? ?  }  
 ?  }  
?  
 ?  @Override  
 ?  public void onError(List<TraceSegment> data, Throwable t) {  
 ? ? ?  logger.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size());  
 ?  }  
?  
 ?  @Override  
 ?  public void onExit() {  
?  
 ?  }  
?  
 ?  @Override  
 ?  public void afterFinished(TraceSegment traceSegment) {  
 ? ? ?  if (traceSegment.isIgnore()) {  
 ? ? ? ? ?  return;  
 ? ? ?  }  
 ? ? ?  if (!carrier.produce(traceSegment)) {  
 ? ? ? ? ?  if (logger.isDebugEnable()) {  
 ? ? ? ? ? ? ?  logger.debug("One trace segment has been abandoned, cause by buffer is full.");  
 ? ? ? ? ?  }  
 ? ? ?  }  
 ?  }  
?  
 ?  @Override  
 ?  public void statusChanged(GRPCChannelStatus status) {  
 ? ? ?  if (CONNECTED.equals(status)) {  
 ? ? ? ? ?  Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();  
 ? ? ? ? ?  serviceStub = TraceSegmentReportServiceGrpc.newStub(channel);  
 ? ? ?  }  
 ? ? ?  this.status = status;  
 ?  }  
}
  • TraceSegmentServiceClient实现了BootService、IConsumer、TracingContextListener、GRPCChannelListener接口;其prepare方法往GRPCChannelManager注册自身的channelListener;其boot方法设置lastLogTime,实例化DataCarrier,并设置其consumer为自身;其onComplete方法执行TracingContext.ListenerManager.add(this);其shutdown方法执行TracingContext.ListenerManager.remove(this)以及carrier.shutdownConsumers();其consume方法在status为CONNECTED的时候执行upstreamSegmentStreamObserver.onNext(upstreamSegment)、upstreamSegmentStreamObserver.onCompleted()以及status.wait4Finish();其afterFinished方法执行carrier.produce(traceSegment);其statusChanged设置serviceStub及status

ConsumerThread

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java

public class ConsumerThread<T> extends Thread {  
 ?  private volatile boolean running;  
 ?  private IConsumer<T> consumer;  
 ?  private List<DataSource> dataSources;  
 ?  private long consumeCycle;  
?  
 ?  ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {  
 ? ? ?  super(threadName);  
 ? ? ?  this.consumer = consumer;  
 ? ? ?  running = false;  
 ? ? ?  dataSources = new ArrayList<DataSource>(1);  
 ? ? ?  this.consumeCycle = consumeCycle;  
 ?  }  
?  
 ?  /**  
 ? ? * add whole buffer to consume  
 ? ? *  
 ? ? * @param sourceBuffer  
 ? ? */  
 ?  void addDataSource(QueueBuffer<T> sourceBuffer) {  
 ? ? ?  this.dataSources.add(new DataSource(sourceBuffer));  
 ?  }  
?  
 ?  @Override  
 ?  public void run() {  
 ? ? ?  running = true;  
?  
 ? ? ?  final List<T> consumeList = new ArrayList<T>(1500);  
 ? ? ?  while (running) {  
 ? ? ? ? ?  if (!consume(consumeList)) {  
 ? ? ? ? ? ? ?  try {  
 ? ? ? ? ? ? ? ? ?  Thread.sleep(consumeCycle);  
 ? ? ? ? ? ? ?  } catch (InterruptedException e) {  
 ? ? ? ? ? ? ?  }  
 ? ? ? ? ?  }  
 ? ? ?  }  
?  
 ? ? ?  // consumer thread is going to stop  
 ? ? ?  // consume the last time  
 ? ? ?  consume(consumeList);  
?  
 ? ? ?  consumer.onExit();  
 ?  }  
?  
 ?  private boolean consume(List<T> consumeList) {  
 ? ? ?  for (DataSource dataSource : dataSources) {  
 ? ? ? ? ?  dataSource.obtain(consumeList);  
 ? ? ?  }  
?  
 ? ? ?  if (!consumeList.isEmpty()) {  
 ? ? ? ? ?  try {  
 ? ? ? ? ? ? ?  consumer.consume(consumeList);  
 ? ? ? ? ?  } catch (Throwable t) {  
 ? ? ? ? ? ? ?  consumer.onError(consumeList, t);  
 ? ? ? ? ?  } finally {  
 ? ? ? ? ? ? ?  consumeList.clear();  
 ? ? ? ? ?  }  
 ? ? ? ? ?  return true;  
 ? ? ?  }  
 ? ? ?  return false;  
 ?  }  
?  
 ?  void shutdown() {  
 ? ? ?  running = false;  
 ?  }  
?  
 ?  /**  
 ? ? * DataSource is a refer to {@link Buffer}.  
 ? ? */  
 ?  class DataSource {  
 ? ? ?  private QueueBuffer<T> sourceBuffer;  
?  
 ? ? ?  DataSource(QueueBuffer<T> sourceBuffer) {  
 ? ? ? ? ?  this.sourceBuffer = sourceBuffer;  
 ? ? ?  }  
?  
 ? ? ?  void obtain(List<T> consumeList) {  
 ? ? ? ? ?  sourceBuffer.obtain(consumeList);  
 ? ? ?  }  
 ?  }  
}
  • ConsumerThread继承了Thread,其run方法会循环执行consume(consumeList),跳出循环时会再次执行consume(consumeList),最后执行consumer.onExit();consume方法会遍历dataSources,执行其dataSource.obtain(consumeList),然后在consumeList不为空的时候执行consumer.consume(consumeList)方法

ConsumeDriver

skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java

public class ConsumeDriver<T> implements IDriver {  
 ?  private boolean running;  
 ?  private ConsumerThread[] consumerThreads;  
 ?  private Channels<T> channels;  
 ?  private ReentrantLock lock;  
?  
 ?  public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,  
 ? ? ?  long consumeCycle) {  
 ? ? ?  this(channels, num);  
 ? ? ?  for (int i = 0; i < num; i++) {  
 ? ? ? ? ?  consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);  
 ? ? ? ? ?  consumerThreads[i].setDaemon(true);  
 ? ? ?  }  
 ?  }  
?  
 ?  public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {  
 ? ? ?  this(channels, num);  
 ? ? ?  prototype.init();  
 ? ? ?  for (int i = 0; i < num; i++) {  
 ? ? ? ? ?  consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);  
 ? ? ? ? ?  consumerThreads[i].setDaemon(true);  
 ? ? ?  }  
?  
 ?  }  
?  
 ?  private ConsumeDriver(Channels<T> channels, int num) {  
 ? ? ?  running = false;  
 ? ? ?  this.channels = channels;  
 ? ? ?  consumerThreads = new ConsumerThread[num];  
 ? ? ?  lock = new ReentrantLock();  
 ?  }  
?  
 ?  private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {  
 ? ? ?  try {  
 ? ? ? ? ?  IConsumer<T> inst = consumerClass.newInstance();  
 ? ? ? ? ?  inst.init();  
 ? ? ? ? ?  return inst;  
 ? ? ?  } catch (InstantiationException e) {  
 ? ? ? ? ?  throw new ConsumerCannotBeCreatedException(e);  
 ? ? ?  } catch (IllegalAccessException e) {  
 ? ? ? ? ?  throw new ConsumerCannotBeCreatedException(e);  
 ? ? ?  }  
 ?  }  
?  
 ?  @Override  
 ?  public void begin(Channels channels) {  
 ? ? ?  if (running) {  
 ? ? ? ? ?  return;  
 ? ? ?  }  
 ? ? ?  try {  
 ? ? ? ? ?  lock.lock();  
 ? ? ? ? ?  this.allocateBuffer2Thread();  
 ? ? ? ? ?  for (ConsumerThread consumerThread : consumerThreads) {  
 ? ? ? ? ? ? ?  consumerThread.start();  
 ? ? ? ? ?  }  
 ? ? ? ? ?  running = true;  
 ? ? ?  } finally {  
 ? ? ? ? ?  lock.unlock();  
 ? ? ?  }  
 ?  }  
?  
 ?  @Override  
 ?  public boolean isRunning(Channels channels) {  
 ? ? ?  return running;  
 ?  }  
?  
 ?  private void allocateBuffer2Thread() {  
 ? ? ?  int channelSize = this.channels.getChannelSize();  
 ? ? ?  /**  
 ? ? ? ? * if consumerThreads.length < channelSize  
 ? ? ? ? * each consumer will process several channels.  
 ? ? ? ? *  
 ? ? ? ? * if consumerThreads.length == channelSize  
 ? ? ? ? * each consumer will process one channel.  
 ? ? ? ? *  
 ? ? ? ? * if consumerThreads.length > channelSize  
 ? ? ? ? * there will be some threads do nothing.  
 ? ? ? ? */  
 ? ? ?  for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {  
 ? ? ? ? ?  int consumerIndex = channelIndex % consumerThreads.length;  
 ? ? ? ? ?  consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));  
 ? ? ?  }  
?  
 ?  }  
?  
 ?  @Override  
 ?  public void close(Channels channels) {  
 ? ? ?  try {  
 ? ? ? ? ?  lock.lock();  
 ? ? ? ? ?  this.running = false;  
 ? ? ? ? ?  for (ConsumerThread consumerThread : consumerThreads) {  
 ? ? ? ? ? ? ?  consumerThread.shutdown();  
 ? ? ? ? ?  }  
 ? ? ?  } finally {  
 ? ? ? ? ?  lock.unlock();  
 ? ? ?  }  
 ?  }  
}
  • ConsumeDriver实现了IDriver接口,其ConsumeDriver会创建num个ConsumerThread;其begin方法会执行allocateBuffer2Thread,给每个consumerThread添加dataSource,然后执行consumerThread.start();其close方法会执行consumerThread.shutdown()

小结

TraceSegmentServiceClient实现了BootService、IConsumer、TracingContextListener、GRPCChannelListener接口;其prepare方法往GRPCChannelManager注册自身的channelListener;其boot方法设置lastLogTime,实例化DataCarrier,并设置其consumer为自身;其onComplete方法执行TracingContext.ListenerManager.add(this);其shutdown方法执行TracingContext.ListenerManager.remove(this)以及carrier.shutdownConsumers();其consume方法在status为CONNECTED的时候执行upstreamSegmentStreamObserver.onNext(upstreamSegment)、upstreamSegmentStreamObserver.onCompleted()以及status.wait4Finish();其afterFinished方法执行carrier.produce(traceSegment);其statusChanged设置serviceStub及status

doc

  • TraceSegmentServiceClient

浅谈skywalking的TraceSegmentServiceClient

标签:ble   apache   use   find   abs   rate   参数   ddd   builder   

原文地址:https://blog.51cto.com/14744108/2481181

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!