标签:thread 没有 local 发布 tca port 一个 mysql 测试的
本文摘自于《Spring Cloud微服务 入门 实战与进阶》一书。1 配置发布后的实时推送设计
配置中心最重要的一个特性就是实时推送了,正因为有这个特性,我们可以依赖配置中心做很多事情。在我自己开发的Smconf这个配置中心,Smconf是依赖于Zookeeper的Watch机制来实现实时推送。
上图简要描述了配置发布的大致过程:
ReleaseMessage消息是通过Mysql实现了一个简单的消息队列。之所有没有采用消息中间件,是为了让Apollo在部署的时候尽量简单,尽可能减少外部依赖。
上图简要描述了发送ReleaseMessage的大致过程:
通知是采用基于Http长连接实现,主要分为下面几个步骤:
Apollo推送这块代码比较多,就不在本书中详细分析了,我把推送这块的代码稍微简化了下,给大家进行讲解,这样理解起来会更容易。当然我这边会比较简单,很多细节就不做考虑了,只是为了能够让大家明白Apollo推送的核心原理。
发送ReleaseMessage的逻辑我们就写一个简单的接口,用队列存储,测试的时候就调用这个接口模拟配置有更新,发送ReleaseMessage消息。
@RestController
public
class
NotificationControllerV2
implements
ReleaseMessageListener
{
// 模拟配置更新,往里插入数据表示有更新
public
static
Queue
<
String
> queue =
new
LinkedBlockingDeque
<>();
@GetMapping
(
"/addMsg"
)
public
String
addMsg() {
queue.add(
"xxx"
);
return
"success"
;
}
}
消息发送之后,前面我们有讲过Config Service会启动一个线程定时扫描ReleaseMessage表,去查看是否有新的消息记录,然后取通知客户端,这边我们也启动一个线程去扫描:
@Component
public
class
ReleaseMessageScanner
implements
InitializingBean
{
@Autowired
private
NotificationControllerV2
configController;
@Override
public
void
afterPropertiesSet()
throws
Exception
{
// 定时任务从数据库扫描有没有新的配置发布
new
Thread
(() -> {
for
(;;) {
String
result =
NotificationControllerV2
.queue.poll();
if
(result !=
null
) {
ReleaseMessage
message =
new
ReleaseMessage
();
message.setMessage(result);
configController.handleMessage(message);
}
}
}).start();;
}
}
循环去读取NotificationControllerV2中的队列,如果有消息的话就构造一个ReleaseMessage的对象,然后调用NotificationControllerV2中的handleMessage()方法进行消息的处理。
ReleaseMessage就一个字段,模拟消息内容:
public
class
ReleaseMessage
{
private
String
message;
public
void
setMessage(
String
message) {
this
.message = message;
}
public
String
getMessage() {
return
message;
}
}
接下来,我们看handleMessage做了什么样的工作
NotificationControllerV2实现了ReleaseMessageListener接口,ReleaseMessageListener中定义了handleMessage()方法。
public
interface
ReleaseMessageListener
{
void
handleMessage(
ReleaseMessage
message);
}
handleMessage就是当配置发生变化的时候,通知的消息监听器,消息监听器得到配置发布的信息后,则会通知对应的客户端:
@RestController
public
class
NotificationControllerV2
implements
ReleaseMessageListener
{
private
final
Multimap
<
String
,
DeferredResultWrapper
> deferredResults =
Multimaps
.synchronizedSetMultimap(
HashMultimap
.create());
@Override
public
void
handleMessage(
ReleaseMessage
message) {
System
.err.println(
"handleMessage:"
+ message);
List
<
DeferredResultWrapper
> results =
Lists
.newArrayList(deferredResults.
get
(
"xxxx"
));
for
(
DeferredResultWrapper
deferredResultWrapper : results) {
List
<
ApolloConfigNotification
> list =
new
ArrayList
<>();
list.add(
new
ApolloConfigNotification
(
"application"
,
1
));
deferredResultWrapper.setResult(list);
}
}
}
Apollo的实时推送是基于Spring DeferredResult实现的,在handleMessage()方法中可以看到是通过deferredResults获取DeferredResult,deferredResults就是第一行的Multimap,Key其实就是消息内容,Value就是DeferredResult的业务包装类DeferredResultWrapper,我们来看下DeferredResultWrapper的代码:
public
class
DeferredResultWrapper
{
private
static
final
long
TIMEOUT =
60
*
1000
;
// 60 seconds
private
static
final
ResponseEntity
<
List
<
ApolloConfigNotification
>> NOT_MODIFIED_RESPONSE_LIST =
new
ResponseEntity
<>(
HttpStatus
.NOT_MODIFIED);
private
DeferredResult
<
ResponseEntity
<
List
<
ApolloConfigNotification
>>> result;
public
DeferredResultWrapper
() {
result =
new
DeferredResult
<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}
public
void
onTimeout(
Runnable
timeoutCallback) {
result.onTimeout(timeoutCallback);
}
public
void
onCompletion(
Runnable
completionCallback) {
result.onCompletion(completionCallback);
}
public
void
setResult(
ApolloConfigNotification
notification) {
setResult(
Lists
.newArrayList(notification));
}
public
void
setResult(
List
<
ApolloConfigNotification
> notifications) {
result.setResult(
new
ResponseEntity
<>(notifications,
HttpStatus
.OK));
}
public
DeferredResult
<
ResponseEntity
<
List
<
ApolloConfigNotification
>>> getResult() {
return
result;
}
}
通过setResult()方法设置返回结果给客户端,以上就是当配置发生变化,然后通过消息监听器通知客户端的原理,那么客户端是在什么时候接入的呢?
@RestController
public
class
NotificationControllerV2
implements
ReleaseMessageListener
{
// 模拟配置更新,往里插入数据表示有更新
public
static
Queue
<
String
> queue =
new
LinkedBlockingDeque
<>();
private
final
Multimap
<
String
,
DeferredResultWrapper
> deferredResults =
Multimaps
.synchronizedSetMultimap(
HashMultimap
.create());
@GetMapping
(
"/getConfig"
)
public
DeferredResult
<
ResponseEntity
<
List
<
ApolloConfigNotification
>>> getConfig() {
DeferredResultWrapper
deferredResultWrapper =
new
DeferredResultWrapper
();
List
<
ApolloConfigNotification
> newNotifications = getApolloConfigNotifications();
if
(!
CollectionUtils
.isEmpty(newNotifications)) {
deferredResultWrapper.setResult(newNotifications);
}
else
{
deferredResultWrapper.onTimeout(() -> {
System
.err.println(
"onTimeout"
);
});
deferredResultWrapper.onCompletion(() -> {
System
.err.println(
"onCompletion"
);
});
deferredResults.put(
"xxxx"
, deferredResultWrapper);
}
return
deferredResultWrapper.getResult();
}
private
List
<
ApolloConfigNotification
> getApolloConfigNotifications() {
List
<
ApolloConfigNotification
> list =
new
ArrayList
<>();
String
result = queue.poll();
if
(result !=
null
) {
list.add(
new
ApolloConfigNotification
(
"application"
,
1
));
}
return
list;
}
}
NotificationControllerV2中提供了一个/getConfig的接口,客户端在启动的时候会调用这个接口,这个时候会执行getApolloConfigNotifications()方法去获取有没有配置的变更信息,如果有的话证明配置修改过,直接就通过deferredResultWrapper.setResult(newNotifications);返回结果给客户端了,客户端收到结果后重新拉取配置的信息进行覆盖本地的配置。
如果getApolloConfigNotifications()方法没有返回配置修改的信息,证明配置没有发生修改,就将DeferredResultWrapper对象添加到deferredResults中,等待后续配置发生变化时消息监听器进行通知。
同时这个请求就会挂起,不会立即返回,挂起是通过DeferredResultWrapper中的下面的代码实现的:
private
static
final
long
TIMEOUT =
60
*
1000
;
// 60 seconds
private
static
final
ResponseEntity
<
List
<
ApolloConfigNotification
>> NOT_MODIFIED_RESPONSE_LIST =
new
ResponseEntity
<>(
HttpStatus
.NOT_MODIFIED);
private
DeferredResult
<
ResponseEntity
<
List
<
ApolloConfigNotification
>>> result;
public
DeferredResultWrapper
() {
result =
new
DeferredResult
<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
}
在创建DeferredResult对象的时候指定了超时的时间和超时后返回的响应码,如果60秒内没有消息监听器进行通知,那么这个请求就会超时,超时后客户端就收到的响应码就是304。
整个Config Service的流程就走完了,接下来我们看客户端是怎么实现的,我们简单的写个测试类模拟客户端注册:
public
class
ClientTest
{
public
static
void
main(
String
[] args) {
reg();
}
private
static
void
reg() {
System
.err.println(
"注册"
);
String
result = request(
"http://localhost:8081/getConfig"
);
if
(result !=
null
) {
// 配置有更新,重新拉取配置
// ......
}
// 重新注册
reg();
}
private
static
String
request(
String
url) {
HttpURLConnection
connection =
null
;
BufferedReader
reader =
null
;
try
{
URL getUrl =
new
URL(url);
connection = (
HttpURLConnection
) getUrl.openConnection();
connection.setReadTimeout(
90000
);
connection.setConnectTimeout(
3000
);
connection.setRequestMethod(
"GET"
);
connection.setRequestProperty(
"Accept-Charset"
,
"utf-8"
);
connection.setRequestProperty(
"Content-Type"
,
"application/json"
);
connection.setRequestProperty(
"Charset"
,
"UTF-8"
);
System
.
out
.println(connection.getResponseCode());
if
(
200
== connection.getResponseCode()) {
reader =
new
BufferedReader
(
new
InputStreamReader
(connection.getInputStream(),
"UTF-8"
));
StringBuilder
result =
new
StringBuilder
();
String
line =
null
;
while
((line = reader.readLine()) !=
null
) {
result.append(line);
}
System
.
out
.println(
"结果 "
+ result);
return
result.toString();
}
}
catch
(
IOException
e) {
e.printStackTrace();
}
finally
{
if
(connection !=
null
) {
connection.disconnect();
}
}
return
null
;
}
}
首先启动/getConfig接口所在的服务,然后启动客户端,客户端就会发起注册请求,如果有修改直接获取到结果,进行配置的更新操作。如果无修改,请求会挂起,这边客户端设置的读取超时时间是90秒,大于服务端的60秒超时时间。
每次收到结果后,无论是有修改还是没修改,都必须重新进行注册,通过这样的方式就可以达到配置实时推送的效果。
我们可以调用之前写的/addMsg接口来模拟配置发生变化,调用之后客户端就能马上得到返回结果。
本文摘自于《Spring Cloud微服务 入门 实战与进阶》一书。
去年出版的《Spring Cloud微服务:全栈技术与案例解析》一书,得到了大家的支持以及反馈,基于大家的反馈,重新进行了更正和改进。
基于比较稳定的 Spring Cloud Finchley.SR2 版本和 Spring Boot 2.0.6.RELEASE 版本编写。
同时将示列代码进行标准的归档,之前的都在一起,不方便读者参考和运行。
同时还增加了像Apollo,Spring Cloud Gateway,生产实践经验等新的内容。
尹吉欢
我不差钱啊
喜欢作者
标签:thread 没有 local 发布 tca port 一个 mysql 测试的
原文地址:https://blog.51cto.com/14888386/2515767