netty客戶端發送數據并接收返回值?我們知道由兩種數據的傳輸方式,分别是字符流和字節流,字符流的意思是傳輸的對象就是字符串,格式已經被設置好了,發送方和接收方按照特定的格式去讀取就行了,而字節流是指将數據作為最原始的二進制字節來進行傳輸,我來為大家科普一下關于netty客戶端發送數據并接收返回值?以下内容希望對你有幫助!
我們知道由兩種數據的傳輸方式,分别是字符流和字節流,字符流的意思是傳輸的對象就是字符串,格式已經被設置好了,發送方和接收方按照特定的格式去讀取就行了,而字節流是指将數據作為最原始的二進制字節來進行傳輸。
今天給大家介紹一下在netty中的基于流的數據傳輸。
熟悉TCP/IP協議的同學應該知道,在TCP/IP中,因為底層協議有支持的數據包的最大值,所以對于大數據傳輸來說,需要對數據進行拆分和封包處理,并将這些拆分組裝過的包進行發送,最後在接收方對這些包進行組合。在各個包中有固定的結構,所以接收方可以很清楚的知道到底應該組合多少個包作為最終的結果。
那麼對于netty來說,channel中傳輸的是ByteBuf,實際上最最最底層的就是byte數組。對于這種byte數組來說,接收方并不知道到底應該組合多少個byte來合成原來的消息,所以需要在接收端對收到的byte進行組合,從而生成最終的數據。
那麼對于netty中的byte數據流應該怎麼組合呢?我們接下來看兩種組合方法。
這種組合的方式的基本思路是構造一個目标大小的ByteBuf,然後将接收到的byte通過調用ByteBuf的writeBytes方法寫入到ByteBuf中。最後從ByteBuf中讀取對應的數據。
比如我們想從服務端發送一個int數字給客戶端,一般來說int是32bits,然後一個byte是8bits,那麼一個int就需要4個bytes組成。
在server端,可以建立一個byte的數組,數組中包含4個元素。将4個元素的byte發送給客戶端,那麼客戶端該如何處理呢?
首先我們需要建立一個clientHander,這個handler應該繼承ChannelInboundHandlerAdapter,并且在其handler被添加到ChannelPipeline的時候初始化一個包含4個byte的byteBuf。
handler被添加的時候會觸發一個handlerAdded事件,所以我們可以這樣寫:
private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { //創建一個4個byte的緩沖器 buf = ctx.alloc().buffer(4); }
上例中,我們從ctx分配了一個4個字節的緩沖器,并将其賦值給handler中的私有變量buf。
當handler執行完畢,從ChannelPipeline中删除的時候,會觸發handlerRemoved事件,在這個事件中,我們可以對分配的Bytebuf進行清理,通常來說,可以調用其release方法,如下所示:
public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // 釋放buf buf = null; }
然後最關鍵的一步就是從channel中讀取byte并将其放到4個字節的byteBuf中。在之前的文章中我們提到了,可以在channelRead方法中,處理消息讀取的邏輯。
public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // 寫入一個byte m.release(); if (buf.readableBytes() >= 4) { // 已經湊夠4個byte,将4個byte組合稱為一個int long result = buf.readUnsignedInt(); ctx.close(); } }
每次觸發channelRead方法,都會将讀取到的一個字節的byte通過調用writeBytes方法寫入buf中。當buf的可讀byte大于等于4個的時候就說明4個字節已經讀滿了,可以對其進行操作了。
這裡我們将4個字節組合成一個unsignedInt,并使用readUnsignedInt方法從buf中讀取出來組合稱為一個int數字。
上面的例子雖然可以解決4個字節的byte問題,但是如果數據結構再負責一點,上面的方式就會力不從心,需要考慮太多的數據組合問題。接下來我們看另外一種方式。
Byte的轉換類
netty提供了一個ByteToMessageDecoder的轉換類,可以方便的對Byte轉換為其他的類型。
我們隻需要重新其中的decode方法,就可以實現對ByteBuf的轉換:
public class SquareDecoder extends ByteToMessageDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { out.add(in.readBytes(in.readableBytes())); } }
上面的例子将byte從input轉換到output中,當然,你還可以在上面的方法中進行格式轉換,如下所示:
public class TimeDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } out.add(in.readBytes(4)); }}
上面的例子會先判斷in中是否有4個byte,如果有就将其讀出來放到out中去。那麼有同學會問了,輸入不是一個byte一個byte來的嗎?為什麼這裡可以一次讀取到4個byte?這是因為ByteToMessageDecoder内置了一個緩存裝置,所以這裡的in實際上是一個緩存集合。
ReplayingDecoder
netty還提供了一個更簡單的轉換ReplayingDecoder,如果使用ReplayingDecoder重新上面的邏輯就是這樣的:
public class TimeDecoder extends ReplayingDecoder<Void> { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { out.add(in.readBytes(4)); }}
隻需要一行代碼即可。
事實上ReplayingDecoder 是ByteToMessageDecoder 的子類,是在ByteToMessageDecoder上豐富了一些功能的結果。
他們兩的區别在于ByteToMessageDecoder 還需要通過調用readableBytes來判斷是否有足夠的可以讀byte,而使用ReplayingDecoder直接讀取即可,它假設的是所有的bytes都已經接受成功了。
比如下面使用ByteToMessageDecoder的代碼:
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { if (buf.readableBytes() < 4) { return; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; } out.add(buf.readBytes(length)); } }
上例假設在byte的頭部是一個int大小的數組,代表着byte數組的長度,需要先讀取int值,然後再根據int值來讀取對應的byte數據。
和下面的代碼是等價的:
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> { protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { out.add(buf.readBytes(buf.readInt())); } }
上面代碼少了判斷的步驟。
那麼這是怎麼實現的呢?
事實上ReplayingDecoder 會傳遞一個會抛出 Error的 ByteBuf , 當 ByteBuf 讀取的byte個數不滿足要求的時候,會抛出異常,當ReplayingDecoder 捕獲到這個異常之後,會重置buffer的readerIndex到最初的狀态,然後等待後續的數據進來,然後再次調用decode方法。
所以,ReplayingDecoder的效率會比較低,為了解決這個問題,netty提供了checkpoint() 方法。這是一個保存點,當報錯的時候,可以不會退到最初的狀态,而是回退到checkpoint() 調用時候保存的狀态,從而可以減少不必要的浪費。
總結
本文介紹了在netty中進行stream操作和變換的幾種方式,希望大家能夠喜歡。
更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!