传统的socket IO中,需要为每个连接创建一个线程,当并发的连接数量非常巨大时,线程所占用的栈内存和CPU线程切换的开销将非常巨大。使用NIO,不再需要为每个线程创建单独的线程,可以用一个含有限数量线程的线程池,甚至一个线程来为任意数量的连接服务。由于线程数量小于连接数量,所以每个线程进行IO操作时就不能阻塞,如果阻塞的话,有些连接就得不到处理,NIO提供了这种非阻塞的能力。 小量的线程如何同时为大量连接服务呢,答案就是就绪选择。这就好比到餐厅吃饭,每来一桌客人,都有一个服务员专门为你服务,从你到餐厅到结帐走人,这样方式的好处是服务质量好,一对一的服务,VIP啊,可是缺点也很明显,成本高,如果餐厅生意好,同时来100桌客人,就需要100个服务员,那老板发工资的时候得心痛死了,这就是传统的一个连接一个线程的方式。 老板是什么人啊,精着呢。这老板就得捉摸怎么能用10个服务员同时为100桌客人服务呢,老板就发现,服务员在为客人服务的过程中并不是一直都忙着,客人点完菜,上完菜,吃着的这段时间,服务员就闲下来了,可是这个服务员还是被这桌客人占用着,不能为别的客人服务,用华为领导的话说,就是工作不饱满。那怎么把这段闲着的时间利用起来呢。这餐厅老板就想了一个办法,让一个服务员(前台)专门负责收集客人的需求,登记下来,比如有客人进来了、客人点菜了,客人要结帐了,都先记录下来按顺序排好。每个服务员到这里领一个需求,比如点菜,就拿着菜单帮客人点菜去了。点好菜以后,服务员马上回来,领取下一个需求,继续为别人客人服务去了。这种方式服务质量就不如一对一的服务了,当客人数据很多的时候可能需要等待。但好处也很明显,由于在客人正吃饭着的时候服务员不用闲着了,服务员这个时间内可以为其他客人服务了,原来10个服务员最多同时为10桌客人服务,现在可能为50桌,60客人服务了。 这种服务方式跟传统的区别有两个: 1、增加了一个角色,要有一个专门负责收集客人需求的人。NIO里对应的就是Selector。 2、由阻塞服务方式改为非阻塞服务了,客人吃着的时候服务员不用一直侯在客人旁边了。传统的IO操作,比如read(),当没有数据可读的时候,线程一直阻塞被占用,直到数据到来。NIO中没有数据可读时,read()会立即返回0,线程不会阻塞。 NIO中,客户端创建一个连接后,先要将连接注册到Selector,相当于客人进入餐厅后,告诉前台你要用餐,前台会告诉你你的桌号是几号,然后你就可能到那张桌子坐下了,SelectionKey就是桌号。当某一桌需要服务时,前台就记录哪一桌需要什么服务,比如1号桌要点菜,2号桌要结帐,服务员从前台取一条记录,根据记录提供服务,完了再来取下一条。这样服务的时间就被最有效的利用起来了。 内容来自 :http://blog.csdn.net/zhouhl_cn/article/details/6568119
简介
Java世界中的两类IO:IO(性能瓶颈)和NIO以及jdk1.7中要加入的增强版NIO
•IO:面向流的方式处理数据(单个的字节,字符的移动,流的一次操作一次只能产生或者消费一个字节或者字符即使有缓冲,也需要程序员自己填充和提取缓冲区内容)
•NIO:面向块的方式处理数据(数据块的移动,一次操作产生或者消费一个数据块,将最耗时的 I/O 操作–填充和提取缓冲区内容操作转移回操作系统)
NIO的特点:
NIO包引入了四个关键的抽象数据类型
1) Buffer:它是包含数据且用于读写的线形表结构(字节数组)。其中还提供了一个特殊类用于内存映射文件的I/O操作。一个 Buffer 实质上是一个容器对象。发送给一个通道的所有对象都必须首先放到缓冲区中;同样地,从通道中读取的任何数据都要读到缓冲区中。
2) Charset:它提供Unicode字符串影射到字节序列以及逆影射的操作(编码及解码器)。
3) Channels:通道是对原 I/O 包中的流的模拟。到任何目的地(或来自任何地方)的所有数据都必须通过一个 Channel 对象。包含socket,file和pipe三种管道,它实际上是双向交流的通道(与流的不同之处在)。
4) Selector:它将多元异步I/O操作集中到一个或多个线程中
Buffer类类图
NIO中BUFFER的属性
•所有的缓冲区都具有四个属性来提供关于其所包含的数据元素的信息。
•容量(Capacity) 缓冲区能够容纳的数据元素的最大数量。这一容量在缓冲区创建时被设定,并且永远不能被改变。
• 上界(Limit) 缓冲区的第一个不能被读或写的元素。或者说,缓冲区中现存元素的计数。
•位置(Position) 下一个要被读或写的元素的索引。位置会自动由相应的 get( )和 put( )方法更新。
•标记(Mark) 一个备忘位置。调用 mark( )来设定 mark = postion。调用 reset( )设定 position = mark。标记在设定前是未定义的(undefined)。
•这四个属性之间总是遵循以下关系: 0 <= mark <= position <= limit <= capacity 让我们来看看这些属性在实际应用中的一些例子。
•新创建的容量为 10的 ByteBuffer 逻辑视图
位置被设为 0,而且容量和上界被设为 10,刚好经过缓冲区能够容纳的最后一个字节。标记最初未定义。容量是固定的,但另外的三个属性可以在使用缓冲区时改变
- public abstract class Buffer { //没有get(),put()方法,但是子类中有
- public final int capacity()
- public final int position()
- public final Buffer position(int newPositio) //定位位置
- public final int limit()
- public final Buffer limit (int newLimit)//定位上界
- public final Buffer mark() //将标记设为当前位置的值
- public final Buffer reset() //将位置设为当前的标记值,如果标记值未定义将抛出异常
- public final Buffer clear() //重置方法,将缓冲区置为填充状态即将position设置为 0。将 limit 设置为与 capacity 相同。
- public final Buffer flip() //翻转方法,将缓冲区填充状态翻转成释放状态即 将 limit 设置为当前 position。将 position 设置为 0。实现细:buffer.limit(buffer.position()).position(0);
- public final Buffer rewind()//不影响上界属性。只是将位置值设回 0
- public final int remaining() //从当前位置到上界还剩余的元素数目
- public final boolean hasRemaining()//是否已经达到缓冲区的上界
- public abstract boolean isReadOnly(); //判断缓冲区是否仅可读,修改只读缓冲区将会抛出异常
- }
public abstract class Buffer {//没有get(),put()方法,但是子类中有 public final int capacity() public final int position() public final Buffer position(int newPositio) //定位位置 public final int limit() public final Buffer limit (int newLimit)//定位上界 public final Buffer mark() //将标记设为当前位置的值 public final Buffer reset() //将位置设为当前的标记值,如果标记值未定义将抛出异常 public final Buffer clear() //重置方法,将缓冲区置为填充状态即将position设置为 0。将 limit 设置为与 capacity 相同。 public final Buffer flip() //翻转方法,将缓冲区填充状态翻转成释放状态即 将 limit 设置为当前 position。将 position 设置为 0。实现细:buffer.limit(buffer.position()).position(0); public final Buffer rewind()//不影响上界属性。只是将位置值设回 0 public final int remaining() //从当前位置到上界还剩余的元素数目 public final boolean hasRemaining()//是否已经达到缓冲区的上界 public abstract boolean isReadOnly(); //判断缓冲区是否仅可读,修改只读缓冲区将会抛出异常}
应用实例——复制文件的操作:
•读取文件操作:(1) 从 FileInputStream 获取Channel, (2) 创建Buffer, (3) 将数据从 Channel 读到 Buffer中.
•写入文件操作:(1)从 FileOutputStream 获取Channel, (2)创建Buffer, (3)将数据从 Channel 写到 Buffer中.
•重设缓冲区操作:在从输入通道读入缓冲区之前,我们调用 clear() 方法。同样,在将缓冲区写入输出通道之前,我们调用 flip() 方法
•检查状态操作: read() 方法返回 -1 是判断文件读取完成的标志
•注意:不需要告诉通道要写入多数据。缓冲区的内部统计机制会跟踪它包含多少数据以及还有多少数据要写入
NIO的三大特性:
1、分散与聚集读取 2、文件锁定功能 3、网络异步IO
1.分散与聚集读取
要了解分散与聚集的读取,我们首先应该先了解一下磁盘的I/O,那么磁盘的I/O是以一种什么方式输入输出的呢?
当进程请求 I/O 操作的时候,它执行一个系统调用,将控制权移交给内核。底层函数 open( )、read( )、write( )和 close( )要做的无非就是建立和执行适当的系统调用。当内核以这种方式被调用,它随即采取任何必要步骤,找到进程所需数据,并把数据传送到用户空间内的指定缓冲区。内核试图对数据进行高速缓存或预读取,因此进程所需数据可能已经在内核空间里了。如果是这样,该数据只需简单地拷贝出来即可。如果数据不在内核空间,则进程被挂起,内核着手把数据读进 内存。
那么为什么不直接让磁盘控制器把数据送到用户空间的缓冲区呢? 首先, 硬件通常不能直接访问用户空间。其次,像磁盘这样基于块存储的硬件设备操作的是固定大小的数据块,而用户进程请求的可能是任意大小的或非对齐的数据块。在数据往来于用户空间与存储设备的过程中,内核负责数据的分解、再组合工作,因此充当着中间人的角色。
分散/聚集 I/O 是使用多个而不是单个缓冲区来保存数据的读写方法。
一个分散的读取就像一个常规通道读取,只不过它是将数据读到一个缓冲区数组中而不是读到单个缓冲区中。同样地,一个聚集写入是向缓冲区数组而不是向单个缓冲区写入数据。 进程只需一个系统调用,就能把一连串缓冲区地址传递给操作系统。然后,内核就可以顺序填充或排干多个缓冲区,读的时候就把数据发散到多个用户空间缓冲区,写的时候再从多个缓冲区把数据汇聚起来
如何实现:通道可以有选择地实现两个新的接口: ScatteringByteChannel 和 GatheringByteChannel。
一个 ScatteringByteChannel 是一个具有两个附加读方法的通道:
long read( ByteBuffer[] dsts );long read( ByteBuffer[] dsts, int offset, int length );
一个 GatheringByteChannel是一个具有两个附加写方法的通道:
long write( ByteBuffer[] srcs );long write( ByteBuffer[] srcs, int offset, int length );
2.文件锁定功能
3.网络异步IO
同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO
•阻塞(线程被挂起)IO:
socket 的阻塞模式意味着必须要做完IO 操作(包括错误)才会返回。
•非阻塞(线程不会被挂起)IO:
非阻塞模式下无论操作是否完成都会立刻返回,需要通过其他方式来判断具体操作是否成功。
•同步IO:
IO操作将导致请求进程阻塞,直到IO操作完成。 •异步IO:
IO操作不导致请求进程阻塞
•两者的区别就在于synchronous IO做”IO operation”(真实的IO操作)的时候会将process阻塞。
•阻塞和非阻塞就是进程的两种状态。
•同步和异步是只跟IO操作过程中进程的状态变化有关
•同步=/=阻塞,异步= /=非阻塞
•IO模型(5种之多):阻塞IO,非阻塞IO,IO复用,信号驱动IO,异步IO
•以linux 网络IO的read举例,介绍这些IO模型,其中整个过程涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,经历两个阶段: 1 等待数据准备 2 将数据从内核拷贝到进程中
•同步IO,需要用户进程主动将存放在内核缓冲区中的数据拷贝到用户进程中。
•异步IO,内核会自动将数据从内核缓冲区拷贝到用户缓冲区,然后再通知用户。
IO模型比较
•在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用 recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。
举例:有A,B,C,D四个人在钓鱼: A用的是最老式的鱼竿,所以呢,得一直守着,等到鱼上钩了再拉杆; B的鱼竿有个功能,能够显示是否有鱼上钩,所以呢,B就和旁边的MM聊天,隔会再看看有没有鱼上钩,有的话就迅速拉杆; C用的鱼竿和B差不多,但他想了一个好办法,就是同时放好几根鱼竿,然后守在旁边,一旦有显示说鱼上钩了,它就将对应的鱼竿拉起来; D是个有钱人,干脆雇了一个人帮他钓鱼,一旦那个人把鱼钓上来了,就给D发个短信 Socket通道
•新的socket通道类可以运行非阻塞模式并且是可选择的。这两个性能可以激活大程序巨大的可伸缩性和灵活性。再也没有为每个socket连接使用一个线程的必要了,也避免了管理大量线程所需的上下文交换总开销。借助新的NIO类,一个或几个线程就可以管理成百上千的活动socket连接了并且只有很少甚至可能没有性能损失。
•全部socket通道类(DatagramChannel、SocketChannel和ServerSocketChannel)在被实例化时都会创建一个对等socket对象( java.net的类(Socket、ServerSocket和DatagramSocket) )
• Socket通道将与通信协议相关的操作委托给相应的socket对象,两者对等能够相互获得(Channel对象的socket()方法和socket对象的getChannel ()方法)。但是如果直接从net包的socket对象来获得相应的Channel对象将会返回null,只有从NIO新包入口才能体验新特性。
选择器
•选择器,可选择通道和选择键类
•选择器(Selector) :选择器类管理着一个被注册的通道集合的信息和它们的就绪状态
•可选择通道(SelectableChannel) :提供了实现通道的可选择性所需要的公共方法,可以被注册到一个或多个选择器上
•选择键(SelectionKey) :选择键封装了特定的通道与特定的选择器之间的注册关系
三者的关系: - public abstract class SelectableChannel extends AbstractChannel implements Channel{
- public abstract SelectionKey register (Selector sel, int ops) throws ClosedChannelException;//将可选择通道注册到选择器上,返回表示二者关系的SelectionKey 对象,如果选择器关闭或者通道为阻塞模式则会抛出异常,第二个参数表示所关心的通道操作,在JDK 1.4中,有四种被定义的可选择操作:读(read),写(write),连接(connect)和接受(accept)。
- public abstract boolean ( ) isRegistered( );//判断是否注册到选择器上
- public abstract SelectionKey key isRegisteredFor (Selector sel);//判断是否注册到特定的选择器上
- public abstract int validOps( );//获取特定的通道所支持的操作集合
- public abstract void configureBlocking (boolean block) throws IOException;
- public abstract boolean isBlocking( );//来配置并检查通道的阻塞模式
- public abstract Object blockingLock( );
- }
public abstract class SelectableChannel extends AbstractChannel implements Channel{ public abstract SelectionKey register (Selector sel, int ops) throws ClosedChannelException;//将可选择通道注册到选择器上,返回表示二者关系的SelectionKey 对象,如果选择器关闭或者通道为阻塞模式则会抛出异常,第二个参数表示所关心的通道操作,在JDK 1.4中,有四种被定义的可选择操作:读(read),写(write),连接(connect)和接受(accept)。 public abstract boolean ( ) isRegistered( );//判断是否注册到选择器上 public abstract SelectionKey key isRegisteredFor (Selector sel);//判断是否注册到特定的选择器上 public abstract int validOps( );//获取特定的通道所支持的操作集合 public abstract void configureBlocking (boolean block) throws IOException; public abstract boolean isBlocking( );//来配置并检查通道的阻塞模式 public abstract Object blockingLock( ); }
- <pre class=“java” name=“code”>public abstract class Selector{
- public static Selector open() throws IOException;静态工厂方法来实例化Selector 对象
- public abstract boolean isOpen( );
- public abstract void close( ) throws IOException; 释放资源和设置选择键无效
- public abstract SelectionProvider provider( ); SelectionProvider对象用于创建一个Selector对象
- public abstract int select( ) throws IOException;
- public abstract int select (long timeout) throws IOException;是阻塞的,有三种条件可以停止阻塞:1)至少存在一条通道是ready I/O的;2)等待超时;3)被唤醒,如被调用wakeup。返回值即为ready I/O的通道数量
- public abstract int selectNow( ) throws IOException;是非阻塞的,返回值即为ready I/O的通道数量,无ready则返回0
- public abstract void wakeup( );
- public abstract Set keys( ); //返回已注册的键的集合
- public abstract Set selectedKeys( );//返回已选择的键的集合
- }
- </pre>
- <pre></pre>
- <pre></pre>
public abstract class Selector{ public static Selector open() throws IOException;静态工厂方法来实例化Selector 对象 public abstract boolean isOpen( ); public abstract void close( ) throws IOException; 释放资源和设置选择键无效 public abstract SelectionProvider provider( ); SelectionProvider对象用于创建一个Selector对象 public abstract int select( ) throws IOException; public abstract int select (long timeout) throws IOException;是阻塞的,有三种条件可以停止阻塞:1)至少存在一条通道是ready I/O的;2)等待超时;3)被唤醒,如被调用wakeup。返回值即为ready I/O的通道数量 public abstract int selectNow( ) throws IOException;是非阻塞的,返回值即为ready I/O的通道数量,无ready则返回0 public abstract void wakeup( ); public abstract Set keys( ); //返回已注册的键的集合 public abstract Set selectedKeys( );//返回已选择的键的集合 }
public abstract class Selector{ public static Selector open() throws IOException;静态工厂方法来实例化Selector 对象 public abstract boolean isOpen( ); public abstract void close( ) throws IOException; 释放资源和设置选择键无效 public abstract SelectionProvider provider( ); SelectionProvider对象用于创建一个Selector对象 public abstract int select( ) throws IOException; public abstract int select (long timeout) throws IOException;是阻塞的,有三种条件可以停止阻塞:1)至少存在一条通道是ready I/O的;2)等待超时;3)被唤醒,如被调用wakeup。返回值即为ready I/O的通道数量 public abstract int selectNow( ) throws IOException;是非阻塞的,返回值即为ready I/O的通道数量,无ready则返回0 public abstract void wakeup( ); public abstract Set keys( ); //返回已注册的键的集合 public abstract Set selectedKeys( );//返回已选择的键的集合}
内容转自
下面给出简单 c/s模式的例子
服务器端import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class Server { private Selector selector; private ByteBuffer readBuffer = ByteBuffer.allocate(1024);//调整缓存的大小可以看到打印输出的变化 private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);//调整缓存的大小可以看到打印输出的变化 String str; public void start() throws IOException { // 打开服务器套接字通道 ServerSocketChannel ssc = ServerSocketChannel.open(); // 服务器配置为非阻塞 ssc.configureBlocking(false); // 进行服务的绑定 ssc.bind(new InetSocketAddress("localhost", 8001)); // 通过open()方法找到Selector selector = Selector.open(); // 注册到selector,等待连接 ssc.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.currentThread().isInterrupted()) { selector.select(); Setkeys = selector.selectedKeys(); Iterator keyIterator = keys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (!key.isValid()) { continue; } if (key.isAcceptable()) { accept(key); } else if (key.isReadable()) { read(key); } else if (key.isWritable()) { write(key); } keyIterator.remove(); //该事件已经处理,可以丢弃 } } } private void write(SelectionKey key) throws IOException, ClosedChannelException { SocketChannel channel = (SocketChannel) key.channel(); System.out.println("write:"+str); sendBuffer.clear(); sendBuffer.put(str.getBytes()); sendBuffer.flip(); channel.write(sendBuffer); channel.register(selector, SelectionKey.OP_READ); } private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); // Clear out our read buffer so it's ready for new data this.readBuffer.clear(); // readBuffer.flip(); // Attempt to read off the channel int numRead; try { numRead = socketChannel.read(this.readBuffer); } catch (IOException e) { // The remote forcibly closed the connection, cancel // the selection key and close the channel. key.cancel(); socketChannel.close(); return; } str = new String(readBuffer.array(), 0, numRead); System.out.println(str); socketChannel.register(selector, SelectionKey.OP_WRITE); } private void accept(SelectionKey key) throws IOException { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel clientChannel = ssc.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); System.out.println("a new client connected "+clientChannel.getRemoteAddress()); } public static void main(String[] args) throws IOException { System.out.println("server started..."); new Server().start(); } }
客户端
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class Client { ByteBuffer writeBuffer = ByteBuffer.allocate(1024); ByteBuffer readBuffer = ByteBuffer.allocate(1024); public void start() throws IOException { // 打开socket通道 SocketChannel sc = SocketChannel.open(); //设置为非阻塞 sc.configureBlocking(false); //连接服务器地址和端口 sc.connect(new InetSocketAddress("localhost", 8001)); //打开选择器 Selector selector = Selector.open(); //注册连接服务器socket的动作 sc.register(selector, SelectionKey.OP_CONNECT); Scanner scanner = new Scanner(System.in); while (true) { //选择一组键,其相应的通道已为 I/O 操作准备就绪。 //此方法执行处于阻塞模式的选择操作。 selector.select(); //返回此选择器的已选择键集。 Setkeys = selector.selectedKeys(); System.out.println("keys=" + keys.size()); Iterator keyIterator = keys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); // 判断此通道上是否正在进行连接操作。 if (key.isConnectable()) { sc.finishConnect(); sc.register(selector, SelectionKey.OP_WRITE); System.out.println("server connected..."); break; } else if (key.isWritable()) { //写数据 System.out.print("please input message:"); String message = scanner.nextLine(); //ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes()); writeBuffer.clear(); writeBuffer.put(message.getBytes()); //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位 writeBuffer.flip(); sc.write(writeBuffer); //注册写操作,每个chanel只能注册一个操作,最后注册的一个生效 //如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来 //int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; //使用interest集合 sc.register(selector, SelectionKey.OP_READ); sc.register(selector, SelectionKey.OP_WRITE); sc.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()){//读取数据 System.out.print("receive message:"); SocketChannel client = (SocketChannel) key.channel(); //将缓冲区清空以备下次读取 readBuffer.clear(); int num = client.read(readBuffer); System.out.println(new String(readBuffer.array(),0, num)); //注册读操作,下一次读取 sc.register(selector, SelectionKey.OP_WRITE); } } } } public static void main(String[] args) throws IOException { new Client().start(); } }