码迷,mamicode.com
首页 > 其他好文 > 详细

NATS学习 -- 概念学习之消息(Message)与发布订阅(Publish Subscribe)

时间:2016-04-16 19:16:43      阅读:1099      评论:0      收藏:0      [点我收藏+]

标签:

1 理论篇

1.1 来自官方的介绍

NATS acts as a central nervous system for distributed systems such as mobile devices, IoT networks, enterprise microservices and cloud native infrastructure. Unlike traditional enterprise messaging systems, NATS provides an always on ‘dial-tone’.

NATS在分布式系统中扮演一个中央神经系统,这些分布式系统包括移动设备,物联网,企业微服务和原生云基础设施。不同于传统的企业级消息系统,NATS提供永远在线服务。

NATS was created by Derek Collison, Founder and CEO at Apcera. Derek has spent over twenty years designing, building and using publish-subscribe messaging systems. Learn why traditional enterprise messaging systems don’t work for today’s distributed infrastructure, in this talk from Derek at SCALE on designing a new cloud-native messaging framework.

1.2 NATS 的消息(Message)

NATS messaging involves the electronic exchange of data among computer applications.

NATS provides a layer between the application and the underlying physical network. Application data is encoded as a message and sent by the publisher. The message is received, decoded, and processed by one or more subscribers. A subscriber can process a NATS message asynchronously or synchronously.

NATS的消息是指在不同计算机应用程序之间的消息交互。

NATS在应用与底层网络之间提供了一个抽象层。应用数据被包装为消息(message)并由发布者发送。消息被一个或多个订阅者接收,解码并且处理。订阅者可以异步或同步地处理消息。

技术分享

1.2.1 Asynchronous processing

Asynchronous processing uses a callback message handler to process messages. When a message arrives, the registered callback handler receives control to process the message. The client or consuming application is not blocked from performing other work while it is waiting for a message. Asynchronous processing lets you create multi-threaded dispatching designs.

异步消息处理

异步消息处理使用一个消息回调处理器来处理消息,当消息到达的时候,已经注册的回调处理器来处理该消息。客户端或消费者程序不会从其他事件中阻塞在等待消息的时候。异步消息处理可以让程序员创建多线程的分发设计。

1.2.2 Synchronous processing

Synchronous processing requires that application code explicitly call a method to process an incoming message. Typically an explicit call is a blocking call that suspends processing until a message becomes available. If no message is available, the period for which the message processing call blocks is set by the client. Synchronous processing is typically used by a server whose purpose is to wait for and process incoming request messages, and to send replies to the requesting application.

同步消息处理

同步消息处理需要应用程序代码显示地调用函数来处理到达的消息。通常显示的函数调用是一个阻塞的调用,它会等待一知道消息可用。如果没有可用的消息,客户端在调用消息的时候就会一直处于阻塞状态。同步消息处理通常是由一个服务器端充当,它的职责就是等待并处理到达的消息,并且给发送消息的一方发送回复内容。

1.3 NATS的发布订阅(Publish Subscribe)

NATS implements a publish subscribe messaging model. NATS publish subscribe is a one-to-many communication. A publisher sends a message on a subject. Any active subscriber listening on that subject receives the message. Subscribers can register interest in wildcard subjects.

NATS实现了一个基于发布订阅的消息模型。NATS的发布订阅模型是一个一对多的通信模型。消息的发布者发送消息到一个主体(subject),任何的活动的订阅者坚挺这个主题并收到消息。订阅者可以使用通配符注册感兴趣的主题。

技术分享

If a subscriber is not listening on the subject (no subject match), or is not active when the message is sent, the message is not received. NATS is a fire-and-forget messaging system. If you need higher levels of service, you build it into the client.

如果订阅者没有监听的主题(或者没有匹配的主题),或者不在线,当消息发送的时候它将不会收到消息。NATS是一个一劳永逸的消息系统。如果需要更高的服务,将它内置在客户端。

In an asynchronous exchange, messages are delivered to the subscriber’s message handler. If there is no handler, the subscription is synchronous and the client may be blocked until it can process the message.

在异步消息处理中,消息将会被发送到订阅者的消息回调处理器上,如果没有消息回调处理器,这个订阅者就是同步的,客户端将会阻塞,知道它处理了消息。

2 实践篇

首先确保NATS服务器程序已经搭建好了。

参考

http://blog.csdn.net/frankcheng5143/article/details/51141804

2.1 启动服务

在命令行输入

gnastd

技术分享

2.2 客户端程序的编写

依赖的jar包

<dependency>
  <groupId>io.nats</groupId>
  <artifactId>jnats</artifactId>
  <version>0.4.1</version>
</dependency>

其他依赖

没有日志它会报错,这里我用的是log4j

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.21</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.21</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

log4j的配置

log4j.properties

#---------------------------------------------------------
# Log4J Settings for log4j 1.2.x (via jakarta-commons-logging)
#
# The five logging levels used by Log are (in order):
#
#   1. DEBUG (the least serious)
#   2. INFO
#   3. WARN
#   4. ERROR
#   5. FATAL (the most serious)
#
#---------------------------------------------------------
# Root-Categroy
#---------------------------------------------------------
log4j.rootCategory=INFO, stdout, file

#---------------------------------------------------------
# stdout
#---------------------------------------------------------
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n

#---------------------------------------------------------
# file (log)
#---------------------------------------------------------
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.File=/home/gwcheng/info.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.Append=true
log4j.appender.file.DatePattern=‘.‘yyyy-MM-dd  
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss:SSS} %p [%M] %c %m%n

#---------------------------------------------------------
# file (html)
#---------------------------------------------------------
#log4j.appender.html=org.apache.log4j.FileAppender
#log4j.appender.html.File=### THE PATH HERE ###
#log4j.appender.html.layout=org.apache.log4j.HTMLLayout

#---------------------------------------------------------
# customer
#---------------------------------------------------------

# my definition 
io.netty=debug

好的,开始写代码。

这里模拟一个发布者,三个订阅者,拓扑图和上面的图一样。

2.2.1 发布者代码

Publish.java

package com.gwc.nats.nats.test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Message;

/**
 * Hello world!
 * @author gwcheng
 */
public class Publish {
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
                ConnectionFactory cf = new ConnectionFactory("nats://127.0.0.1:4222");
                Connection nc = cf.createConnection();
                // 消息
                Message msg = new Message();
                // 设置主题
                msg.setSubject("foo");            
                @SuppressWarnings("resource")
                Scanner scanner = new Scanner(System.in);
                System.out.println("请输入字符串:");
                while (true) {
                        String line = scanner.nextLine();
                        msg.setData(line.getBytes());
                        // 发布消息
                        nc.publish(msg);
                }
        }
}

2.2.2 发布者代码

Subscribe.java

package com.gwc.nats.nats.test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Message;
import io.nats.client.MessageHandler;

public class Subscribe {

        public static void main(String[] args) throws IOException, TimeoutException {
                ConnectionFactory cf = new ConnectionFactory("nats://127.0.0.1:4222");
                Connection nc = cf.createConnection();
                // Lambda 表达式写法
                /*
                 * nc.subscribe("foo", m -> { System.out.printf(
                 * "收到的消息:%s\n", new String(m.getData())); });
                 */
                nc.subscribe("foo", new MessageHandler() {
                        @Override
                        public void onMessage(Message msg) {
                                System.out.println("收到的消息:" + new String(msg.getData()));
                        }
                });
        }
}

写上三个一模一样的Subscribe,除了类名不一样。

2.3 测试

先运行Publish,并输入一些字符,发布到foo这个主题上,这里没有订阅者,一会儿有了订阅者也不会收到刚才发布的字符。

技术分享

运行三个Subscribe

技术分享

然后在Publish的console输入一些字符,这个时候三个订阅者都将会收到该字符

Publish
技术分享

Subscribe
技术分享

Subscribe1
技术分享

我们让Subscribe2 订阅的主题不是foo,看一下运行结果

Subscribe
技术分享

Subscribe1
技术分享

Subscribe2
技术分享

说明Subscribe2并没有收到消息,因为它没有关注foo主题。

参考文献

http://nats.io/

http://nats.io/documentation/concepts/nats-messaging/

http://nats.io/documentation/concepts/nats-pub-sub/

https://github.com/nats-io/jnats

NATS学习 -- 概念学习之消息(Message)与发布订阅(Publish Subscribe)

标签:

原文地址:http://blog.csdn.net/frankcheng5143/article/details/51161206

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!