前言
近期接到一个任务,把netty http2引入到项目里面。听说过http2,还真没有自己玩过。对看过这篇博客的大家说句: 抱歉。本来想很好的说说http2的。写着写着,发现要写的东西太多了,有一些内容根本就不好写。但是netty http2.0的主要内容,本章博客已经全面的讲述了,需要读者有使用经历,阅读点源码。
了解下http2.0
时代在发展,使用http协议的人越来越多。http1.1的弊端慢慢都被显现出来。
- 浏览器方式一些网站频繁发送请求,造成一家独大其他网站无法使用。或者所有网站都频发发送请求造成用户体验差等等问题。限制每个url同时并发数量
- 提高请求的响应速度。只有一个连接,只有一次tcp三次握手或者tls的7次握手。一个http1.1请求所用的时间,http2.0可以处理三到四个请求。
- 提高服务端与客服端的性能(尤其是大型互联网公司流量很大,如果使用http2.0,可以减少一半的http服务器)
协商
原因 一
http客服端不知道http服务端是否支持http2.0。反过来 http服务端也不知道http客服端是否支持http2.0。为什么出现这种现象,让所有的http服务端与http客服端直接从http1.1过度到http2.0是不可能的事情。甚至在大点的公司内部直接从http1.1直接过度到http2.0也是一件不现实的事情,那么出现一件麻烦的事情有http1客服端,也有http2客服端。有http2服务端,也有http1服务端。这种两个维度,四种情况的共存现象。
原因二
有人会问,只支持http1.1不好吗? 已经支持http2,.0的client肯定不会放弃http2.0优秀的性能与特性,能使用使用http2.0,就要使用。
解决
那么http2.0的设计者为了解决这种麻烦的东西。推出了解决方案:协商。
https 1.1 与https.20的协商
https1.1与https2.0的协商是基于ALPN机制。ALPNS是基于TLS实现。在建立TLS链接的时候,客服端会 在TLS协议里面加入自己支持的协议,服务端在客服端支持的协议里面选中一个自己最合适的。然后把选中的协议通知给客服端。如果客户端没有发送支持的http协议,服务端会默认使用http1.1
http1.1与http2.0的协商
http没有TLS协议,无法基于TLS传递协议。协议制定者使用了Upgrade机制。客户端发送一个空请求,请求里面包含该Upgrade,Connection,HTTP2-Settings请求头。服务端从Upgrade取出支持的协议然后响应请求,在响应的请求头里面包含Upgrade,Connection。这样协商就成功了。下面是http1.1与http2.0的协商流程
请求头示例:
GET / HTTP/1.1Host: example.comConnection: Upgrade, HTTP2-SettingsUpgrade: h2cHTTP2-Settings:
如果服务端不支持 HTTP/2,它会忽略 Upgrade 字段,直接返回 HTTP/1.1 响应,例如:
HTTP/1.1 200 OKContent-Length: 243Content-Type: text/html
如果服务端支持 HTTP/2,那就可以回应 101 状态码及对应头部:
HTTP/1.1 100 Switching ProtocolsConnection: UpgradeUpgrade: h2c
小结
- https 1.1 与https.2.0的协商 与 http1.1与http2.0的协商 是两套设计方案。https 1.1 与https.2.0 TLS帮你做了。http1.1与http2.0的协商需要自己做。
- 现在的趋势,客服端与服务端都需要同时支持http1.1,http2.0,https1.1,https2.0。真是一件很麻烦的事情
netty http2
netty 的http2模块设计的非常好,实在是很绕,就一个http2的包。实在有点乱。按照功能划分的话http2应该有四个模块。
- http2核心模块
- http1.1与http2协商模块
- http2帧处理模块
- http2协议转http1协议模块
http2核心模块
作为核心的模块主要是负责http2协议的解码与编码,帧的解析与处理,http2请求头子模块,streamId的管理,http2链接管理
Http2ConnectionHandler
Http2ConnectionHandler 是 netty核心设计hadler的实现,也是http2模块的出发点。
负责http2模块的组合
protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) { this.initialSettings = checkNotNull(initialSettings, "initialSettings"); this.decoder = checkNotNull(decoder, "decoder"); this.encoder = checkNotNull(encoder, "encoder"); if (encoder.connection() != decoder.connection()) { throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object"); } }
负责http2协议下handler生命周期处理
@Override public void flush(ChannelHandlerContext ctx) { try { // Trigger pending writes in the remote flow controller. encoder.flowController().writePendingBytes(); ctx.flush(); } catch (Http2Exception e) { onError(ctx, true, e); } catch (Throwable cause) { onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing")); } }public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // Initialize the encoder, decoder, flow controllers, and internal state. encoder.lifecycleManager(this); decoder.lifecycleManager(this); encoder.flowController().channelHandlerContext(ctx); decoder.flowController().channelHandlerContext(ctx); byteDecoder = new PrefaceDecoder(ctx); } @Override protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { if (byteDecoder != null) { byteDecoder.handlerRemoved(ctx); byteDecoder = null; } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (byteDecoder == null) { byteDecoder = new PrefaceDecoder(ctx);//当链接创建的时候创建PrefaceDecoder对象 } byteDecoder.channelActive(ctx); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // Call super class first, as this may result in decode being called. super.channelInactive(ctx); if (byteDecoder != null) { byteDecoder.channelInactive(ctx); byteDecoder = null; } } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation. try { if (ctx.channel().isWritable()) { flush(ctx); } encoder.flowController().channelWritabilityChanged(); } finally { super.channelWritabilityChanged(ctx); } } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
负责http2 Lifecycle Manager(http2生命周期的管理)
public interface Http2LifecycleManager { void closeStreamLocal(Http2Stream stream, ChannelFuture future); void closeStreamRemote(Http2Stream stream, ChannelFuture future); void closeStream(Http2Stream stream, ChannelFuture future); ChannelFuture resetStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise); ChannelFuture goAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, ChannelPromise promise);
closeStreamLocal
关闭本地stream。local stream是指客服端发送headers帧与data帧到服务端,客服端会创建一个local stream。同样服务端发送headers帧与data帧给客服端,服务端也会创建一个 local stream
closeStreamRemote
关闭远程stream。 remote stream是值当客服端接受服务端发的headers帧与data帧 ,客服端会创建一个remote stream。同样服务端接受到客服端发送的headers帧与data帧,服务端也会创建一个 remote stream
closeStream
当接受到 resetStream 帧的时候就用调用改方法。发送方发送一个错误的流,想后悔的时候,就发送resetStream帧这个后悔药
resetStream
对resetStream帧进行处理
负责校验协议行为
public void onHttpClientUpgrade() throws Http2Exception { if (connection().isServer()) { throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server"); } if (!prefaceSent()) { // If the preface was not sent yet it most likely means the handler was not added to the pipeline before // calling this method. throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent"); } if (decoder.prefaceReceived()) { throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received"); } // Create a local stream used for the HTTP cleartext upgrade. connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true); } /** * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2. * @param settings the settings for the remote endpoint. */ public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception { if (!connection().isServer()) { throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client"); } if (!prefaceSent()) { // If the preface was not sent yet it most likely means the handler was not added to the pipeline before // calling this method. throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent"); } if (decoder.prefaceReceived()) { throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received"); } // Apply the settings but no ACK is necessary. encoder.remoteSettings(settings); // Create a stream in the half-closed state. connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true); }
管理http2协议中的连接前言
连接前言????? 很术语化。其实就是协商成功之后,客户端可以开始发送各种 HTTP/2 帧,但第一个帧必须是 Magic 帧(内容固定为 PRI * HTTP/2.0rnrnSMrnrn),做为协议升级的最终确认。连接前言模块由父类BaseDecoder与子类PrefaceDecoder,FrameDecoder组成。连接前言由PrefaceDecoder管理。
客服端发送前言
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // The channel just became active - send the connection preface to the remote endpoint. sendPreface(ctx); } private void sendPreface(ChannelHandlerContext ctx) throws Exception { if (prefaceSent || !ctx.channel().isActive()) { return; } prefaceSent = true; final boolean isClient = !connection().isServer(); if (isClient) { // Clients must send the preface string as the first bytes on the connection. ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } // Both client and server must send their initial settings. encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener( ChannelFutureListener.CLOSE_ON_FAILURE); if (isClient) { // If this handler is extended by the user and we directly fire the userEvent from this context then // the user will not see the event. We should fire the event starting with this handler so this class // (and extending classes) have a chance to process the event. userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE); } }
服务端校验连接前言
@Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List
麻烦的设计,只是为解耦而已。代码层次明显。这就是架构......
http2链接管理 , streamId的管理,流量管理,权重管理
http2的编码与解码
解码实现类为DefaultHttp2ConnectionEncoder,看下很容易理解的。DefaultHttp2ConnectionEncoder内部有两个成员变量lifecycleManager(http2生命周期)与Http2FrameWriter(协议生成实现类)。
goAway与resetStream帧的处理方式,是由lifecycleManager(http2生命周期)
@Override public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, ChannelPromise promise) { return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise); } @Override public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) { // Delegate to the lifecycle manager for proper updating of connection state. return lifecycleManager.resetStream(ctx, streamId, errorCode, promise); }
重点:FlowControlledHeaders与FlowControlledData主要是用于流量控制与权重使用,不属于编码范畴
解码流程为 Handler(Http2ConnectionHandle) --> Decoder(PrefaceDecoder) --> Decoder(FrameDecoder) --> decoder(Http2ConnectionDecoder) --> frameReader(Http2FrameReader) --> FrameListener(Http2FrameListener) 。按照此调用链慢慢看容易看懂了。
- Handler(Http2ConnectionHandle) 负责接受到buf数据
- PrefaceDecoder负责校验连接前言
- FrameDecoder 负责执行Http2ConnectionDecoder。Http2ConnectionDecoder实现类为 DefaultHttp2ConnectionDecoder
- frameReader 负责解析协议。 Http2FrameReader 实现类为DefaultHttp2FrameReader
- FrameListener负责对帧进行处理。FrameListener的实现类有很多,主要分FrameReadListener与其他FrameListener。FrameReadListener负责帧进行http2特性维护,维护成功调用其他FrameListener。由其他FrameListener进行帧处理。
帧的解析与处理
frameReader 负责解析协议。 Http2FrameReader 实现类为DefaultHttp2FrameReader
@Overridepublic void readFrame(ChannelHandlerContext ctx, ByteBuf input, Http2FrameListener listener) throws Http2Exception { if (readError) { input.skipBytes(input.readableBytes()); return; } try { do { if (readingHeaders) { processHeaderState(input);// 解析帧 if (readingHeaders) { // Wait until the entire header has arrived. return; } } processPayloadState(ctx, input, listener);// 处理帧 if (!readingHeaders) { // Wait until the entire payload has arrived. return; } } while (input.isReadable()); } catch (Http2Exception e) { readError = !Http2Exception.isStreamError(e); throw e; } catch (RuntimeException e) { readError = true; throw e; } catch (Throwable cause) { readError = true; PlatformDependent.throwException(cause); }}private void processHeaderState(ByteBuf in) throws Http2Exception { if (in.readableBytes() < FRAME_HEADER_LENGTH) { // Wait until the entire frame header has been read. return; } // Read the header and prepare the unmarshaller to read the frame. payloadLength = in.readUnsignedMedium();// 帧长度 if (payloadLength > maxFrameSize) { throw connectionError(FRAME_SIZE_ERROR, "Frame length: %d exceeds maximum: %d", payloadLength, maxFrameSize); } frameType = in.readByte();// 帧类型 flags = new Http2Flags(in.readUnsignedByte()); streamId = readUnsignedInt(in);// 得到streamId 帧头的9个字节处理完毕 // We have consumed the data, next time we read we will be expecting to read the frame payload. readingHeaders = false; switch (frameType) {//进行校验 case DATA: verifyDataFrame(); break; case HEADERS: verifyHeadersFrame(); break; case PRIORITY: verifyPriorityFrame(); break; case RST_STREAM: verifyRstStreamFrame(); break; case SETTINGS: verifySettingsFrame(); break; case PUSH_PROMISE: verifyPushPromiseFrame(); break; case PING: verifyPingFrame(); break; case GO_AWAY: verifyGoAwayFrame(); break; case WINDOW_UPDATE: verifyWindowUpdateFrame(); break; case CONTINUATION: verifyContinuationFrame(); break; default: // Unknown frame type, could be an extension. verifyUnknownFrame(); break; }}private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2FrameListener listener) throws Http2Exception { if (in.readableBytes() < payloadLength) { // Wait until the entire payload has been read. return; } // Get a view of the buffer for the size of the payload. ByteBuf payload = in.readSlice(payloadLength); // We have consumed the data, next time we read we will be expecting to read a frame header. readingHeaders = true; // Read the payload and fire the frame event to the listener. switch (frameType) { case DATA: readDataFrame(ctx, payload, listener); break; case HEADERS: readHeadersFrame(ctx, payload, listener); break; case PRIORITY: readPriorityFrame(ctx, payload, listener); break; case RST_STREAM: readRstStreamFrame(ctx, payload, listener); break; case SETTINGS: readSettingsFrame(ctx, payload, listener); break; case PUSH_PROMISE: readPushPromiseFrame(ctx, payload, listener); break; case PING: readPingFrame(ctx, payload.readLong(), listener); break; case GO_AWAY: readGoAwayFrame(ctx, payload, listener); break; case WINDOW_UPDATE: readWindowUpdateFrame(ctx, payload, listener); break; case CONTINUATION: readContinuationFrame(payload, listener); break; default: readUnknownFrame(ctx, payload, listener); break; }}
http2请求头子模块
在http2的设计中请求头是一个重点。http2要求请求使用HPACK算法进行压缩。Http2Headers,Http2HeadersDecoder,Http2HeadersEncoder,HPACKDecoder,HPACKEncoder组成。
http1.1与http2协商模块
http2.0只需要一个Http2ConnectionHandler的实现类。目前netty自带的实现类有Http2FrameCodec,HttpToHttp2ConnectionHandler。
http1.1需要HttpServerCodec与HttpObjectAggregator对象。
https 需要一个SSLHandler
private void configureSsl(SocketChannel ch) { ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), new Http2OrHttpHandler()); }
https 1.1 与 https 2.0的协商
客户端在TSL 4次握手的时候已经把客户端支持的http协议传给服务端了,当4次握手成功,SSLHandler会产生一个事件传递下去。ApplicationProtocolNegotiationHandler会处理这个事件,获得协议并且调用configurePipelineI()方法。
public class Http2OrHttpHandler extends ApplicationProtocolNegotiationHandler { private static final int MAX_CONTENT_LENGTH = 1024 * 100; protected Http2OrHttpHandler() { super(ApplicationProtocolNames.HTTP_1_1); } @Override protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { ctx.pipeline().addLast(new HttpToHttp2ConnectionHandler(), new HelloWorldHttp1Handler("ALPN Negotiation")); return; } if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { ctx.pipeline().addLast(new HttpServerCodec(), new HttpObjectAggregator(MAX_CONTENT_LENGTH), new HelloWorldHttp1Handler("ALPN Negotiation")); return; } throw new IllegalStateException("unknown protocol: " + protocol); }}
http 1.1 与 http 2.0的协商
https的协商通过TLS的4次握手解决了,那http如何进行协商了。http使用了Upgrade机制,机制的流程上面已经说明了。大家是否发现一点。Upgrade机制是基于http1.1实现了,这是重点也是最麻烦的地方。当协商成功需要删除http1.1与upgrade的handler,加入http2.0的hanler。
- http客户端与服务端都要支持http1.1协议
- http客户端与服务端都要支持Upgrade机制
- 进行协商
- http客户端与服务端都要删除http1.1协议与Upgrade机制的支持
- http客户端与服务端都要加入http2.0协议的支持
服务端如何支持Upgrade机制
private static final UpgradeCodecFactory upgradeCodecFactory = new UpgradeCodecFactory() { @Override public UpgradeCodec newUpgradeCodec(CharSequence protocol) { if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) { return new Http2ServerUpgradeCodec(new HttpToHttp2ConnectionHandler()); } else { return null; } }};HttpServerCodec sourceCodec = new HttpServerCodec();HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory);cp.addLast(ourceCodec, upgradeHandler);
HttpServerUpgradeHandler Upgrade机制处理handler
HttpServerUpgradeHandler做两件事,第一个识别Upgrade请求头开启server端的Upgrade机制,第二件是删除http1.1Handler与HttpServerUpgradeHandler,加入http2.0Handler。
private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) { // c获得请求头upgrade的数据 final ListrequestedProtocols = splitHeader(request.headers().get(HttpHeaderNames.UPGRADE)); final int numRequestedProtocols = requestedProtocols.size(); UpgradeCodec upgradeCodec = null; CharSequence upgradeProtocol = null; for (int i = 0; i < numRequestedProtocols; i ++) { final CharSequence p = requestedProtocols.get(i);//获得协议 final UpgradeCodec c = upgradeCodecFactory.newUpgradeCodec(p);//识别是否支持改协议 if (c != null) { upgradeProtocol = p; upgradeCodec = c; break; } } // 没有upgradeCodec,不支持。client发过的协议 if (upgradeCodec == null) { return false; } // uconnection请求头,表示upgrade机制不完整 CharSequence connectionHeader = request.headers().get(HttpHeaderNames.CONNECTION); if (connectionHeader == null) { return false; } Collection requiredHeaders = upgradeCodec.requiredUpgradeHeaders(); List values = splitHeader(connectionHeader); if (!containsContentEqualsIgnoreCase(values, HttpHeaderNames.UPGRADE) || !containsAllContentEqualsIgnoreCase(values, requiredHeaders)) { return false; } for (CharSequence requiredHeader : requiredHeaders) { if (!request.headers().contains(requiredHeader)) { return false; } } final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeProtocol);//创建响应,并返回协商后的协议。 // prepareUpgradeResponse解析Http2-setting if (!upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse.headers())) { return false; } // 创建事件,谁需要处理,谁去处理。 final UpgradeEvent event = new UpgradeEvent(upgradeProtocol, request); try { // 返回数据。这里是一个大坑。 final ChannelFuture writeComplete = ctx.writeAndFlush(upgradeResponse); // 删除http1.1 handler sourceCodec.upgradeFrom(ctx); // 添加http2.0 handler,这里会识别上面的大坑 upgradeCodec.upgradeTo(ctx, request); // 删除自己,即HttpServerUpgradeHandler。那么netty的handler应该只剩下http2.0handler与业务处理handler了 ctx.pipeline().remove(HttpServerUpgradeHandler.this); // 传播 事件 ctx.fireUserEventTriggered(event.retain()); writeComplete.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } finally { event.release(); } return true;}
Http2ServerUpgradeCode Upgrade协议解析器
Http2ServerUpgradeCode有一件非常重要的事情做。就是解析请求头HTTP2-Settings的值最终交给Http2ConnectionEncoder处理。这个值不简单,该值包含了http2 客户端大量的信息。比如权重,流量信息等等。请求头HTTP2-Settings的值使用了HPACK算法进行压缩,然后base64编码。使用base64解码之后,就相当于http2.0的SETTINGS帧了。
private Http2Settings decodeSettingsHeader(ChannelHandlerContext ctx, CharSequence settingsHeader) throws Http2Exception { ByteBuf header = ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(settingsHeader), CharsetUtil.UTF_8); try { ByteBuf payload = Base64.decode(header, URL_SAFE);// base64解码 ByteBuf frame = createSettingsFrame(ctx, payload); return decodeSettings(ctx, frame); } finally { header.release(); } }// 把解码之后的数据拼接成 http2.0的SETTINGS帧private static ByteBuf createSettingsFrame(ChannelHandlerContext ctx, ByteBuf payload) { ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payload.readableBytes()); writeFrameHeader(frame, payload.readableBytes(), SETTINGS, new Http2Flags(), 0); frame.writeBytes(payload); payload.release(); return frame; }// 下面详解了Http2Settings里面数据的作用。public void remoteSettings(Http2Settings settings) throws Http2Exception { Boolean pushEnabled = settings.pushEnabled(); Http2FrameWriter.Configuration config = configuration(); Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration(); Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy(); if (pushEnabled != null) { if (!connection.isServer() && pushEnabled) { throw connectionError(PROTOCOL_ERROR, "Client received a value of ENABLE_PUSH specified to other than 0"); } connection.remote().allowPushTo(pushEnabled); } Long maxConcurrentStreams = settings.maxConcurrentStreams(); if (maxConcurrentStreams != null) { connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE)); } Long headerTableSize = settings.headerTableSize(); if (headerTableSize != null) { outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE)); } Long maxHeaderListSize = settings.maxHeaderListSize(); if (maxHeaderListSize != null) { outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize); } Integer maxFrameSize = settings.maxFrameSize(); if (maxFrameSize != null) { outboundFrameSizePolicy.maxFrameSize(maxFrameSize); } Integer initialWindowSize = settings.initialWindowSize(); if (initialWindowSize != null) { flowController().initialWindowSize(initialWindowSize); }}
客户端如何支持并且触发Upgrade机制
一个简单的客户端代码
public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); HttpClientCodec sourceCodec = new HttpClientCodec(); Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(Http2Handler.newHandler(false)); HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 65536); pipeline.addLast( sourceCodec, upgradeHandler, new UpgradeRequestHandler(),}private final class UpgradeRequestHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { DefaultFullHttpRequest upgradeRequest =new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); ctx.writeAndFlush(upgradeRequest); ctx.fireChannelActive(); ctx.pipeline().remove(this); }}
客户端发送一个请求,请求里面加入三个请求头。问题1.什么时候发送请求。2. 谁来发送请求。3. 谁来在请求里面加入请求头
- 什么时候发送请求
在连接建立之后立马发送。连接建立事件为:channelActive。
- 谁来发送请求
UpgradeRequestHandler来发。UpgradeRequestHandler是使用者自己实现的,不是netty http2。原因很简单,Upgrade的请求是被拦截还是不被拦截,被拦截业务handler是无法得请求的。由使用者来决定Upgrade 请求的行为。netty http2 默认是是被拦截的。
- 谁来在请求里面加入请求头
HttpClientUpgradeHandler 负责拦截请求并添加请求头
private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) { // 添加UPGRADE请求头 request.headers().set(HttpHeaderNames.UPGRADE, upgradeCodec.protocol()); // 添加CONNECTIO请求头 SetconnectionParts = new LinkedHashSet (2); connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));// 添加Http2-setting请求头 // Set the CONNECTION header from the set of all protocol-specific headers that were added. StringBuilder builder = new StringBuilder(); for (CharSequence part : connectionParts) { builder.append(part); builder.append(','); } builder.append(HttpHeaderValues.UPGRADE); request.headers().add(HttpHeaderNames.CONNECTION, builder.toString());}
HttpClientUpgradeHandler 只会对第一次的请求,进行Upgrade操作,之后全部一场啊
http2协议处理模块
该模块针对http2 九种帧进行的一次封装,这样可以让下游Handler可以直接处理帧,而不需要自己对原始数据进行处理。该模块是最简单,最容易理解。请看Http2FrameCodec类就行了。Http2FrameCodec继承Http2ConnectionHandler。
以下是每种帧对应一个封装好的实体类。
帧类型 | 对应实体类 |
---|---|
data | DefaultHttp2DataFrame |
headers | DefaultHttp2HeadersFrame |
windowUpdate | DefaultHttp2WindowUpdateFrame |
reset | DefaultHttp2ResetFrame |
pring | DefaultHttp2PingFrame |
settings | DefaultHttp2SettingsFrame |
unknown | DefaultHttp2UnknownFrame |
goAwary | DefaultHttp2GoAwayFrame |
push_promise | 没有 |
Http2FrameCodec 内部Http2FrameListener的实现
private final class FrameListener implements Http2FrameListener { @Override public void onUnknownFrame( ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) { onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload) .stream(requireStream(streamId)).retain()); } @Override public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings)); } @Override public void onPingRead(ChannelHandlerContext ctx, long data) { onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false)); } @Override public void onPingAckRead(ChannelHandlerContext ctx, long data) { onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true)); } @Override public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) { onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId))); } @Override public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) { if (streamId == 0) { // Ignore connection window updates. return; } onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId))); } @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) { onHeadersRead(ctx, streamId, headers, padding, endStream); } @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) { onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding) .stream(requireStream(streamId))); } @Override public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) { onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding) .stream(requireStream(streamId)).retain()); // We return the bytes in consumeBytes() once the stream channel consumed the bytes. return 0; } @Override public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) { onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain()); } @Override public void onPriorityRead( ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) { // TODO: Maybe handle me } @Override public void onSettingsAckRead(ChannelHandlerContext ctx) { // TODO: Maybe handle me } @Override public void onPushPromiseRead( ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) { // TODO: Maybe handle me } private Http2FrameStream requireStream(int streamId) { Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey); if (stream == null) { throw new IllegalStateException("Stream object required for identifier: " + streamId); } return stream; } }
http2协议转http1协议模块
该模块解决两个痛处。一个是http2帧,处理很麻烦,很麻烦,不管如何就算使用【 http2协议处理模块】使用者都需要进行很多额外的处理,http1.1就非常简单,好用。第二是向http1.1兼容,目前有大量的http1.1服务端与客户端。直接使用http2的确不太适合,需要修改大量的代码。
- InboundHttp2ToHttpAdapter(实现转换的类)以及InboundHttp2ToHttpAdapterBuilder
- HttpToHttp2ConnectionHandler(发http1.1的对象解析成http2协议发送出去)以及HttpToHttp2ConnectionHandlerBuilder
下面代码简单描述了http1.1的对象如何解析成http2协议并且发送出去
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (!(msg instanceof HttpMessage || msg instanceof HttpContent)) { ctx.write(msg, promise); return; } boolean release = true; SimpleChannelPromiseAggregator promiseAggregator = new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor()); try { Http2ConnectionEncoder encoder = encoder(); boolean endStream = false; if (msg instanceof HttpMessage) { final HttpMessage httpMsg = (HttpMessage) msg; // Provide the user the opportunity to specify the streamId currentStreamId = getStreamId(httpMsg.headers()); // Convert and write the headers. Http2Headers http2Headers = HttpConversionUtil.toHttp2Headers(httpMsg, validateHeaders); endStream = msg instanceof FullHttpMessage && !((FullHttpMessage) msg).content().isReadable(); writeHeaders(ctx, encoder, currentStreamId, httpMsg.headers(), http2Headers, endStream, promiseAggregator); } if (!endStream && msg instanceof HttpContent) { boolean isLastContent = false; HttpHeaders trailers = EmptyHttpHeaders.INSTANCE; Http2Headers http2Trailers = EmptyHttp2Headers.INSTANCE; if (msg instanceof LastHttpContent) { isLastContent = true; // Convert any trailing headers. final LastHttpContent lastContent = (LastHttpContent) msg; trailers = lastContent.trailingHeaders(); http2Trailers = HttpConversionUtil.toHttp2Headers(trailers, validateHeaders); } // Write the data final ByteBuf content = ((HttpContent) msg).content(); endStream = isLastContent && trailers.isEmpty(); release = false; encoder.writeData(ctx, currentStreamId, content, 0, endStream, promiseAggregator.newPromise()); if (!trailers.isEmpty()) { // Write trailing headers. writeHeaders(ctx, encoder, currentStreamId, trailers, http2Trailers, true, promiseAggregator); } } } catch (Throwable t) { onError(ctx, true, t); promiseAggregator.setFailure(t); } finally { if (release) { ReferenceCountUtil.release(msg); } promiseAggregator.doneAllocatingPromises(); } }
- Stream转http1.1对象。 请看Http2StreamFrameToHttpObjectCodec。感觉这个类比较鸡肋。
参考资料
HTTP/2 协议规范(很全面文档)
关于HTTP2和HTTPS,这些你必须要知道
谈谈 HTTP/2 的协议协商机制
HTTP2 详解