• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Base class for interceptors that operate on all RPC types."""
15
16import grpc
17
18
19class _GenericClientInterceptor(
20    grpc.UnaryUnaryClientInterceptor,
21    grpc.UnaryStreamClientInterceptor,
22    grpc.StreamUnaryClientInterceptor,
23    grpc.StreamStreamClientInterceptor,
24):
25    def __init__(self, interceptor_function):
26        self._fn = interceptor_function
27
28    def intercept_unary_unary(self, continuation, client_call_details, request):
29        new_details, new_request_iterator, postprocess = self._fn(
30            client_call_details, iter((request,)), False, False
31        )
32        response = continuation(new_details, next(new_request_iterator))
33        return postprocess(response) if postprocess else response
34
35    def intercept_unary_stream(
36        self, continuation, client_call_details, request
37    ):
38        new_details, new_request_iterator, postprocess = self._fn(
39            client_call_details, iter((request,)), False, True
40        )
41        response_it = continuation(new_details, next(new_request_iterator))
42        return postprocess(response_it) if postprocess else response_it
43
44    def intercept_stream_unary(
45        self, continuation, client_call_details, request_iterator
46    ):
47        new_details, new_request_iterator, postprocess = self._fn(
48            client_call_details, request_iterator, True, False
49        )
50        response = continuation(new_details, new_request_iterator)
51        return postprocess(response) if postprocess else response
52
53    def intercept_stream_stream(
54        self, continuation, client_call_details, request_iterator
55    ):
56        new_details, new_request_iterator, postprocess = self._fn(
57            client_call_details, request_iterator, True, True
58        )
59        response_it = continuation(new_details, new_request_iterator)
60        return postprocess(response_it) if postprocess else response_it
61
62
63def create(intercept_call):
64    return _GenericClientInterceptor(intercept_call)
65