計(jì)算機(jī)二級(jí):yanf4j引入了客戶端非阻塞API

字號(hào):

yanf4j發(fā)布一個(gè)0.50-beta2版本,這個(gè)版本最重要的改進(jìn)就是引入了客戶端連接非阻塞API,主要最近的工作要用到,所以添加了。兩個(gè)核心類TCPConnectorController和UDPConnectorController分別用于TCP和UDP的客戶端連接控制。例如,現(xiàn)在的UDP echo client可以寫成:
    //客戶端echo handler
    classEchoClientHandlerextendsHandlerAdapter{
    publicvoidonReceive(SessionudpSession,Objectt){
    DatagramPacketdatagramPacket=(DatagramPacket)t;
    System.out.println("recv:"+newString(datagramPacket.getData()));
    }
    @Override
    publicvoidonMessageSent(Sessionsession,Objectt){
    System.out.println("send:"+newString((byte[])t));
    }
    }
    //連接代碼,并發(fā)送UDP包
    UDPConnectorControllerconnector=newUDPConnectorController();
    connector.setSoTimeout(1000);
    connector.setHandler(newEchoClientHandler());
    connector.connect(newInetSocketAddress(InetAddress.getByName(host),
    port));
    for(inti=0;i<10000;i++){
    Strings="hello"+i;
    DatagramPacketpacket=newDatagramPacket(s.getBytes(),s.length());
    connector.send(packet);
    }
    UDP不是面向連接的,因此connect方法僅僅是調(diào)用了底層DatagramChannel.connect方法,用來限制接收和發(fā)送的packet的遠(yuǎn)程端點(diǎn)。
    再來看看TCPConnectorController的使用,同樣看Echo Client的實(shí)現(xiàn):
    //客戶端的echohandler
    classEchoHandlerextendsHandlerAdapter{
    @Override
    publicvoidonConnected(Sessionsession){
    try{
    //一連接就發(fā)送NUM個(gè)字符串
    for(inti=0;i    session.send(generateString(i));
    }catch(Exceptione){
    }
    }publicStringgenerateString(intlen){
    StringBuffersb=newStringBuffer();
    for(inti=0;i    sb.append(i);
    returnsb.toString();
    }
    @Override
    publicvoidonReceive(Sessionsession,Stringt){
    //打印接收到字符串
    if(DEBUG)
    System.out.println("recv:"+t);
    }
    }
    //...連接API,TCPConnectorController示例
    Configurationconfiguration=newConfiguration();
    configuration.setTcpSessionReadBufferSize(256*1024);//設(shè)置讀的緩沖區(qū)大小
    TCPConnectorControllerconnector=newTCPConnectorController(configuration,
    newStringCodecFactory());
    connector.setHandler(newEchoHandler());
    connector.setCodecFactory(newStringCodecFactory());
    try{
    connector.Connect(newInetSocketAddress("localhost",8080));
    }catch(IOExceptione){
    e.printStackTrace();
    }
    注意,connect方法并不阻塞,而是立即返回,連接是否建立可以通過TCPConnectorController.isConnected()方法來判斷,因此通常你可能會(huì)這樣使用:
    try{
    connector.Connect(newInetSocketAddress("localhost",8080));
    while(!connector.isConnected())
    ;
    }catch(Exceptione){
    e.printStackTrace();
    }
    來強(qiáng)制確保后面對connector的使用是已經(jīng)連接上的connector,然而更好的做法是在Handler的onConnected()回調(diào)方法中處理邏輯,因?yàn)檫@個(gè)方法僅僅在連接建立后才會(huì)被調(diào)用。
    兩個(gè)ConnectorController都有系列send方法,用于發(fā)送數(shù)據(jù):
    TCPConnectorController.send(Objectmsg)throwsInterruptedException
    UDPConnectorController.send(DatagramPacketpacket)throwsInterruptedException
    UDPConnectorController.send(SocketAddresstargetAddr,Objectmsg)throwsInterruptedException
    0.50-beta2帶來的另一個(gè)修改就是Session接口添加setReadBufferByteOrder方法,用于設(shè)置session接收緩沖區(qū)的字節(jié)序,默認(rèn)是網(wǎng)絡(luò)字節(jié)序,也就是大端法。這個(gè)方法建議在Handler的onSessionStarted回調(diào)方法中調(diào)用。
    在0.50-beta最重要的修改是引入了session發(fā)送隊(duì)列緩沖區(qū)的流量控制選項(xiàng)。默認(rèn)情況下,session的發(fā)送緩沖隊(duì)列是無界的,隊(duì)列的push和pop也全然不會(huì)阻塞。在設(shè)置了緩沖隊(duì)列的高低水位選項(xiàng)后即引入了發(fā)送流量控制,規(guī)則如下:
    a)當(dāng)發(fā)送隊(duì)列中的數(shù)據(jù)總量大于高水位標(biāo)記(highWaterMark),Session.send將阻塞
    b)在條件a的作用下,Session.send的阻塞將持續(xù)到發(fā)送隊(duì)列中的數(shù)據(jù)總量小于于低水位標(biāo)記(lowWaterMark)才解除。
    緩沖隊(duì)列高低水位的設(shè)置通過Controller的下列方法設(shè)置:
    publicvoidsetSessionWriteQueueHighWaterMark(inthighWaterMark);
    publicvoidsetSessionWriteQueueLowWaterMark(intlowWaterMark);
    緩沖隊(duì)列的流量控制想法來自ACE的ACE_Message_Queue,是通過com.google.code.yanf4j.util.MessageQueue類實(shí)現(xiàn)的。
    0.50-beta還引入了Session.send(Object msg)的重載版本 Session.send(Object msg,long timeout),在超過timeout時(shí)間后send仍然阻塞時(shí)即終止send。注意,現(xiàn)在Session.send的這兩個(gè)方法都返回一個(gè)bool值來表示send成功與否,并且都將響應(yīng)中斷(僅限啟動(dòng)了流量控制選項(xiàng))拋出InterruptedException。