# Copyright 2020 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """Common RPC codegen utilities.""" import abc from datetime import datetime import os from typing import cast, Any, Callable, Iterable from pw_protobuf.output_file import OutputFile from pw_protobuf.proto_tree import ProtoNode, ProtoService, ProtoServiceMethod import pw_rpc.ids PLUGIN_NAME = 'pw_rpc_codegen' PLUGIN_VERSION = '0.2.0' RPC_NAMESPACE = '::pw::rpc' STUB_REQUEST_TODO = ( '// TODO: Read the request as appropriate for your application') STUB_RESPONSE_TODO = ( '// TODO: Fill in the response as appropriate for your application') STUB_WRITER_TODO = ( '// TODO: Send responses with the writer as appropriate for your ' 'application') ServerWriterGenerator = Callable[[OutputFile], None] MethodGenerator = Callable[[ProtoServiceMethod, int, OutputFile], None] ServiceGenerator = Callable[[ProtoService, ProtoNode, OutputFile], None] IncludesGenerator = Callable[[Any, ProtoNode], Iterable[str]] def package(file_descriptor_proto, proto_package: ProtoNode, output: OutputFile, includes: IncludesGenerator, service: ServiceGenerator, client: ServiceGenerator) -> None: """Generates service and client code for a package.""" assert proto_package.type() == ProtoNode.Type.PACKAGE output.write_line(f'// {os.path.basename(output.name())} automatically ' f'generated by {PLUGIN_NAME} {PLUGIN_VERSION}') output.write_line(f'// on {datetime.now().isoformat()}') output.write_line('// clang-format off') output.write_line('#pragma once\n') output.write_line('#include ') output.write_line('#include ') output.write_line('#include \n') include_lines = [ '#include "pw_rpc/internal/method_lookup.h"', '#include "pw_rpc/server_context.h"', '#include "pw_rpc/service.h"', ] include_lines += includes(file_descriptor_proto, proto_package) for include_line in sorted(include_lines): output.write_line(include_line) output.write_line() if proto_package.cpp_namespace(): file_namespace = proto_package.cpp_namespace() if file_namespace.startswith('::'): file_namespace = file_namespace[2:] output.write_line(f'namespace {file_namespace} {{') for node in proto_package: if node.type() == ProtoNode.Type.SERVICE: service(cast(ProtoService, node), proto_package, output) client(cast(ProtoService, node), proto_package, output) if proto_package.cpp_namespace(): output.write_line(f'}} // namespace {file_namespace}') def service_class(service: ProtoService, root: ProtoNode, output: OutputFile, server_writer_alias: ServerWriterGenerator, method_union: str, method_descriptor: MethodGenerator) -> None: """Generates a C++ derived class for a nanopb RPC service.""" output.write_line('namespace generated {') base_class = f'{RPC_NAMESPACE}::Service' output.write_line('\ntemplate ') output.write_line( f'class {service.cpp_namespace(root)} : public {base_class} {{') output.write_line(' public:') with output.indent(): output.write_line( f'using ServerContext = {RPC_NAMESPACE}::ServerContext;') server_writer_alias(output) output.write_line() output.write_line(f'constexpr {service.name()}()') output.write_line(f' : {base_class}(kServiceId, kMethods) {{}}') output.write_line() output.write_line( f'{service.name()}(const {service.name()}&) = delete;') output.write_line(f'{service.name()}& operator=' f'(const {service.name()}&) = delete;') output.write_line() output.write_line(f'static constexpr const char* name() ' f'{{ return "{service.name()}"; }}') output.write_line() output.write_line( '// Used by MethodLookup to identify the generated service base.') output.write_line( 'constexpr void _PwRpcInternalGeneratedBase() const {}') service_name_hash = pw_rpc.ids.calculate(service.proto_path()) output.write_line('\n private:') with output.indent(): output.write_line('friend class ::pw::rpc::internal::MethodLookup;\n') output.write_line(f'// Hash of "{service.proto_path()}".') output.write_line( f'static constexpr uint32_t kServiceId = 0x{service_name_hash:08x};' ) output.write_line() # Generate the method table output.write_line('static constexpr std::array<' f'{RPC_NAMESPACE}::internal::{method_union},' f' {len(service.methods())}> kMethods = {{') with output.indent(4): for method in service.methods(): method_descriptor(method, pw_rpc.ids.calculate(method.name()), output) output.write_line('};\n') # Generate the method lookup table _method_lookup_table(service, output) output.write_line('};') output.write_line('\n} // namespace generated\n') def _method_lookup_table(service: ProtoService, output: OutputFile) -> None: """Generates array of method IDs for looking up methods at compile time.""" output.write_line('static constexpr std::array kMethodIds = {{') with output.indent(4): for method in service.methods(): method_id = pw_rpc.ids.calculate(method.name()) output.write_line( f'0x{method_id:08x}, // Hash of "{method.name()}"') output.write_line('};\n') class StubGenerator(abc.ABC): @abc.abstractmethod def unary_signature(self, method: ProtoServiceMethod, prefix: str) -> str: """Returns the signature of this unary method.""" @abc.abstractmethod def unary_stub(self, method: ProtoServiceMethod, output: OutputFile) -> None: """Returns the stub for this unary method.""" @abc.abstractmethod def server_streaming_signature(self, method: ProtoServiceMethod, prefix: str) -> str: """Returns the signature of this server streaming method.""" def server_streaming_stub( # pylint: disable=no-self-use self, unused_method: ProtoServiceMethod, output: OutputFile) -> None: """Returns the stub for this server streaming method.""" output.write_line(STUB_REQUEST_TODO) output.write_line('static_cast(request);') output.write_line(STUB_WRITER_TODO) output.write_line('static_cast(writer);') def _select_stub_methods(generator: StubGenerator, method: ProtoServiceMethod): if method.type() is ProtoServiceMethod.Type.UNARY: return generator.unary_signature, generator.unary_stub if method.type() is ProtoServiceMethod.Type.SERVER_STREAMING: return (generator.server_streaming_signature, generator.server_streaming_stub) raise NotImplementedError( 'Client and bidirectional streaming not yet implemented') _STUBS_COMMENT = r''' /* ____ __ __ __ _ / _/___ ___ ____ / /__ ____ ___ ___ ____ / /_____ _/ /_(_)___ ____ / // __ `__ \/ __ \/ / _ \/ __ `__ \/ _ \/ __ \/ __/ __ `/ __/ / __ \/ __ \ _/ // / / / / / /_/ / / __/ / / / / / __/ / / / /_/ /_/ / /_/ / /_/ / / / / /___/_/ /_/ /_/ .___/_/\___/_/ /_/ /_/\___/_/ /_/\__/\__,_/\__/_/\____/_/ /_/ /_/ _____ __ __ __ / ___// /___ __/ /_ _____/ / \__ \/ __/ / / / __ \/ ___/ / ___/ / /_/ /_/ / /_/ (__ )_/ /____/\__/\__,_/_.___/____(_) */ // This section provides stub implementations of the RPC services in this file. // The code below may be referenced or copied to serve as a starting point for // your RPC service implementations. ''' def package_stubs(proto_package: ProtoNode, output: OutputFile, stub_generator: StubGenerator) -> None: """Generates the RPC stubs for a package.""" if proto_package.cpp_namespace(): file_ns = proto_package.cpp_namespace() if file_ns.startswith('::'): file_ns = file_ns[2:] start_ns = lambda: output.write_line(f'namespace {file_ns} {{\n') finish_ns = lambda: output.write_line(f'}} // namespace {file_ns}\n') else: start_ns = finish_ns = lambda: None services = [ cast(ProtoService, node) for node in proto_package if node.type() == ProtoNode.Type.SERVICE ] output.write_line('#ifdef _PW_RPC_COMPILE_GENERATED_SERVICE_STUBS') output.write_line(_STUBS_COMMENT) output.write_line(f'#include "{output.name()}"\n') start_ns() for node in services: _generate_service_class(node, output, stub_generator) output.write_line() finish_ns() start_ns() for node in services: _generate_service_stubs(node, output, stub_generator) output.write_line() finish_ns() output.write_line('#endif // _PW_RPC_COMPILE_GENERATED_SERVICE_STUBS') def _generate_service_class(service: ProtoService, output: OutputFile, stub_generator: StubGenerator) -> None: output.write_line(f'// Implementation class for {service.proto_path()}.') output.write_line( f'class {service.name()} ' f': public generated::{service.name()}<{service.name()}> {{') output.write_line(' public:') with output.indent(): blank_line = False for method in service.methods(): if blank_line: output.write_line() else: blank_line = True signature, _ = _select_stub_methods(stub_generator, method) output.write_line(signature(method, '') + ';') output.write_line('};\n') def _generate_service_stubs(service: ProtoService, output: OutputFile, stub_generator: StubGenerator) -> None: output.write_line(f'// Method definitions for {service.proto_path()}.') blank_line = False for method in service.methods(): if blank_line: output.write_line() else: blank_line = True signature, stub = _select_stub_methods(stub_generator, method) output.write_line(signature(method, f'{service.name()}::') + ' {') with output.indent(): stub(method, output) output.write_line('}')