国产chinesehdxxxx野外,国产av无码专区亚洲av琪琪,播放男人添女人下边视频,成人国产精品一区二区免费看,chinese丰满人妻videos

Netty添加 WebSocket 支持

2018-08-08 10:56 更新

一種被稱作“Upgrade handshake(升級(jí)握手)”的機(jī)制能夠?qū)?biāo)準(zhǔn)的HTTP或者HTTPS協(xié)議轉(zhuǎn)成 WebSocket。所以,應(yīng)用程序如果使用了 WebSocket ,那么它都是以 HTTP/S 開(kāi)始,之后再進(jìn)行升級(jí),升級(jí)會(huì)發(fā)生在什么時(shí)候是不確定的,要根據(jù)具體的應(yīng)用來(lái)決定:可能是在應(yīng)用啟動(dòng)的時(shí)候,也可能是當(dāng)一個(gè)特定的 URL 被請(qǐng)求的時(shí)候。

在我們的應(yīng)用中,要想升級(jí)協(xié)議為 WebSocket,只有當(dāng) URL 請(qǐng)求以“/ws”結(jié)束時(shí)才可以,如果沒(méi)有達(dá)到該要求,服務(wù)器仍將使用基本的 HTTP/S,一旦連接升級(jí),之后的數(shù)據(jù)傳輸都將使用 WebSocket 。

下面看下服務(wù)器的邏輯圖

Figure 11.2 Server logic

Figure%2011

#1客戶端/用戶連接到服務(wù)器并加入聊天

#2 HTTP 請(qǐng)求頁(yè)面或 WebSocket 升級(jí)握手

#3服務(wù)器處理所有客戶端/用戶

#4響應(yīng) URI “/”的請(qǐng)求,轉(zhuǎn)到 index.html

#5如果訪問(wèn)的是 URI“/ws” ,處理 WebSocket 升級(jí)握手

#6升級(jí)握手完成后 ,通過(guò) WebSocket 發(fā)送聊天消息

處理 HTTP 請(qǐng)求

本節(jié)我們將實(shí)現(xiàn)此應(yīng)用中用于處理 HTTP 請(qǐng)求的組件,這個(gè)組件托管著可供客戶端訪問(wèn)的聊天室頁(yè)面,并且顯示客戶端發(fā)送的消息。

下面就是這個(gè) HttpRequestHandler 的代碼,它是一個(gè)用來(lái)處理 FullHttpRequest 消息的 ChannelInboundHandler 的實(shí)現(xiàn)類。注意看它是怎么實(shí)現(xiàn)忽略符合 "/ws" 格式的 URI 請(qǐng)求的。

Listing 11.1 HTTPRequestHandler

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {    //1
    private final String wsUri;
    private static final File INDEX;

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
        try {
            String path = location.toURI() + "index.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate index.html", e);
        }
    }

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (wsUri.equalsIgnoreCase(request.getUri())) {
            ctx.fireChannelRead(request.retain());                    //2
        } else {
            if (HttpHeaders.is100ContinueExpected(request)) {
                send100Continue(ctx);                                //3
            }

            RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4

            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");

            boolean keepAlive = HttpHeaders.isKeepAlive(request);

            if (keepAlive) {                                        //5
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);                    //6

            if (ctx.pipeline().get(SslHandler.class) == null) {        //7
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);            //8
            if (!keepAlive) {
                future.addListener(ChannelFutureListener.CLOSE);        //9
            }
        }
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

1.擴(kuò)展 SimpleChannelInboundHandler 用于處理 FullHttpRequest信息

2.如果請(qǐng)求是一次升級(jí)了的 WebSocket 請(qǐng)求,則遞增引用計(jì)數(shù)器(retain)并且將它傳遞給在 ChannelPipeline 中的下個(gè) ChannelInboundHandler

3.處理符合 HTTP 1.1的 "100 Continue" 請(qǐng)求

4.讀取 index.html

5.判斷 keepalive 是否在請(qǐng)求頭里面

6.寫 HttpResponse 到客戶端

7.寫 index.html 到客戶端,根據(jù) ChannelPipeline 中是否有 SslHandler 來(lái)決定使用 DefaultFileRegion 還是 ChunkedNioFile

8.寫并刷新 LastHttpContent 到客戶端,標(biāo)記響應(yīng)完成

9.如果 請(qǐng)求頭中不包含 keepalive,當(dāng)寫完成時(shí),關(guān)閉 Channel

HttpRequestHandler 做了下面幾件事,

  • 如果該 HTTP 請(qǐng)求被發(fā)送到URI “/ws”,則調(diào)用 FullHttpRequest 上的 retain(),并通過(guò)調(diào)用 fireChannelRead(msg) 轉(zhuǎn)發(fā)到下一個(gè) ChannelInboundHandler。retain() 的調(diào)用是必要的,因?yàn)?channelRead() 完成后,它會(huì)調(diào)用 FullHttpRequest 上的 release() 來(lái)釋放其資源。 (請(qǐng)參考我們先前在第6章中關(guān)于 SimpleChannelInboundHandler 的討論)
  • 如果客戶端發(fā)送的 HTTP 1.1 頭是“Expect: 100-continue” ,則發(fā)送“100 Continue”的響應(yīng)。
  • 在 頭被設(shè)置后,寫一個(gè) HttpResponse 返回給客戶端。注意,這不是 FullHttpResponse,這只是響應(yīng)的第一部分。另外,這里我們也不使用 writeAndFlush(), 這個(gè)是在留在最后完成。
  • 如果傳輸過(guò)程既沒(méi)有要求加密也沒(méi)有要求壓縮,那么把 index.html 的內(nèi)容存儲(chǔ)在一個(gè) DefaultFileRegion 里就可以達(dá)到最好的效率。這將利用零拷貝來(lái)執(zhí)行傳輸。出于這個(gè)原因,我們要檢查 ChannelPipeline 中是否有一個(gè) SslHandler。如果是的話,我們就使用 ChunkedNioFile。
  • 寫 LastHttpContent 來(lái)標(biāo)記響應(yīng)的結(jié)束,并終止它
  • 如果不要求 keepalive ,添加 ChannelFutureListener 到 ChannelFuture 對(duì)象的最后寫入,并關(guān)閉連接。注意,這里我們調(diào)用 writeAndFlush() 來(lái)刷新所有以前寫的信息。

這里展示了應(yīng)用程序的第一部分,用來(lái)處理純的 HTTP 請(qǐng)求和響應(yīng)。接下來(lái)我們將處理 WebSocket 的 frame(幀),用來(lái)發(fā)送聊天消息。

WebSocket frame

WebSockets 在“幀”里面來(lái)發(fā)送數(shù)據(jù),其中每一個(gè)都代表了一個(gè)消息的一部分。一個(gè)完整的消息可以利用了多個(gè)幀。

處理 WebSocket frame

WebSocket "Request for Comments" (RFC) 定義了六種不同的 frame; Netty 給他們每個(gè)都提供了一個(gè) POJO 實(shí)現(xiàn) ,見(jiàn)下表:

Table 11.1 WebSocketFrame types

名稱描述
BinaryWebSocketFramecontains binary data
TextWebSocketFramecontains text data
ContinuationWebSocketFramecontains text or binary data that belongs to a previous BinaryWebSocketFrame or TextWebSocketFrame
CloseWebSocketFramerepresents a CLOSE request and contains close status code and a phrase
PingWebSocketFramerequests the transmission of a PongWebSocketFrame
PongWebSocketFramesent as a response to a PingWebSocketFrame

我們的程序只需要使用下面4個(gè)幀類型:

  • CloseWebSocketFrame
  • PingWebSocketFrame
  • PongWebSocketFrame
  • TextWebSocketFrame

在這里我們只需要處理 TextWebSocketFrame,其他的會(huì)由 WebSocketServerProtocolHandler 自動(dòng)處理。

下面代碼展示了 ChannelInboundHandler 處理 TextWebSocketFrame,同時(shí)也將跟蹤在 ChannelGroup 中所有活動(dòng)的 WebSocket 連接

Listing 11.2 Handles Text frames

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //1
    private final ChannelGroup group;

    public TextWebSocketFrameHandler(ChannelGroup group) {
        this.group = group;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    //2
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {

            ctx.pipeline().remove(HttpRequestHandler.class);    //3

            group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));//4

            group.add(ctx.channel());    //5
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        group.writeAndFlush(msg.retain());    //6
    }
}

1.擴(kuò)展 SimpleChannelInboundHandler 用于處理 TextWebSocketFrame 信息

2.覆寫userEventTriggered() 方法來(lái)處理自定義事件

3.如果接收的事件表明握手成功,就從 ChannelPipeline 中刪除HttpRequestHandler ,因?yàn)榻酉聛?lái)不會(huì)接受 HTTP 消息了

4.寫一條消息給所有的已連接 WebSocket 客戶端,通知它們建立了一個(gè)新的 Channel 連接

5.添加新連接的 WebSocket Channel 到 ChannelGroup 中,這樣它就能收到所有的信息

6.保留收到的消息,并通過(guò) writeAndFlush() 傳遞給所有連接的客戶端。

上面顯示了 TextWebSocketFrameHandler 僅作了幾件事:

  • 當(dāng)WebSocket 與新客戶端已成功握手完成,通過(guò)寫入信息到 ChannelGroup 中的 Channel 來(lái)通知所有連接的客戶端,然后添加新 Channel 到 ChannelGroup
  • 如果接收到 TextWebSocketFrame,調(diào)用 retain() ,并將其寫、刷新到 ChannelGroup,使所有連接的 WebSocket Channel 都能接收到它。和以前一樣,retain() 是必需的,因?yàn)楫?dāng) channelRead0()返回時(shí),TextWebSocketFrame 的引用計(jì)數(shù)將遞減。由于所有操作都是異步的,writeAndFlush() 可能會(huì)在以后完成,我們不希望它訪問(wèn)無(wú)效的引用。

由于 Netty 在其內(nèi)部處理了其余大部分功能,唯一剩下的需要我們?nèi)プ龅木褪菫槊恳粋€(gè)新創(chuàng)建的 Channel 初始化 ChannelPipeline 。要完成這個(gè),我們需要一個(gè)ChannelInitializer

初始化 ChannelPipeline

接下來(lái),我們需要安裝我們上面實(shí)現(xiàn)的兩個(gè) ChannelHandler 到 ChannelPipeline。為此,我們需要繼承 ChannelInitializer 并且實(shí)現(xiàn) initChannel()??聪旅?ChatServerInitializer 的代碼實(shí)現(xiàn)

Listing 11.3 Init the ChannelPipeline

public class ChatServerInitializer extends ChannelInitializer<Channel> {    //1
    private final ChannelGroup group;

    public ChatServerInitializer(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {            //2
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler(group));
    }
}

1.擴(kuò)展 ChannelInitializer

2.添加 ChannelHandler 到 ChannelPipeline

initChannel() 方法用于設(shè)置所有新注冊(cè)的 Channel 的ChannelPipeline,安裝所有需要的 ChannelHandler??偨Y(jié)如下:

Table 11.2 ChannelHandlers for the WebSockets Chat server

ChannelHandler  職責(zé)
HttpServerCodecDecode bytes to HttpRequest, HttpContent, LastHttpContent.Encode HttpRequest, HttpContent, LastHttpContent to bytes.
ChunkedWriteHandlerWrite the contents of a file.
HttpObjectAggregatorThis ChannelHandler aggregates an HttpMessage and its following HttpContents into a single FullHttpRequest or FullHttpResponse (depending on whether it is being used to handle requests or responses).With this installed the next ChannelHandler in the pipeline will receive only full HTTP requests.
HttpRequestHandlerHandle FullHttpRequests (those not sent to "/ws" URI).
WebSocketServerProtocolHandlerAs required by the WebSockets specification, handle the WebSocket Upgrade handshake, PingWebSocketFrames,PongWebSocketFrames and CloseWebSocketFrames.
TextWebSocketFrameHandlerHandles TextWebSocketFrames and handshake completion events

該 WebSocketServerProtocolHandler 處理所有規(guī)定的 WebSocket 幀類型和升級(jí)握手本身。如果握手成功所需的 ChannelHandler 被添加到管道,而那些不再需要的則被去除。管道升級(jí)之前的狀態(tài)如下圖。這代表了 ChannelPipeline 剛剛經(jīng)過(guò) ChatServerInitializer 初始化。

Figure 11.3 ChannelPipeline before WebSockets Upgrade

Figure%2011

握手升級(jí)成功后 WebSocketServerProtocolHandler 替換HttpRequestDecoder 為 WebSocketFrameDecoder,HttpResponseEncoder 為WebSocketFrameEncoder。 為了最大化性能,WebSocket 連接不需要的 ChannelHandler 將會(huì)被移除。其中就包括了 HttpObjectAggregator 和 HttpRequestHandler

下圖,展示了 ChannelPipeline 經(jīng)過(guò)這個(gè)操作完成后的情況。注意 Netty 目前支持四個(gè)版本 WebSocket 協(xié)議,每個(gè)通過(guò)其自身的方式實(shí)現(xiàn)類。選擇正確的版本W(wǎng)ebSocketFrameDecoder 和 WebSocketFrameEncoder 是自動(dòng)進(jìn)行的,這取決于在客戶端(在這里指瀏覽器)的支持(在這個(gè)例子中,我們假設(shè)使用版本是 13 的 WebSocket 協(xié)議,從而圖中顯示的是 WebSocketFrameDecoder13 和 WebSocketFrameEncoder13)。

Figure 11.4 ChannelPipeline after WebSockets Upgrade

Figure%2011

引導(dǎo)

最后一步是 引導(dǎo)服務(wù)器,設(shè)置 ChannelInitializer

public class ChatServer {

    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);//1
    private final EventLoopGroup group = new NioEventLoopGroup();
    private Channel channel;

    public ChannelFuture start(InetSocketAddress address) {
        ServerBootstrap bootstrap  = new ServerBootstrap(); //2
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(createInitializer(channelGroup));
        ChannelFuture future = bootstrap.bind(address);
        future.syncUninterruptibly();
        channel = future.channel();
        return future;
    }

    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {        //3
       return new ChatServerInitializer(group);
    }

    public void destroy() {        //4
        if (channel != null) {
            channel.close();
        }
        channelGroup.close();
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception{
        if (args.length != 1) {
            System.err.println("Please give port as argument");
            System.exit(1);
        }
        int port = Integer.parseInt(args[0]);

        final ChatServer endpoint = new ChatServer();
        ChannelFuture future = endpoint.start(new InetSocketAddress(port));

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

1.創(chuàng)建 DefaultChannelGroup 用來(lái) 保存所有連接的的 WebSocket channel

2.引導(dǎo) 服務(wù)器

3.創(chuàng)建 ChannelInitializer

4.處理服務(wù)器關(guān)閉,包括釋放所有資源


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)