Java NIO

阅读:520 2019-03-19 15:02:35 来源:新网

bio是基于字节流和字符流进行操作的,而nio是基于通道channel和缓冲区buffer进行操作的,数据从通道读取到缓冲区中,或者从缓冲区写入到通道中。

nio的类库位于java.lang.nio下,其中有如下一些基本的概念:

通道channel:通道的作用与bio中的流类似,主要不同的地方在于:

缓冲区buffer:在bio中,可以直接将数据写入或者直接读取到流中,也可以通过装饰类添加缓冲的功能;而在nio中,所有的数据都是用缓冲区处理的,任何时候使用nio读取或者写入数据都是通过缓冲区进行的。缓冲区本质上是一块可以读写数据的内存,这块内存被包装成niobuffer对象,并提供了一组方法来访问该块内存。

分散/聚集scatter/gather:分散和聚集是用来描述从通道中读取或者写入通道的操作。分散从通道中读取是指读操作时将读取的数据写入多个缓冲区中;聚集写入通道是指写入操作时将多个缓冲区的数据写入到同一个通道中。分散/聚集通常用于需要将传输数据分开处理的场合,如传输一个消息可以将消息头和消息体分散到不同的buffer中。

选择器selector:selector模型是nio编程的基础,多路复用器selector通过不断地轮询已经注册过的通道,检测出就绪的通道集合,从而可以实现一个线程管理多个通道,管理多个网络连接。

java.nio包中常用的channel实现类有:

filechannel无法设置为非阻塞模式,只能运行在阻塞模式下。使用filechannel的几个基本步骤包括:打开filechannel、从filechannel读写数据、关闭filechannel。

privatestaticfinalintbuf_size=1024;publicstaticvoidmain(string[]args){//打开filechannel需要通过fileintputstream或fileoutputstream或randomaccessfiletry(fileoutputstreamout=newfileoutputstream(newfile("d:\test.txt"));filechannelchannel=out.getchannel();){bytebufferbuffer=bytebuffer.allocate(buf_size);buffer.put("nio学习:niofilechanneldemo".getbytes());//先往buffer中写入数据buffer.flip();//调转buffer中读写指针position的位置printbuffer(buffer);//打印buffer中内容channel.write(buffer);//将buffer中数据写入channel}catch(filenotfoundexceptione){e.printstacktrace();}catch(ioexceptione){e.printstacktrace();}}privatestaticvoidprintbuffer(bytebufferbuffer){try{charsetcharset=charset.forname("utf-8");charsetdecoderdecoder=charset.newdecoder();charbuffercbuf=decoder.decode(buffer);buffer.flip();system.out.println(cbuf.tostring());}catch(charactercodingexceptione){e.printstacktrace();}}从文件中读数据

privatestaticfinalintbuf_size=1024;publicstaticvoidmain(string[]args){//channel关联文件——>通过channel读取数据到buffertry(fileinputstreamin=newfileinputstream(newfile("d:\test.txt"));filechannelchannel=in.getchannel();){bytebufferbuffer=bytebuffer.allocate(buf_size);channel.read(buffer);buffer.flip();printbuffer(buffer);}catch(filenotfoundexceptione){e.printstacktrace();}catch(ioexceptione){e.printstacktrace();}}privatestaticvoidprintbuffer(bytebufferbuffer){charsetcharset=charset.forname("utf-8");charsetdecoderdecoder=charset.newdecoder();try{charbufferbuf=decoder.decode(buffer);buffer.flip();system.out.println(buf.tostring());}catch(charactercodingexceptione){e.printstacktrace();}}

close():使用完filechannel需要关闭channel,为简洁代码可以使用try-with-resources语法。

position():position()方法可以获取filechannel当前的位置,而position(longpos)可以设置filechannel当前位置,但是如果将位置设置在文件结束符之后,调用position()将返回-1;调用position(pos)写入数据,则会把文件撑大到当前位置并写入数据,这样会导致磁盘上物理文件中写入的数据间有空隙。

longpos=channle.position();channel.position(100l);

size():channel.size()方法返回的是channel关联文件的大小。

truncate(longsize):truncate()方法用来截取文件,此时文件中指定长度后面的部分将会被删除,如:

channel.truncate(100);//将会保留前100个字节force(booleanflag):一般情况下出于性能考虑,操作系统会将数据缓存在内存中,所以无法保证写入到filechannel中的数据一定会立即写到磁盘上,此时,如果调用force()方法则能强制将channel中的数据立即写入磁盘。datagramchannel示例

datagramchannel是用来收发udp包的通道,因为udp是无连接的网络协议,所以datagramchannel收发的是udp数据包。

publicstaticvoidmain(string[]args)throwsinterruptedexception{newthread(()->{//打开datagramchannel,监听udp9999端口try(datagramchannelchannel=datagramchannel.open()){channel.socket().bind(newinetsocketaddress(9999));bytebufferbuffer=bytebuffer.allocate(100);//通过channel的recevice方法接收udp数据包channel.receive(buffer);buffer.flip();printbuffer(buffer);}catch(ioexceptione){e.printstacktrace();}}).start();thread.sleep(1000);//服务端先启动newthread(()->{try(datagramchannelchannel=datagramchannel.open();){bytebufferbuf=bytebuffer.allocate(100);buf.clear();buf.put("datagramchanneldemo".getbytes());//发送数据前注意要把buffer的position置为0buf.flip();//调用send方法发送到指定ip地址的指定端口channel.send(buf,newinetsocketaddress("localhost",9999));}catch(ioexceptione){e.printstacktrace();}}).start();}

由于udp是无连接的,当指定连接的ip地址或域名时并不会创建一个真正的连接,而是锁住了datagramchannel,只能从锁定的地址收发数据,并且数据传送没有可靠性保证。

publicstaticvoidmain(string[]args)throwsinterruptedexception{threadserver=newthread(()->{try(serversocketchannelchannel=serversocketchannel.open()){channel.socket().bind(newinetsocketaddress(44593));socketchannelsocket=channel.accept();bytebufferbuffer=bytebuffer.allocate(1024);socket.read(buffer);buffer.flip();printbuffer(buffer);}catch(ioexceptione){e.printstacktrace();}});threadclient=newthread(()->{try(socketchannelchannel=socketchannel.open()){channel.socket().connect(newinetsocketaddress("127.0.0.1",44593));bytebufferbuffer=bytebuffer.allocate(1034);buffer.put("socketchannledemo".getbytes());buffer.flip();channel.write(buffer);}catch(ioexceptione){e.printstacktrace();}});//启动顺序不影响结果server.start();client.start();}buffer

buffer作为数据的读写缓冲区,具备读和写两种模式。

publicabstractclassbuffer{//invariants:mark<=position<=limit<=capacityprivateintmark=-1;privateintposition=0;privateintlimit;privateintcapacity;......publicfinalbufferflip(){limit=position;position=0;mark=-1;returnthis;}publicfinalbufferclear(){position=0;limit=capacity;mark=-1;returnthis;}......}capacity、position、limit、mark、flip()、clear()

capacity:buffer的容量,在申请buffer时指定大小,是固定不变的。

limit:buffer可以使用的上限——写模式下表示最多能往buffer中写入数据的边界,初始化时limit等于capacity;调用flip()切换为读模式后limit会等于当前的position;表示能读到的数据边界。当一次读写操作完成后,limit的值可能不会等于capacity,存在内存泄露的情况(这个不知道算不算设计不够友好),避免这种情况要在每一次读写操作完成后执行clear()方法清空buffer。

position:position可以看成是一个读写指针,指示当前读或写的位置,随put/get方法自动更新,当buffer中的数据准备好了,需要从写模式切换为读模式时,需要调用buffer.flip()方法,可以看到flip()方法会将当前写的最后一个位置赋值给limit,然后将position切换为0,即变成从0位置开始读,可以读到limit位置,反之从读模式切换为写模式也是如此。

mark:mark用来标记某个时刻的一个position,通过调用buffer.mark()方法可以记录当前的position,之后能通过buffer.reset()恢复到这个position。mark默认值是-1,并且其值必须小于position,如果调用buffer.position(index)时传入的index比mark小,则会将mark设置为-1使暂存的位置失效。

这4个属性的大小关系是$mark<=position<=limit<=capacity$

publicstaticvoidmain(string[]args){bytebufferbuffer=bytebuffer.allocate(10);buffer.put("abcde".getbytes());buffer.flip();while(buffer.hasremaining()){system.out.print(buffer.get()+"");}buffer.flip();try{buffer.put("abcdef".getbytes());//没有clear()将抛出bufferoverflowexception}catch(exceptione){e.printstacktrace();}}selectorregister

一个selector可以注册多个channel,并且seletor要求channel必须工作在非阻塞模式下,因此filechannel不能结合selector使用,同时注册的channel需要调用channel.configureblocking(flase);设置为非阻塞通道。

channel声明了一个抽象方法用来注册channel到selector:

publicabstractselectionkeyregister(selectorsel,intops,objectatt)throwsclosedchannelexception;

selectionkey:selectionkey包含了许多有用的属性,如interest集合、ready集合、channel对象、selector对象等;通过selectionkey返回值,可以进行各种操作。

ops:ops是一个int类型数,其实质表示的是事件类型的集合,即channel注册selector时告诉selector其对哪些事件感兴趣。selectionkey中只定义了四种事件类型,分别用四个常量表示:

并且channel也不是四种事件都能注册,不同的channel只能注册validops()方法中有定义的事件。

publicfinalintvalidops(){return(selectionkey.op_read|selectionkey.op_write|selectionkey.op_connect);}datagramchannel

publicfinalintvalidops(){return(selectionkey.op_read|selectionkey.op_write);}serversocketchannel

publicfinalintvalidops(){returnselectionkey.op_accept;}att:att是一个附加的对象,可以不指定,也可以让我们更灵活的将更多的信息附加到selectionkey上,比如attach一个buffer、attach一个唯一标识等等。select

注册了selector的channel便能将原本由自己调用accept的工作交由selector来代替。selector通过select()方法根据channel注册时所关联的感兴趣的事件返回准备就绪的channel。此时,原本阻塞在channel.accept()上的操作变成了阻塞在selector.select()上。

当select()返回值大于0时,说明有channel准备就绪了,进一步处理可以按以下步骤进行:

selectnow()和select()不同之处在于前者不会阻塞当前线程,而是直接返回。

wakeup()是用来唤醒被select()阻塞的线程的,有的时候select()阻塞的线程,我们不想其一直被阻塞,而是一段时间内如果没有通道就绪就继续执行,那么这个时候可以在另外一个线程里调用selector.wakeup(),但是这里有个“坑”就是如果当前的selector没有被阻塞在select上,那么下一次调用该selector对象的select方法会被立即唤醒。

publicclassserver{publicstaticvoidmain(string[]args){try{selectorselector=selector.open();serversocketchannelchannel=serversocketchannel.open();channel.configureblocking(false);channel.socket().bind(newinetsocketaddress(44593));channel.register(selector,channel.validops());while(true){while(selector.select()==0){continue;}setkeys=selector.selectedkeys();iteratoriterator=keys.iterator();while(iterator.hasnext()){selectionkeykey=iterator.next();if(key.isacceptable()){serversocketchannelserver=(serversocketchannel)key.channel();socketchannelclient=server.accept();client.configureblocking(false);client.register(selector,selectionkey.op_read);}if(key.isreadable()){socketchannelclient=(socketchannel)key.channel();bytebufferbuffer=bytebuffer.allocate(100);client.read(buffer);buffer.flip();bufferutil.printbuffer(buffer);client=(socketchannel)key.channel();client.register(selector,selectionkey.op_read);}keys.clear();}}}catch(exceptione){e.printstacktrace();}}}

publicclassclient{publicstaticvoidmain(string[]args){try(scannersc=newscanner(system.in);socketsocket=newsocket("127.0.0.1",44593);){stringinput=sc.nextline();while(input!=null&&!"".equals(input.trim())){socket.getoutputstream().write(input.getbytes());input=sc.nextline();}}catch(exceptione){e.printstacktrace();}}}io模型图解阻塞io模型

相关文章
{{ v.title }}
{{ v.description||(cleanHtml(v.content)).substr(0,100)+'···' }}
你可能感兴趣
推荐阅读 更多>
推荐商标

{{ v.name }}

{{ v.cls }}类

立即购买 联系客服