博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty http2设计深入讲解
阅读量:7114 次
发布时间:2019-06-28

本文共 35648 字,大约阅读时间需要 118 分钟。

hot3.png

前言

近期接到一个任务,把netty http2引入到项目里面。听说过http2,还真没有自己玩过。对看过这篇博客的大家说句: 抱歉。本来想很好的说说http2的。写着写着,发现要写的东西太多了,有一些内容根本就不好写。但是netty http2.0的主要内容,本章博客已经全面的讲述了,需要读者有使用经历,阅读点源码。

了解下http2.0

时代在发展,使用http协议的人越来越多。http1.1的弊端慢慢都被显现出来。

  1. 浏览器方式一些网站频繁发送请求,造成一家独大其他网站无法使用。或者所有网站都频发发送请求造成用户体验差等等问题。限制每个url同时并发数量
  2. 提高请求的响应速度。只有一个连接,只有一次tcp三次握手或者tls的7次握手。一个http1.1请求所用的时间,http2.0可以处理三到四个请求。
  3. 提高服务端与客服端的性能(尤其是大型互联网公司流量很大,如果使用http2.0,可以减少一半的http服务器)

协商

原因 一

http客服端不知道http服务端是否支持http2.0。反过来 http服务端也不知道http客服端是否支持http2.0。为什么出现这种现象,让所有的http服务端与http客服端直接从http1.1过度到http2.0是不可能的事情。甚至在大点的公司内部直接从http1.1直接过度到http2.0也是一件不现实的事情,那么出现一件麻烦的事情有http1客服端,也有http2客服端。有http2服务端,也有http1服务端。这种两个维度,四种情况的共存现象。

原因二

有人会问,只支持http1.1不好吗? 已经支持http2,.0的client肯定不会放弃http2.0优秀的性能与特性,能使用使用http2.0,就要使用。

解决

那么http2.0的设计者为了解决这种麻烦的东西。推出了解决方案:协商。

https 1.1 与https.20的协商

https1.1与https2.0的协商是基于ALPN机制。ALPNS是基于TLS实现。在建立TLS链接的时候,客服端会 在TLS协议里面加入自己支持的协议,服务端在客服端支持的协议里面选中一个自己最合适的。然后把选中的协议通知给客服端。如果客户端没有发送支持的http协议,服务端会默认使用http1.1

http1.1与http2.0的协商

http没有TLS协议,无法基于TLS传递协议。协议制定者使用了Upgrade机制。客户端发送一个空请求,请求里面包含该Upgrade,Connection,HTTP2-Settings请求头。服务端从Upgrade取出支持的协议然后响应请求,在响应的请求头里面包含Upgrade,Connection。这样协商就成功了。下面是http1.1与http2.0的协商流程

请求头示例:

GET / HTTP/1.1Host: example.comConnection: Upgrade, HTTP2-SettingsUpgrade: h2cHTTP2-Settings: 

如果服务端不支持 HTTP/2,它会忽略 Upgrade 字段,直接返回 HTTP/1.1 响应,例如:

HTTP/1.1 200 OKContent-Length: 243Content-Type: text/html

如果服务端支持 HTTP/2,那就可以回应 101 状态码及对应头部:

HTTP/1.1 100 Switching ProtocolsConnection: UpgradeUpgrade: h2c
小结
  1. https 1.1 与https.2.0的协商 与 http1.1与http2.0的协商 是两套设计方案。https 1.1 与https.2.0 TLS帮你做了。http1.1与http2.0的协商需要自己做。
  2. 现在的趋势,客服端与服务端都需要同时支持http1.1,http2.0,https1.1,https2.0。真是一件很麻烦的事情

netty http2

netty 的http2模块设计的非常好,实在是很绕,就一个http2的包。实在有点乱。按照功能划分的话http2应该有四个模块。

  1. http2核心模块
  2. http1.1与http2协商模块
  3. http2帧处理模块
  4. http2协议转http1协议模块

http2核心模块

作为核心的模块主要是负责http2协议的解码与编码,帧的解析与处理,http2请求头子模块,streamId的管理,http2链接管理

Http2ConnectionHandler

Http2ConnectionHandler 是 netty核心设计hadler的实现,也是http2模块的出发点。

负责http2模块的组合
protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,                                     Http2Settings initialSettings) {        this.initialSettings = checkNotNull(initialSettings, "initialSettings");        this.decoder = checkNotNull(decoder, "decoder");        this.encoder = checkNotNull(encoder, "encoder");        if (encoder.connection() != decoder.connection()) {            throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");        }    }
负责http2协议下handler生命周期处理
@Override public void flush(ChannelHandlerContext ctx) {	 try {		 // Trigger pending writes in the remote flow controller.		 encoder.flowController().writePendingBytes();		 ctx.flush();	 } catch (Http2Exception e) {		 onError(ctx, true, e);	 } catch (Throwable cause) {	 	onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));	 } }public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        // Initialize the encoder, decoder, flow controllers, and internal state.        encoder.lifecycleManager(this);        decoder.lifecycleManager(this);        encoder.flowController().channelHandlerContext(ctx);        decoder.flowController().channelHandlerContext(ctx);        byteDecoder = new PrefaceDecoder(ctx);    }    @Override    protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {        if (byteDecoder != null) {            byteDecoder.handlerRemoved(ctx);            byteDecoder = null;        }    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        if (byteDecoder == null) {            byteDecoder = new PrefaceDecoder(ctx);//当链接创建的时候创建PrefaceDecoder对象        }        byteDecoder.channelActive(ctx);        super.channelActive(ctx);    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        // Call super class first, as this may result in decode being called.        super.channelInactive(ctx);        if (byteDecoder != null) {            byteDecoder.channelInactive(ctx);            byteDecoder = null;        }    }    @Override    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {        // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering        // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.        try {            if (ctx.channel().isWritable()) {                flush(ctx);            }            encoder.flowController().channelWritabilityChanged();        } finally {            super.channelWritabilityChanged(ctx);        }    }    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {        byteDecoder.decode(ctx, in, out);    }
负责http2 Lifecycle Manager(http2生命周期的管理)
public interface Http2LifecycleManager {    void closeStreamLocal(Http2Stream stream, ChannelFuture future);    void closeStreamRemote(Http2Stream stream, ChannelFuture future);    void closeStream(Http2Stream stream, ChannelFuture future);    ChannelFuture resetStream(ChannelHandlerContext ctx, int streamId, long errorCode,            ChannelPromise promise);    ChannelFuture goAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode,            ByteBuf debugData, ChannelPromise promise);

closeStreamLocal

关闭本地stream。local stream是指客服端发送headers帧与data帧到服务端,客服端会创建一个local stream。同样服务端发送headers帧与data帧给客服端,服务端也会创建一个 local stream

closeStreamRemote

关闭远程stream。 remote stream是值当客服端接受服务端发的headers帧与data帧 ,客服端会创建一个remote stream。同样服务端接受到客服端发送的headers帧与data帧,服务端也会创建一个 remote stream

closeStream

当接受到 resetStream 帧的时候就用调用改方法。发送方发送一个错误的流,想后悔的时候,就发送resetStream帧这个后悔药

resetStream

对resetStream帧进行处理

负责校验协议行为
public void onHttpClientUpgrade() throws Http2Exception {        if (connection().isServer()) {            throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");        }        if (!prefaceSent()) {            // If the preface was not sent yet it most likely means the handler was not added to the pipeline before            // calling this method.            throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");        }        if (decoder.prefaceReceived()) {            throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");        }        // Create a local stream used for the HTTP cleartext upgrade.        connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);    }    /**     * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.     * @param settings the settings for the remote endpoint.     */    public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {        if (!connection().isServer()) {            throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");        }        if (!prefaceSent()) {            // If the preface was not sent yet it most likely means the handler was not added to the pipeline before            // calling this method.            throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");        }        if (decoder.prefaceReceived()) {            throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");        }        // Apply the settings but no ACK is necessary.        encoder.remoteSettings(settings);        // Create a stream in the half-closed state.        connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);    }
管理http2协议中的连接前言

连接前言????? 很术语化。其实就是协商成功之后,客户端可以开始发送各种 HTTP/2 帧,但第一个帧必须是 Magic 帧(内容固定为 PRI * HTTP/2.0rnrnSMrnrn),做为协议升级的最终确认。连接前言模块由父类BaseDecoder与子类PrefaceDecoder,FrameDecoder组成。连接前言由PrefaceDecoder管理。

客服端发送前言

@Override        public void channelActive(ChannelHandlerContext ctx) throws Exception {            // The channel just became active - send the connection preface to the remote endpoint.            sendPreface(ctx);        }		private void sendPreface(ChannelHandlerContext ctx) throws Exception {            if (prefaceSent || !ctx.channel().isActive()) {                return;            }            prefaceSent = true;            final boolean isClient = !connection().isServer();            if (isClient) {                // Clients must send the preface string as the first bytes on the connection.                ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            }            // Both client and server must send their initial settings.            encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(                    ChannelFutureListener.CLOSE_ON_FAILURE);            if (isClient) {                // If this handler is extended by the user and we directly fire the userEvent from this context then                // the user will not see the event. We should fire the event starting with this handler so this class                // (and extending classes) have a chance to process the event.                userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);            }        }

服务端校验连接前言

@Override        public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {            try {                if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {                    // After the preface is read, it is time to hand over control to the post initialized decoder.                    byteDecoder = new FrameDecoder();                    byteDecoder.decode(ctx, in, out);                }            } catch (Throwable e) {                onError(ctx, false, e);            }        }		        private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {            if (clientPrefaceString == null) {                return true;            }            int prefaceRemaining = clientPrefaceString.readableBytes();            int bytesRead = min(in.readableBytes(), prefaceRemaining);            // If the input so far doesn't match the preface, break the connection.            if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),                                                      clientPrefaceString, clientPrefaceString.readerIndex(),                                                      bytesRead)) {                int maxSearch = 1024; // picked because 512 is too little, and 2048 too much                int http1Index =                    ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));                if (http1Index != -1) {                    String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);                    throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);                }                String receivedBytes = hexDump(in, in.readerIndex(),                                               min(in.readableBytes(), clientPrefaceString.readableBytes()));                throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +                                                      "Hex dump for received bytes: %s", receivedBytes);            }            in.skipBytes(bytesRead);            clientPrefaceString.skipBytes(bytesRead);            if (!clientPrefaceString.isReadable()) {                // Entire preface has been read.                clientPrefaceString.release();                clientPrefaceString = null;                return true;            }            return false;        }        private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {            if (in.readableBytes() < 5) {                // Need more data before we can see the frame type for the first frame.                return false;            }            short frameType = in.getUnsignedByte(in.readerIndex() + 3);            short flags = in.getUnsignedByte(in.readerIndex() + 4);            if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {                throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +                                                      "Hex dump for first 5 bytes: %s",                                      hexDump(in, in.readerIndex(), 5));            }            return true;        }

麻烦的设计,只是为解耦而已。代码层次明显。这就是架构......

http2链接管理 , streamId的管理,流量管理,权重管理

http2的编码与解码

解码实现类为DefaultHttp2ConnectionEncoder,看下很容易理解的。DefaultHttp2ConnectionEncoder内部有两个成员变量lifecycleManager(http2生命周期)与Http2FrameWriter(协议生成实现类)。

goAway与resetStream帧的处理方式,是由lifecycleManager(http2生命周期)

@Override    public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,            ChannelPromise promise) {        return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);    }	 @Override    public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,            ChannelPromise promise) {        // Delegate to the lifecycle manager for proper updating of connection state.        return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);    }

重点:FlowControlledHeaders与FlowControlledData主要是用于流量控制与权重使用,不属于编码范畴

解码流程为 Handler(Http2ConnectionHandle) --> Decoder(PrefaceDecoder) --> Decoder(FrameDecoder) --> decoder(Http2ConnectionDecoder) --> frameReader(Http2FrameReader) --> FrameListener(Http2FrameListener) 。按照此调用链慢慢看容易看懂了。

  1. Handler(Http2ConnectionHandle) 负责接受到buf数据
  2. PrefaceDecoder负责校验连接前言
  3. FrameDecoder 负责执行Http2ConnectionDecoder。Http2ConnectionDecoder实现类为 DefaultHttp2ConnectionDecoder
  4. frameReader 负责解析协议。 Http2FrameReader 实现类为DefaultHttp2FrameReader
  5. FrameListener负责对帧进行处理。FrameListener的实现类有很多,主要分FrameReadListener与其他FrameListener。FrameReadListener负责帧进行http2特性维护,维护成功调用其他FrameListener。由其他FrameListener进行帧处理。

帧的解析与处理

frameReader 负责解析协议。 Http2FrameReader 实现类为DefaultHttp2FrameReader

@Overridepublic void readFrame(ChannelHandlerContext ctx, ByteBuf input, Http2FrameListener listener)        throws Http2Exception {    if (readError) {        input.skipBytes(input.readableBytes());        return;    }    try {        do {            if (readingHeaders) {                processHeaderState(input);// 解析帧                if (readingHeaders) {                    // Wait until the entire header has arrived.                    return;                }            }            processPayloadState(ctx, input, listener);// 处理帧            if (!readingHeaders) {                // Wait until the entire payload has arrived.                return;            }        } while (input.isReadable());    } catch (Http2Exception e) {        readError = !Http2Exception.isStreamError(e);        throw e;    } catch (RuntimeException e) {        readError = true;        throw e;    } catch (Throwable cause) {        readError = true;        PlatformDependent.throwException(cause);    }}private void processHeaderState(ByteBuf in) throws Http2Exception {    if (in.readableBytes() < FRAME_HEADER_LENGTH) {        // Wait until the entire frame header has been read.        return;    }    // Read the header and prepare the unmarshaller to read the frame.    payloadLength = in.readUnsignedMedium();// 帧长度    if (payloadLength > maxFrameSize) {        throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength,                              maxFrameSize);    }    frameType = in.readByte();// 帧类型    flags = new Http2Flags(in.readUnsignedByte());    streamId = readUnsignedInt(in);// 得到streamId 帧头的9个字节处理完毕    // We have consumed the data, next time we read we will be expecting to read the frame payload.    readingHeaders = false;     switch (frameType) {//进行校验        case DATA:            verifyDataFrame();            break;        case HEADERS:            verifyHeadersFrame();            break;        case PRIORITY:            verifyPriorityFrame();            break;        case RST_STREAM:            verifyRstStreamFrame();            break;        case SETTINGS:            verifySettingsFrame();            break;        case PUSH_PROMISE:            verifyPushPromiseFrame();            break;        case PING:            verifyPingFrame();            break;        case GO_AWAY:            verifyGoAwayFrame();            break;        case WINDOW_UPDATE:            verifyWindowUpdateFrame();            break;        case CONTINUATION:            verifyContinuationFrame();            break;        default:            // Unknown frame type, could be an extension.            verifyUnknownFrame();            break;    }}private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener)                throws Http2Exception {    if (in.readableBytes() < payloadLength) {        // Wait until the entire payload has been read.        return;    }    // Get a view of the buffer for the size of the payload.    ByteBuf payload = in.readSlice(payloadLength);    // We have consumed the data, next time we read we will be expecting to read a frame header.    readingHeaders = true;    // Read the payload and fire the frame event to the listener.    switch (frameType) {        case DATA:            readDataFrame(ctx, payload, listener);            break;        case HEADERS:            readHeadersFrame(ctx, payload, listener);            break;        case PRIORITY:            readPriorityFrame(ctx, payload, listener);            break;        case RST_STREAM:            readRstStreamFrame(ctx, payload, listener);            break;        case SETTINGS:            readSettingsFrame(ctx, payload, listener);            break;        case PUSH_PROMISE:            readPushPromiseFrame(ctx, payload, listener);            break;        case PING:            readPingFrame(ctx, payload.readLong(), listener);            break;        case GO_AWAY:            readGoAwayFrame(ctx, payload, listener);            break;        case WINDOW_UPDATE:            readWindowUpdateFrame(ctx, payload, listener);            break;        case CONTINUATION:            readContinuationFrame(payload, listener);            break;        default:            readUnknownFrame(ctx, payload, listener);            break;    }}
http2请求头子模块

在http2的设计中请求头是一个重点。http2要求请求使用HPACK算法进行压缩。Http2Headers,Http2HeadersDecoder,Http2HeadersEncoder,HPACKDecoder,HPACKEncoder组成。

http1.1与http2协商模块

http2.0只需要一个Http2ConnectionHandler的实现类。目前netty自带的实现类有Http2FrameCodec,HttpToHttp2ConnectionHandler。

http1.1需要HttpServerCodec与HttpObjectAggregator对象。

https 需要一个SSLHandler

private void configureSsl(SocketChannel ch) {        ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), new Http2OrHttpHandler());  }
https 1.1 与 https 2.0的协商

客户端在TSL 4次握手的时候已经把客户端支持的http协议传给服务端了,当4次握手成功,SSLHandler会产生一个事件传递下去。ApplicationProtocolNegotiationHandler会处理这个事件,获得协议并且调用configurePipelineI()方法。

public class Http2OrHttpHandler extends ApplicationProtocolNegotiationHandler {    private static final int MAX_CONTENT_LENGTH = 1024 * 100;    protected Http2OrHttpHandler() {        super(ApplicationProtocolNames.HTTP_1_1);    }    @Override    protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {        if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {            ctx.pipeline().addLast(new HttpToHttp2ConnectionHandler(),			           new HelloWorldHttp1Handler("ALPN Negotiation"));            return;        }        if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {            ctx.pipeline().addLast(new HttpServerCodec(),                                   new HttpObjectAggregator(MAX_CONTENT_LENGTH),                                   new HelloWorldHttp1Handler("ALPN Negotiation"));            return;        }        throw new IllegalStateException("unknown protocol: " + protocol);    }}
http 1.1 与 http 2.0的协商

https的协商通过TLS的4次握手解决了,那http如何进行协商了。http使用了Upgrade机制,机制的流程上面已经说明了。大家是否发现一点。Upgrade机制是基于http1.1实现了,这是重点也是最麻烦的地方。当协商成功需要删除http1.1与upgrade的handler,加入http2.0的hanler。

  1. http客户端与服务端都要支持http1.1协议
  2. http客户端与服务端都要支持Upgrade机制
  3. 进行协商
  4. http客户端与服务端都要删除http1.1协议与Upgrade机制的支持
  5. http客户端与服务端都要加入http2.0协议的支持
服务端如何支持Upgrade机制
private static final UpgradeCodecFactory upgradeCodecFactory = new UpgradeCodecFactory() {        @Override        public UpgradeCodec newUpgradeCodec(CharSequence protocol) {            if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {                return new Http2ServerUpgradeCodec(new HttpToHttp2ConnectionHandler());            } else {                return null;            }        }};HttpServerCodec sourceCodec = new HttpServerCodec();HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory);cp.addLast(ourceCodec, upgradeHandler);
HttpServerUpgradeHandler Upgrade机制处理handler

HttpServerUpgradeHandler做两件事,第一个识别Upgrade请求头开启server端的Upgrade机制,第二件是删除http1.1Handler与HttpServerUpgradeHandler,加入http2.0Handler。

private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) {    // c获得请求头upgrade的数据    final List
requestedProtocols = splitHeader(request.headers().get(HttpHeaderNames.UPGRADE)); final int numRequestedProtocols = requestedProtocols.size(); UpgradeCodec upgradeCodec = null; CharSequence upgradeProtocol = null; for (int i = 0; i < numRequestedProtocols; i ++) { final CharSequence p = requestedProtocols.get(i);//获得协议 final UpgradeCodec c = upgradeCodecFactory.newUpgradeCodec(p);//识别是否支持改协议 if (c != null) { upgradeProtocol = p; upgradeCodec = c; break; } } // 没有upgradeCodec,不支持。client发过的协议 if (upgradeCodec == null) { return false; } // uconnection请求头,表示upgrade机制不完整 CharSequence connectionHeader = request.headers().get(HttpHeaderNames.CONNECTION); if (connectionHeader == null) { return false; } Collection
requiredHeaders = upgradeCodec.requiredUpgradeHeaders(); List
values = splitHeader(connectionHeader); if (!containsContentEqualsIgnoreCase(values, HttpHeaderNames.UPGRADE) || !containsAllContentEqualsIgnoreCase(values, requiredHeaders)) { return false; } for (CharSequence requiredHeader : requiredHeaders) { if (!request.headers().contains(requiredHeader)) { return false; } } final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeProtocol);//创建响应,并返回协商后的协议。 // prepareUpgradeResponse解析Http2-setting if (!upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse.headers())) { return false; } // 创建事件,谁需要处理,谁去处理。 final UpgradeEvent event = new UpgradeEvent(upgradeProtocol, request); try { // 返回数据。这里是一个大坑。 final ChannelFuture writeComplete = ctx.writeAndFlush(upgradeResponse); // 删除http1.1 handler sourceCodec.upgradeFrom(ctx); // 添加http2.0 handler,这里会识别上面的大坑 upgradeCodec.upgradeTo(ctx, request); // 删除自己,即HttpServerUpgradeHandler。那么netty的handler应该只剩下http2.0handler与业务处理handler了 ctx.pipeline().remove(HttpServerUpgradeHandler.this); // 传播 事件 ctx.fireUserEventTriggered(event.retain()); writeComplete.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } finally { event.release(); } return true;}
Http2ServerUpgradeCode Upgrade协议解析器

Http2ServerUpgradeCode有一件非常重要的事情做。就是解析请求头HTTP2-Settings的值最终交给Http2ConnectionEncoder处理。这个值不简单,该值包含了http2 客户端大量的信息。比如权重,流量信息等等。请求头HTTP2-Settings的值使用了HPACK算法进行压缩,然后base64编码。使用base64解码之后,就相当于http2.0的SETTINGS帧了。

private Http2Settings decodeSettingsHeader(ChannelHandlerContext ctx, CharSequence settingsHeader)            throws Http2Exception {        ByteBuf header = ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(settingsHeader), CharsetUtil.UTF_8);        try {            ByteBuf payload = Base64.decode(header, URL_SAFE);// base64解码            ByteBuf frame = createSettingsFrame(ctx, payload);            return decodeSettings(ctx, frame);        } finally {            header.release();        }  }// 把解码之后的数据拼接成 http2.0的SETTINGS帧private static ByteBuf createSettingsFrame(ChannelHandlerContext ctx, ByteBuf payload) {        ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payload.readableBytes());        writeFrameHeader(frame, payload.readableBytes(), SETTINGS, new Http2Flags(), 0);        frame.writeBytes(payload);        payload.release();        return frame;    }// 下面详解了Http2Settings里面数据的作用。public void remoteSettings(Http2Settings settings) throws Http2Exception {    Boolean pushEnabled = settings.pushEnabled();    Http2FrameWriter.Configuration config = configuration();    Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();    Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();    if (pushEnabled != null) {        if (!connection.isServer() && pushEnabled) {            throw connectionError(PROTOCOL_ERROR,                "Client received a value of ENABLE_PUSH specified to other than 0");        }        connection.remote().allowPushTo(pushEnabled);    }    Long maxConcurrentStreams = settings.maxConcurrentStreams();    if (maxConcurrentStreams != null) {        connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));    }    Long headerTableSize = settings.headerTableSize();    if (headerTableSize != null) {        outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE));    }    Long maxHeaderListSize = settings.maxHeaderListSize();    if (maxHeaderListSize != null) {        outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);    }    Integer maxFrameSize = settings.maxFrameSize();    if (maxFrameSize != null) {        outboundFrameSizePolicy.maxFrameSize(maxFrameSize);    }    Integer initialWindowSize = settings.initialWindowSize();    if (initialWindowSize != null) {        flowController().initialWindowSize(initialWindowSize);    }}
客户端如何支持并且触发Upgrade机制

一个简单的客户端代码

public void initChannel(SocketChannel ch) throws Exception {    ChannelPipeline pipeline = ch.pipeline();    HttpClientCodec sourceCodec = new HttpClientCodec();    Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(Http2Handler.newHandler(false));    HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 65536);    pipeline.addLast(        sourceCodec,        upgradeHandler,        new UpgradeRequestHandler(),}private final class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        DefaultFullHttpRequest upgradeRequest =new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");        ctx.writeAndFlush(upgradeRequest);        ctx.fireChannelActive();        ctx.pipeline().remove(this);    }}

客户端发送一个请求,请求里面加入三个请求头。问题1.什么时候发送请求。2. 谁来发送请求。3. 谁来在请求里面加入请求头

  1. 什么时候发送请求

在连接建立之后立马发送。连接建立事件为:channelActive。

  1. 谁来发送请求

UpgradeRequestHandler来发。UpgradeRequestHandler是使用者自己实现的,不是netty http2。原因很简单,Upgrade的请求是被拦截还是不被拦截,被拦截业务handler是无法得请求的。由使用者来决定Upgrade 请求的行为。netty http2 默认是是被拦截的。

  1. 谁来在请求里面加入请求头

HttpClientUpgradeHandler 负责拦截请求并添加请求头

private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) {    // 添加UPGRADE请求头    request.headers().set(HttpHeaderNames.UPGRADE, upgradeCodec.protocol());    // 添加CONNECTIO请求头    Set
connectionParts = new LinkedHashSet
(2); connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));// 添加Http2-setting请求头 // Set the CONNECTION header from the set of all protocol-specific headers that were added. StringBuilder builder = new StringBuilder(); for (CharSequence part : connectionParts) { builder.append(part); builder.append(','); } builder.append(HttpHeaderValues.UPGRADE); request.headers().add(HttpHeaderNames.CONNECTION, builder.toString());}

HttpClientUpgradeHandler 只会对第一次的请求,进行Upgrade操作,之后全部一场啊

http2协议处理模块

该模块针对http2 九种帧进行的一次封装,这样可以让下游Handler可以直接处理帧,而不需要自己对原始数据进行处理。该模块是最简单,最容易理解。请看Http2FrameCodec类就行了。Http2FrameCodec继承Http2ConnectionHandler。

以下是每种帧对应一个封装好的实体类。

帧类型 对应实体类
data DefaultHttp2DataFrame
headers DefaultHttp2HeadersFrame
windowUpdate DefaultHttp2WindowUpdateFrame
reset DefaultHttp2ResetFrame
pring DefaultHttp2PingFrame
settings DefaultHttp2SettingsFrame
unknown DefaultHttp2UnknownFrame
goAwary DefaultHttp2GoAwayFrame
push_promise 没有

Http2FrameCodec 内部Http2FrameListener的实现

private final class FrameListener implements Http2FrameListener {        @Override        public void onUnknownFrame(                ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {            onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)                    .stream(requireStream(streamId)).retain());        }        @Override        public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {            onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));        }        @Override        public void onPingRead(ChannelHandlerContext ctx, long data) {            onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));        }        @Override        public void onPingAckRead(ChannelHandlerContext ctx, long data) {            onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));        }        @Override        public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {            onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));        }        @Override        public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {            if (streamId == 0) {                // Ignore connection window updates.                return;            }            onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));        }        @Override        public void onHeadersRead(ChannelHandlerContext ctx, int streamId,                                  Http2Headers headers, int streamDependency, short weight, boolean                                          exclusive, int padding, boolean endStream) {            onHeadersRead(ctx, streamId, headers, padding, endStream);        }        @Override        public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,                                  int padding, boolean endOfStream) {            onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)                                        .stream(requireStream(streamId)));        }        @Override        public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,                              boolean endOfStream) {            onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)                                        .stream(requireStream(streamId)).retain());            // We return the bytes in consumeBytes() once the stream channel consumed the bytes.            return 0;        }        @Override        public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {            onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain());        }        @Override        public void onPriorityRead(                ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) {            // TODO: Maybe handle me        }        @Override        public void onSettingsAckRead(ChannelHandlerContext ctx) {            // TODO: Maybe handle me        }        @Override        public void onPushPromiseRead(                ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding)  {            // TODO: Maybe handle me        }        private Http2FrameStream requireStream(int streamId) {            Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);            if (stream == null) {                throw new IllegalStateException("Stream object required for identifier: " + streamId);            }            return stream;        }    }

http2协议转http1协议模块

该模块解决两个痛处。一个是http2帧,处理很麻烦,很麻烦,不管如何就算使用【 http2协议处理模块】使用者都需要进行很多额外的处理,http1.1就非常简单,好用。第二是向http1.1兼容,目前有大量的http1.1服务端与客户端。直接使用http2的确不太适合,需要修改大量的代码。

  1. InboundHttp2ToHttpAdapter(实现转换的类)以及InboundHttp2ToHttpAdapterBuilder
  2. HttpToHttp2ConnectionHandler(发http1.1的对象解析成http2协议发送出去)以及HttpToHttp2ConnectionHandlerBuilder

下面代码简单描述了http1.1的对象如何解析成http2协议并且发送出去

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {        if (!(msg instanceof HttpMessage || msg instanceof HttpContent)) {            ctx.write(msg, promise);            return;        }        boolean release = true;        SimpleChannelPromiseAggregator promiseAggregator =                new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor());        try {            Http2ConnectionEncoder encoder = encoder();            boolean endStream = false;            if (msg instanceof HttpMessage) {                final HttpMessage httpMsg = (HttpMessage) msg;                // Provide the user the opportunity to specify the streamId                currentStreamId = getStreamId(httpMsg.headers());                // Convert and write the headers.                Http2Headers http2Headers = HttpConversionUtil.toHttp2Headers(httpMsg, validateHeaders);                endStream = msg instanceof FullHttpMessage && !((FullHttpMessage) msg).content().isReadable();                writeHeaders(ctx, encoder, currentStreamId, httpMsg.headers(), http2Headers,                        endStream, promiseAggregator);            }            if (!endStream && msg instanceof HttpContent) {                boolean isLastContent = false;                HttpHeaders trailers = EmptyHttpHeaders.INSTANCE;                Http2Headers http2Trailers = EmptyHttp2Headers.INSTANCE;                if (msg instanceof LastHttpContent) {                    isLastContent = true;                    // Convert any trailing headers.                    final LastHttpContent lastContent = (LastHttpContent) msg;                    trailers = lastContent.trailingHeaders();                    http2Trailers = HttpConversionUtil.toHttp2Headers(trailers, validateHeaders);                }                // Write the data                final ByteBuf content = ((HttpContent) msg).content();                endStream = isLastContent && trailers.isEmpty();                release = false;                encoder.writeData(ctx, currentStreamId, content, 0, endStream, promiseAggregator.newPromise());                if (!trailers.isEmpty()) {                    // Write trailing headers.                    writeHeaders(ctx, encoder, currentStreamId, trailers, http2Trailers, true, promiseAggregator);                }            }        } catch (Throwable t) {            onError(ctx, true, t);            promiseAggregator.setFailure(t);        } finally {            if (release) {                ReferenceCountUtil.release(msg);            }            promiseAggregator.doneAllocatingPromises();        }    }
  1. Stream转http1.1对象。 请看Http2StreamFrameToHttpObjectCodec。感觉这个类比较鸡肋。

参考资料

HTTP/2 协议规范(很全面文档)

关于HTTP2和HTTPS,这些你必须要知道

谈谈 HTTP/2 的协议协商机制

HTTP2 详解

转载于:https://my.oschina.net/u/1261452/blog/2997252

你可能感兴趣的文章
Android Eclipse 导入 AS Gradle AAR 库手册
查看>>
推荐算法
查看>>
分析,理解,优化Laravel
查看>>
说说安全狗服云的优势与不足
查看>>
pip 安装flask
查看>>
7.springboot --dubbo 了解
查看>>
HTML 教程
查看>>
一道受用终生的面试题,谁能给出最好的答案
查看>>
java 报表的计算公式
查看>>
EOF是什么?
查看>>
图与网络优化---最小费用最大流问题
查看>>
JAVA 内存泄露详解(原因、例子及解决)
查看>>
关于驰骋工作流引擎ccbpm 在工业自动化环境下的 应用演示实例
查看>>
THINKPHP_URL简化设置
查看>>
关于Studio中引用mipmap目录下图片的问题
查看>>
服务器断电后自动重启
查看>>
Ubuntu下添加Eclipse快捷方式
查看>>
页面操作 页面放大/缩小 怎么确定恢复正常大小
查看>>
以两军问题为背景来演绎Basic Paxos
查看>>
统计中位值的一般概念
查看>>