标签:
socket是一种抽象层,应用程序通过它来发送和接收数据。不同的socket与不同的协议栈相关联,常用的端对端协议有tcp和udp。
一个客户端要与服务器进行通信,必须要先知道服务器的地址,地址可以是ip数字和域名,其实域名最终还是解析成ip数字进行通信的。
java为tcp协议提供了2个类,Socket和ServerSocket,一个代表客户端,一个代表服务端。
public class TcpEchoClient { private static final String serverIP = "127.0.0.1"; private static final int port = 8888; public static void main(String[] args) { byte[] writeData = "hi".getBytes(); byte[] revdData = new byte[writeData.length]; Socket socket = null; try { // 创建一个Socket实例 socket = new Socket(serverIP, port); // 进行通信 InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); out.write(writeData); socket.shutdownOutput(); System.out.println("client send " + new String(writeData)); in.read(revdData); System.out.println("client recived " + new String(revdData)); } catch (IOException e) { e.printStackTrace(); } finally { try { // 关闭连接 socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } public class TcpEchoServer { private static final int port = 8888; public static void main(String[] args) throws InterruptedException, IOException { ServerSocket server = new ServerSocket(port); while (true) { // 阻塞直到有客户端进行连接 Socket socket = server.accept(); InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); byte[] data = new byte[1024]; int len = in.read(data); out.write(data, 0, len); System.out.println("Server recived " + new String(data, 0, len)); socket.close(); } } }
TCP/IP协议以字节的方式传输用户数据,大部分应用程序通过字段序列组成的离散信息来定义协议,其中某个字段中包含了一段以位序列编码的特定信息。协议中定义发送者以怎样的排列和解释这些位序列,接收者怎么解析这些序列。
TCP/IP传输的信息必须在块(chunk)中发售和接收,而快的长度必须是8位的倍数,如果把传输的信息看成数字序列活数组(byte[]),每个数字的范围是0~255。因此我们必须对更大的基本整形进行编码。、
java程序中,int数据类型由32位表示,因此可以用4个字节来传输任意的int型变量,short数据类型16位2个字节,64位long8个字节。 java中一般用DataOutputStream和ByteArrayOutputStream按照big-endian顺序将整数以适当大些的二进制补码的形式写入到流中。
ASCII编码字符集将英文字母、数字、标点符号映射成0~127的整数,java中使用Unicode国际编码字符集来表示char型和String型值,Unicode字符集将世界上大部分的语言和符号映射到0~65535之间。对于每个整数值都比255小的一小组字符,可以每个字符进行一个单独的字节进行编码,但对于大整数的字节就存在多种方式在线路上进行编码,因此发送者和接收者需要对这些整数如何表示成字节序列统一方案即编码方案,常用的编码方案有ASCII ISO-8859-1 GBK2312 UTF-8。
位图是对布尔值进行编码的一种非常进场的方式,通常在协议中0表示false,1表示false。
将数据转换成线路上传输的格式只完成了一半的工作,在接收端还必须将接收到的字节序列还原成原始信息。接收端通过成帧来定位信息的首尾位置。 主要有两个技术使接收者能够准确地找到信息的结束位置。
public interface Framer { void frameMsg(byte[] message, OutputStream out) throws IOException; byte[] nextMsg() throws IOException; } public class DelimFramer implements Framer { private InputStream in; private static final byte DELIMITER = ‘\n‘; public DelimFramer(InputStream in) { this.in = in; } @Override public void frameMsg(byte[] message, OutputStream out) throws IOException { checkMessage(message); out.write(message); out.write(DELIMITER); out.flush(); } private void checkMessage(byte[] message) throws IOException { for (byte b : message) { if (b == DELIMITER) { throw new IOException("message contains delimiter"); } } } @Override public byte[] nextMsg() throws IOException { ByteArrayOutputStream messageBuffer = new ByteArrayOutputStream(); int nextByte; while ((nextByte = in.read()) != DELIMITER) { if (nextByte == -1) { // 数据结尾 if (messageBuffer.size() == 0) { return null;// 没有数据 } else { // 有数据但结尾不是定界符 throw new IOException("non-empty message without delimiter"); } } messageBuffer.write(nextByte); } return messageBuffer.toByteArray(); } } public class LengthFramer implements Framer { private static final int MAX_MESSAGE_LEN = 65535; private static final int BYTEMASK = 0xff; //private static final int SHORTMASK = 0xffff; public static final int BYTESHIFT = 8; private DataInputStream in; public LengthFramer(InputStream in) { this.in = new DataInputStream(in); } @Override public void frameMsg(byte[] message, OutputStream out) throws IOException { checkMessage(message); //发送长度 out.write((message.length >> BYTESHIFT) & BYTEMASK); out.write(message.length & BYTEMASK); //发送消息 out.write(message); out.flush(); } private void checkMessage(byte[] message) throws IOException { if (message.length > MAX_MESSAGE_LEN) { throw new IOException("message too long"); } } @Override public byte[] nextMsg() throws IOException { int length = 0; try { // 读取长度 length = in.readUnsignedShort(); } catch (Exception e) { return null; } // 读取消息 byte[] message = new byte[length]; in.readFully(message); return message; } }
测试
public class FramerClientTest { public static void main(String[] args) throws UnknownHostException, IOException { Socket socket = new Socket("127.0.0.1", 8888); InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); String send = "HelloWorld"; // Framer famer = new DelimFramer(in); Framer famer = new LengthFramer(in); System.out.println("client send " + send); famer.frameMsg(send.getBytes(), out); String revice = new String(famer.nextMsg()); System.out.println("client reviced " + revice); socket.close(); //socket. } } public class FramerServerTest { public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(8888); while (true) { Socket socket = server.accept(); InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); //Framer framer = new DelimFramer(in); Framer framer = new LengthFramer(in); String recive = new String(framer.nextMsg()); System.out.println("server recived " + recive); framer.frameMsg(recive.getBytes(), out); System.out.println("server send " + recive); socket.close(); } } }
在实际业务中,服务器都需要处理多个客户端的请求,前面的例子都属于迭代服务器,即处理完一个客户端的请求后才去处理下一个客户端的请求。但当一个请求产生阻塞时后面所有的请求都将阻塞,这在业务中肯定是行不通的,我们可以使用多线程技术来解决这个问题。为每个客户端分配一个执行线程
public class EchoProtocal implements Runnable{ private Socket socket; public EchoProtocal(Socket socket) { this.socket = socket; } public void handle() { try { InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); byte[] data = new byte[1024]; int len = in.read(data); out.write(data, 0, len); System.out.println(Thread.currentThread().getName() + "server reviced " + new String(data, 0, len)); } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } @Override public void run() { handle(); } } public class EchoProtocalTestServer { public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(8888); /*while (true) { EchoProtocal ep = new EchoProtocal(server.accept()); new Thread(ep).start(); }*/ Executor service = Executors.newCachedThreadPool(); while (true) { EchoProtocal ep = new EchoProtocal(server.accept()); service.execute(ep); } } }
注 UDP无连接状态,发送信息是单向的,不保证信息是否成功到达,优点是性能强。
创建、维护和切换线程需要系统的开销,一客户端一线程的方式在系统扩展性方面受到了限制。虽然使用线程池可以节省系统开销,但线程池的大小仍然限制了系统可以同时处理的客户端数量。因为大部分时间客户端是处于闲置状态的,所以单纯的增加线程池的大小,将带来更多的线程处理开销,而不能提升系统的性能。NIO没有采用一连接一线程的方式,而是通过独立线程轮询监听,对当前需要服务(连接,读或写)的select经行异步处理,并将最耗时间的操作转移到操作系统中,从而大大提高的速度。
Selector类可避免非阻塞式客户端很浪费资源的忙等待方法,一个Selector实例可以处理多个信道上的IO操作。 要使用选择器,需要创建一个Selector实例,并注册到想要监控的信道上,调用select方法将会阻塞等待,直到产生io操作。最后迭代处理每个SelectionKey 完成相应的操作。
缓冲区用4个索引来维护内部状态,从而完成读写的操作。
flip 要将数据写到输出通道中。在这之前必须调用 flip() 方法。这个方法做两件非常重要的事:
clear 最后一步是调用缓冲区的 clear() 方法。这个方法重设缓冲区以便接收更多的字节。 Clear 做两种非常重要的事情:
public class TcpEchoClientNoblocking { private static final String ADDRESS = "127.0.0.1"; private static final int PORT = 8888; public static void main(String[] args) throws IOException { SocketChannel clntChan = SocketChannel.open(); clntChan.configureBlocking(false);// 设置非阻塞 InetSocketAddress isa = new InetSocketAddress(ADDRESS, PORT); if (!clntChan.connect(isa)) { while (!clntChan.finishConnect()) { System.out.println("连接中。。"); } } byte[] sendBytes = "HelloWorld".getBytes(); ByteBuffer writeBuf = ByteBuffer.wrap(sendBytes); clntChan.write(writeBuf); System.out.println("client send " + new String(sendBytes)); ByteBuffer readBuf = ByteBuffer.allocate(sendBytes.length); int totalBytesRcvd = 0; int bytesRcvd; while (totalBytesRcvd < sendBytes.length) { bytesRcvd = clntChan.read(readBuf); if (bytesRcvd != -1) { totalBytesRcvd += bytesRcvd; } } System.out.println("client recived " + new String(readBuf.array(), 0, totalBytesRcvd)); } } public class TCPEchoServerNoBlocking { private static final int PORT = 8888; public void start() throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverChan = ServerSocketChannel.open(); serverChan.socket().bind(new InetSocketAddress(PORT)); serverChan.configureBlocking(false); serverChan.register(selector, SelectionKey.OP_ACCEPT); while (true) { if (selector.select(3000) == 0) { // 无连接 System.out.println("等待连接..."); continue; } Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } finally { keyIter.remove(); } } } } private void handleInput(SelectionKey key) throws IOException { if (key.isAcceptable()) { // 可连接 SocketChannel clntChanAccept = ((ServerSocketChannel) key.channel()) .accept(); clntChanAccept.configureBlocking(false); clntChanAccept.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } if (key.isReadable()) { // 可读 SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer bufRead = (ByteBuffer) key.attachment(); int readBytes = sc.read(bufRead); if (readBytes > 0) { bufRead.flip(); byte[] bytes = new byte[bufRead.remaining()]; bufRead.get(bytes); String body = new String(bytes); System.out.println("server received : " + body); //echo doWrite(sc, body); } else if (readBytes < 0) { // 读完 key.cancel(); sc.close(); } else { // 读到0字节 } } if (key.isValid() && key.isWritable()) { // 可写 ByteBuffer bufWrite = (ByteBuffer) key.attachment(); bufWrite.flip(); SocketChannel clntChanWrite = (SocketChannel) key.channel(); clntChanWrite.write(bufWrite); // clntChanWrite.close(); // 是否还有剩余字节 if (!bufWrite.hasRemaining()) { // 无数据,使key只能进行读操作 key.interestOps(SelectionKey.OP_READ); } // 压缩缓冲区 执行完该操作缓冲区变为可读 bufWrite.compact(); } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } public static void main(String[] args) throws Exception { new TCPEchoServerNoBlocking().start(); } }
标签:
原文地址:http://www.cnblogs.com/huifukejian/p/4781413.html