• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2014 Square, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.squareup.okhttp.benchmarks;
17 
18 import com.squareup.okhttp.HttpUrl;
19 import com.squareup.okhttp.internal.SslContextBuilder;
20 import io.netty.bootstrap.Bootstrap;
21 import io.netty.buffer.ByteBuf;
22 import io.netty.buffer.PooledByteBufAllocator;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOption;
27 import io.netty.channel.ChannelPipeline;
28 import io.netty.channel.SimpleChannelInboundHandler;
29 import io.netty.channel.nio.NioEventLoopGroup;
30 import io.netty.channel.socket.SocketChannel;
31 import io.netty.channel.socket.nio.NioSocketChannel;
32 import io.netty.handler.codec.http.DefaultFullHttpRequest;
33 import io.netty.handler.codec.http.HttpClientCodec;
34 import io.netty.handler.codec.http.HttpContent;
35 import io.netty.handler.codec.http.HttpContentDecompressor;
36 import io.netty.handler.codec.http.HttpHeaders;
37 import io.netty.handler.codec.http.HttpMethod;
38 import io.netty.handler.codec.http.HttpObject;
39 import io.netty.handler.codec.http.HttpRequest;
40 import io.netty.handler.codec.http.HttpResponse;
41 import io.netty.handler.codec.http.HttpVersion;
42 import io.netty.handler.codec.http.LastHttpContent;
43 import io.netty.handler.ssl.SslHandler;
44 import java.util.ArrayDeque;
45 import java.util.Deque;
46 import java.util.concurrent.TimeUnit;
47 import javax.net.ssl.SSLContext;
48 import javax.net.ssl.SSLEngine;
49 
50 /** Netty isn't an HTTP client, but it's almost one. */
51 class NettyHttpClient implements HttpClient {
52   private static final boolean VERBOSE = false;
53 
54   // Guarded by this. Real apps need more capable connection management.
55   private final Deque<HttpChannel> freeChannels = new ArrayDeque<>();
56   private final Deque<HttpUrl> backlog = new ArrayDeque<>();
57 
58   private int totalChannels = 0;
59   private int concurrencyLevel;
60   private int targetBacklog;
61   private Bootstrap bootstrap;
62 
prepare(final Benchmark benchmark)63   @Override public void prepare(final Benchmark benchmark) {
64     this.concurrencyLevel = benchmark.concurrencyLevel;
65     this.targetBacklog = benchmark.targetBacklog;
66 
67     ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() {
68       @Override public void initChannel(SocketChannel channel) throws Exception {
69         ChannelPipeline pipeline = channel.pipeline();
70 
71         if (benchmark.tls) {
72           SSLContext sslContext = SslContextBuilder.localhost();
73           SSLEngine engine = sslContext.createSSLEngine();
74           engine.setUseClientMode(true);
75           pipeline.addLast("ssl", new SslHandler(engine));
76         }
77 
78         pipeline.addLast("codec", new HttpClientCodec());
79         pipeline.addLast("inflater", new HttpContentDecompressor());
80         pipeline.addLast("handler", new HttpChannel(channel));
81       }
82     };
83 
84     bootstrap = new Bootstrap();
85     bootstrap.group(new NioEventLoopGroup(concurrencyLevel))
86         .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
87         .channel(NioSocketChannel.class)
88         .handler(channelInitializer);
89   }
90 
enqueue(HttpUrl url)91   @Override public void enqueue(HttpUrl url) throws Exception {
92     HttpChannel httpChannel = null;
93     synchronized (this) {
94       if (!freeChannels.isEmpty()) {
95         httpChannel = freeChannels.pop();
96       } else if (totalChannels < concurrencyLevel) {
97         totalChannels++; // Create a new channel. (outside of the synchronized block).
98       } else {
99         backlog.add(url); // Enqueue this for later, to be picked up when another request completes.
100         return;
101       }
102     }
103     if (httpChannel == null) {
104       Channel channel = bootstrap.connect(url.host(), url.port())
105           .sync().channel();
106       httpChannel = (HttpChannel) channel.pipeline().last();
107     }
108     httpChannel.sendRequest(url);
109   }
110 
acceptingJobs()111   @Override public synchronized boolean acceptingJobs() {
112     return backlog.size() < targetBacklog || hasFreeChannels();
113   }
114 
hasFreeChannels()115   private boolean hasFreeChannels() {
116     int activeChannels = totalChannels - freeChannels.size();
117     return activeChannels < concurrencyLevel;
118   }
119 
release(HttpChannel httpChannel)120   private void release(HttpChannel httpChannel) {
121     HttpUrl url;
122     synchronized (this) {
123       url = backlog.pop();
124       if (url == null) {
125         // There were no URLs in the backlog. Pool this channel for later.
126         freeChannels.push(httpChannel);
127         return;
128       }
129     }
130 
131     // We removed a URL from the backlog. Schedule it right away.
132     httpChannel.sendRequest(url);
133   }
134 
135   class HttpChannel extends SimpleChannelInboundHandler<HttpObject> {
136     private final SocketChannel channel;
137     byte[] buffer = new byte[1024];
138     int total;
139     long start;
140 
HttpChannel(SocketChannel channel)141     public HttpChannel(SocketChannel channel) {
142       this.channel = channel;
143     }
144 
sendRequest(HttpUrl url)145     private void sendRequest(HttpUrl url) {
146       start = System.nanoTime();
147       total = 0;
148       HttpRequest request = new DefaultFullHttpRequest(
149           HttpVersion.HTTP_1_1, HttpMethod.GET, url.encodedPath());
150       request.headers().set(HttpHeaders.Names.HOST, url.host());
151       request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
152       channel.writeAndFlush(request);
153     }
154 
channelRead0( ChannelHandlerContext context, HttpObject message)155     @Override protected void channelRead0(
156         ChannelHandlerContext context, HttpObject message) throws Exception {
157       if (message instanceof HttpResponse) {
158         receive((HttpResponse) message);
159       }
160       if (message instanceof HttpContent) {
161         receive((HttpContent) message);
162         if (message instanceof LastHttpContent) {
163           release(this);
164         }
165       }
166     }
167 
channelInactive(ChannelHandlerContext ctx)168     @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
169       super.channelInactive(ctx);
170     }
171 
receive(HttpResponse response)172     void receive(HttpResponse response) {
173       // Don't do anything with headers.
174     }
175 
receive(HttpContent content)176     void receive(HttpContent content) {
177       // Consume the response body.
178       ByteBuf byteBuf = content.content();
179       for (int toRead; (toRead = byteBuf.readableBytes()) > 0; ) {
180         byteBuf.readBytes(buffer, 0, Math.min(buffer.length, toRead));
181         total += toRead;
182       }
183 
184       if (VERBOSE && content instanceof LastHttpContent) {
185         long finish = System.nanoTime();
186         System.out.println(String.format("Transferred % 8d bytes in %4d ms",
187             total, TimeUnit.NANOSECONDS.toMillis(finish - start)));
188       }
189     }
190 
exceptionCaught(ChannelHandlerContext context, Throwable cause)191     @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
192       System.out.println("Failed: " + cause);
193     }
194   }
195 }
196