标签:selection nsf mit mapped 转型 时间 多路复用 输出 读写
本来是打算直接学习网络框架Netty的,但是先补充了一下自己对Java 几种IO模型的学习和理解。分别是 BIO、NIO、AIO三种IO模型。
缺点:
如果有很多个Client,则会产生很多个线程。压力主要是在服务器端。客户端的压力并不大。
另外建立连接之后,并不是在时时刻刻的使用。会有空间时间。
会阻塞。
特点:
目前还未得到广泛运用。异步非阻塞。先了解就可以。
package com.dawa.netty.bio;
import com.sun.org.apache.xpath.internal.operations.String;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 功能需求
* 1. 使用BIO模型编写一个服务器,监听6666端口,当有客户连接的时候,就启动一个客户端线程与之连接
* 2. 要求使用县城连接机制,可以连接过个客户端
* 3. 服务器端可以接受客户端发送的数据(TeInet方法即可)
*/
public class TestBIO {
public static void main(String[] args) throws Exception {
//1. 创建一个线程池. 这里 借助 Executors 这个工具类
ExecutorService pool = Executors.newCachedThreadPool();
//2. 建立一个监听服务,用来监听客户端连接
ServerSocket serverSocket = new ServerSocket(6666);
while (true) {
final Socket socket = serverSocket.accept();
System.out.println("一个客户端连接");
//就创建一个线程与之通信
pool.execute(new Runnable() {
public void run() {
//编写一个处理方法.
handler(socket);
}
});
}
}
public static void handler(Socket socket) {
byte[] bytes = new byte[1024];
try (InputStream inputStream = socket.getInputStream()) {
while (true) {
int read = inputStream.read(bytes);
if (read != -1) {
//注意这里,不能用String转换了.因为String已经不支持有参数的构造方法.
System.out.println(Arrays.toString(bytes));
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}finally {
System.out.println("关闭连接");
}
}
}
这没有BooleanBuffer,另外StringBuffer继承自StringBuilder.
一个简单的Buffer子类的使用案例如下
package com.dawa.netty.bio;
import java.nio.IntBuffer;
public class TestNIO {
public static void main(String[] args) {
IntBuffer intBuffer = IntBuffer.allocate(5);
for (int i = 0; i < intBuffer.capacity(); i++) {
System.out.println(intBuffer.put(i*5));
}
intBuffer.flip();
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}
}
Selector、Channel和Buffer的关系图
- 每个Channel都会对应一个Buffer。
- Selector对应一个线程,一个线程对应多个Channel(连接)。
- 该图反应了有三个Channel注册到该Selector
- 程序切换到哪个Channel是由事件决定的。Event是一个重要概念。
- Select会根据不同的事件,在各个通道上切换。
- Buffer就是一个内存块、底层是由一个数组
- 数据的读取写入是通过Buffer,这个和BIO、BIO中要么是输入流、或者是输出流,不能双向,但是NIO的Buffer是可以读也可以写,需要flip()方法来进行切换。
- Channel也是双向的,可以返回底层操作系统的情况,比如Linux,底层的操作系统通道就是双向的。
容器对象(函数组),如何理解?从源码中可以看到。Int,Float等,每一个子类Buffer对象,都是[]数组。
private int mark = -1; private int position = 0; private int limit; private int capacity;
缓冲区的容量是它所包含的元素数量。 缓冲区的容量从不为负,从来没有改变。
缓冲区的限制是不应读取或写入的第一个元素的索引。 缓冲区的限制是从不为负,并且永远不会比它更大的容量。
缓冲区的位置要被读出或写入的下一个元素的索引。 缓冲区的位置永远不会为负,并且永远不会比它的极限。
// Invariants: mark <= position <= limit <= capacity
如,通过设置position的值,来读取指定位置的值。也可以修改limit的值等。
ByteBuffer,是最常用的。二进制数据。
实例代码如下:
package com.dawa.netty.nio;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class TestNIOFileChannel01 {
public static void main(String[] args) throws Exception{
//准备字符串
String string = "dawa,大娃,Bigbaby";
//准备输出流.指定输出的文件地址
FileOutputStream fileOutputStream = new FileOutputStream("dawa.txt");
//准备Channel管道. 对输出流进行封装,封装为一个channel管道.
FileChannel fileChannel = fileOutputStream.getChannel();
//准备一个byte数组, 也就是一个 Buffer数组,来缓存数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取数据
byteBuffer.put(string.getBytes());
//这里第一次没有反转,文件里面乱码
byteBuffer.flip();
//完成写的操作
fileChannel.write(byteBuffer);
//关闭流
fileOutputStream.close();
}
}
注意:
代码案例如下
package com.dawa.netty.nio;
import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
//本地 读文件
public class TestNIOFileChannel02 {
public static void main(String[] args) throws Exception {
//读到文件
File file = new File("dawa.txt");
FileInputStream fileInputStream = new FileInputStream(file);
//fileInputStream 包装为 Channel
FileChannel fileChannel = fileInputStream.getChannel();
//借助Buffer byte[]缓冲数
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
//将Channel的数据读入到byteBuffer
fileChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
}
}
类似于拷贝的操作,使用文件Channel+Buffer完成
代码案例如下
package com.dawa.netty.nio;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
//使用一个Channel完成文件的读写
public class TestNIOFileChannel03 {
public static void main(String[] args) throws Exception{
FileInputStream fileInputStream = new FileInputStream("dawa.txt");
FileChannel channel01 = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("erwa.txt");
FileChannel channel02 = fileOutputStream.getChannel();
//Buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
while (true) {
//这里注意使用Clear操作,不然会进入死循环
/**
* public Buffer clear() {
* position = 0;
* limit = capacity;
* mark = -1;
* return this;
* }
*/
byteBuffer.clear();
int read = channel01.read(byteBuffer);
if (read == -1) {
break;
}
//反转,切换流
byteBuffer.flip();
channel02.write(byteBuffer);
}
fileInputStream.close();
fileOutputStream.close();
}
}
这里需要注意的是使用clear操作,重置缓冲区基本参数
public Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
transferFrom方法
public long transferFrom(ReadableByteChannel src,
long position, long count)
throws IOException
{
ensureOpen();
if (!src.isOpen())
throw new ClosedChannelException();
if (!writable)
throw new NonWritableChannelException();
if ((position < 0) || (count < 0))
throw new IllegalArgumentException();
if (position > size())
return 0;
if (src instanceof FileChannelImpl)
return transferFromFileChannel((FileChannelImpl)src,
position, count);
return transferFromArbitraryChannel(src, position, count);
}
案例如下
package com.dawa.netty.nio;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
//使用一个Channel完成文件的读写
public class TestNIOFileChannel03 {
public static void main(String[] args) throws Exception{
FileInputStream fileInputStream = new FileInputStream("dawa.txt");
FileOutputStream fileOutputStream = new FileOutputStream("erwa.txt");
FileChannel sourceCH = fileInputStream.getChannel();
FileChannel destCH = fileOutputStream.getChannel();
//直接通过通道,完成拷贝
destCH.transferFrom(sourceCH, 0, sourceCH.size());
fileInputStream.close();
fileOutputStream.close();
}
}
存取类型需要保持一致(存取顺序一致)
Buffer可以转为只读Buffer
byteBuffer.asReadOnlyBuffer();
![image-20200325061059067](https://tva1.sinaimg.cn/large/00831rSTly1gd5rck2zicj31ja0tyh7e.jpg)
3. **MappedBuffer 可以让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次**
![image-20200325061421909](https://tva1.sinaimg.cn/large/00831rSTly1gd5rge5zuaj30oi0b4tad.jpg)
MappedByteBuffer是抽象类,实际能够操作的类型是 DirectByteBuffer
> 代码案例如下:
```java
package com.dawa.netty.nio;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
//使用 MappedBuffer 直接完成文件在内存中的数据修改
public class MappedBuffer01 {
public static void main(String[] args) throws Exception {
//获取一个读取文件流
RandomAccessFile randomAccessFile = new RandomAccessFile("dawa.txt","rw");
//获取指定的Channel
FileChannel channel = randomAccessFile.getChannel();
//读取模式. 0 代表从0开始, 5代表读取5个字节,也同时意味着只能在内存中操作这5个字节
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
//对指定位置进行操作
mappedByteBuffer.put(0, (byte) ‘A‘);
mappedByteBuffer.put(2, (byte) 9);
randomAccessFile.close();
channel.close();
}
}
Scattering&Gathering的使用
Scattering:将数据写入到Buffer时,可以采用Buffer数组,依次写入【分散】
Gathering:从Buffer读取数据时,可以采用Buffer数组,依次读【聚合】
解决的问题:当一个数组不够用的时候,可以用数组组,来完成类似的操作
代码案例如下:使用 数组,来完成客户端 - 服务器端 读取操作
package com.dawa.netty.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
public class ScatteringGatheringGetPut {
public static void main(String[] args) throws Exception {
//创建服务器端的
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//监听端口号
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 7000);
//绑定端口号到服务器端的Channel
serverSocketChannel.socket().bind(inetSocketAddress);
//创建Buffer数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
// 等待连接,获取连接, 并生成客户端的 Channel
SocketChannel socketChannel = serverSocketChannel.accept();
//假设从 客户端读取 8个字节
int messageLength = 8;
while (true) {
//1. 将客户端的数据, 读取
int byteRead = 0;
while (byteRead < messageLength) {
long l = socketChannel.read(byteBuffers);
System.out.println("byteRead = " + byteRead);
byteRead += 1;//累积读取的字节数
//使用流打印,看看当前Buffer里面的position和limit
Arrays.asList(byteBuffers).stream()
.map(byteBuffer -> "position=" + byteBuffer.position() + ", limit=" + byteBuffer.limit()).forEach(System.out::println);
}
//将所有的 Buffer反转
Arrays.asList(byteBuffers).forEach(ByteBuffer::flip);
//2. 将读取到的数据,写回客户端
int byteWrite = 0;
while (byteWrite < messageLength) {
socketChannel.write(byteBuffers);
byteWrite += 1;
}
//将所有的Buffer进行Clear操作
Arrays.asList(byteBuffers).forEach(ByteBuffer::clear);
//读完之后,打印出来看看读写文件的长度
System.out.println("byteRead = " + byteRead + ", byteWrite = " + byteWrite + ", messageLength" + messageLength);
}
}
}
示意图:
PS:一个线程,对应一个Selector,每个Selector通过调用select()方法,获取不同的能够代表Channel的SelectionKey,得到一个能够被选择的Channel集合。
NIO非阻塞网络相关的(Selector、SelectionKey、ServerSocketChannel和SocketChannel)关系图梳理。
对上图的说明
当客户端生成时,会通过ServerSocketChannel得到SocketChannel。
Selector开始监听...Selector进行监听select方法,返回有事件发生的通道的个数。
将SocketChannel注册到Selector上.register(Selector sel,int ops).一个Selector上可以注册多个SocketChannel。
SocketChannel的父类里面有注册方法
SelectableChannel里面还有一个注册方法,这个用的比较多
注册后,返回一个SelectionKey,会和该Selector关联(集合)
进一步得到各个SelectionKey(有事件发生)
再通过SelectionKey 反向获取SocketChannel。
SelectionKey类中的channel()方法
通过得到的Channel,完成业务处理
创建服务器端。
1. 创建ServerSocketChannel ,并设置非阻塞
2. 得到一个Selector对象
3. 绑定一个端口6666.在服务器端监听
4. 把servrSocketChannel 注册到 selector 关心事件为 SelectionKey.OP_ACCEPT
5. 循环等待客户端连接
//这里我们等待一秒,如果没有事件发生,返回
1. if(selector.selecct(1000)==0){//没有事件发生
sout("服务器等待了一秒");
continue;
}
//如果返回的值>0,就获取到相关的selectionKey集合
// 1. 表示已经获取到关注的事件。
// 2. 通过selectionKeys()返回关注的集合。
// 3. 通过selectionKeys
seletor.selectedKeys().var;
//遍历得到的selectionKeys.
//1. 获取SelectionKey
//2. 根据key 对应的通道发生的事件做处理
//3. 如果是 OP_ACCEPT,有新的客户端连接
//1. 给该客户端生成一个SocketChannel
//2. 将SocketChannel 注册到select,关注事件为OP_READ,并关联一个Buffer
//4. 如果是 OP_READ,读取数据
//1. 通过key,反向获取对应的channel
//2. 获取到该channel关联的buffer
//5. 手动从集合中移动单签的SelectionKey,防止重复操作。
创建客户端。
1. 得到一个网络通道SocketChannel.并设置非阻塞
2. 提供服务区的IP和端口,连接服务器
服务器端代码
package com.dawa.netty.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
// NIO 服务端
public class TestNIOServer {
public static void main(String[] args) throws Exception {
// 1. 创建ServerSocketChannel ,并设置非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2. 得到一个Selector对象
Selector selector = Selector.open();
// 3. 绑定一个端口6666.在服务器端监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
// 4. 把serverSocketChannel 注册到 selector 关心事件为 SelectionKey.OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 循环,等待客户端连接
while (true) {
if (selector.select(1000) == 0) {// 没有事件发生
System.out.println("服务器端等待1秒,没有客户端连接");
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();//得到所有被选中的Key
//循环遍历每一个key,每一个key代表一个事件
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//根据key对应的事件,做响应的处理
if (selectionKey.isAcceptable()) {//如果是 Accept事件, 连接事件,则生成对应的客户端Channel
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
//将SocketChannel 注册到select,关注事件为OP_READ,并关联一个Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (selectionKey.isReadable()) {//如果是读事件
//1. 通过key,反向生成Channel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//设置非同步NIO
socketChannel.configureBlocking(false);
//2. 获取到该channel关联的buffer
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
socketChannel.read(buffer);
//打印出获取到的Buffer
System.out.println("from 客户端:" + new String(buffer.array()));
}
//这里一定要记得把处理过的key给移除掉,自己遇到了死循环.
iterator.remove();
}
}
}
}
客户端代码
package com.dawa.netty.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
//NIO 客户端
public class TestNIOClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
//服务器端的IP和端口
InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 6666);
if (!socketChannel.connect(socketAddress)) {
System.out.println("连接失败,但是可以干其他事情,非阻塞");
while (!socketChannel.finishConnect()) {
System.out.println("在连接完成之前,我一直干其他的事情");
}
}
String string = "hello,dawa";
socketChannel.write(ByteBuffer.wrap(string.getBytes()));
System.in.read();
}
}
Selector.keys() 是 列出所有的key。
Selector.selectedKeys()是列出所有被选中的key。
这两个是不一样的。
功能示意图
- 先写服务器端
- 服务器端启动并监听6667
- 服务器端接收客户端消息,并实现转发[处理上线和离线]
- 编写客户端
- 连接服务器
- 发送消息
- 接受服务器消息
里面读数据的方法:
try catch完成离线处理
里面转发给其他客户端的方法
构造器初始化
向服务器发消息
读取从服务器端回复的消息
启动客户端的方法
启动服务器端的方法
客户端代码
package com.dawa.netty.nio.group;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
//客户端
public class GroupCharClient {
private SocketChannel socketChannel;
private static final int PORT = 6667;
private static final String HOST = "127.0.0.1";
private Selector selector;
private String userName;
public GroupCharClient() throws IOException {
selector = Selector.open();
//连接服务器
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
userName = socketChannel.getRemoteAddress().toString().substring(1);
}
//发送消息
public void sendMessage(String message){
message = userName + "说" + message;
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
try {
socketChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
//读取服务器端发来的消息
public void readMessage() {
try {
int readChannels = selector.select();
if (readChannels > 0) {//有可用的通道
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer);
String msg = new String(buffer.array());
System.out.println(msg.trim());
}
}
} else {
//没有可用的通道
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
GroupCharClient groupCharClient = new GroupCharClient();
new Thread(() -> {
while (true) {
groupCharClient.readMessage();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//发送数据给服务器端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String message = scanner.nextLine();
groupCharClient.sendMessage(message);
}
}
}
服务器端代码
package com.dawa.netty.nio.group;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* 服务器端代码
*/
public class GroupChatServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private static final int PORT = 6666;
public GroupChatServer() {
try {
//得到选择器
selector = Selector.open();
//绑定端口
serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(PORT));
} catch (IOException e) {
e.printStackTrace();
}
}
//服务器端监听的方法
public void listen() {
try {
//循环监听
while (true) {
int count = selector.select();
if (count > 0) {
Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
while (selectionKeyIterator.hasNext()) {
//取出Key
SelectionKey key = selectionKeyIterator.next();
//判断事件
if (key.isAcceptable()) {//监听访问
//key 转Channel
SocketChannel channel = serverSocketChannel.accept();
SocketAddress remoteAddress = channel.getRemoteAddress();
System.out.println(remoteAddress + ":上线了");
}
if (key.isReadable()) {//读取事件
//处理读
readData(key);
}
//移除已经处理的key
selectionKeyIterator.remove();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
//读数据
public void readData(SelectionKey key) {
SocketChannel channel = null;
try {
//根据key,取得Channel
channel = (SocketChannel) key.channel();
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = channel.read(buffer);
if (read > 0) {
String message = new String(buffer.array());
System.out.println("from: 客户端" + message);
// 向其他用户,转发消息
sendMessageToOtherCLient(message,key);
}
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + " :离线了");
key.cancel();
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
//向其他用户转发消息
public void sendMessageToOtherCLient(String message,SelectionKey self){
System.out.println("服务器转发消息ing");
selector.keys().forEach(key -> {
//根据Key,取出对应的SocketChannel.或者是ServerSocketChannel
Channel targetChannel = key.channel();
//排除自己
if (targetChannel instanceof SocketChannel && targetChannel != self) {
//转型
SocketChannel dest = (SocketChannel) targetChannel;
//Buffer
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
try {
dest.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) {
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listen();
}
}
什么是零拷贝?
零拷贝,指的是没有CPU拷贝,而不是不拷贝。从操作系统角度看的。
传统IO模型图,状态切换:用户态和内核态的切换:4次拷贝,3次切换
Mmap优化:3次拷贝,3次切换
DMA拷贝:direct memory accect:直接内存访问
sendFile优化:三次拷贝,两次切换
零拷贝,指的是没有CPU拷贝,而不是不拷贝。从操作系统角度看的。
这里其实还有一次CPU拷贝的:kernel buffer->socket buffer但是,拷贝的信息很少,比如length,offset,消耗低,可以忽略
零拷贝:2次拷贝,2次切换。
零拷贝是我们在进行网络传输的重要优化手段。
传统IO流案例
传统IO耗费时间:60毫秒
零拷贝案例。(NIO)
transferTo 底层用的就是零拷贝
NIO零拷贝耗时时间:20毫秒
这里暂时不深入扩展。
标签:selection nsf mit mapped 转型 时间 多路复用 输出 读写
原文地址:https://www.cnblogs.com/bigbaby/p/12596350.html