# Copyright 2016 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """HTTP2 Test Server""" import argparse import logging import sys import twisted import twisted.internet import twisted.internet.endpoints import twisted.internet.reactor import http2_base_server import test_goaway import test_max_streams import test_ping import test_rst_after_data import test_rst_after_header import test_rst_during_data import test_data_frame_padding _TEST_CASE_MAPPING = { 'rst_after_header': test_rst_after_header.TestcaseRstStreamAfterHeader, 'rst_after_data': test_rst_after_data.TestcaseRstStreamAfterData, 'rst_during_data': test_rst_during_data.TestcaseRstStreamDuringData, 'goaway': test_goaway.TestcaseGoaway, 'ping': test_ping.TestcasePing, 'max_streams': test_max_streams.TestcaseSettingsMaxStreams, # Positive tests below: 'data_frame_padding': test_data_frame_padding.TestDataFramePadding, 'no_df_padding_sanity_test': test_data_frame_padding.TestDataFramePadding, } _exit_code = 0 class H2Factory(twisted.internet.protocol.Factory): def __init__(self, testcase): logging.info('Creating H2Factory for new connection (%s)', testcase) self._num_streams = 0 self._testcase = testcase def buildProtocol(self, addr): self._num_streams += 1 logging.info('New Connection: %d' % self._num_streams) if not _TEST_CASE_MAPPING.has_key(self._testcase): logging.error('Unknown test case: %s' % self._testcase) assert (0) else: t = _TEST_CASE_MAPPING[self._testcase] if self._testcase == 'goaway': return t(self._num_streams).get_base_server() elif self._testcase == 'no_df_padding_sanity_test': return t(use_padding=False).get_base_server() else: return t().get_base_server() def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument( '--base_port', type=int, default=8080, help='base port to run the servers (default: 8080). One test server is ' 'started on each incrementing port, beginning with base_port, in the ' 'following order: data_frame_padding,goaway,max_streams,' 'no_df_padding_sanity_test,ping,rst_after_data,rst_after_header,' 'rst_during_data') return parser.parse_args() def listen(endpoint, test_case): deferred = endpoint.listen(H2Factory(test_case)) def listen_error(reason): # If listening fails, we stop the reactor and exit the program # with exit code 1. global _exit_code _exit_code = 1 logging.error('Listening failed: %s' % reason.value) twisted.internet.reactor.stop() deferred.addErrback(listen_error) def start_test_servers(base_port): """ Start one server per test case on incrementing port numbers beginning with base_port """ index = 0 for test_case in sorted(_TEST_CASE_MAPPING.keys()): portnum = base_port + index logging.warning('serving on port %d : %s' % (portnum, test_case)) endpoint = twisted.internet.endpoints.TCP4ServerEndpoint( twisted.internet.reactor, portnum, backlog=128) # Wait until the reactor is running before calling endpoint.listen(). twisted.internet.reactor.callWhenRunning(listen, endpoint, test_case) index += 1 if __name__ == '__main__': logging.basicConfig( format= '%(levelname) -10s %(asctime)s %(module)s:%(lineno)s | %(message)s', level=logging.INFO) args = parse_arguments() start_test_servers(args.base_port) twisted.internet.reactor.run() sys.exit(_exit_code)