yanf4j發(fā)布一個(gè)0.50-beta2版本,這個(gè)版本最重要的改進(jìn)就是引入了客戶端連接非阻塞API,主要最近的工作要用到,所以添加了。兩個(gè)核心類TCPConnectorController和UDPConnectorController分別用于TCP和UDP的客戶端連接控制。例如,現(xiàn)在的UDP echo client可以寫成:
//客戶端echo handler
classEchoClientHandlerextendsHandlerAdapter{
publicvoidonReceive(SessionudpSession,Objectt){
DatagramPacketdatagramPacket=(DatagramPacket)t;
System.out.println("recv:"+newString(datagramPacket.getData()));
}
@Override
publicvoidonMessageSent(Sessionsession,Objectt){
System.out.println("send:"+newString((byte[])t));
}
}
//連接代碼,并發(fā)送UDP包
UDPConnectorControllerconnector=newUDPConnectorController();
connector.setSoTimeout(1000);
connector.setHandler(newEchoClientHandler());
connector.connect(newInetSocketAddress(InetAddress.getByName(host),
port));
for(inti=0;i<10000;i++){
Strings="hello"+i;
DatagramPacketpacket=newDatagramPacket(s.getBytes(),s.length());
connector.send(packet);
}
UDP不是面向連接的,因此connect方法僅僅是調(diào)用了底層DatagramChannel.connect方法,用來限制接收和發(fā)送的packet的遠(yuǎn)程端點(diǎn)。
再來看看TCPConnectorController的使用,同樣看Echo Client的實(shí)現(xiàn):
//客戶端的echohandler
classEchoHandlerextendsHandlerAdapter{
@Override
publicvoidonConnected(Sessionsession){
try{
//一連接就發(fā)送NUM個(gè)字符串
for(inti=0;i session.send(generateString(i));
}catch(Exceptione){
}
}publicStringgenerateString(intlen){
StringBuffersb=newStringBuffer();
for(inti=0;i sb.append(i);
returnsb.toString();
}
@Override
publicvoidonReceive(Sessionsession,Stringt){
//打印接收到字符串
if(DEBUG)
System.out.println("recv:"+t);
}
}
//...連接API,TCPConnectorController示例
Configurationconfiguration=newConfiguration();
configuration.setTcpSessionReadBufferSize(256*1024);//設(shè)置讀的緩沖區(qū)大小
TCPConnectorControllerconnector=newTCPConnectorController(configuration,
newStringCodecFactory());
connector.setHandler(newEchoHandler());
connector.setCodecFactory(newStringCodecFactory());
try{
connector.Connect(newInetSocketAddress("localhost",8080));
}catch(IOExceptione){
e.printStackTrace();
}
注意,connect方法并不阻塞,而是立即返回,連接是否建立可以通過TCPConnectorController.isConnected()方法來判斷,因此通常你可能會(huì)這樣使用:
try{
connector.Connect(newInetSocketAddress("localhost",8080));
while(!connector.isConnected())
;
}catch(Exceptione){
e.printStackTrace();
}
來強(qiáng)制確保后面對connector的使用是已經(jīng)連接上的connector,然而更好的做法是在Handler的onConnected()回調(diào)方法中處理邏輯,因?yàn)檫@個(gè)方法僅僅在連接建立后才會(huì)被調(diào)用。
兩個(gè)ConnectorController都有系列send方法,用于發(fā)送數(shù)據(jù):
TCPConnectorController.send(Objectmsg)throwsInterruptedException
UDPConnectorController.send(DatagramPacketpacket)throwsInterruptedException
UDPConnectorController.send(SocketAddresstargetAddr,Objectmsg)throwsInterruptedException
0.50-beta2帶來的另一個(gè)修改就是Session接口添加setReadBufferByteOrder方法,用于設(shè)置session接收緩沖區(qū)的字節(jié)序,默認(rèn)是網(wǎng)絡(luò)字節(jié)序,也就是大端法。這個(gè)方法建議在Handler的onSessionStarted回調(diào)方法中調(diào)用。
在0.50-beta最重要的修改是引入了session發(fā)送隊(duì)列緩沖區(qū)的流量控制選項(xiàng)。默認(rèn)情況下,session的發(fā)送緩沖隊(duì)列是無界的,隊(duì)列的push和pop也全然不會(huì)阻塞。在設(shè)置了緩沖隊(duì)列的高低水位選項(xiàng)后即引入了發(fā)送流量控制,規(guī)則如下:
a)當(dāng)發(fā)送隊(duì)列中的數(shù)據(jù)總量大于高水位標(biāo)記(highWaterMark),Session.send將阻塞
b)在條件a的作用下,Session.send的阻塞將持續(xù)到發(fā)送隊(duì)列中的數(shù)據(jù)總量小于于低水位標(biāo)記(lowWaterMark)才解除。
緩沖隊(duì)列高低水位的設(shè)置通過Controller的下列方法設(shè)置:
publicvoidsetSessionWriteQueueHighWaterMark(inthighWaterMark);
publicvoidsetSessionWriteQueueLowWaterMark(intlowWaterMark);
緩沖隊(duì)列的流量控制想法來自ACE的ACE_Message_Queue,是通過com.google.code.yanf4j.util.MessageQueue類實(shí)現(xiàn)的。
0.50-beta還引入了Session.send(Object msg)的重載版本 Session.send(Object msg,long timeout),在超過timeout時(shí)間后send仍然阻塞時(shí)即終止send。注意,現(xiàn)在Session.send的這兩個(gè)方法都返回一個(gè)bool值來表示send成功與否,并且都將響應(yīng)中斷(僅限啟動(dòng)了流量控制選項(xiàng))拋出InterruptedException。
//客戶端echo handler
classEchoClientHandlerextendsHandlerAdapter{
publicvoidonReceive(SessionudpSession,Objectt){
DatagramPacketdatagramPacket=(DatagramPacket)t;
System.out.println("recv:"+newString(datagramPacket.getData()));
}
@Override
publicvoidonMessageSent(Sessionsession,Objectt){
System.out.println("send:"+newString((byte[])t));
}
}
//連接代碼,并發(fā)送UDP包
UDPConnectorControllerconnector=newUDPConnectorController();
connector.setSoTimeout(1000);
connector.setHandler(newEchoClientHandler());
connector.connect(newInetSocketAddress(InetAddress.getByName(host),
port));
for(inti=0;i<10000;i++){
Strings="hello"+i;
DatagramPacketpacket=newDatagramPacket(s.getBytes(),s.length());
connector.send(packet);
}
UDP不是面向連接的,因此connect方法僅僅是調(diào)用了底層DatagramChannel.connect方法,用來限制接收和發(fā)送的packet的遠(yuǎn)程端點(diǎn)。
再來看看TCPConnectorController的使用,同樣看Echo Client的實(shí)現(xiàn):
//客戶端的echohandler
classEchoHandlerextendsHandlerAdapter
@Override
publicvoidonConnected(Sessionsession){
try{
//一連接就發(fā)送NUM個(gè)字符串
for(inti=0;i
}catch(Exceptione){
}
}publicStringgenerateString(intlen){
StringBuffersb=newStringBuffer();
for(inti=0;i
returnsb.toString();
}
@Override
publicvoidonReceive(Sessionsession,Stringt){
//打印接收到字符串
if(DEBUG)
System.out.println("recv:"+t);
}
}
//...連接API,TCPConnectorController示例
Configurationconfiguration=newConfiguration();
configuration.setTcpSessionReadBufferSize(256*1024);//設(shè)置讀的緩沖區(qū)大小
TCPConnectorControllerconnector=newTCPConnectorController(configuration,
newStringCodecFactory());
connector.setHandler(newEchoHandler());
connector.setCodecFactory(newStringCodecFactory());
try{
connector.Connect(newInetSocketAddress("localhost",8080));
}catch(IOExceptione){
e.printStackTrace();
}
注意,connect方法并不阻塞,而是立即返回,連接是否建立可以通過TCPConnectorController.isConnected()方法來判斷,因此通常你可能會(huì)這樣使用:
try{
connector.Connect(newInetSocketAddress("localhost",8080));
while(!connector.isConnected())
;
}catch(Exceptione){
e.printStackTrace();
}
來強(qiáng)制確保后面對connector的使用是已經(jīng)連接上的connector,然而更好的做法是在Handler的onConnected()回調(diào)方法中處理邏輯,因?yàn)檫@個(gè)方法僅僅在連接建立后才會(huì)被調(diào)用。
兩個(gè)ConnectorController都有系列send方法,用于發(fā)送數(shù)據(jù):
TCPConnectorController.send(Objectmsg)throwsInterruptedException
UDPConnectorController.send(DatagramPacketpacket)throwsInterruptedException
UDPConnectorController.send(SocketAddresstargetAddr,Objectmsg)throwsInterruptedException
0.50-beta2帶來的另一個(gè)修改就是Session接口添加setReadBufferByteOrder方法,用于設(shè)置session接收緩沖區(qū)的字節(jié)序,默認(rèn)是網(wǎng)絡(luò)字節(jié)序,也就是大端法。這個(gè)方法建議在Handler的onSessionStarted回調(diào)方法中調(diào)用。
在0.50-beta最重要的修改是引入了session發(fā)送隊(duì)列緩沖區(qū)的流量控制選項(xiàng)。默認(rèn)情況下,session的發(fā)送緩沖隊(duì)列是無界的,隊(duì)列的push和pop也全然不會(huì)阻塞。在設(shè)置了緩沖隊(duì)列的高低水位選項(xiàng)后即引入了發(fā)送流量控制,規(guī)則如下:
a)當(dāng)發(fā)送隊(duì)列中的數(shù)據(jù)總量大于高水位標(biāo)記(highWaterMark),Session.send將阻塞
b)在條件a的作用下,Session.send的阻塞將持續(xù)到發(fā)送隊(duì)列中的數(shù)據(jù)總量小于于低水位標(biāo)記(lowWaterMark)才解除。
緩沖隊(duì)列高低水位的設(shè)置通過Controller的下列方法設(shè)置:
publicvoidsetSessionWriteQueueHighWaterMark(inthighWaterMark);
publicvoidsetSessionWriteQueueLowWaterMark(intlowWaterMark);
緩沖隊(duì)列的流量控制想法來自ACE的ACE_Message_Queue,是通過com.google.code.yanf4j.util.MessageQueue類實(shí)現(xiàn)的。
0.50-beta還引入了Session.send(Object msg)的重載版本 Session.send(Object msg,long timeout),在超過timeout時(shí)間后send仍然阻塞時(shí)即終止send。注意,現(xiàn)在Session.send的這兩個(gè)方法都返回一個(gè)bool值來表示send成功與否,并且都將響應(yīng)中斷(僅限啟動(dòng)了流量控制選項(xiàng))拋出InterruptedException。

