标签:ble apache use find abs rate 参数 ddd builder
本文参考原文-http://bjbsair.com/2020-03-22/tech-info/5102/本文主要研究一下skywalking的TraceSegmentServiceClient
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContextListener.java
public interface TracingContextListener {
? void afterFinished(TraceSegment traceSegment);
}
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegment.java
public class TraceSegment {
?
? private ID traceSegmentId;
?
? private List<TraceSegmentRef> refs;
?
? private List<AbstractTracingSpan> spans;
?
? private DistributedTraceIds relatedGlobalTraces;
?
? private boolean ignore = false;
?
? private boolean isSizeLimited = false;
?
? private final long createTime;
?
? public TraceSegment() {
? ? ? this.traceSegmentId = GlobalIdGenerator.generate();
? ? ? this.spans = new LinkedList<AbstractTracingSpan>();
? ? ? this.relatedGlobalTraces = new DistributedTraceIds();
? ? ? this.relatedGlobalTraces.append(new NewDistributedTraceId());
? ? ? this.createTime = System.currentTimeMillis();
? }
?
? public void ref(TraceSegmentRef refSegment) {
? ? ? if (refs == null) {
? ? ? ? ? refs = new LinkedList<TraceSegmentRef>();
? ? ? }
? ? ? if (!refs.contains(refSegment)) {
? ? ? ? ? refs.add(refSegment);
? ? ? }
? }
?
? public void relatedGlobalTraces(DistributedTraceId distributedTraceId) {
? ? ? relatedGlobalTraces.append(distributedTraceId);
? }
?
? public void archive(AbstractTracingSpan finishedSpan) {
? ? ? spans.add(finishedSpan);
? }
?
? public TraceSegment finish(boolean isSizeLimited) {
? ? ? this.isSizeLimited = isSizeLimited;
? ? ? return this;
? }
?
? public ID getTraceSegmentId() {
? ? ? return traceSegmentId;
? }
?
? public int getServiceId() {
? ? ? return RemoteDownstreamConfig.Agent.SERVICE_ID;
? }
?
? public boolean hasRef() {
? ? ? return !(refs == null || refs.size() == 0);
? }
?
? public List<TraceSegmentRef> getRefs() {
? ? ? return refs;
? }
?
? public List<DistributedTraceId> getRelatedGlobalTraces() {
? ? ? return relatedGlobalTraces.getRelatedGlobalTraces();
? }
?
? public boolean isSingleSpanSegment() {
? ? ? return this.spans != null && this.spans.size() == 1;
? }
?
? public boolean isIgnore() {
? ? ? return ignore;
? }
?
? public void setIgnore(boolean ignore) {
? ? ? this.ignore = ignore;
? }
?
? public UpstreamSegment transform() {
? ? ? UpstreamSegment.Builder upstreamBuilder = UpstreamSegment.newBuilder();
? ? ? for (DistributedTraceId distributedTraceId : getRelatedGlobalTraces()) {
? ? ? ? ? upstreamBuilder = upstreamBuilder.addGlobalTraceIds(distributedTraceId.toUniqueId());
? ? ? }
? ? ? SegmentObject.Builder traceSegmentBuilder = SegmentObject.newBuilder();
? ? ? /**
? ? ? ? * Trace Segment
? ? ? ? */
? ? ? traceSegmentBuilder.setTraceSegmentId(this.traceSegmentId.transform());
? ? ? // Don‘t serialize TraceSegmentReference
?
? ? ? // SpanObject
? ? ? for (AbstractTracingSpan span : this.spans) {
? ? ? ? ? traceSegmentBuilder.addSpans(span.transform());
? ? ? }
? ? ? traceSegmentBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID);
? ? ? traceSegmentBuilder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
? ? ? traceSegmentBuilder.setIsSizeLimited(this.isSizeLimited);
?
? ? ? upstreamBuilder.setSegment(traceSegmentBuilder.build().toByteString());
? ? ? return upstreamBuilder.build();
? }
?
? @Override
? public String toString() {
? ? ? return "TraceSegment{" +
? ? ? ? ? "traceSegmentId=‘" + traceSegmentId + ‘\‘‘ +
? ? ? ? ? ", refs=" + refs +
? ? ? ? ? ", spans=" + spans +
? ? ? ? ? ", relatedGlobalTraces=" + relatedGlobalTraces +
? ? ? ? ? ‘}‘;
? }
?
? public int getApplicationInstanceId() {
? ? ? return RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID;
? }
?
? public long createTime() {
? ? ? return this.createTime;
? }
}
skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IConsumer.java
public interface IConsumer<T> {
? void init();
?
? void consume(List<T> data);
?
? void onError(List<T> data, Throwable t);
?
? void onExit();
}
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@DefaultImplementor
public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, GRPCChannelListener {
? private static final ILog logger = LogManager.getLogger(TraceSegmentServiceClient.class);
? private static final int TIMEOUT = 30 * 1000;
?
? private long lastLogTime;
? private long segmentUplinkedCounter;
? private long segmentAbandonedCounter;
? private volatile DataCarrier<TraceSegment> carrier;
? private volatile TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub serviceStub;
? private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
?
? @Override
? public void prepare() throws Throwable {
? ? ? ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
? }
?
? @Override
? public void boot() throws Throwable {
? ? ? lastLogTime = System.currentTimeMillis();
? ? ? segmentUplinkedCounter = 0;
? ? ? segmentAbandonedCounter = 0;
? ? ? carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);
? ? ? carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
? ? ? carrier.consume(this, 1);
? }
?
? @Override
? public void onComplete() throws Throwable {
? ? ? TracingContext.ListenerManager.add(this);
? }
?
? @Override
? public void shutdown() throws Throwable {
? ? ? TracingContext.ListenerManager.remove(this);
? ? ? carrier.shutdownConsumers();
? }
?
? @Override
? public void init() {
?
? }
?
? @Override
? public void consume(List<TraceSegment> data) {
? ? ? if (CONNECTED.equals(status)) {
? ? ? ? ? final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
? ? ? ? ? StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? public void onNext(Commands commands) {
? ? ? ? ? ? ? ? ? ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
? ? ? ? ? ? ? }
?
? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? public void onError(Throwable throwable) {
? ? ? ? ? ? ? ? ? status.finished();
? ? ? ? ? ? ? ? ? if (logger.isErrorEnable()) {
? ? ? ? ? ? ? ? ? ? ? logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
? ? ? ? ? ? ? }
?
? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? public void onCompleted() {
? ? ? ? ? ? ? ? ? status.finished();
? ? ? ? ? ? ? }
? ? ? ? ? });
?
? ? ? ? ? try {
? ? ? ? ? ? ? for (TraceSegment segment : data) {
? ? ? ? ? ? ? ? ? UpstreamSegment upstreamSegment = segment.transform();
? ? ? ? ? ? ? ? ? upstreamSegmentStreamObserver.onNext(upstreamSegment);
? ? ? ? ? ? ? }
? ? ? ? ? } catch (Throwable t) {
? ? ? ? ? ? ? logger.error(t, "Transform and send UpstreamSegment to collector fail.");
? ? ? ? ? }
?
? ? ? ? ? upstreamSegmentStreamObserver.onCompleted();
?
? ? ? ? ? status.wait4Finish();
? ? ? ? ? segmentUplinkedCounter += data.size();
? ? ? } else {
? ? ? ? ? segmentAbandonedCounter += data.size();
? ? ? }
?
? ? ? printUplinkStatus();
? }
?
? private void printUplinkStatus() {
? ? ? long currentTimeMillis = System.currentTimeMillis();
? ? ? if (currentTimeMillis - lastLogTime > 30 * 1000) {
? ? ? ? ? lastLogTime = currentTimeMillis;
? ? ? ? ? if (segmentUplinkedCounter > 0) {
? ? ? ? ? ? ? logger.debug("{} trace segments have been sent to collector.", segmentUplinkedCounter);
? ? ? ? ? ? ? segmentUplinkedCounter = 0;
? ? ? ? ? }
? ? ? ? ? if (segmentAbandonedCounter > 0) {
? ? ? ? ? ? ? logger.debug("{} trace segments have been abandoned, cause by no available channel.", segmentAbandonedCounter);
? ? ? ? ? ? ? segmentAbandonedCounter = 0;
? ? ? ? ? }
? ? ? }
? }
?
? @Override
? public void onError(List<TraceSegment> data, Throwable t) {
? ? ? logger.error(t, "Try to send {} trace segments to collector, with unexpected exception.", data.size());
? }
?
? @Override
? public void onExit() {
?
? }
?
? @Override
? public void afterFinished(TraceSegment traceSegment) {
? ? ? if (traceSegment.isIgnore()) {
? ? ? ? ? return;
? ? ? }
? ? ? if (!carrier.produce(traceSegment)) {
? ? ? ? ? if (logger.isDebugEnable()) {
? ? ? ? ? ? ? logger.debug("One trace segment has been abandoned, cause by buffer is full.");
? ? ? ? ? }
? ? ? }
? }
?
? @Override
? public void statusChanged(GRPCChannelStatus status) {
? ? ? if (CONNECTED.equals(status)) {
? ? ? ? ? Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
? ? ? ? ? serviceStub = TraceSegmentReportServiceGrpc.newStub(channel);
? ? ? }
? ? ? this.status = status;
? }
}
skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
public class ConsumerThread<T> extends Thread {
? private volatile boolean running;
? private IConsumer<T> consumer;
? private List<DataSource> dataSources;
? private long consumeCycle;
?
? ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {
? ? ? super(threadName);
? ? ? this.consumer = consumer;
? ? ? running = false;
? ? ? dataSources = new ArrayList<DataSource>(1);
? ? ? this.consumeCycle = consumeCycle;
? }
?
? /**
? ? * add whole buffer to consume
? ? *
? ? * @param sourceBuffer
? ? */
? void addDataSource(QueueBuffer<T> sourceBuffer) {
? ? ? this.dataSources.add(new DataSource(sourceBuffer));
? }
?
? @Override
? public void run() {
? ? ? running = true;
?
? ? ? final List<T> consumeList = new ArrayList<T>(1500);
? ? ? while (running) {
? ? ? ? ? if (!consume(consumeList)) {
? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? Thread.sleep(consumeCycle);
? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? }
? ? ? ? ? }
? ? ? }
?
? ? ? // consumer thread is going to stop
? ? ? // consume the last time
? ? ? consume(consumeList);
?
? ? ? consumer.onExit();
? }
?
? private boolean consume(List<T> consumeList) {
? ? ? for (DataSource dataSource : dataSources) {
? ? ? ? ? dataSource.obtain(consumeList);
? ? ? }
?
? ? ? if (!consumeList.isEmpty()) {
? ? ? ? ? try {
? ? ? ? ? ? ? consumer.consume(consumeList);
? ? ? ? ? } catch (Throwable t) {
? ? ? ? ? ? ? consumer.onError(consumeList, t);
? ? ? ? ? } finally {
? ? ? ? ? ? ? consumeList.clear();
? ? ? ? ? }
? ? ? ? ? return true;
? ? ? }
? ? ? return false;
? }
?
? void shutdown() {
? ? ? running = false;
? }
?
? /**
? ? * DataSource is a refer to {@link Buffer}.
? ? */
? class DataSource {
? ? ? private QueueBuffer<T> sourceBuffer;
?
? ? ? DataSource(QueueBuffer<T> sourceBuffer) {
? ? ? ? ? this.sourceBuffer = sourceBuffer;
? ? ? }
?
? ? ? void obtain(List<T> consumeList) {
? ? ? ? ? sourceBuffer.obtain(consumeList);
? ? ? }
? }
}
skywalking-6.6.0/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
public class ConsumeDriver<T> implements IDriver {
? private boolean running;
? private ConsumerThread[] consumerThreads;
? private Channels<T> channels;
? private ReentrantLock lock;
?
? public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
? ? ? long consumeCycle) {
? ? ? this(channels, num);
? ? ? for (int i = 0; i < num; i++) {
? ? ? ? ? consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
? ? ? ? ? consumerThreads[i].setDaemon(true);
? ? ? }
? }
?
? public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
? ? ? this(channels, num);
? ? ? prototype.init();
? ? ? for (int i = 0; i < num; i++) {
? ? ? ? ? consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);
? ? ? ? ? consumerThreads[i].setDaemon(true);
? ? ? }
?
? }
?
? private ConsumeDriver(Channels<T> channels, int num) {
? ? ? running = false;
? ? ? this.channels = channels;
? ? ? consumerThreads = new ConsumerThread[num];
? ? ? lock = new ReentrantLock();
? }
?
? private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {
? ? ? try {
? ? ? ? ? IConsumer<T> inst = consumerClass.newInstance();
? ? ? ? ? inst.init();
? ? ? ? ? return inst;
? ? ? } catch (InstantiationException e) {
? ? ? ? ? throw new ConsumerCannotBeCreatedException(e);
? ? ? } catch (IllegalAccessException e) {
? ? ? ? ? throw new ConsumerCannotBeCreatedException(e);
? ? ? }
? }
?
? @Override
? public void begin(Channels channels) {
? ? ? if (running) {
? ? ? ? ? return;
? ? ? }
? ? ? try {
? ? ? ? ? lock.lock();
? ? ? ? ? this.allocateBuffer2Thread();
? ? ? ? ? for (ConsumerThread consumerThread : consumerThreads) {
? ? ? ? ? ? ? consumerThread.start();
? ? ? ? ? }
? ? ? ? ? running = true;
? ? ? } finally {
? ? ? ? ? lock.unlock();
? ? ? }
? }
?
? @Override
? public boolean isRunning(Channels channels) {
? ? ? return running;
? }
?
? private void allocateBuffer2Thread() {
? ? ? int channelSize = this.channels.getChannelSize();
? ? ? /**
? ? ? ? * if consumerThreads.length < channelSize
? ? ? ? * each consumer will process several channels.
? ? ? ? *
? ? ? ? * if consumerThreads.length == channelSize
? ? ? ? * each consumer will process one channel.
? ? ? ? *
? ? ? ? * if consumerThreads.length > channelSize
? ? ? ? * there will be some threads do nothing.
? ? ? ? */
? ? ? for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
? ? ? ? ? int consumerIndex = channelIndex % consumerThreads.length;
? ? ? ? ? consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
? ? ? }
?
? }
?
? @Override
? public void close(Channels channels) {
? ? ? try {
? ? ? ? ? lock.lock();
? ? ? ? ? this.running = false;
? ? ? ? ? for (ConsumerThread consumerThread : consumerThreads) {
? ? ? ? ? ? ? consumerThread.shutdown();
? ? ? ? ? }
? ? ? } finally {
? ? ? ? ? lock.unlock();
? ? ? }
? }
}
TraceSegmentServiceClient实现了BootService、IConsumer、TracingContextListener、GRPCChannelListener接口;其prepare方法往GRPCChannelManager注册自身的channelListener;其boot方法设置lastLogTime,实例化DataCarrier,并设置其consumer为自身;其onComplete方法执行TracingContext.ListenerManager.add(this);其shutdown方法执行TracingContext.ListenerManager.remove(this)以及carrier.shutdownConsumers();其consume方法在status为CONNECTED的时候执行upstreamSegmentStreamObserver.onNext(upstreamSegment)、upstreamSegmentStreamObserver.onCompleted()以及status.wait4Finish();其afterFinished方法执行carrier.produce(traceSegment);其statusChanged设置serviceStub及status
浅谈skywalking的TraceSegmentServiceClient
标签:ble apache use find abs rate 参数 ddd builder
原文地址:https://blog.51cto.com/14744108/2481181