码迷,mamicode.com
首页 > 编程语言 > 详细

netty学习:UDP服务器与Spring整合(2)

时间:2017-08-31 10:58:18      阅读:4290      评论:0      收藏:0      [点我收藏+]

标签:dex   dha   img   write   .exe   ota   tag   cal   ide   

上一篇文章中,介绍了netty实现UDP服务器的栗子,本文将会对UDP服务器与spring boot整合起来,并使用RedisTemplate的操作类访问Redis和使用JPA链接MySQL,其中会使用多线程、异步等知识。

本人使用的编辑器是IntelliJ IDEA 2017.1.exe版本(链接:http://pan.baidu.com/s/1pLODHm7 密码:dlx7);建议使用STS或者是idea编辑器来进行spring的学习。

 

1)项目目录结构

整个项目的目录结构如下:

技术分享

2)jar包

其中pom.xml文件的内容如下:

只有netty-all和commons-lang3是手动加入的jar包,其余的都是创建spring boot项目时候选择组件后自动导入的。
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5 
 6     <groupId>com.example</groupId>
 7     <artifactId>udplearning</artifactId>
 8     <version>0.0.1-SNAPSHOT</version>
 9     <packaging>jar</packaging>
10 
11     <name>udplearning</name>
12     <description>Demo project for Spring Boot</description>
13 
14     <parent>
15         <groupId>org.springframework.boot</groupId>
16         <artifactId>spring-boot-starter-parent</artifactId>
17         <version>1.5.6.RELEASE</version>
18         <relativePath/> <!-- lookup parent from repository -->
19     </parent>
20 
21     <properties>
22         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
23         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
24         <commons-lang3.version>3.4</commons-lang3.version>
25         <java.version>1.8</java.version>
26     </properties>
27 
28     <dependencies>
29 
30         <!-- netty  -->
31 
32         <dependency>
33             <groupId>io.netty</groupId>
34             <artifactId>netty-all</artifactId>
35             <version>4.0.49.Final</version>
36         </dependency>
37 
38 
39         <dependency>
40             <groupId>org.apache.commons</groupId>
41             <artifactId>commons-lang3</artifactId>
42             <version>${commons-lang3.version}</version>
43         </dependency>
44 
45 
46 
47         <dependency>
48             <groupId>org.springframework.boot</groupId>
49             <artifactId>spring-boot-starter-data-jpa</artifactId>
50         </dependency>
51         <dependency>
52             <groupId>org.springframework.boot</groupId>
53             <artifactId>spring-boot-starter-data-redis</artifactId>
54         </dependency>
55         <dependency>
56             <groupId>org.springframework.boot</groupId>
57             <artifactId>spring-boot-starter-jdbc</artifactId>
58         </dependency>
59         <dependency>
60             <groupId>org.springframework.boot</groupId>
61             <artifactId>spring-boot-starter-web</artifactId>
62         </dependency>
63         <dependency>
64             <groupId>org.springframework.boot</groupId>
65             <artifactId>spring-boot-starter-web-services</artifactId>
66         </dependency>
67 
68         <dependency>
69             <groupId>mysql</groupId>
70             <artifactId>mysql-connector-java</artifactId>
71             <scope>runtime</scope>
72         </dependency>
73         <dependency>
74             <groupId>org.springframework.boot</groupId>
75             <artifactId>spring-boot-starter-test</artifactId>
76             <scope>test</scope>
77         </dependency>
78     </dependencies>
79 
80     <build>
81         <plugins>
82             <plugin>
83                 <groupId>org.springframework.boot</groupId>
84                 <artifactId>spring-boot-maven-plugin</artifactId>
85             </plugin>
86         </plugins>
87     </build>
88 
89 
90 </project>

 

3)配置文件application.properties

application.properties的内容:

1 spring.profiles.active=test
2 
3 spring.messages.encoding=utf-8
4 
5 logging.config=classpath:logback.xml
“spring.profiles.active” 针对多种启动环境的spring boot配置方法,此时启动的是test运行环境,即默认是启动application-test.properties里面的配置信息;
“spring.messages.encoding=utf-8”是指编码方式utf-8;
“logging.config=classpath:logback.xml”是指日志文件位置。

application-test.properties的内容如下:
 1 context.listener.classes=com.example.demo.init.StartupEvent
 2 
 3 #mysql
 4 spring.jpa.show-sql=true
 5 spring.jpa.database=mysql
 6 #spring.jpa.hibernate.ddl-auto=update
 7 spring.datasource.url=jdbc:mysql://127.0.0.1/test
 8 spring.datasource.username=root
 9 spring.datasource.password=123456
10 spring.datasource.driver-class-name=com.mysql.jdbc.Driver
11 spring.datasource.jdbc-interceptors=ConnectionState;SlowQueryReport(threshold=0)
12 
13 spring.session.store-type=none
14 
15 # (RedisProperties)
16 spring.redis.database=3
17 spring.redis.host=127.0.0.1
18 spring.redis.port=6379
19 spring.redis.password=123456
20 spring.redis.pool.max-active=8
21 spring.redis.pool.max-wait=-1
22 spring.redis.pool.max-idle=8
23 spring.redis.pool.min-idle=0
24 spring.redis.timeout=0
25 
26 
27 #UDP消息接收打端口
28 sysfig.udpReceivePort = 7686
29 
30 #线程池
31 spring.task.pool.corePoolSize = 5
32 spring.task.pool.maxPoolSize = 100
33 spring.task.pool.keepAliveSeconds = 100
34 spring.task.pool.queueCapacity = 100

其中配置了context.listener.classes=com.example.demo.init.StartupEvent,将StartupEvent类作为Spring boot启动后执行文件。

其中还配置了一些mysql、redis和自定义的属性。可根据项目的实际情况修改。



4)日志文件logback.xml
logback.xml的内容如下:
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <configuration xmlns="http://ch.qos.logback/xml/ns/logback"
 3                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4                xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback
 5                http://ch.qos.logback/xml/ns/logback/logback.xsd
 6                http://ch.qos.logback/xml/ns/logback ">
 7     <property name="APP_Name" value="udplearning" />
 8     <timestamp key="bySecond" datePattern="yyyyMMdd‘T‘HHmmss" />
 9     <contextName>${APP_Name}</contextName>
10     
11     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
12         <encoder>
13             <pattern>%d{yyyyMMddHHmmss}|%-5level| %logger{0}.%M | %msg | %thread %n</pattern>
14         </encoder>
15     </appender>  
16     
17   <appender name="FILELOG" class="ch.qos.logback.core.rolling.RollingFileAppender">   
18     <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">   
19       <fileNamePattern>${catalina.home}/logs/app.%d{yyyyMMdd}.log</fileNamePattern>   
20       <maxHistory>30</maxHistory>    
21     </rollingPolicy>   
22     <encoder>   
23       <pattern>%d{yyMMddHHmmss.SSS}|%-5level| %msg%n</pattern>   
24     </encoder>   
25   </appender>
26   
27     <appender name="RUNLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">   
28     <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">   
29       <fileNamePattern>${catalina.home}/logs/run.%d{yyyyMMdd}.log</fileNamePattern>   
30       <maxHistory>7</maxHistory>    
31     </rollingPolicy>   
32     <encoder>   
33       <pattern>%d{yyMMddHHmmss.SSS}|%-5level| %msg%n</pattern>   
34     </encoder>   
35   </appender>
36   
37     <logger name="com.example.demo" level="debug" additivity="false">
38         <appender-ref ref="STDOUT" />  
39         <appender-ref ref="FILELOG" />
40     </logger>
41     
42     <root level="info">
43         <appender-ref ref="STDOUT" />
44     </root>
45 </configuration>

日志的级别是info级别  可以根据自己项目的实际情况进行设置。

 

5)StartupEvent.java

 1 package com.example.demo.init;
 2 
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.context.ApplicationContext;
 6 import org.springframework.context.ApplicationListener;
 7 import org.springframework.context.event.ContextRefreshedEvent;
 8 
 9 /**
10  *
11  * Created by wj on 2017/8/28.
12  */
13 
14 public class StartupEvent implements ApplicationListener<ContextRefreshedEvent> {
15     private static final Logger log = LoggerFactory.getLogger(StartupEvent.class);
16 
17     private static ApplicationContext context;
18 
19     @Override
20     public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
21 
22         try {
23 
24             context = contextRefreshedEvent.getApplicationContext();
25 
26             SysConfig sysConfig = (SysConfig) context.getBean(SysConfig.class);
27 
28             //接收UDP消息并保存至redis中
29             UdpServer udpServer = (UdpServer)StartupEvent.getBean(UdpServer.class);
30             udpServer.run(sysConfig.getUdpReceivePort());
31 
32 
33 //            这里可以开启多个线程去执行不同的任务
34 //            此处为工作的内容,不便公开!
35 
36 
37         } catch (Exception e) {
38             log.error("Exception", e);
39         }
40     }
41 
42     public static Object getBean(Class beanName) {
43         return context != null ? context.getBean(beanName) : null;
44     }
45 }

 

6)UdpServer.java

 1 package com.example.demo.init;
 2 
 3 import com.example.demo.handle.UdpServerHandler;
 4 import io.netty.bootstrap.Bootstrap;
 5 import io.netty.channel.ChannelOption;
 6 import io.netty.channel.EventLoopGroup;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.nio.NioDatagramChannel;
 9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.springframework.scheduling.annotation.Async;
12 import org.springframework.stereotype.Component;
13 
14 /**
15  * server服务器
16  * Created by wj on 2017/8/30.
17  */
18 @Component
19 public class UdpServer {
20 
21     private static final Logger log= LoggerFactory.getLogger(UdpServer.class);
22 
23 //    private static final int PORT = Integer.parseInt(System.getProperty("port", "7686"));
24 
25     @Async("myTaskAsyncPool")
26     public void run(int udpReceivePort) {
27 
28         EventLoopGroup group = new NioEventLoopGroup();
29         log.info("Server start!  Udp Receive msg Port:" + udpReceivePort );
30 
31         try {
32             Bootstrap b = new Bootstrap();
33             b.group(group)
34                     .channel(NioDatagramChannel.class)
35                     .option(ChannelOption.SO_BROADCAST, true)
36                     .handler(new UdpServerHandler());
37 
38             b.bind(udpReceivePort).sync().channel().closeFuture().await();
39         } catch (InterruptedException e) {
40             e.printStackTrace();
41         } finally {
42             group.shutdownGracefully();
43         }
44     }
45 
46 }

此处NioDatagramChannel.class采用的是非阻塞的模式接受UDP消息,若是接受的UDP消息少,可以采用阻塞式的方式接受UDP消息。

UdpServer.run()方法使用@Async将该方法定义成异步的,myTaskAsyncPool是自定义的线程池。

 

7)UdpServerHandler.java

 1 package com.example.demo.handle;
 2 
 3 import com.example.demo.init.StartupEvent;
 4 import com.example.demo.repository.redis.RedisRepository;
 5 import io.netty.buffer.Unpooled;
 6 import io.netty.channel.ChannelHandlerContext;
 7 import io.netty.channel.SimpleChannelInboundHandler;
 8 import io.netty.channel.socket.DatagramPacket;
 9 import io.netty.util.CharsetUtil;
10 import org.apache.commons.lang3.StringUtils;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 
14 /**
15  * 接受UDP消息,并保存至redis的list链表中
16  * Created by wj on 2017/8/30.
17  *
18  */
19 
20 public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
21 
22     private static final Logger log= LoggerFactory.getLogger(UdpServerHandler.class);
23 
24     //用来计算server接收到多少UDP消息
25     private static int count = 0;
26 
27     @Override
28     public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
29 
30         String receiveMsg = packet.content().toString(CharsetUtil.UTF_8);
31 
32         log.info("Received UDP Msg:" + receiveMsg);
33 
34         //判断接受到的UDP消息是否正确(未实现)
35         if (StringUtils.isNotEmpty(receiveMsg) ){
36             //计算接收到的UDP消息的数量
37             count++;
38 
39             //获取RedirRepository对象
40             RedisRepository redisRepository = (RedisRepository) StartupEvent.getBean(RedisRepository.class);
41             //将获取到的UDP消息保存至redis的list列表中
42             redisRepository.lpush("udp:msg", receiveMsg);
43             redisRepository.setKey("UDPMsgNumber", String.valueOf(count));
44 
45 
46 //            在这里可以返回一个UDP消息给对方,告知已接收到UDP消息,但考虑到这是UDP消息,此处可以注释掉
47             ctx.write(new DatagramPacket(
48                     Unpooled.copiedBuffer("QOTM: " + "Got UDP Message!" , CharsetUtil.UTF_8), packet.sender()));
49 
50         }else{
51             log.error("Received Error UDP Messsage:" + receiveMsg);
52         }
53     }
54 
55     @Override
56     public void channelReadComplete(ChannelHandlerContext ctx) {
57         ctx.flush();
58     }
59 
60     @Override
61     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
62         cause.printStackTrace();
63         // We don‘t close the channel because we can keep serving requests.
64     }
65 
66 }

此处若不借用ApplicationContext.getBean,是无法获取到RedisRepository对象的。

注:这里是无法使用注解@Autowired来获取到redisTemplate对象的。

8) RedisRepository.java

 1 package com.example.demo.repository.redis;
 2 
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.data.redis.core.RedisTemplate;
 7 import org.springframework.stereotype.Service;
 8 
 9 /**
10  * 链接redis
11  * 实现list lpush和rpop
12  * Created by wj on 2017/8/30.
13  */
14 
15 
16 @Service
17 public class RedisRepository {
18     private static final Logger log = LoggerFactory.getLogger(RedisRepository.class);
19 
20     @Autowired
21     private RedisTemplate<String, String> redisTemplate;
22 
23     //----------------String-----------------------
24     public void setKey(String key,String value){
25         redisTemplate.opsForValue().set(key, value);
26     }
27 
28 
29     //----------------list----------------------
30     public Long lpush(String key, String val) throws Exception{
31         log.info("UDP Msg保存至redis中,key:" + key + ",val:" + val);
32         return redisTemplate.opsForList().leftPush(key, val);
33     }
34 
35     public String rpop(String key) throws Exception {
36         return redisTemplate.opsForList().rightPop(key);
37     }
38 
39 }

 使用springframework框架中的RedisTemplate类去链接redis,此处是将收到的UDP消息左保存(lpush)至list链表中,然后右边弹出(rpop)。

 

9)线程池的相关信息

TaskExecutePool.java
 1 package com.example.demo.thread;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.context.annotation.Bean;
 5 import org.springframework.context.annotation.Configuration;
 6 import org.springframework.scheduling.annotation.EnableAsync;
 7 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 8 
 9 import java.util.concurrent.Executor;
10 import java.util.concurrent.ThreadPoolExecutor;
11 
12 /**
13  * Created by wj on 2017/8/29.
14  *
15  * thread线程池的相关信息
16  */
17 @Configuration
18 @EnableAsync
19 public class TaskExecutePool {
20 
21     @Autowired
22     private TaskThreadPoolConfig config;
23 
24     @Bean
25     public Executor myTaskAsyncPool() {
26         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
27         executor.setCorePoolSize(config.getCorePoolSize());
28         executor.setMaxPoolSize(config.getMaxPoolSize());
29         executor.setQueueCapacity(config.getQueueCapacity());
30         executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
31         executor.setThreadNamePrefix("MyExecutor-");
32 
33         // rejection-policy:当pool已经达到max size的时候,如何处理新任务
34         // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
35         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
36         executor.initialize();
37         return executor;
38     }
39 }

TaskThreadPoolConfig.java
 1 package com.example.demo.thread;
 2 
 3 import org.springframework.boot.context.properties.ConfigurationProperties;
 4 import org.springframework.context.annotation.ComponentScan;
 5 import org.springframework.stereotype.Component;
 6 
 7 /**
 8  * Created by wj on 2017/8/29.
 9  */
10 
11 @Component
12 @ComponentScan("com.example.demo.init")
13 @ConfigurationProperties(prefix = "spring.task.pool") // 该注解的locations已经被启用,现在只要是在环境中,都会优先加载
14 public class TaskThreadPoolConfig {
15     private int corePoolSize;
16 
17     private int maxPoolSize;
18 
19     private int keepAliveSeconds;
20 
21     private int queueCapacity;
22 
23     public int getCorePoolSize() {
24         return corePoolSize;
25     }
26 
27     public void setCorePoolSize(int corePoolSize) {
28         this.corePoolSize = corePoolSize;
29     }
30 
31     public int getMaxPoolSize() {
32         return maxPoolSize;
33     }
34 
35     public void setMaxPoolSize(int maxPoolSize) {
36         this.maxPoolSize = maxPoolSize;
37     }
38 
39     public int getKeepAliveSeconds() {
40         return keepAliveSeconds;
41     }
42 
43     public void setKeepAliveSeconds(int keepAliveSeconds) {
44         this.keepAliveSeconds = keepAliveSeconds;
45     }
46 
47     public int getQueueCapacity() {
48         return queueCapacity;
49     }
50 
51     public void setQueueCapacity(int queueCapacity) {
52         this.queueCapacity = queueCapacity;
53     }
54 }

 

10)发送udp消息的测试代码是直接借用官方公布的栗子,上一篇已详细介绍了,在此不再公布

 

11)小结

其实发送UDP和接收UDP消息的核心代码很简单,只是netty框架将其包装了。

UDP发送消息是

1 byte[] buffer = ...
2 InetAddress address = InetAddress.getByName("localhost");
3 
4 DatagramPacket packet = new DatagramPacket(
5     buffer, buffer.length, address, 9999);
6     DatagramSocket datagramSocket = new DatagramSocket();
7     datagramSocket.send(packet);

 

udp接收消息是

1 DatagramSocket datagramSocket = new DatagramSocket(9999);
2 
3 byte[] buffer =....
4 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
5 
6 datagramSocket.receive(packet);

 

看起来是不是很简单???

 

 

12)源代码下载地址

https://github.com/wj302763621/udplearning.git

这里只公布了一个框架,其他很多部分由于涉及到了工作内容不便公布。

有需要的同学可以自行下载对其代码进行更改。

 

netty学习:UDP服务器与Spring整合(2)

标签:dex   dha   img   write   .exe   ota   tag   cal   ide   

原文地址:http://www.cnblogs.com/wj0816/p/7454081.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!