码迷,mamicode.com
首页 > 其他好文 > 详细

Service框架浅析Google-Guava Concurrent

时间:2020-04-15 00:23:19      阅读:68      评论:0      收藏:0      [点我收藏+]

标签:ram   xtend   oop   false   protect   which   abstract   inter   发包   

Guava包里的Service接口用于封装一个服务对象的运行状态、包括start和stop等方法。例如web服务器,RPC服务器、计时器等可以实现这个接口。对此类服务的状态管理并不轻松、需要对服务的开启/关闭进行妥善管理、特别是在多线程环境下尤为复杂。Guava包提供了一些基础类帮助你管理复杂的状态转换逻辑和同步细节。


使用一个服务

一个服务正常生命周期有:

Service.State.NEW
Service.State.STARTING
Service.State.RUNNING
Service.State.STOPPING
Service.State.TERMINATED
服务一旦被停止就无法再重新启动了。如果服务在starting、running、stopping状态出现问题、会进入Service.State.FAILED.状态。调用 startAsync()方法可以异步开启一个服务,同时返回this对象形成方法调用链。注意:只有在当前服务的状态是NEW时才能调用startAsync()方法,因此最好在应用中有一个统一的地方初始化相关服务。停止一个服务也是类似的、使用异步方法stopAsync() 。但是不像startAsync(),多次调用这个方法是安全的。这是为了方便处理关闭服务时候的锁竞争问题。

Service也提供了一些方法用于等待服务状态转换的完成:

通过 addListener()方法异步添加监听器。此方法允许你添加一个 Service.Listener 、它会在每次服务状态转换的时候被调用。注意:最好在服务启动之前添加Listener(这时的状态是NEW)、否则之前已发生的状态转换事件是无法在新添加的Listener上被重新触发的。

同步使用awaitRunning()。这个方法不能被打断、不强制捕获异常、一旦服务启动就会返回。如果服务没有成功启动,会抛出IllegalStateException异常。同样的, awaitTerminated() 方法会等待服务达到终止状态(TERMINATED 或者 FAILED)。两个方法都有重载方法允许传入超时时间。

Service 接口本身实现起来会比较复杂、且容易碰到一些捉摸不透的问题。因此我们不推荐直接实现这个接口。而是请继承Guava包里已经封装好的基础抽象类。每个基础类支持一种特定的线程模型。

基础实现类

AbstractIdleService

AbstractIdleService 类简单实现了Service接口、其在running状态时不会执行任何动作–因此在running时也不需要启动线程–但需要处理开启/关闭动作。要实现一个此类的服务,只需继承AbstractIdleService类,然后自己实现startUp() 和shutDown()方法就可以了。


1 protected void startUp() {
2 servlets.add(new GcStatsServlet());
3 }
4 protected void shutDown() {}
如上面的例子、由于任何请求到GcStatsServlet时已经会有现成线程处理了,所以在服务运行时就不需要做什么额外动作了。

AbstractExecutionThreadService

AbstractExecutionThreadService 通过单线程处理启动、运行、和关闭等操作。你必须重载run()方法,同时需要能响应停止服务的请求。具体的实现可以在一个循环内做处理:


1 public void run() {
2 while (isRunning()) {
3 // perform a unit of work
4 }
5 }
另外,你还可以重载triggerShutdown()方法让run()方法结束返回。

重载startUp()和shutDown()方法是可选的,不影响服务本身状态的管理


01 protected void startUp() {
02 dispatcher.listenForConnections(port, queue);
03 }
04 protected void run() {
05 Connection connection;
06 while ((connection = queue.take() != POISON)) {
07 process(connection);
08 }
09 }
10 protected void triggerShutdown() {
11 dispatcher.stopListeningForConnections(queue);
12 queue.put(POISON);
13 }
start()内部会调用startUp()方法,创建一个线程、然后在线程内调用run()方法。stop()会调用 triggerShutdown()方法并且等待线程终止。

AbstractScheduledService

AbstractScheduledService类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration()方法定义一个周期执行的任务,以及相应的startUp()和shutDown()方法。为了能够描述执行周期,你需要实现scheduler()方法。通常情况下,你可以使用AbstractScheduledService.Scheduler类提供的两种调度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit),类似于JDK并发包中ScheduledExecutorService类提供的两种调度方式。如要自定义schedules则可以使用 CustomScheduler类来辅助实现;具体用法见javadoc。

AbstractService

如需要自定义的线程管理、可以通过扩展 AbstractService类来实现。一般情况下、使用上面的几个实现类就已经满足需求了,但如果在服务执行过程中有一些特定的线程处理需求、则建议继承AbstractService类。

继承AbstractService方法必须实现两个方法.

doStart(): 首次调用startAsync()时会同时调用doStart(),doStart()内部需要处理所有的初始化工作、如果启动成功则调用notifyStarted()方法;启动失败则调用notifyFailed()
doStop(): 首次调用stopAsync()会同时调用doStop(),doStop()要做的事情就是停止服务,如果停止成功则调用 notifyStopped()方法;停止失败则调用 notifyFailed()方法。
doStart和doStop方法的实现需要考虑下性能,尽可能的低延迟。如果初始化的开销较大,如读文件,打开网络连接,或者其他任何可能引起阻塞的操作,建议移到另外一个单独的线程去处理。

使用ServiceManager

除了对Service接口提供基础的实现类,Guava还提供了 ServiceManager类使得涉及到多个Service集合的操作更加容易。通过实例化ServiceManager类来创建一个Service集合,你可以通过以下方法来管理它们:

startAsync() : 将启动所有被管理的服务。如果当前服务的状态都是NEW的话、那么你只能调用该方法一次、这跟 Service#startAsync()是一样的。
stopAsync() :将停止所有被管理的服务。
addListener :会添加一个ServiceManager.Listener,在服务状态转换中会调用该Listener
awaitHealthy() :会等待所有的服务达到Running状态
awaitStopped():会等待所有服务达到终止状态
检测类的方法有:

isHealthy() :如果所有的服务处于Running状态、会返回True
servicesByState():以状态为索引返回当前所有服务的快照
startupTimes() :返回一个Map对象,记录被管理的服务启动的耗时、以毫秒为单位,同时Map默认按启动时间排序。
我们建议整个服务的生命周期都能通过ServiceManager来管理,不过即使状态转换是通过其他机制触发的、也不影响ServiceManager方法的正确执行。例如:当一个服务不是通过startAsync()、而是其他机制启动时,listeners 仍然可以被正常调用、awaitHealthy()也能够正常工作。ServiceManager 唯一强制的要求是当其被创建时所有的服务必须处于New状态。

附:TestCase、也可以作为练习Demo

ServiceTest


01 </pre>
02 /*
03 * Copyright (C) 2013 The Guava Authors
04 *
05 * Licensed under the Apache License, Version 2.0 (the "License");
06 * you may not use this file except in compliance with the License.
07 * You may obtain a copy of the License at
08 *
09 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package com.google.common.util.concurrent;
19
20 import static com.google.common.util.concurrent.Service.State.FAILED;
21 import static com.google.common.util.concurrent.Service.State.NEW;
22 import static com.google.common.util.concurrent.Service.State.RUNNING;
23 import static com.google.common.util.concurrent.Service.State.STARTING;
24 import static com.google.common.util.concurrent.Service.State.STOPPING;
25 import static com.google.common.util.concurrent.Service.State.TERMINATED;
26
27 import junit.framework.TestCase;
28
29 /**
30 * Unit tests for {@link Service}
31 */
32 public class ServiceTest extends TestCase {
33
34 /** Assert on the comparison ordering of the State enum since we guarantee it. */
35 public void testStateOrdering() {
36 // List every valid (direct) state transition.
37 assertLessThan(NEW, STARTING);
38 assertLessThan(NEW, TERMINATED);
39
40 assertLessThan(STARTING, RUNNING);
41 assertLessThan(STARTING, STOPPING);
42 assertLessThan(STARTING, FAILED);
43
44 assertLessThan(RUNNING, STOPPING);
45 assertLessThan(RUNNING, FAILED);
46
47 assertLessThan(STOPPING, FAILED);
48 assertLessThan(STOPPING, TERMINATED);
49 }
50
51 private static <T extends Comparable<? super T>> void assertLessThan(T a, T b) {
52 if (a.compareTo(b) >= 0) {
53 fail(String.format("Expected %s to be less than %s", a, b));
54 }
55 }
56 }
57 <pre>
AbstractIdleServiceTest


001 /*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.util.concurrent;
018
019 import static org.truth0.Truth.ASSERT;
020
021 import com.google.common.collect.Lists;
022
023 import junit.framework.TestCase;
024
025 import java.util.List;
026 import java.util.concurrent.Executor;
027 import java.util.concurrent.TimeUnit;
028 import java.util.concurrent.TimeoutException;
029
030 /**
031 * Tests for {@link AbstractIdleService}.
032 *
033 * @author Chris Nokleberg
034 * @author Ben Yu
035 */
036 public class AbstractIdleServiceTest extends TestCase {
037
038 // Functional tests using real thread. We only verify publicly visible state.
039 // Interaction assertions are done by the single-threaded unit tests.
040
041 public static class FunctionalTest extends TestCase {
042
043 private static class DefaultService extends AbstractIdleService {
044 @Override protected void startUp() throws Exception {}
045 @Override protected void shutDown() throws Exception {}
046 }
047
048 public void testServiceStartStop() throws Exception {
049 AbstractIdleService service = new DefaultService();
050 service.startAsync().awaitRunning();
051 assertEquals(Service.State.RUNNING, service.state());
052 service.stopAsync().awaitTerminated();
053 assertEquals(Service.State.TERMINATED, service.state());
054 }
055
056 public void testStart_failed() throws Exception {
057 final Exception exception = new Exception("deliberate");
058 AbstractIdleService service = new DefaultService() {
059 @Override protected void startUp() throws Exception {
060 throw exception;
061 }
062 };
063 try {
064 service.startAsync().awaitRunning();
065 fail();
066 } catch (RuntimeException e) {
067 assertSame(exception, e.getCause());
068 }
069 assertEquals(Service.State.FAILED, service.state());
070 }
071
072 public void testStop_failed() throws Exception {
073 final Exception exception = new Exception("deliberate");
074 AbstractIdleService service = new DefaultService() {
075 @Override protected void shutDown() throws Exception {
076 throw exception;
077 }
078 };
079 service.startAsync().awaitRunning();
080 try {
081 service.stopAsync().awaitTerminated();
082 fail();
083 } catch (RuntimeException e) {
084 assertSame(exception, e.getCause());
085 }
086 assertEquals(Service.State.FAILED, service.state());
087 }
088 }
089
090 public void testStart() {
091 TestService service = new TestService();
092 assertEquals(0, service.startUpCalled);
093 service.startAsync().awaitRunning();
094 assertEquals(1, service.startUpCalled);
095 assertEquals(Service.State.RUNNING, service.state());
096 ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
097 }
098
099 public void testStart_failed() {
100 final Exception exception = new Exception("deliberate");
101 TestService service = new TestService() {
102 @Override protected void startUp() throws Exception {
103 super.startUp();
104 throw exception;
105 }
106 };
107 assertEquals(0, service.startUpCalled);
108 try {
109 service.startAsync().awaitRunning();
110 fail();
111 } catch (RuntimeException e) {
112 assertSame(exception, e.getCause());
113 }
114 assertEquals(1, service.startUpCalled);
115 assertEquals(Service.State.FAILED, service.state());
116 ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
117 }
118
119 public void testStop_withoutStart() {
120 TestService service = new TestService();
121 service.stopAsync().awaitTerminated();
122 assertEquals(0, service.startUpCalled);
123 assertEquals(0, service.shutDownCalled);
124 assertEquals(Service.State.TERMINATED, service.state());
125 ASSERT.that(service.transitionStates).isEmpty();
126 }
127
128 public void testStop_afterStart() {
129 TestService service = new TestService();
130 service.startAsync().awaitRunning();
131 assertEquals(1, service.startUpCalled);
132 assertEquals(0, service.shutDownCalled);
133 service.stopAsync().awaitTerminated();
134 assertEquals(1, service.startUpCalled);
135 assertEquals(1, service.shutDownCalled);
136 assertEquals(Service.State.TERMINATED, service.state());
137 ASSERT.that(service.transitionStates)
138 .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
139 }
140
141 public void testStop_failed() {
142 final Exception exception = new Exception("deliberate");
143 TestService service = new TestService() {
144 @Override protected void shutDown() throws Exception {
145 super.shutDown();
146 throw exception;
147 }
148 };
149 service.startAsync().awaitRunning();
150 assertEquals(1, service.startUpCalled);
151 assertEquals(0, service.shutDownCalled);
152 try {
153 service.stopAsync().awaitTerminated();
154 fail();
155 } catch (RuntimeException e) {
156 assertSame(exception, e.getCause());
157 }
158 assertEquals(1, service.startUpCalled);
159 assertEquals(1, service.shutDownCalled);
160 assertEquals(Service.State.FAILED, service.state());
161 ASSERT.that(service.transitionStates)
162 .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
163 }
164
165 public void testServiceToString() {
166 AbstractIdleService service = new TestService();
167 assertEquals("TestService [NEW]", service.toString());
168 service.startAsync().awaitRunning();
169 assertEquals("TestService [RUNNING]", service.toString());
170 service.stopAsync().awaitTerminated();
171 assertEquals("TestService [TERMINATED]", service.toString());
172 }
173
174 public void testTimeout() throws Exception {
175 // Create a service whose executor will never run its commands
176 Service service = new TestService() {
177 @Override protected Executor executor() {
178 return new Executor() {
179 @Override public void execute(Runnable command) {}
180 };
181 }
182 };
183 try {
184 service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
185 fail("Expected timeout");
186 } catch (TimeoutException e) {
187 ASSERT.that(e.getMessage()).contains(Service.State.STARTING.toString());
188 }
189 }
190
191 private static class TestService extends AbstractIdleService {
192 int startUpCalled = 0;
193 int shutDownCalled = 0;
194 final List<State> transitionStates = Lists.newArrayList();
195
196 @Override protected void startUp() throws Exception {
197 assertEquals(0, startUpCalled);
198 assertEquals(0, shutDownCalled);
199 startUpCalled++;
200 assertEquals(State.STARTING, state());
201 }
202
203 @Override protected void shutDown() throws Exception {
204 assertEquals(1, startUpCalled);
205 assertEquals(0, shutDownCalled);
206 shutDownCalled++;
207 assertEquals(State.STOPPING, state());
208 }
209
210 @Override protected Executor executor() {
211 transitionStates.add(state());
212 return MoreExecutors.sameThreadExecutor();
213 }
214 }
215 }
216
217 <pre>
AbstractScheduledServiceTest


001 </pre>
002 /*
003 * Copyright (C) 2011 The Guava Authors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package com.google.common.util.concurrent;
019
020 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
021 import com.google.common.util.concurrent.Service.State;
022
023 import junit.framework.TestCase;
024
025 import java.util.concurrent.CountDownLatch;
026 import java.util.concurrent.CyclicBarrier;
027 import java.util.concurrent.ExecutionException;
028 import java.util.concurrent.Executors;
029 import java.util.concurrent.Future;
030 import java.util.concurrent.ScheduledExecutorService;
031 import java.util.concurrent.ScheduledFuture;
032 import java.util.concurrent.ScheduledThreadPoolExecutor;
033 import java.util.concurrent.TimeUnit;
034 import java.util.concurrent.atomic.AtomicBoolean;
035 import java.util.concurrent.atomic.AtomicInteger;
036
037 /**
038 * Unit test for {@link AbstractScheduledService}.
039 *
040 * @author Luke Sandberg
041 */
042
043 public class AbstractScheduledServiceTest extends TestCase {
044
045 volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
046 volatile ScheduledFuture<?> future = null;
047
048 volatile boolean atFixedRateCalled = false;
049 volatile boolean withFixedDelayCalled = false;
050 volatile boolean scheduleCalled = false;
051
052 final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
053 @Override
054 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
055 long delay, TimeUnit unit) {
056 return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
057 }
058 };
059
060 public void testServiceStartStop() throws Exception {
061 NullService service = new NullService();
062 service.startAsync().awaitRunning();
063 assertFalse(future.isDone());
064 service.stopAsync().awaitTerminated();
065 assertTrue(future.isCancelled());
066 }
067
068 private class NullService extends AbstractScheduledService {
069 @Override protected void runOneIteration() throws Exception {}
070 @Override protected Scheduler scheduler() { return configuration; }
071 @Override protected ScheduledExecutorService executor() { return executor; }
072 }
073
074 public void testFailOnExceptionFromRun() throws Exception {
075 TestService service = new TestService();
076 service.runException = new Exception();
077 service.startAsync().awaitRunning();
078 service.runFirstBarrier.await();
079 service.runSecondBarrier.await();
080 try {
081 future.get();
082 fail();
083 } catch (ExecutionException e) {
084 // An execution exception holds a runtime exception (from throwables.propogate) that holds our
085 // original exception.
086 assertEquals(service.runException, e.getCause().getCause());
087 }
088 assertEquals(service.state(), Service.State.FAILED);
089 }
090
091 public void testFailOnExceptionFromStartUp() {
092 TestService service = new TestService();
093 service.startUpException = new Exception();
094 try {
095 service.startAsync().awaitRunning();
096 fail();
097 } catch (IllegalStateException e) {
098 assertEquals(service.startUpException, e.getCause());
099 }
100 assertEquals(0, service.numberOfTimesRunCalled.get());
101 assertEquals(Service.State.FAILED, service.state());
102 }
103
104 public void testFailOnExceptionFromShutDown() throws Exception {
105 TestService service = new TestService();
106 service.shutDownException = new Exception();
107 service.startAsync().awaitRunning();
108 service.runFirstBarrier.await();
109 service.stopAsync();
110 service.runSecondBarrier.await();
111 try {
112 service.awaitTerminated();
113 fail();
114 } catch (IllegalStateException e) {
115 assertEquals(service.shutDownException, e.getCause());
116 }
117 assertEquals(Service.State.FAILED, service.state());
118 }
119
120 public void testRunOneIterationCalledMultipleTimes() throws Exception {
121 TestService service = new TestService();
122 service.startAsync().awaitRunning();
123 for (int i = 1; i < 10; i++) {
124 service.runFirstBarrier.await();
125 assertEquals(i, service.numberOfTimesRunCalled.get());
126 service.runSecondBarrier.await();
127 }
128 service.runFirstBarrier.await();
129 service.stopAsync();
130 service.runSecondBarrier.await();
131 service.stopAsync().awaitTerminated();
132 }
133
134 public void testExecutorOnlyCalledOnce() throws Exception {
135 TestService service = new TestService();
136 service.startAsync().awaitRunning();
137 // It should be called once during startup.
138 assertEquals(1, service.numberOfTimesExecutorCalled.get());
139 for (int i = 1; i < 10; i++) {
140 service.runFirstBarrier.await();
141 assertEquals(i, service.numberOfTimesRunCalled.get());
142 service.runSecondBarrier.await();
143 }
144 service.runFirstBarrier.await();
145 service.stopAsync();
146 service.runSecondBarrier.await();
147 service.stopAsync().awaitTerminated();
148 // Only called once overall.
149 assertEquals(1, service.numberOfTimesExecutorCalled.get());
150 }
151
152 public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
153 final CountDownLatch terminationLatch = new CountDownLatch(1);
154 AbstractScheduledService service = new AbstractScheduledService() {
155 volatile ScheduledExecutorService executorService;
156 @Override protected void runOneIteration() throws Exception {}
157
158 @Override protected ScheduledExecutorService executor() {
159 if (executorService == null) {
160 executorService = super.executor();
161 // Add a listener that will be executed after the listener that shuts down the executor.
162 addListener(new Listener() {
163 @Override public void terminated(State from) {
164 terminationLatch.countDown();
165 }
166 }, MoreExecutors.sameThreadExecutor());
167 }
168 return executorService;
169 }
170
171 @Override protected Scheduler scheduler() {
172 return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
173 }};
174
175 service.startAsync();
176 assertFalse(service.executor().isShutdown());
177 service.awaitRunning();
178 service.stopAsync();
179 terminationLatch.await();
180 assertTrue(service.executor().isShutdown());
181 assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
182 }
183
184 public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
185 final CountDownLatch failureLatch = new CountDownLatch(1);
186 AbstractScheduledService service = new AbstractScheduledService() {
187 volatile ScheduledExecutorService executorService;
188 @Override protected void runOneIteration() throws Exception {}
189
190 @Override protected void startUp() throws Exception {
191 throw new Exception("Failed");
192 }
193
194 @Override protected ScheduledExecutorService executor() {
195 if (executorService == null) {
196 executorService = super.executor();
197 // Add a listener that will be executed after the listener that shuts down the executor.
198 addListener(new Listener() {
199 @Override public void failed(State from, Throwable failure) {
200 failureLatch.countDown();
201 }
202 }, MoreExecutors.sameThreadExecutor());
203 }
204 return executorService;
205 }
206
207 @Override protected Scheduler scheduler() {
208 return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
209 }};
210
211 try {
212 service.startAsync().awaitRunning();
213 fail("Expected service to fail during startup");
214 } catch (IllegalStateException expected) {}
215 failureLatch.await();
216 assertTrue(service.executor().isShutdown());
217 assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
218 }
219
220 public void testSchedulerOnlyCalledOnce() throws Exception {
221 TestService service = new TestService();
222 service.startAsync().awaitRunning();
223 // It should be called once during startup.
224 assertEquals(1, service.numberOfTimesSchedulerCalled.get());
225 for (int i = 1; i < 10; i++) {
226 service.runFirstBarrier.await();
227 assertEquals(i, service.numberOfTimesRunCalled.get());
228 service.runSecondBarrier.await();
229 }
230 service.runFirstBarrier.await();
231 service.stopAsync();
232 service.runSecondBarrier.await();
233 service.awaitTerminated();
234 // Only called once overall.
235 assertEquals(1, service.numberOfTimesSchedulerCalled.get());
236 }
237
238 private class TestService extends AbstractScheduledService {
239 CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
240 CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
241
242 volatile boolean startUpCalled = false;
243 volatile boolean shutDownCalled = false;
244 AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
245 AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
246 AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
247 volatile Exception runException = null;
248 volatile Exception startUpException = null;
249 volatile Exception shutDownException = null;
250
251 @Override
252 protected void runOneIteration() throws Exception {
253 assertTrue(startUpCalled);
254 assertFalse(shutDownCalled);
255 numberOfTimesRunCalled.incrementAndGet();
256 assertEquals(State.RUNNING, state());
257 runFirstBarrier.await();
258 runSecondBarrier.await();
259 if (runException != null) {
260 throw runException;
261 }
262 }
263
264 @Override
265 protected void startUp() throws Exception {
266 assertFalse(startUpCalled);
267 assertFalse(shutDownCalled);
268 startUpCalled = true;
269 assertEquals(State.STARTING, state());
270 if (startUpException != null) {
271 throw startUpException;
272 }
273 }
274
275 @Override
276 protected void shutDown() throws Exception {
277 assertTrue(startUpCalled);
278 assertFalse(shutDownCalled);
279 shutDownCalled = true;
280 if (shutDownException != null) {
281 throw shutDownException;
282 }
283 }
284
285 @Override
286 protected ScheduledExecutorService executor() {
287 numberOfTimesExecutorCalled.incrementAndGet();
288 return executor;
289 }
290
291 @Override
292 protected Scheduler scheduler() {
293 numberOfTimesSchedulerCalled.incrementAndGet();
294 return configuration;
295 }
296 }
297
298 public static class SchedulerTest extends TestCase {
299 // These constants are arbitrary and just used to make sure that the correct method is called
300 // with the correct parameters.
301 private static final int initialDelay = 10;
302 private static final int delay = 20;
303 private static final TimeUnit unit = TimeUnit.MILLISECONDS;
304
305 // Unique runnable object used for comparison.
306 final Runnable testRunnable = new Runnable() {@Override public void run() {}};
307 boolean called = false;
308
309 private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
310 long delay, TimeUnit unit) {
311 assertFalse(called); // only called once.
312 called = true;
313 assertEquals(SchedulerTest.initialDelay, initialDelay);
314 assertEquals(SchedulerTest.delay, delay);
315 assertEquals(SchedulerTest.unit, unit);
316 assertEquals(testRunnable, command);
317 }
318
319 public void testFixedRateSchedule() {
320 Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
321 schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
322 @Override
323 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
324 long period, TimeUnit unit) {
325 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
326 return null;
327 }
328 }, testRunnable);
329 assertTrue(called);
330 }
331
332 public void testFixedDelaySchedule() {
333 Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
334 schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
335 @Override
336 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
337 long delay, TimeUnit unit) {
338 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
339 return null;
340 }
341 }, testRunnable);
342 assertTrue(called);
343 }
344
345 private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
346 public AtomicInteger scheduleCounter = new AtomicInteger(0);
347 @Override
348 protected Schedule getNextSchedule() throws Exception {
349 scheduleCounter.incrementAndGet();
350 return new Schedule(0, TimeUnit.SECONDS);
351 }
352 }
353
354 public void testCustomSchedule_startStop() throws Exception {
355 final CyclicBarrier firstBarrier = new CyclicBarrier(2);
356 final CyclicBarrier secondBarrier = new CyclicBarrier(2);
357 final AtomicBoolean shouldWait = new AtomicBoolean(true);
358 Runnable task = new Runnable() {
359 @Override public void run() {
360 try {
361 if (shouldWait.get()) {
362 firstBarrier.await();
363 secondBarrier.await();
364 }
365 } catch (Exception e) {
366 throw new RuntimeException(e);
367 }
368 }
369 };
370 TestCustomScheduler scheduler = new TestCustomScheduler();
371 Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
372 firstBarrier.await();
373 assertEquals(1, scheduler.scheduleCounter.get());
374 secondBarrier.await();
375 firstBarrier.await();
376 assertEquals(2, scheduler.scheduleCounter.get());
377 shouldWait.set(false);
378 secondBarrier.await();
379 future.cancel(false);
380 }
381
382 public void testCustomSchedulerServiceStop() throws Exception {
383 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
384 service.startAsync().awaitRunning();
385 service.firstBarrier.await();
386 assertEquals(1, service.numIterations.get());
387 service.stopAsync();
388 service.secondBarrier.await();
389 service.awaitTerminated();
390 // Sleep for a while just to ensure that our task wasn‘t called again.
391 Thread.sleep(unit.toMillis(3 * delay));
392 assertEquals(1, service.numIterations.get());
393 }
394
395 public void testBig() throws Exception {
396 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
397 @Override protected Scheduler scheduler() {
398 return new AbstractScheduledService.CustomScheduler() {
399 @Override
400 protected Schedule getNextSchedule() throws Exception {
401 // Explicitly yield to increase the probability of a pathological scheduling.
402 Thread.yield();
403 return new Schedule(0, TimeUnit.SECONDS);
404 }
405 };
406 }
407 };
408 service.useBarriers = false;
409 service.startAsync().awaitRunning();
410 Thread.sleep(50);
411 service.useBarriers = true;
412 service.firstBarrier.await();
413 int numIterations = service.numIterations.get();
414 service.stopAsync();
415 service.secondBarrier.await();
416 service.awaitTerminated();
417 assertEquals(numIterations, service.numIterations.get());
418 }
419
420 private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
421 final AtomicInteger numIterations = new AtomicInteger(0);
422 volatile boolean useBarriers = true;
423 final CyclicBarrier firstBarrier = new CyclicBarrier(2);
424 final CyclicBarrier secondBarrier = new CyclicBarrier(2);
425
426 @Override protected void runOneIteration() throws Exception {
427 numIterations.incrementAndGet();
428 if (useBarriers) {
429 firstBarrier.await();
430 secondBarrier.await();
431 }
432 }
433
434 @Override protected ScheduledExecutorService executor() {
435 // use a bunch of threads so that weird overlapping schedules are more likely to happen.
436 return Executors.newScheduledThreadPool(10);
437 }
438
439 @Override protected void startUp() throws Exception {}
440
441 @Override protected void shutDown() throws Exception {}
442
443 @Override protected Scheduler scheduler() {
444 return new CustomScheduler() {
445 @Override
446 protected Schedule getNextSchedule() throws Exception {
447 return new Schedule(delay, unit);
448 }};
449 }
450 }
451
452 public void testCustomSchedulerFailure() throws Exception {
453 TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
454 service.startAsync().awaitRunning();
455 for (int i = 1; i < 4; i++) {
456 service.firstBarrier.await();
457 assertEquals(i, service.numIterations.get());
458 service.secondBarrier.await();
459 }
460 Thread.sleep(1000);
461 try {
462 service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
463 fail();
464 } catch (IllegalStateException e) {
465 assertEquals(State.FAILED, service.state());
466 }
467 }
468
469 private static class TestFailingCustomScheduledService extends AbstractScheduledService {
470 final AtomicInteger numIterations = new AtomicInteger(0);
471 final CyclicBarrier firstBarrier = new CyclicBarrier(2);
472 final CyclicBarrier secondBarrier = new CyclicBarrier(2);
473
474 @Override protected void runOneIteration() throws Exception {
475 numIterations.incrementAndGet();
476 firstBarrier.await();
477 secondBarrier.await();
478 }
479
480 @Override protected ScheduledExecutorService executor() {
481 // use a bunch of threads so that weird overlapping schedules are more likely to happen.
482 return Executors.newScheduledThreadPool(10);
483 }
484
485 @Override protected Scheduler scheduler() {
486 return new CustomScheduler() {
487 @Override
488 protected Schedule getNextSchedule() throws Exception {
489 if (numIterations.get() > 2) {
490 throw new IllegalStateException("Failed");
491 }
492 return new Schedule(delay, unit);
493 }};
494 }
495 }
496 }
497 }
498 <pre>
AbstractServiceTest


001 </pre>
002 /*
003 * Copyright (C) 2009 The Guava Authors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package com.google.common.util.concurrent;
019
020 import static java.lang.Thread.currentThread;
021 import static java.util.concurrent.TimeUnit.SECONDS;
022
023 import com.google.common.collect.ImmutableList;
024 import com.google.common.collect.Iterables;
025 import com.google.common.collect.Lists;
026 import com.google.common.util.concurrent.Service.Listener;
027 import com.google.common.util.concurrent.Service.State;
028
029 import junit.framework.TestCase;
030
031 import java.lang.Thread.UncaughtExceptionHandler;
032 import java.util.List;
033 import java.util.concurrent.CountDownLatch;
034 import java.util.concurrent.TimeUnit;
035 import java.util.concurrent.atomic.AtomicInteger;
036 import java.util.concurrent.atomic.AtomicReference;
037
038 import javax.annotation.concurrent.GuardedBy;
039
040 /**
041 * Unit test for {@link AbstractService}.
042 *
043 * @author Jesse Wilson
044 */
045 public class AbstractServiceTest extends TestCase {
046
047 private Thread executionThread;
048 private Throwable thrownByExecutionThread;
049
050 public void testNoOpServiceStartStop() throws Exception {
051 NoOpService service = new NoOpService();
052 RecordingListener listener = RecordingListener.record(service);
053
054 assertEquals(State.NEW, service.state());
055 assertFalse(service.isRunning());
056 assertFalse(service.running);
057
058 service.startAsync();
059 assertEquals(State.RUNNING, service.state());
060 assertTrue(service.isRunning());
061 assertTrue(service.running);
062
063 service.stopAsync();
064 assertEquals(State.TERMINATED, service.state());
065 assertFalse(service.isRunning());
066 assertFalse(service.running);
067 assertEquals(
068 ImmutableList.of(
069 State.STARTING,
070 State.RUNNING,
071 State.STOPPING,
072 State.TERMINATED),
073 listener.getStateHistory());
074 }
075
076 public void testNoOpServiceStartAndWaitStopAndWait() throws Exception {
077 NoOpService service = new NoOpService();
078
079 service.startAsync().awaitRunning();
080 assertEquals(State.RUNNING, service.state());
081
082 service.stopAsync().awaitTerminated();
083 assertEquals(State.TERMINATED, service.state());
084 }
085
086 public void testNoOpServiceStartAsyncAndAwaitStopAsyncAndAwait() throws Exception {
087 NoOpService service = new NoOpService();
088
089 service.startAsync().awaitRunning();
090 assertEquals(State.RUNNING, service.state());
091
092 service.stopAsync().awaitTerminated();
093 assertEquals(State.TERMINATED, service.state());
094 }
095
096 public void testNoOpServiceStopIdempotence() throws Exception {
097 NoOpService service = new NoOpService();
098 RecordingListener listener = RecordingListener.record(service);
099 service.startAsync().awaitRunning();
100 assertEquals(State.RUNNING, service.state());
101
102 service.stopAsync();
103 service.stopAsync();
104 assertEquals(State.TERMINATED, service.state());
105 assertEquals(
106 ImmutableList.of(
107 State.STARTING,
108 State.RUNNING,
109 State.STOPPING,
110 State.TERMINATED),
111 listener.getStateHistory());
112 }
113
114 public void testNoOpServiceStopIdempotenceAfterWait() throws Exception {
115 NoOpService service = new NoOpService();
116
117 service.startAsync().awaitRunning();
118
119 service.stopAsync().awaitTerminated();
120 service.stopAsync();
121 assertEquals(State.TERMINATED, service.state());
122 }
123
124 public void testNoOpServiceStopIdempotenceDoubleWait() throws Exception {
125 NoOpService service = new NoOpService();
126
127 service.startAsync().awaitRunning();
128 assertEquals(State.RUNNING, service.state());
129
130 service.stopAsync().awaitTerminated();
131 service.stopAsync().awaitTerminated();
132 assertEquals(State.TERMINATED, service.state());
133 }
134
135 public void testNoOpServiceStartStopAndWaitUninterruptible()
136 throws Exception {
137 NoOpService service = new NoOpService();
138
139 currentThread().interrupt();
140 try {
141 service.startAsync().awaitRunning();
142 assertEquals(State.RUNNING, service.state());
143
144 service.stopAsync().awaitTerminated();
145 assertEquals(State.TERMINATED, service.state());
146
147 assertTrue(currentThread().isInterrupted());
148 } finally {
149 Thread.interrupted(); // clear interrupt for future tests
150 }
151 }
152
153 private static class NoOpService extends AbstractService {
154 boolean running = false;
155
156 @Override protected void doStart() {
157 assertFalse(running);
158 running = true;
159 notifyStarted();
160 }
161
162 @Override protected void doStop() {
163 assertTrue(running);
164 running = false;
165 notifyStopped();
166 }
167 }
168
169 public void testManualServiceStartStop() throws Exception {
170 ManualSwitchedService service = new ManualSwitchedService();
171 RecordingListener listener = RecordingListener.record(service);
172
173 service.startAsync();
174 assertEquals(State.STARTING, service.state());
175 assertFalse(service.isRunning());
176 assertTrue(service.doStartCalled);
177
178 service.notifyStarted(); // usually this would be invoked by another thread
179 assertEquals(State.RUNNING, service.state());
180 assertTrue(service.isRunning());
181
182 service.stopAsync();
183 assertEquals(State.STOPPING, service.state());
184 assertFalse(service.isRunning());
185 assertTrue(service.doStopCalled);
186
187 service.notifyStopped(); // usually this would be invoked by another thread
188 assertEquals(State.TERMINATED, service.state());
189 assertFalse(service.isRunning());
190 assertEquals(
191 ImmutableList.of(
192 State.STARTING,
193 State.RUNNING,
194 State.STOPPING,
195 State.TERMINATED),
196 listener.getStateHistory());
197
198 }
199
200 public void testManualServiceNotifyStoppedWhileRunning() throws Exception {
201 ManualSwitchedService service = new ManualSwitchedService();
202 RecordingListener listener = RecordingListener.record(service);
203
204 service.startAsync();
205 service.notifyStarted();
206 service.notifyStopped();
207 assertEquals(State.TERMINATED, service.state());
208 assertFalse(service.isRunning());
209 assertFalse(service.doStopCalled);
210
211 assertEquals(
212 ImmutableList.of(
213 State.STARTING,
214 State.RUNNING,
215 State.TERMINATED),
216 listener.getStateHistory());
217 }
218
219 public void testManualServiceStopWhileStarting() throws Exception {
220 ManualSwitchedService service = new ManualSwitchedService();
221 RecordingListener listener = RecordingListener.record(service);
222
223 service.startAsync();
224 assertEquals(State.STARTING, service.state());
225 assertFalse(service.isRunning());
226 assertTrue(service.doStartCalled);
227
228 service.stopAsync();
229 assertEquals(State.STOPPING, service.state());
230 assertFalse(service.isRunning());
231 assertFalse(service.doStopCalled);
232
233 service.notifyStarted();
234 assertEquals(State.STOPPING, service.state());
235 assertFalse(service.isRunning());
236 assertTrue(service.doStopCalled);
237
238 service.notifyStopped();
239 assertEquals(State.TERMINATED, service.state());
240 assertFalse(service.isRunning());
241 assertEquals(
242 ImmutableList.of(
243 State.STARTING,
244 State.STOPPING,
245 State.TERMINATED),
246 listener.getStateHistory());
247 }
248
249 /**
250 * This tests for a bug where if {@link Service#stopAsync()} was called while the service was
251 * {@link State#STARTING} more than once, the {@link Listener#stopping(State)} callback would get
252 * called multiple times.
253 */
254 public void testManualServiceStopMultipleTimesWhileStarting() throws Exception {
255 ManualSwitchedService service = new ManualSwitchedService();
256 final AtomicInteger stopppingCount = new AtomicInteger();
257 service.addListener(new Listener() {
258 @Override public void stopping(State from) {
259 stopppingCount.incrementAndGet();
260 }
261 }, MoreExecutors.sameThreadExecutor());
262
263 service.startAsync();
264 service.stopAsync();
265 assertEquals(1, stopppingCount.get());
266 service.stopAsync();
267 assertEquals(1, stopppingCount.get());
268 }
269
270 public void testManualServiceStopWhileNew() throws Exception {
271 ManualSwitchedService service = new ManualSwitchedService();
272 RecordingListener listener = RecordingListener.record(service);
273
274 service.stopAsync();
275 assertEquals(State.TERMINATED, service.state());
276 assertFalse(service.isRunning());
277 assertFalse(service.doStartCalled);
278 assertFalse(service.doStopCalled);
279 assertEquals(ImmutableList.of(State.TERMINATED), listener.getStateHistory());
280 }
281
282 public void testManualServiceFailWhileStarting() throws Exception {
283 ManualSwitchedService service = new ManualSwitchedService();
284 RecordingListener listener = RecordingListener.record(service);
285 service.startAsync();
286 service.notifyFailed(EXCEPTION);
287 assertEquals(ImmutableList.of(State.STARTING, State.FAILED), listener.getStateHistory());
288 }
289
290 public void testManualServiceFailWhileRunning() throws Exception {
291 ManualSwitchedService service = new ManualSwitchedService();
292 RecordingListener listener = RecordingListener.record(service);
293 service.startAsync();
294 service.notifyStarted();
295 service.notifyFailed(EXCEPTION);
296 assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.FAILED),
297 listener.getStateHistory());
298 }
299
300 public void testManualServiceFailWhileStopping() throws Exception {
301 ManualSwitchedService service = new ManualSwitchedService();
302 RecordingListener listener = RecordingListener.record(service);
303 service.startAsync();
304 service.notifyStarted();
305 service.stopAsync();
306 service.notifyFailed(EXCEPTION);
307 assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.STOPPING, State.FAILED),
308 listener.getStateHistory());
309 }
310
311 public void testManualServiceUnrequestedStop() {
312 ManualSwitchedService service = new ManualSwitchedService();
313
314 service.startAsync();
315
316 service.notifyStarted();
317 assertEquals(State.RUNNING, service.state());
318 assertTrue(service.isRunning());
319 assertFalse(service.doStopCalled);
320
321 service.notifyStopped();
322 assertEquals(State.TERMINATED, service.state());
323 assertFalse(service.isRunning());
324 assertFalse(service.doStopCalled);
325 }
326
327 /**
328 * The user of this service should call {@link #notifyStarted} and {@link
329 * #notifyStopped} after calling {@link #startAsync} and {@link #stopAsync}.
330 */
331 private static class ManualSwitchedService extends AbstractService {
332 boolean doStartCalled = false;
333 boolean doStopCalled = false;
334
335 @Override protected void doStart() {
336 assertFalse(doStartCalled);
337 doStartCalled = true;
338 }
339
340 @Override protected void doStop() {
341 assertFalse(doStopCalled);
342 doStopCalled = true;
343 }
344 }
345
346 public void testAwaitTerminated() throws Exception {
347 final NoOpService service = new NoOpService();
348 Thread waiter = new Thread() {
349 @Override public void run() {
350 service.awaitTerminated();
351 }
352 };
353 waiter.start();
354 service.startAsync().awaitRunning();
355 assertEquals(State.RUNNING, service.state());
356 service.stopAsync();
357 waiter.join(100); // ensure that the await in the other thread is triggered
358 assertFalse(waiter.isAlive());
359 }
360
361 public void testAwaitTerminated_FailedService() throws Exception {
362 final ManualSwitchedService service = new ManualSwitchedService();
363 final AtomicReference<Throwable> exception = Atomics.newReference();
364 Thread waiter = new Thread() {
365 @Override public void run() {
366 try {
367 service.awaitTerminated();
368 fail("Expected an IllegalStateException");
369 } catch (Throwable t) {
370 exception.set(t);
371 }
372 }
373 };
374 waiter.start();
375 service.startAsync();
376 service.notifyStarted();
377 assertEquals(State.RUNNING, service.state());
378 service.notifyFailed(EXCEPTION);
379 assertEquals(State.FAILED, service.state());
380 waiter.join(100);
381 assertFalse(waiter.isAlive());
382 assertTrue(exception.get() instanceof IllegalStateException);
383 assertEquals(EXCEPTION, exception.get().getCause());
384 }
385
386 public void testThreadedServiceStartAndWaitStopAndWait() throws Throwable {
387 ThreadedService service = new ThreadedService();
388 RecordingListener listener = RecordingListener.record(service);
389 service.startAsync().awaitRunning();
390 assertEquals(State.RUNNING, service.state());
391
392 service.awaitRunChecks();
393
394 service.stopAsync().awaitTerminated();
395 assertEquals(State.TERMINATED, service.state());
396
397 throwIfSet(thrownByExecutionThread);
398 assertEquals(
399 ImmutableList.of(
400 State.STARTING,
401 State.RUNNING,
402 State.STOPPING,
403 State.TERMINATED),
404 listener.getStateHistory());
405 }
406
407 public void testThreadedServiceStopIdempotence() throws Throwable {
408 ThreadedService service = new ThreadedService();
409
410 service.startAsync().awaitRunning();
411 assertEquals(State.RUNNING, service.state());
412
413 service.awaitRunChecks();
414
415 service.stopAsync();
416 service.stopAsync().awaitTerminated();
417 assertEquals(State.TERMINATED, service.state());
418
419 throwIfSet(thrownByExecutionThread);
420 }
421
422 public void testThreadedServiceStopIdempotenceAfterWait()
423 throws Throwable {
424 ThreadedService service = new ThreadedService();
425
426 service.startAsync().awaitRunning();
427 assertEquals(State.RUNNING, service.state());
428
429 service.awaitRunChecks();
430
431 service.stopAsync().awaitTerminated();
432 service.stopAsync();
433 assertEquals(State.TERMINATED, service.state());
434
435 executionThread.join();
436
437 throwIfSet(thrownByExecutionThread);
438 }
439
440 public void testThreadedServiceStopIdempotenceDoubleWait()
441 throws Throwable {
442 ThreadedService service = new ThreadedService();
443
444 service.startAsync().awaitRunning();
445 assertEquals(State.RUNNING, service.state());
446
447 service.awaitRunChecks();
448
449 service.stopAsync().awaitTerminated();
450 service.stopAsync().awaitTerminated();
451 assertEquals(State.TERMINATED, service.state());
452
453 throwIfSet(thrownByExecutionThread);
454 }
455
456 public void testManualServiceFailureIdempotence() {
457 ManualSwitchedService service = new ManualSwitchedService();
458 RecordingListener.record(service);
459 service.startAsync();
460 service.notifyFailed(new Exception("1"));
461 service.notifyFailed(new Exception("2"));
462 assertEquals("1", service.failureCause().getMessage());
463 try {
464 service.awaitRunning();
465 fail();
466 } catch (IllegalStateException e) {
467 assertEquals("1", e.getCause().getMessage());
468 }
469 }
470
471 private class ThreadedService extends AbstractService {
472 final CountDownLatch hasConfirmedIsRunning = new CountDownLatch(1);
473
474 /*
475 * The main test thread tries to stop() the service shortly after
476 * confirming that it is running. Meanwhile, the service itself is trying
477 * to confirm that it is running. If the main thread‘s stop() call happens
478 * before it has the chance, the test will fail. To avoid this, the main
479 * thread calls this method, which waits until the service has performed
480 * its own "running" check.
481 */
482 void awaitRunChecks() throws InterruptedException {
483 assertTrue("Service thread hasn‘t finished its checks. "
484 + "Exception status (possibly stale): " + thrownByExecutionThread,
485 hasConfirmedIsRunning.await(10, SECONDS));
486 }
487
488 @Override protected void doStart() {
489 assertEquals(State.STARTING, state());
490 invokeOnExecutionThreadForTest(new Runnable() {
491 @Override public void run() {
492 assertEquals(State.STARTING, state());
493 notifyStarted();
494 assertEquals(State.RUNNING, state());
495 hasConfirmedIsRunning.countDown();
496 }
497 });
498 }
499
500 @Override protected void doStop() {
501 assertEquals(State.STOPPING, state());
502 invokeOnExecutionThreadForTest(new Runnable() {
503 @Override public void run() {
504 assertEquals(State.STOPPING, state());
505 notifyStopped();
506 assertEquals(State.TERMINATED, state());
507 }
508 });
509 }
510 }
511
512 private void invokeOnExecutionThreadForTest(Runnable runnable) {
513 executionThread = new Thread(runnable);
514 executionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
515 @Override
516 public void uncaughtException(Thread thread, Throwable e) {
517 thrownByExecutionThread = e;
518 }
519 });
520 executionThread.start();
521 }
522
523 private static void throwIfSet(Throwable t) throws Throwable {
524 if (t != null) {
525 throw t;
526 }
527 }
528
529 public void testStopUnstartedService() throws Exception {
530 NoOpService service = new NoOpService();
531 RecordingListener listener = RecordingListener.record(service);
532
533 service.stopAsync();
534 assertEquals(State.TERMINATED, service.state());
535
536 try {
537 service.startAsync();
538 fail();
539 } catch (IllegalStateException expected) {}
540 assertEquals(State.TERMINATED, Iterables.getOnlyElement(listener.getStateHistory()));
541 }
542
543 public void testFailingServiceStartAndWait() throws Exception {
544 StartFailingService service = new StartFailingService();
545 RecordingListener listener = RecordingListener.record(service);
546
547 try {
548 service.startAsync().awaitRunning();
549 fail();
550 } catch (IllegalStateException e) {
551 assertEquals(EXCEPTION, service.failureCause());
552 assertEquals(EXCEPTION, e.getCause());
553 }
554 assertEquals(
555 ImmutableList.of(
556 State.STARTING,
557 State.FAILED),
558 listener.getStateHistory());
559 }
560
561 public void testFailingServiceStopAndWait_stopFailing() throws Exception {
562 StopFailingService service = new StopFailingService();
563 RecordingListener listener = RecordingListener.record(service);
564
565 service.startAsync().awaitRunning();
566 try {
567 service.stopAsync().awaitTerminated();
568 fail();
569 } catch (IllegalStateException e) {
570 assertEquals(EXCEPTION, service.failureCause());
571 assertEquals(EXCEPTION, e.getCause());
572 }
573 assertEquals(
574 ImmutableList.of(
575 State.STARTING,
576 State.RUNNING,
577 State.STOPPING,
578 State.FAILED),
579 listener.getStateHistory());
580 }
581
582 public void testFailingServiceStopAndWait_runFailing() throws Exception {
583 RunFailingService service = new RunFailingService();
584 RecordingListener listener = RecordingListener.record(service);
585
586 service.startAsync();
587 try {
588 service.awaitRunning();
589 fail();
590 } catch (IllegalStateException e) {
591 assertEquals(EXCEPTION, service.failureCause());
592 assertEquals(EXCEPTION, e.getCause());
593 }
594 assertEquals(
595 ImmutableList.of(
596 State.STARTING,
597 State.RUNNING,
598 State.FAILED),
599 listener.getStateHistory());
600 }
601
602 public void testThrowingServiceStartAndWait() throws Exception {
603 StartThrowingService service = new StartThrowingService();
604 RecordingListener listener = RecordingListener.record(service);
605
606 try {
607 service.startAsync().awaitRunning();
608 fail();
609 } catch (IllegalStateException e) {
610 assertEquals(service.exception, service.failureCause());
611 assertEquals(service.exception, e.getCause());
612 }
613 assertEquals(
614 ImmutableList.of(
615 State.STARTING,
616 State.FAILED),
617 listener.getStateHistory());
618 }
619
620 public void testThrowingServiceStopAndWait_stopThrowing() throws Exception {
621 StopThrowingService service = new StopThrowingService();
622 RecordingListener listener = RecordingListener.record(service);
623
624 service.startAsync().awaitRunning();
625 try {
626 service.stopAsync().awaitTerminated();
627 fail();
628 } catch (IllegalStateException e) {
629 assertEquals(service.exception, service.failureCause());
630 assertEquals(service.exception, e.getCause());
631 }
632 assertEquals(
633 ImmutableList.of(
634 State.STARTING,
635 State.RUNNING,
636 State.STOPPING,
637 State.FAILED),
638 listener.getStateHistory());
639 }
640
641 public void testThrowingServiceStopAndWait_runThrowing() throws Exception {
642 RunThrowingService service = new RunThrowingService();
643 RecordingListener listener = RecordingListener.record(service);
644
645 service.startAsync();
646 try {
647 service.awaitTerminated();
648 fail();
649 } catch (IllegalStateException e) {
650 assertEquals(service.exception, service.failureCause());
651 assertEquals(service.exception, e.getCause());
652 }
653 assertEquals(
654 ImmutableList.of(
655 State.STARTING,
656 State.RUNNING,
657 State.FAILED),
658 listener.getStateHistory());
659 }
660
661 public void testFailureCause_throwsIfNotFailed() {
662 StopFailingService service = new StopFailingService();
663 try {
664 service.failureCause();
665 fail();
666 } catch (IllegalStateException e) {
667 // expected
668 }
669 service.startAsync().awaitRunning();
670 try {
671 service.failureCause();
672 fail();
673 } catch (IllegalStateException e) {
674 // expected
675 }
676 try {
677 service.stopAsync().awaitTerminated();
678 fail();
679 } catch (IllegalStateException e) {
680 assertEquals(EXCEPTION, service.failureCause());
681 assertEquals(EXCEPTION, e.getCause());
682 }
683 }
684
685 public void testAddListenerAfterFailureDoesntCauseDeadlock() throws InterruptedException {
686 final StartFailingService service = new StartFailingService();
687 service.startAsync();
688 assertEquals(State.FAILED, service.state());
689 service.addListener(new RecordingListener(service), MoreExecutors.sameThreadExecutor());
690 Thread thread = new Thread() {
691 @Override public void run() {
692 // Internally stopAsync() grabs a lock, this could be any such method on AbstractService.
693 service.stopAsync();
694 }
695 };
696 thread.start();
697 thread.join(100);
698 assertFalse(thread + " is deadlocked", thread.isAlive());
699 }
700
701 public void testListenerDoesntDeadlockOnStartAndWaitFromRunning() throws Exception {
702 final NoOpThreadedService service = new NoOpThreadedService();
703 service.addListener(new Listener() {
704 @Override public void running() {
705 service.awaitRunning();
706 }
707 }, MoreExecutors.sameThreadExecutor());
708 service.startAsync().awaitRunning(10, TimeUnit.MILLISECONDS);
709 service.stopAsync();
710 }
711
712 public void testListenerDoesntDeadlockOnStopAndWaitFromTerminated() throws Exception {
713 final NoOpThreadedService service = new NoOpThreadedService();
714 service.addListener(new Listener() {
715 @Override public void terminated(State from) {
716 service.stopAsync().awaitTerminated();
717 }
718 }, MoreExecutors.sameThreadExecutor());
719 service.startAsync().awaitRunning();
720
721 Thread thread = new Thread() {
722 @Override public void run() {
723 service.stopAsync().awaitTerminated();
724 }
725 };
726 thread.start();
727 thread.join(100);
728 assertFalse(thread + " is deadlocked", thread.isAlive());
729 }
730
731 private static class NoOpThreadedService extends AbstractExecutionThreadService {
732 final CountDownLatch latch = new CountDownLatch(1);
733 @Override protected void run() throws Exception {
734 latch.await();
735 }
736 @Override protected void triggerShutdown() {
737 latch.countDown();
738 }
739 }
740
741 private static class StartFailingService extends AbstractService {
742 @Override protected void doStart() {
743 notifyFailed(EXCEPTION);
744 }
745
746 @Override protected void doStop() {
747 fail();
748 }
749 }
750
751 private static class RunFailingService extends AbstractService {
752 @Override protected void doStart() {
753 notifyStarted();
754 notifyFailed(EXCEPTION);
755 }
756
757 @Override protected void doStop() {
758 fail();
759 }
760 }
761
762 private static class StopFailingService extends AbstractService {
763 @Override protected void doStart() {
764 notifyStarted();
765 }
766
767 @Override protected void doStop() {
768 notifyFailed(EXCEPTION);
769 }
770 }
771
772 private static class StartThrowingService extends AbstractService {
773
774 final RuntimeException exception = new RuntimeException("deliberate");
775
776 @Override protected void doStart() {
777 throw exception;
778 }
779
780 @Override protected void doStop() {
781 fail();
782 }
783 }
784
785 private static class RunThrowingService extends AbstractService {
786
787 final RuntimeException exception = new RuntimeException("deliberate");
788
789 @Override protected void doStart() {
790 notifyStarted();
791 throw exception;
792 }
793
794 @Override protected void doStop() {
795 fail();
796 }
797 }
798
799 private static class StopThrowingService extends AbstractService {
800
801 final RuntimeException exception = new RuntimeException("deliberate");
802
803 @Override protected void doStart() {
804 notifyStarted();
805 }
806
807 @Override protected void doStop() {
808 throw exception;
809 }
810 }
811
812 private static class RecordingListener extends Listener {
813 static RecordingListener record(Service service) {
814 RecordingListener listener = new RecordingListener(service);
815 service.addListener(listener, MoreExecutors.sameThreadExecutor());
816 return listener;
817 }
818
819 final Service service;
820
821 RecordingListener(Service service) {
822 this.service = service;
823 }
824
825 @GuardedBy("this")
826 final List<State> stateHistory = Lists.newArrayList();
827 final CountDownLatch completionLatch = new CountDownLatch(1);
828
829 ImmutableList<State> getStateHistory() throws Exception {
830 completionLatch.await();
831 synchronized (this) {
832 return ImmutableList.copyOf(stateHistory);
833 }
834 }
835
836 @Override public synchronized void starting() {
837 assertTrue(stateHistory.isEmpty());
838 assertNotSame(State.NEW, service.state());
839 stateHistory.add(State.STARTING);
840 }
841
842 @Override public synchronized void running() {
843 assertEquals(State.STARTING, Iterables.getOnlyElement(stateHistory));
844 stateHistory.add(State.RUNNING);
845 service.awaitRunning();
846 assertNotSame(State.STARTING, service.state());
847 }
848
849 @Override public synchronized void stopping(State from) {
850 assertEquals(from, Iterables.getLast(stateHistory));
851 stateHistory.add(State.STOPPING);
852 if (from == State.STARTING) {
853 try {
854 service.awaitRunning();
855 fail();
856 } catch (IllegalStateException expected) {
857 assertNull(expected.getCause());
858 assertTrue(expected.getMessage().equals(
859 "Expected the service to be RUNNING, but was STOPPING"));
860 }
861 }
862 assertNotSame(from, service.state());
863 }
864
865 @Override public synchronized void terminated(State from) {
866 assertEquals(from, Iterables.getLast(stateHistory, State.NEW));
867 stateHistory.add(State.TERMINATED);
868 assertEquals(State.TERMINATED, service.state());
869 if (from == State.NEW) {
870 try {
871 service.awaitRunning();
872 fail();
873 } catch (IllegalStateException expected) {
874 assertNull(expected.getCause());
875 assertTrue(expected.getMessage().equals(
876 "Expected the service to be RUNNING, but was TERMINATED"));
877 }
878 }
879 completionLatch.countDown();
880 }
881
882 @Override public synchronized void failed(State from, Throwable failure) {
883 assertEquals(from, Iterables.getLast(stateHistory));
884 stateHistory.add(State.FAILED);
885 assertEquals(State.FAILED, service.state());
886 assertEquals(failure, service.failureCause());
887 if (from == State.STARTING) {
888 try {
889 service.awaitRunning();
890 fail();
891 } catch (IllegalStateException e) {
892 assertEquals(failure, e.getCause());
893 }
894 }
895 try {
896 service.awaitTerminated();
897 fail();
898 } catch (IllegalStateException e) {
899 assertEquals(failure, e.getCause());
900 }
901 completionLatch.countDown();
902 }
903 }
904
905 public void testNotifyStartedWhenNotStarting() {
906 AbstractService service = new DefaultService();
907 try {
908 service.notifyStarted();
909 fail();
910 } catch (IllegalStateException expected) {}
911 }
912
913 public void testNotifyStoppedWhenNotRunning() {
914 AbstractService service = new DefaultService();
915 try {
916 service.notifyStopped();
917 fail();
918 } catch (IllegalStateException expected) {}
919 }
920
921 public void testNotifyFailedWhenNotStarted() {
922 AbstractService service = new DefaultService();
923 try {
924 service.notifyFailed(new Exception());
925 fail();
926 } catch (IllegalStateException expected) {}
927 }
928
929 public void testNotifyFailedWhenTerminated() {
930 NoOpService service = new NoOpService();
931 service.startAsync().awaitRunning();
932 service.stopAsync().awaitTerminated();
933 try {
934 service.notifyFailed(new Exception());
935 fail();
936 } catch (IllegalStateException expected) {}
937 }
938
939 private static class DefaultService extends AbstractService {
940 @Override protected void doStart() {}
941 @Override protected void doStop() {}
942 }
943
944 private static final Exception EXCEPTION = new Exception();
945 }
946 <pre>
ServiceManagerTest


001 </pre>
002 /*
003 * Copyright (C) 2012 The Guava Authors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package com.google.common.util.concurrent;
018
019 import static java.util.Arrays.asList;
020
021 import com.google.common.collect.ImmutableMap;
022 import com.google.common.collect.ImmutableSet;
023 import com.google.common.collect.Lists;
024 import com.google.common.collect.Sets;
025 import com.google.common.testing.NullPointerTester;
026 import com.google.common.testing.TestLogHandler;
027 import com.google.common.util.concurrent.ServiceManager.Listener;
028
029 import junit.framework.TestCase;
030
031 import java.util.Arrays;
032 import java.util.Collection;
033 import java.util.List;
034 import java.util.Set;
035 import java.util.concurrent.CountDownLatch;
036 import java.util.concurrent.Executor;
037 import java.util.concurrent.TimeUnit;
038 import java.util.concurrent.TimeoutException;
039 import java.util.logging.Formatter;
040 import java.util.logging.Level;
041 import java.util.logging.LogRecord;
042 import java.util.logging.Logger;
043
044 /**
045 * Tests for {@link ServiceManager}.
046 *
047 * @author Luke Sandberg
048 * @author Chris Nokleberg
049 */
050 public class ServiceManagerTest extends TestCase {
051
052 private static class NoOpService extends AbstractService {
053 @Override protected void doStart() {
054 notifyStarted();
055 }
056
057 @Override protected void doStop() {
058 notifyStopped();
059 }
060 }
061
062 /*
063 * A NoOp service that will delay the startup and shutdown notification for a configurable amount
064 * of time.
065 */
066 private static class NoOpDelayedSerivce extends NoOpService {
067 private long delay;
068
069 public NoOpDelayedSerivce(long delay) {
070 this.delay = delay;
071 }
072
073 @Override protected void doStart() {
074 new Thread() {
075 @Override public void run() {
076 Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
077 notifyStarted();
078 }
079 }.start();
080 }
081
082 @Override protected void doStop() {
083 new Thread() {
084 @Override public void run() {
085 Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
086 notifyStopped();
087 }
088 }.start();
089 }
090 }
091
092 private static class FailStartService extends NoOpService {
093 @Override protected void doStart() {
094 notifyFailed(new IllegalStateException("failed"));
095 }
096 }
097
098 private static class FailRunService extends NoOpService {
099 @Override protected void doStart() {
100 super.doStart();
101 notifyFailed(new IllegalStateException("failed"));
102 }
103 }
104
105 private static class FailStopService extends NoOpService {
106 @Override protected void doStop() {
107 notifyFailed(new IllegalStateException("failed"));
108 }
109 }
110
111 public void testServiceStartupTimes() {
112 Service a = new NoOpDelayedSerivce(150);
113 Service b = new NoOpDelayedSerivce(353);
114 ServiceManager serviceManager = new ServiceManager(asList(a, b));
115 serviceManager.startAsync().awaitHealthy();
116 ImmutableMap<Service, Long> startupTimes = serviceManager.startupTimes();
117 assertEquals(2, startupTimes.size());
118 assertTrue(startupTimes.get(a) >= 150);
119 assertTrue(startupTimes.get(b) >= 353);
120 }
121
122 public void testServiceStartStop() {
123 Service a = new NoOpService();
124 Service b = new NoOpService();
125 ServiceManager manager = new ServiceManager(asList(a, b));
126 RecordingListener listener = new RecordingListener();
127 manager.addListener(listener);
128 assertState(manager, Service.State.NEW, a, b);
129 assertFalse(manager.isHealthy());
130 manager.startAsync().awaitHealthy();
131 assertState(manager, Service.State.RUNNING, a, b);
132 assertTrue(manager.isHealthy());
133 assertTrue(listener.healthyCalled);
134 assertFalse(listener.stoppedCalled);
135 assertTrue(listener.failedServices.isEmpty());
136 manager.stopAsync().awaitStopped();
137 assertState(manager, Service.State.TERMINATED, a, b);
138 assertFalse(manager.isHealthy());
139 assertTrue(listener.stoppedCalled);
140 assertTrue(listener.failedServices.isEmpty());
141 }
142
143 public void testFailStart() throws Exception {
144 Service a = new NoOpService();
145 Service b = new FailStartService();
146 Service c = new NoOpService();
147 Service d = new FailStartService();
148 Service e = new NoOpService();
149 ServiceManager manager = new ServiceManager(asList(a, b, c, d, e));
150 RecordingListener listener = new RecordingListener();
151 manager.addListener(listener);
152 assertState(manager, Service.State.NEW, a, b, c, d, e);
153 try {
154 manager.startAsync().awaitHealthy();
155 fail();
156 } catch (IllegalStateException expected) {
157 }
158 assertFalse(listener.healthyCalled);
159 assertState(manager, Service.State.RUNNING, a, c, e);
160 assertEquals(ImmutableSet.of(b, d), listener.failedServices);
161 assertState(manager, Service.State.FAILED, b, d);
162 assertFalse(manager.isHealthy());
163
164 manager.stopAsync().awaitStopped();
165 assertFalse(manager.isHealthy());
166 assertFalse(listener.healthyCalled);
167 assertTrue(listener.stoppedCalled);
168 }
169
170 public void testFailRun() throws Exception {
171 Service a = new NoOpService();
172 Service b = new FailRunService();
173 ServiceManager manager = new ServiceManager(asList(a, b));
174 RecordingListener listener = new RecordingListener();
175 manager.addListener(listener);
176 assertState(manager, Service.State.NEW, a, b);
177 try {
178 manager.startAsync().awaitHealthy();
179 fail();
180 } catch (IllegalStateException expected) {
181 }
182 assertTrue(listener.healthyCalled);
183 assertEquals(ImmutableSet.of(b), listener.failedServices);
184
185 manager.stopAsync().awaitStopped();
186 assertState(manager, Service.State.FAILED, b);
187 assertState(manager, Service.State.TERMINATED, a);
188
189 assertTrue(listener.stoppedCalled);
190 }
191
192 public void testFailStop() throws Exception {
193 Service a = new NoOpService();
194 Service b = new FailStopService();
195 Service c = new NoOpService();
196 ServiceManager manager = new ServiceManager(asList(a, b, c));
197 RecordingListener listener = new RecordingListener();
198 manager.addListener(listener);
199
200 manager.startAsync().awaitHealthy();
201 assertTrue(listener.healthyCalled);
202 assertFalse(listener.stoppedCalled);
203 manager.stopAsync().awaitStopped();
204
205 assertTrue(listener.stoppedCalled);
206 assertEquals(ImmutableSet.of(b), listener.failedServices);
207 assertState(manager, Service.State.FAILED, b);
208 assertState(manager, Service.State.TERMINATED, a, c);
209 }
210
211 public void testToString() throws Exception {
212 Service a = new NoOpService();
213 Service b = new FailStartService();
214 ServiceManager manager = new ServiceManager(asList(a, b));
215 String toString = manager.toString();
216 assertTrue(toString.contains("NoOpService"));
217 assertTrue(toString.contains("FailStartService"));
218 }
219
220 public void testTimeouts() throws Exception {
221 Service a = new NoOpDelayedSerivce(50);
222 ServiceManager manager = new ServiceManager(asList(a));
223 manager.startAsync();
224 try {
225 manager.awaitHealthy(1, TimeUnit.MILLISECONDS);
226 fail();
227 } catch (TimeoutException expected) {
228 }
229 manager.awaitHealthy(100, TimeUnit.MILLISECONDS); // no exception thrown
230
231 manager.stopAsync();
232 try {
233 manager.awaitStopped(1, TimeUnit.MILLISECONDS);
234 fail();
235 } catch (TimeoutException expected) {
236 }
237 manager.awaitStopped(100, TimeUnit.MILLISECONDS); // no exception thrown
238 }
239
240 /**
241 * This covers a case where if the last service to stop failed then the stopped callback would
242 * never be called.
243 */
244 public void testSingleFailedServiceCallsStopped() {
245 Service a = new FailStartService();
246 ServiceManager manager = new ServiceManager(asList(a));
247 RecordingListener listener = new RecordingListener();
248 manager.addListener(listener);
249 try {
250 manager.startAsync().awaitHealthy();
251 fail();
252 } catch (IllegalStateException expected) {
253 }
254 assertTrue(listener.stoppedCalled);
255 }
256
257 /**
258 * This covers a bug where listener.healthy would get called when a single service failed during
259 * startup (it occurred in more complicated cases also).
260 */
261 public void testFailStart_singleServiceCallsHealthy() {
262 Service a = new FailStartService();
263 ServiceManager manager = new ServiceManager(asList(a));
264 RecordingListener listener = new RecordingListener();
265 manager.addListener(listener);
266 try {
267 manager.startAsync().awaitHealthy();
268 fail();
269 } catch (IllegalStateException expected) {
270 }
271 assertFalse(listener.healthyCalled);
272 }
273
274 /**
275 * This covers a bug where if a listener was installed that would stop the manager if any service
276 * fails and something failed during startup before service.start was called on all the services,
277 * then awaitStopped would deadlock due to an IllegalStateException that was thrown when trying to
278 * stop the timer(!).
279 */
280 public void testFailStart_stopOthers() throws TimeoutException {
281 Service a = new FailStartService();
282 Service b = new NoOpService();
283 final ServiceManager manager = new ServiceManager(asList(a, b));
284 manager.addListener(new Listener() {
285 @Override public void failure(Service service) {
286 manager.stopAsync();
287 }});
288 manager.startAsync();
289 manager.awaitStopped(10, TimeUnit.MILLISECONDS);
290 }
291
292 private static void assertState(
293 ServiceManager manager, Service.State state, Service... services) {
294 Collection<Service> managerServices = manager.servicesByState().get(state);
295 for (Service service : services) {
296 assertEquals(service.toString(), state, service.state());
297 assertEquals(service.toString(), service.isRunning(), state == Service.State.RUNNING);
298 assertTrue(managerServices + " should contain " + service, managerServices.contains(service));
299 }
300 }
301
302 /**
303 * This is for covering a case where the ServiceManager would behave strangely if constructed
304 * with no service under management. Listeners would never fire because the ServiceManager was
305 * healthy and stopped at the same time. This test ensures that listeners fire and isHealthy
306 * makes sense.
307 */
308 public void testEmptyServiceManager() {
309 Logger logger = Logger.getLogger(ServiceManager.class.getName());
310 logger.setLevel(Level.FINEST);
311 TestLogHandler logHandler = new TestLogHandler();
312 logger.addHandler(logHandler);
313 ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
314 RecordingListener listener = new RecordingListener();
315 manager.addListener(listener, MoreExecutors.sameThreadExecutor());
316 manager.startAsync().awaitHealthy();
317 assertTrue(manager.isHealthy());
318 assertTrue(listener.healthyCalled);
319 assertFalse(listener.stoppedCalled);
320 assertTrue(listener.failedServices.isEmpty());
321 manager.stopAsync().awaitStopped();
322 assertFalse(manager.isHealthy());
323 assertTrue(listener.stoppedCalled);
324 assertTrue(listener.failedServices.isEmpty());
325 // check that our NoOpService is not directly observable via any of the inspection methods or
326 // via logging.
327 assertEquals("ServiceManager{services=[]}", manager.toString());
328 assertTrue(manager.servicesByState().isEmpty());
329 assertTrue(manager.startupTimes().isEmpty());
330 Formatter logFormatter = new Formatter() {
331 @Override public String format(LogRecord record) {
332 return formatMessage(record);
333 }
334 };
335 for (LogRecord record : logHandler.getStoredLogRecords()) {
336 assertFalse(logFormatter.format(record).contains("NoOpService"));
337 }
338 }

Service框架浅析Google-Guava Concurrent

标签:ram   xtend   oop   false   protect   which   abstract   inter   发包   

原文地址:https://www.cnblogs.com/525jik/p/12702413.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!