标签:cluster err 最小 dsd cout 根据 public oba ===
Stream 的分组分为随机分组、字段分组、全部分组、全局分组、无分组、直接分组,自定义分组
1.Shuffle grouping:通过tuple获取任务到supout,然后再由spout将任务分发到Bolt上。这种分组是随机性的,没有规律可言,任务的多少可能会跟被分配机器性能有关。
2.Fields grouping : 根据指定字段将tuple进行分组。例如,根据“user-id”字段,相同“user-id”的tuple总是分发到task上,不同“user-id”的tuple可能分发到不同的task上。
3.All grouping : tuple被复制到bolt的所有任务。这种类型需要谨慎使用。
4.Global grouping : 在多个bolt中对数据进行了一系列的操作,在最后一个bolt时需要对前面bolt操作的数据进行整合,这里就需要用global grouping 分组来进行整合。确切的说,是分配给ID最小的那个task执行。
5.Direct grouping : 与global grouping 实现刚好相反的作用。这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
6.local or shuffle grouping : 假如上游的组件是spout或bolt,下游是一个bolt时,假如通过shuffle或field分组恰好上游的task到下游的task时,两个work恰好是同一个work,都在一个jvm进程里面,正常情况下,我们会起多个solt点,让上游的task发送到下游的task时分配了2个jvm进程,会通过tcp/rcp的方式进行通信,但是,如果上游的task和下游的task时在同一个进程时是没必要进行通信的,所以采用了本地或随机分组方式,减少网络通信的消耗,提高storm的运算效率
7.None grouping : 你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
8.Custom grouping : 一般情况下,我们不会自定义grouping,
========================================= Topology =============================================== public class ShuffleGroupingTopology { private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingTopology.class); public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("ShuffleGroupingSpout", new ShuffleGroupingSpout(),1); builder.setBolt("ShuffleGroupingBolt",new ShuffleGroupingBolt(),2).shuffleGrouping("ShuffleGroupingSpout"); builder.setBolt("ShuffleGroupingBolt2",new ShuffleGroupingBolt2(),2).shuffleGrouping("ShuffleGroupingBolt"); Config config = new Config(); config.setNumWorkers(3); try { StormSubmitter.submitTopology("ShuffleGroupingTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology {} is submitted.","ShuffleGroupingTopology"); log.warn("=================================================="); } catch (Exception e) { e.printStackTrace(); } } } ===================================== Spout =========================================
public class ShuffleGroupingSpout extends BaseRichSpout{ private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingSpout.class); private SpoutOutputCollector collector; private TopologyContext context; private AtomicInteger ai; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.context = context; this.ai = new AtomicInteger(); log.warn("ShuffleGroupingSpout ------> open:hashcode{} ---->taskId:{}",this.hashCode(),context.getThisTaskId()); } @Override public void nextTuple() { int i =this.ai.getAndIncrement(); if(i<10){ log.warn("ShuffleGroupingSpout ------> nextTuple:hashcode:{} ---->taskId:{} ----->value:{}",this.hashCode(),context.getThisTaskId(),i); collector.emit(new Values(i)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("i")); } @Override public void close() { log.warn("ShuffleGroupingSpout ------> close:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } } ====================================== Bolt ============================================ public class ShuffleGroupingBolt extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn("ShuffleGroupingBolt ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer i = input.getIntegerByField("i"); collector.emit(new Values(i*10)); log.warn("ShuffleGroupingBolt ------> prepare:hashcode:{} ---->taskId:{} ---->value:{}",this.hashCode(),context.getThisTaskId(),i); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result")); } @Override public void cleanup() { log.warn("ShuffleGroupingBolt ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } } ====================================== Bolt2 ============================================ public class ShuffleGroupingBolt2 extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt2.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn("ShuffleGroupingBolt2 ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer i = input.getIntegerByField("result"); log.warn("ShuffleGroupingBolt2 ------> prepare:hashcode:{} ---->taskId:{} ---->result:{}",this.hashCode(),context.getThisTaskId(),i); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //nothing to do } @Override public void cleanup() { log.warn("ShuffleGroupingBolt2 ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } }
//====================================== Topology ============================================ public class FieldsShuffleTopology { private static final Logger log = LoggerFactory.getLogger(FieldsShuffleTopology.class); public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("FieldsShuffleSpout", new FieldsShuffleSpout(),1); builder.setBolt("FieldsShuffleUpperBolt",new FieldsShuffleUpperBolt(),2).shuffleGrouping("FieldsShuffleSpout"); builder.setBolt("FieldsShuffleFinalBolt",new FieldsShuffleFinalBolt(),2).fieldsGrouping("FieldsShuffleUpperBolt", new Fields("upperName")); Config config = new Config(); config.setNumWorkers(3); try { StormSubmitter.submitTopology("FieldsShuffleTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology {} is submitted.","FieldsShuffleTopology"); log.warn("=================================================="); } catch (Exception e) { e.printStackTrace(); } } } ====================================== Spout ============================================ public class FieldsShuffleSpout extends BaseRichSpout { private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingSpout.class); private SpoutOutputCollector collector; private TopologyContext context; private List<String> list; private int index; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.context = context; this.index = 0; this.list = Arrays.asList("Hello", "Hello", "Hello", "Hello", "Hello", "Hello", "Word", "Word"); log.warn("FieldsShuffleSpout open:hashcode{} taskId:{}", this.hashCode(), context.getThisTaskId()); } @Override public void nextTuple() { if (index < list.size()) { String s = list.get(index++); log.warn("FieldsShuffleSpout nextTuple:hashcode:{} taskId:{} value:{}", this.hashCode(), context.getThisTaskId(), s); collector.emit(new Values(s)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("i")); } @Override public void close() { log.warn("FieldsShuffleSpout close:hashcode:{} taskId:{} ", this.hashCode(), context.getThisTaskId()); } } ====================================== Bolt ========================================== public class FieldsShuffleUpperBolt extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn("FieldsShuffleUpperBolt ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String s = input.getStringByField("i"); collector.emit(new Values(s.toUpperCase())); log.warn("FieldsShuffleUpperBolt ------> prepare:hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("upperName")); } @Override public void cleanup() { log.warn("FieldsShuffleUpperBolt ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } } ====================================== Bolt2 ========================================== public class FieldsShuffleFinalBolt extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(FieldsShuffleFinalBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn("FieldsShuffleFinalBolt ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String s = input.getStringByField("upperName"); log.warn("FieldsShuffleFinalBolt ------> prepare:hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //nothing to do } @Override public void cleanup() { log.warn("FieldsShuffleFinalBolt ------> cleanup:hashcode:{} ---->taskId:{} ",this.hashCode(),context.getThisTaskId()); } }
tuple数据将会被复制到下游的所有的bolt的任务中。这种类型需要谨慎使用。
public class AllGroupTopology { private static final Logger log = LoggerFactory.getLogger(AllGroupTopology .class); public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("AllGrouppingSpout", new AllGrouppingSpout(),1); builder.setBolt("AllBolt1",new AllBolt1(),1).shuffleGrouping("AllGrouppingSpout"); builder.setBolt("AllBolt2",new AllBolt2(),1).shuffleGrouping("AllGrouppingSpout"); Config config = new Config(); config.setNumWorkers(3); try { StormSubmitter.submitTopology("AllGroupTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology {} is submitted.","AllGroupTopology"); log.warn("=================================================="); } catch (Exception e) { e.printStackTrace(); } } } ====================================== AllGrouppingSpout ========================================== public class AllGrouppingSpout extends BaseRichSpout { private static final Logger log = LoggerFactory.getLogger(AllGrouppingSpout.class); private SpoutOutputCollector collector; private TopologyContext context; private List<String> list; private int index; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.context = context; this.index = 0; this.list = Arrays.asList("Hello", "Hello", "Hello", "Hello", "Hello", "Hello", "Word", "Word"); log.warn("AllGrouppingSpout open:hashcode{} taskId:{}", this.hashCode(), context.getThisTaskId()); } @Override public void nextTuple() { if (index < list.size()) { String s = list.get(index++); log.warn("AllGrouppingSpout nextTuple:hashcode:{} taskId:{} value:{}", this.hashCode(), context.getThisTaskId(), s); collector.emit(new Values(s)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("name")); } } ====================================== AllBolt1 ========================================== public class AllBolt1 extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(AllBolt1.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn("AllBolt1 ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String s = input.getStringByField("name"); collector.emit(new Values(s.toUpperCase())); log.warn("AllBolt1 execute:hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //nothing to do } } ====================================== AllBolt2 ========================================== public class AllBolt2 extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(AllBolt2.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn("AllBolt2 ------> prepare:hashcode:{} ----> taskId:{} ",this.hashCode(),context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String s = input.getStringByField("name"); collector.emit(new Values(s.toUpperCase())); log.warn("AllBolt2 execute :hashcode:{} ---->taskId:{} ---->String:{}",this.hashCode(),context.getThisTaskId(),s); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //nothing to do } }
//通过查看日志得出,2个bolt都收到了spout的8条数据,也就是说,shuffle groupping的分组并没有起到作用,还是从spout中获取了8条数据,所以是将任务的副本全拷贝过来了
在多个bolt中对数据进行了一系列的操作,在最后一个bolt时需要对前面bolt操作的数据进行整合,这里就需要用global grouping 分组来进行整合。
确切的说,是分配给ID最小的那个task执行。
需求:随机将数据分发到doubleBolt上,在最后一个bolt上做整合操作
public class GlobalGroupingTopology { private final static Logger log = LoggerFactory.getLogger(GlobalGroupingTopology.class); public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("NumberGenerateSpout", new NumberGenerateSpout(), 1); builder.setBolt("NumberDoubleBolt", new NumberDoubleBolt(), 2).shuffleGrouping("NumberGenerateSpout"); builder.setBolt("NumberPrintBolt", new NumberPrintBolt(), 2).globalGrouping("NumberDoubleBolt"); Config config = new Config(); config.setNumWorkers(4); try { StormSubmitter.submitTopology("GlobalGroupingTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology {} is submitted.", "GlobalGroupingTopology"); log.warn("=================================================="); } catch (Exception e) { e.printStackTrace(); } } } // ============================== spout ======================================= public class NumberGenerateSpout extends BaseRichSpout{ private SpoutOutputCollector collector; private AtomicInteger counter; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.counter = new AtomicInteger(0); } @Override public void nextTuple() { while(counter.get()< 10){ collector.emit(new Values(counter.getAndIncrement())); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("i")); } }
// =========================== bolt ==================================== public class NumberDoubleBolt extends BaseBasicBolt{ @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer value = input.getIntegerByField("i"); collector.emit(new Values(value*2,10)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("i","constant")); } } // ========================== bolt ==================================== public class NumberPrintBolt extends BaseBasicBolt{ private final static Logger logger = LoggerFactory.getLogger(NumberPrintBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; logger.warn("============== perpare TaskID:{}",context.getThisTaskId()); } @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer i = input.getIntegerByField("i"); Integer constant = input.getIntegerByField("constant"); logger.warn("taskID:{},instantID:{},i:{},constant:{}",context.getThisTaskId(),this,i,constant); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // nothing to do } }
与global grouping 实现刚好相反的作用,这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
public class DirectGroupingTopology { private static final Logger log = LoggerFactory.getLogger(DirectGroupingTopology.class); public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("NumberGenerateSpout", new NumberGenerateSpout(),1); builder.setBolt("NumberDoubleBolt",new NumberDoubleBolt(),2).directGrouping("NumberGenerateSpout"); Config config = new Config(); config.setNumWorkers(3); try { StormSubmitter.submitTopology("DirectGroupingTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology {} is submitted.","DirectGroupingTopology"); log.warn("=================================================="); } catch (Exception e) { e.printStackTrace(); } } }
// ============================= spout =============================== public class NumberGenerateSpout extends BaseRichSpout { private SpoutOutputCollector collector; private AtomicInteger counter; private int destTaskID; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; counter = new AtomicInteger(0); List<Integer> tasks = context.getComponentTasks("NumberDoubleBolt"); destTaskID = tasks.stream().mapToInt(Integer::intValue).max().getAsInt(); } @Override public void nextTuple() { while(counter.get() < 10){ collector.emitDirect(destTaskID,new Values(counter.getAndIncrement())); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(true,new Fields("i")); } }
// =============================== bolt ============================================== public class NumberDoubleBolt extends BaseBasicBolt{ private static final Logger log = LoggerFactory.getLogger(NumberDoubleBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; log.warn(" =================== taskId:{}",context.getThisTaskId()); }; @Override public void execute(Tuple input, BasicOutputCollector collector) { Integer value = input.getIntegerByField("i"); log.warn("taskID:{},instanceID:{},value:{}",context.getThisTaskId(),this,value); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // nothing to do } }
public class LocalLogTopology { private static final Logger log = LoggerFactory.getLogger(LocalLogTopology.class); public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("LogSpout", new LogSpout(),1); builder.setBolt("LogParseBolt",new LogParseBolt(),1).localOrShuffleGrouping("LogSpout"); builder.setBolt("LogPrintBolt",new LogPrintBolt(),2).localOrShuffleGrouping("LogSpout"); Config config = new Config(); config.setDebug(false); config.setNumWorkers(4); try { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalLogTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology {} is submitted.","LocalLogTopology"); log.warn("=================================================="); TimeUnit.SECONDS.sleep(120); cluster.killTopology("LocalLogToplogy"); cluster.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } // =============================== spout ============================== public class LogSpout extends BaseRichSpout{ private SpoutOutputCollector collector; private List<String> list; private int index; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.index = 0; this.list = Arrays.asList("JAVA,COLLECTION","JAVA,IO","JAVA,THREAD","JAVA,LAMBDA","BIG_DATA,STORM", "BIG_DATA,KAFKA","BIG_DATA,HADDOP","BIG_DATA,FLUME","BIG_DATA,KAFKA","C,c"); } @Override public void nextTuple() { while(index < list.size()){ collector.emit(new Values(list.get(index++))); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("entity")); } } //============================== bolt ===================================== public class LogParseBolt extends BaseBasicBolt{ @Override public void execute(Tuple input, BasicOutputCollector collector) { String entity = input.getStringByField("entity"); List<String> list = Splitter.on(",").splitToList(entity); collector.emit(new Values(list.get(0),list.get(1))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("category","item")); } }
//============================= bolt =========================================== public class LogPrintBolt extends BaseBasicBolt{ private static final Logger LOG = LoggerFactory.getLogger(LogPrintBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) { this.context = context; LOG.info("=========================================="); LOG.info("prepare taskID:{}",context.getThisTaskId()); LOG.info("=========================================="); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String category = input.getStringByField("category"); String item = input.getStringByField("item"); LOG.info("=========================================="); LOG.info("execute:category:{},item:{},taskID:{}",category,item,context.getThisTaskId()); LOG.info("=========================================="); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // nothing to do } }
你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
public class LocalLogTopology { private static final Logger log = LoggerFactory.getLogger(LocalLogTopology.class); public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("LogSpout", new LogSpout(),1); builder.setBolt("LogParseBolt",new LogParseBolt(),1).localOrShuffleGrouping("LogSpout"); builder.setBolt("LogPrintBolt",new LogPrintBolt(),2).customGrouping("LogParseBolt", new HighTaskIDGrouping()); Config config = new Config(); config.setDebug(false); config.setNumWorkers(4); try { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalLogTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology {} is submitted.","LocalLogTopology"); log.warn("=================================================="); TimeUnit.SECONDS.sleep(120); cluster.killTopology("LocalLogToplogy"); cluster.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } // =============================== spout ============================== public class LogSpout extends BaseRichSpout{ private SpoutOutputCollector collector; private List<String> list; private int index; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.index = 0; this.list = Arrays.asList("JAVA,COLLECTION","JAVA,IO","JAVA,THREAD","JAVA,LAMBDA","BIG_DATA,STORM", "BIG_DATA,KAFKA","BIG_DATA,HADDOP","BIG_DATA,FLUME","BIG_DATA,KAFKA","C,c"); } @Override public void nextTuple() { while(index < list.size()){ collector.emit(new Values(list.get(index++))); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("entity")); } } //============================== bolt ===================================== public class LogParseBolt extends BaseBasicBolt{ @Override public void execute(Tuple input, BasicOutputCollector collector) { String entity = input.getStringByField("entity"); List<String> list = Splitter.on(",").splitToList(entity); collector.emit(new Values(list.get(0),list.get(1))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("category","item")); } } //============================= custom grouping =========================================== public class HighTaskIDGrouping implements CustomStreamGrouping{ private int taskID; @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) { //List<Integer> targetTasks: 下游所有的tasks的集合 ArrayList<Integer> tasks = new ArrayList<>(targetTasks); Collections.sort(tasks); //从小到大排列 this.taskID = tasks.get(tasks.size() -1); } @Override public List<Integer> chooseTasks(int taskId, List<Object> values) { return Arrays.asList(taskID); } }
标签:cluster err 最小 dsd cout 根据 public oba ===
原文地址:https://www.cnblogs.com/MrRightZhao/p/11066077.html