1 /* 2 * Copyright (C) 2014 Square, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.squareup.okhttp.benchmarks; 17 18 import com.squareup.okhttp.HttpUrl; 19 import com.squareup.okhttp.internal.SslContextBuilder; 20 import io.netty.bootstrap.Bootstrap; 21 import io.netty.buffer.ByteBuf; 22 import io.netty.buffer.PooledByteBufAllocator; 23 import io.netty.channel.Channel; 24 import io.netty.channel.ChannelHandlerContext; 25 import io.netty.channel.ChannelInitializer; 26 import io.netty.channel.ChannelOption; 27 import io.netty.channel.ChannelPipeline; 28 import io.netty.channel.SimpleChannelInboundHandler; 29 import io.netty.channel.nio.NioEventLoopGroup; 30 import io.netty.channel.socket.SocketChannel; 31 import io.netty.channel.socket.nio.NioSocketChannel; 32 import io.netty.handler.codec.http.DefaultFullHttpRequest; 33 import io.netty.handler.codec.http.HttpClientCodec; 34 import io.netty.handler.codec.http.HttpContent; 35 import io.netty.handler.codec.http.HttpContentDecompressor; 36 import io.netty.handler.codec.http.HttpHeaders; 37 import io.netty.handler.codec.http.HttpMethod; 38 import io.netty.handler.codec.http.HttpObject; 39 import io.netty.handler.codec.http.HttpRequest; 40 import io.netty.handler.codec.http.HttpResponse; 41 import io.netty.handler.codec.http.HttpVersion; 42 import io.netty.handler.codec.http.LastHttpContent; 43 import io.netty.handler.ssl.SslHandler; 44 import java.util.ArrayDeque; 45 import java.util.Deque; 46 import java.util.concurrent.TimeUnit; 47 import javax.net.ssl.SSLContext; 48 import javax.net.ssl.SSLEngine; 49 50 /** Netty isn't an HTTP client, but it's almost one. */ 51 class NettyHttpClient implements HttpClient { 52 private static final boolean VERBOSE = false; 53 54 // Guarded by this. Real apps need more capable connection management. 55 private final Deque<HttpChannel> freeChannels = new ArrayDeque<>(); 56 private final Deque<HttpUrl> backlog = new ArrayDeque<>(); 57 58 private int totalChannels = 0; 59 private int concurrencyLevel; 60 private int targetBacklog; 61 private Bootstrap bootstrap; 62 prepare(final Benchmark benchmark)63 @Override public void prepare(final Benchmark benchmark) { 64 this.concurrencyLevel = benchmark.concurrencyLevel; 65 this.targetBacklog = benchmark.targetBacklog; 66 67 ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() { 68 @Override public void initChannel(SocketChannel channel) throws Exception { 69 ChannelPipeline pipeline = channel.pipeline(); 70 71 if (benchmark.tls) { 72 SSLContext sslContext = SslContextBuilder.localhost(); 73 SSLEngine engine = sslContext.createSSLEngine(); 74 engine.setUseClientMode(true); 75 pipeline.addLast("ssl", new SslHandler(engine)); 76 } 77 78 pipeline.addLast("codec", new HttpClientCodec()); 79 pipeline.addLast("inflater", new HttpContentDecompressor()); 80 pipeline.addLast("handler", new HttpChannel(channel)); 81 } 82 }; 83 84 bootstrap = new Bootstrap(); 85 bootstrap.group(new NioEventLoopGroup(concurrencyLevel)) 86 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 87 .channel(NioSocketChannel.class) 88 .handler(channelInitializer); 89 } 90 enqueue(HttpUrl url)91 @Override public void enqueue(HttpUrl url) throws Exception { 92 HttpChannel httpChannel = null; 93 synchronized (this) { 94 if (!freeChannels.isEmpty()) { 95 httpChannel = freeChannels.pop(); 96 } else if (totalChannels < concurrencyLevel) { 97 totalChannels++; // Create a new channel. (outside of the synchronized block). 98 } else { 99 backlog.add(url); // Enqueue this for later, to be picked up when another request completes. 100 return; 101 } 102 } 103 if (httpChannel == null) { 104 Channel channel = bootstrap.connect(url.host(), url.port()) 105 .sync().channel(); 106 httpChannel = (HttpChannel) channel.pipeline().last(); 107 } 108 httpChannel.sendRequest(url); 109 } 110 acceptingJobs()111 @Override public synchronized boolean acceptingJobs() { 112 return backlog.size() < targetBacklog || hasFreeChannels(); 113 } 114 hasFreeChannels()115 private boolean hasFreeChannels() { 116 int activeChannels = totalChannels - freeChannels.size(); 117 return activeChannels < concurrencyLevel; 118 } 119 release(HttpChannel httpChannel)120 private void release(HttpChannel httpChannel) { 121 HttpUrl url; 122 synchronized (this) { 123 url = backlog.pop(); 124 if (url == null) { 125 // There were no URLs in the backlog. Pool this channel for later. 126 freeChannels.push(httpChannel); 127 return; 128 } 129 } 130 131 // We removed a URL from the backlog. Schedule it right away. 132 httpChannel.sendRequest(url); 133 } 134 135 class HttpChannel extends SimpleChannelInboundHandler<HttpObject> { 136 private final SocketChannel channel; 137 byte[] buffer = new byte[1024]; 138 int total; 139 long start; 140 HttpChannel(SocketChannel channel)141 public HttpChannel(SocketChannel channel) { 142 this.channel = channel; 143 } 144 sendRequest(HttpUrl url)145 private void sendRequest(HttpUrl url) { 146 start = System.nanoTime(); 147 total = 0; 148 HttpRequest request = new DefaultFullHttpRequest( 149 HttpVersion.HTTP_1_1, HttpMethod.GET, url.encodedPath()); 150 request.headers().set(HttpHeaders.Names.HOST, url.host()); 151 request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); 152 channel.writeAndFlush(request); 153 } 154 channelRead0( ChannelHandlerContext context, HttpObject message)155 @Override protected void channelRead0( 156 ChannelHandlerContext context, HttpObject message) throws Exception { 157 if (message instanceof HttpResponse) { 158 receive((HttpResponse) message); 159 } 160 if (message instanceof HttpContent) { 161 receive((HttpContent) message); 162 if (message instanceof LastHttpContent) { 163 release(this); 164 } 165 } 166 } 167 channelInactive(ChannelHandlerContext ctx)168 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { 169 super.channelInactive(ctx); 170 } 171 receive(HttpResponse response)172 void receive(HttpResponse response) { 173 // Don't do anything with headers. 174 } 175 receive(HttpContent content)176 void receive(HttpContent content) { 177 // Consume the response body. 178 ByteBuf byteBuf = content.content(); 179 for (int toRead; (toRead = byteBuf.readableBytes()) > 0; ) { 180 byteBuf.readBytes(buffer, 0, Math.min(buffer.length, toRead)); 181 total += toRead; 182 } 183 184 if (VERBOSE && content instanceof LastHttpContent) { 185 long finish = System.nanoTime(); 186 System.out.println(String.format("Transferred % 8d bytes in %4d ms", 187 total, TimeUnit.NANOSECONDS.toMillis(finish - start))); 188 } 189 } 190 exceptionCaught(ChannelHandlerContext context, Throwable cause)191 @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { 192 System.out.println("Failed: " + cause); 193 } 194 } 195 } 196