• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import fhirspec_pb2
2from fhir_spec_utils import *
3from typing import Collection, Mapping, Iterable
4
5R4_FHIR_TYPE_PREFIX = "R4_FHIR_TYPE_"
6
7
8class FhirSpecExtractor:
9    """Extractor for getting information for HC FHIR validation from official FHIR spec json files.
10
11    Typical usage example:
12        extractor = new FhirSpecExtractor(profile_resources_json, profiles_types_json,
13        {"Immunization", "Observation"})
14        fhir_spec_message = extractor.generate_r4_fhir_spec_proto_message(profile_types_json)
15    """
16
17    def __init__(self, profile_resources_json: Mapping, resource_names: set[str]):
18        """Extracts StructureDefinitions for the requested resources from the provided fhir spec.
19
20        Args:
21            profile_resources_json: The contents of the profile-resources.json fhir spec file, which
22            is in the structure of a https://hl7.org/fhir/Bundle.html, parsed to dict. The
23            Bundle.entry will contain the list of https://hl7.org/fhir/StructureDefinition.html that
24            contain the resource definitions we are interested in.
25            resource_names: The set of FHIR resources to extract FHIR spec information for.
26
27        Raises:
28            ValueError: If a requested resource is not present in the spec, if it's not supported by
29            Health Connect, or if any spec values are not as expected.
30        """
31        if not resource_names.issubset(HC_SUPPORTED_RESOURCE_SET):
32            raise ValueError("Provided resource set was not a subset of supported resources")
33
34        # A mapping from the resource name to the list of field definitions, which are in the
35        # structure of https://hl7.org/fhir/ElementDefinition.html
36        self._resource_to_element_definitions = (
37            self._extract_element_definitions_by_resource_from_spec(
38                profile_resources_json, resource_names))
39
40    def generate_r4_fhir_spec_proto_message(
41            self, profile_types_json: Mapping) -> fhirspec_pb2.FhirResourceSpec:
42        """Generates a FhirResourceSpec message from the fhir json spec.
43
44        Args:
45            profile_types_json: The contents of the profile-types.json fhir spec file, which
46            is in the structure of a https://hl7.org/fhir/Bundle.html, parsed to dict. The
47            Bundle.entry will contain the list of https://hl7.org/fhir/StructureDefinition.html that
48            contain the data type definitions we are interested in.
49
50        Returns:
51            The FhirResourceSpec message, with an entry for each requested resource, and the data
52            type configs for the required types.
53        """
54        r4_resource_spec = fhirspec_pb2.FhirResourceSpec()
55
56        # Add Element type manually, as this is the type of primitive type extension fields.
57        data_types_set = {fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_ELEMENT}
58        data_type_configs = []
59
60        for resource, element_definitions in self._resource_to_element_definitions.items():
61            resource_type_int = RESOURCE_TYPE_STRING_TO_HC_INT_MAPPING[resource]
62
63            resource_complex_type_config, child_types = (
64                self._generate_complex_type_config_and_child_types_from_element_definitions(
65                    element_definitions, is_resource_type=True))
66            data_type_configs.extend(child_types)
67
68            child_type_enums = set(child_type.fhir_type for child_type in child_types)
69            all_complex_type_configs = [child_type.fhir_complex_type_config for child_type in
70                                        child_types] + [resource_complex_type_config]
71
72            for complex_type_config in all_complex_type_configs:
73                new_sub_types = self._extract_subtypes_from_complex_type_config(complex_type_config)
74                # Add sub types to data_types_set but skip child types, since we have already
75                # extracted their definitions.
76                data_types_set.update(new_sub_types - child_type_enums)
77
78            r4_resource_spec.resource_type_to_config[
79                resource_type_int].CopyFrom(resource_complex_type_config)
80
81        data_type_configs.extend(self._get_fhir_data_type_configs_for_types_and_nested_types(
82            profile_types_json, data_types_set))
83
84        # Sort list by fhir_type before adding to make sure the script output is deterministic
85        data_type_configs = sorted(data_type_configs, key=lambda x: x.fhir_type)
86        r4_resource_spec.fhir_data_type_configs.extend(data_type_configs)
87
88        return r4_resource_spec
89
90    def _get_fhir_data_type_configs_for_types_and_nested_types(
91            self,
92            profile_types_json: Mapping,
93            type_names: set[fhirspec_pb2.R4FhirType]) -> Iterable[fhirspec_pb2.FhirDataType]:
94        # All structure definitions that have a matching enum value. If one is missing, this will
95        # cause an exception when extracting the data type configs.
96        all_type_to_structure_definition = {}
97        for type_string, structure_definition in extract_type_to_structure_definitions_from_spec(
98                profile_types_json, None).items():
99            try:
100                type_enum = self._get_type_enum_from_type_code(type_string)
101                all_type_to_structure_definition[type_enum] = structure_definition
102            except ValueError:
103                print(f"Type {type_string} did not have an enum value.")
104
105        return self._recursively_extract_data_type_configs_and_sub_types_by_type(
106            all_type_to_structure_definition, type_names).values()
107
108    def _recursively_extract_data_type_configs_and_sub_types_by_type(
109            self,
110            all_type_to_structure_definition: Mapping,
111            fhir_types_to_extract: set[fhirspec_pb2.R4FhirType],
112            already_extracted_types=set()) -> Mapping:
113        new_types_to_extract = set()
114        type_to_data_type_config_map = {}
115
116        for fhir_type in fhir_types_to_extract:
117            if fhir_type == fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_RESOURCE:
118                # The Resource type definition does not exist in the profile types file. As we don't
119                # support contained resources yet, we don't need a config for this type for now.
120                type_to_data_type_config_map[fhir_type] = fhirspec_pb2.FhirDataType(
121                    fhir_type=fhir_type,
122                    kind=fhirspec_pb2.Kind.KIND_COMPLEX_TYPE)
123                continue
124
125            if fhir_type not in all_type_to_structure_definition:
126                raise ValueError(
127                    f"Type {fhir_type} was missing from the list of structure definitions.")
128
129            structure_definition = all_type_to_structure_definition[fhir_type]
130            kind = self._get_kind_enum_from_kind(structure_definition["kind"])
131            complex_type_config = None
132
133            if kind != fhirspec_pb2.Kind.KIND_PRIMITIVE_TYPE:
134                complex_type_config, child_types = (
135                    self._generate_complex_type_config_and_child_types_from_element_definitions(
136                        extract_element_definitions_from_structure_def(structure_definition)))
137
138                sub_types = self._extract_subtypes_from_complex_type_config(complex_type_config)
139                child_type_enums = [child_type.fhir_type for child_type in child_types]
140                for child_type in child_types:
141                    type_to_data_type_config_map[child_type.fhir_type] = child_type
142                    sub_types.update(self._extract_subtypes_from_complex_type_config(
143                        child_type.fhir_complex_type_config))
144
145                for sub_type in sub_types:
146                    if sub_type not in already_extracted_types and \
147                            sub_type not in fhir_types_to_extract and \
148                            sub_type not in child_type_enums:
149                        new_types_to_extract.add(sub_type)
150
151            type_to_data_type_config_map[fhir_type] = fhirspec_pb2.FhirDataType(
152                fhir_type=fhir_type,
153                kind=kind,
154                fhir_complex_type_config=complex_type_config)
155
156        if new_types_to_extract:
157            type_to_data_type_config_map.update(
158                self._recursively_extract_data_type_configs_and_sub_types_by_type(
159                    all_type_to_structure_definition, new_types_to_extract,
160                    already_extracted_types.union(fhir_types_to_extract)))
161
162        return type_to_data_type_config_map
163
164    def _extract_subtypes_from_complex_type_config(
165            self, complex_type_config: fhirspec_pb2.FhirComplexTypeConfig) -> Collection[
166        fhirspec_pb2.R4FhirType]:
167        return set(field_config.r4_type
168                   for field_config in complex_type_config.allowed_field_names_to_config.values())
169
170    def _extract_element_definitions_by_resource_from_spec(
171            self, profile_resources_json: Mapping, resource_names: set[str]) -> Mapping:
172        resource_to_element_definitions = {}
173
174        for resource, structure_definition in extract_type_to_structure_definitions_from_spec(
175                profile_resources_json, resource_names).items():
176            if structure_definition["kind"] != "resource":
177                raise ValueError(
178                    "Unexpected kind field in structure definition. Expected resource.")
179            resource_to_element_definitions[resource] = (
180                extract_element_definitions_from_structure_def(structure_definition))
181
182        return resource_to_element_definitions
183
184    def _generate_complex_type_config_and_child_types_from_element_definitions(
185            self,
186            element_definitions: Collection[Mapping],
187            is_resource_type=False) -> (
188            fhirspec_pb2.FhirComplexTypeConfig, Collection[fhirspec_pb2.FhirDataType]):
189        # Element definitions that have 2 field parts, and are therefore fields present in this type
190        field_element_definitions = []
191        # Element definitions with more than 2 field parts, which are used to define child types of
192        # this type, so need to be handled separately.
193        child_type_to_element_definitions = {}
194
195        for element in element_definitions:
196            field_id = element["id"]
197            if field_id != element["path"]:
198                raise ValueError("Expected id and path field to be the same")
199            field_parts = field_id.split(".")
200            field_parts_length = len(field_parts)
201
202            if field_parts_length == 1:
203                # This is the path to the element itself. For example for the Observation resource,
204                # There will be an ElementDefinition with id "Observation"
205                continue
206
207            elif field_parts_length == 2:
208                # This is a "regular" nested field, e.g. Immunization.status.
209                field_element_definitions.append(element)
210
211            elif field_parts_length == 3 or field_parts_length == 4:
212                # This means the field is part of type BackBoneElement or Element, which are the
213                # types of child types defined by Resource and Type definitions respectively.
214                # For an example see the https://hl7.org/fhir/Immunization.html "reaction" field.
215                # They need to be handled separately, as those fields don't have a type defined, but
216                # have a child type definition instead.
217                child_type_path = '.'.join(field_parts[:-1])
218                if child_type_path not in child_type_to_element_definitions:
219                    child_type_to_element_definitions[child_type_path] = []
220                child_type_to_element_definitions[child_type_path].append(element)
221
222            else:
223                raise ValueError(
224                    "This should not happen, unless there are more deeply nested child types")
225
226        complex_type_definition = self._generate_complex_type_config_from_element_definitions(
227            field_element_definitions,
228            is_resource_type
229        )
230
231        child_data_types = []
232        for path, element_definitions in child_type_to_element_definitions.items():
233            child_data_types.append(
234                fhirspec_pb2.FhirDataType(
235                    fhir_type=self._get_type_enum_from_type_code(
236                        generate_fhir_type_string_for_child_type_path(path)),
237                    kind=fhirspec_pb2.Kind.KIND_COMPLEX_TYPE,
238                    fhir_complex_type_config=
239                    self._generate_complex_type_config_from_element_definitions(
240                        element_definitions,
241                        False
242                    ))
243            )
244
245        return complex_type_definition, child_data_types
246
247    def _generate_complex_type_config_from_element_definitions(
248            self,
249            element_definitions: Collection[Mapping],
250            is_resource_type: bool) -> fhirspec_pb2.FhirComplexTypeConfig:
251        """Generates a FhirComplexTypeDefinition from element definitions.
252
253         The element definitions are expected to be pre filtered to contain only fields that should
254         be part of the complex type config. This is done in
255         _generate_complex_type_config_and_child_types_from_element_definitions().
256        """
257        required_fields = set()
258
259        multi_type_configs = []
260
261        field_configs_by_name = {}
262
263        if is_resource_type:
264            # Manually add resourceType field, as this is not present in the spec
265            field_configs_by_name["resourceType"] = fhirspec_pb2.FhirFieldConfig(
266                is_array=False,
267                r4_type=fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_STRING
268            )
269
270        for element in element_definitions:
271            field_name = element["id"].split(".")[-1]
272
273            field_configs_to_add, multi_type_config = (
274                self._generate_field_configs_and_multi_type_config_from_field_element(
275                    element, field_name, is_resource_type))
276            for name in field_configs_to_add:
277                if name in field_configs_by_name: raise ValueError("Field name already exists")
278
279            field_configs_by_name.update(field_configs_to_add)
280            if self._field_name_is_multi_type_field(field_name):
281                multi_type_configs.append(multi_type_config)
282            elif self._field_is_required(element):
283                required_fields.add(field_name)
284
285        return fhirspec_pb2.FhirComplexTypeConfig(
286            allowed_field_names_to_config=field_configs_by_name,
287            # Sort the list of required fields alphabetically, as the output of this script is part
288            # of the build, which needs to be deterministic. The required_fields come from a set,
289            # which does not have ordering guarantees.
290            required_fields=sorted(required_fields),
291            multi_type_fields=multi_type_configs
292        )
293
294    def _generate_field_configs_and_multi_type_config_from_field_element(
295            self, element_definition, field_name, is_resource_field: bool) -> (
296            Mapping[str, fhirspec_pb2.FhirFieldConfig], list[fhirspec_pb2.MultiTypeFieldConfig]):
297        field_is_array = self._field_is_array(element_definition)
298
299        field_configs_by_name = {}
300
301        multi_type_config = None
302
303        if "contentReference" in element_definition:
304            # If an element definition has a contentReference, it doesn't have a type, but instead
305            # references a child type defined in a different field and we can assume the type from
306            # the referenced path.
307            # Since this only happens for one field currently, raise Error if it's a new field.
308            if element_definition["path"] != "Observation.component.referenceRange":
309                raise ValueError("Found unexpected field with contentReference")
310
311            content_reference_string = element_definition["contentReference"]
312            if content_reference_string.startswith('#'):
313                content_reference_string = content_reference_string[1:]
314            field_configs_by_name[field_name] = fhirspec_pb2.FhirFieldConfig(
315                is_array=field_is_array,
316                r4_type=self._get_type_enum_from_type_code(
317                    generate_fhir_type_string_for_child_type_path(content_reference_string))
318            )
319
320        # If the field is a multi type field, it means one of several types can be set. An example
321        # is the field Immunization.occurrence, which has types "string" and "dateTime" and
322        # therefore means the fields "occurrenceString" and "occurrenceDateTime" are allowed. We
323        # therefore expand the field name with each defined type.
324        elif self._field_name_is_multi_type_field(field_name):
325            if field_is_array:
326                raise ValueError(
327                    "Unexpected cardinality for type choice field. Did not expect array.")
328
329            multi_type_fields = []
330            for data_type in element_definition["type"]:
331                field_with_type = self._get_multi_type_name_for_type(field_name, data_type["code"])
332                type_enum = self._extract_type_enum_from_type(data_type, element_definition["path"])
333                field_configs_by_name[field_with_type] = fhirspec_pb2.FhirFieldConfig(
334                    is_array=False,
335                    r4_type=type_enum
336                )
337                multi_type_fields.append(field_with_type)
338
339            multi_type_config = fhirspec_pb2.MultiTypeFieldConfig(
340                name=field_name,
341                typed_field_names=multi_type_fields,
342                is_required=self._field_is_required(element_definition)
343            )
344
345        else:
346            if len(element_definition["type"]) != 1:
347                raise ValueError("Expected exactly one type")
348            fhir_type = element_definition["type"][0]
349            # If the field is the resource "id" field manually set the type to be ID, since the spec
350            # just uses System.String extension with value "string"
351            type_enum = fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_ID \
352                if is_resource_field and field_name == "id" \
353                else self._extract_type_enum_from_type(fhir_type, element_definition["path"])
354            field_configs_by_name[field_name] = fhirspec_pb2.FhirFieldConfig(
355                is_array=field_is_array,
356                r4_type=type_enum
357            )
358
359        return field_configs_by_name, multi_type_config
360
361    def _field_name_is_multi_type_field(self, field_name) -> bool:
362        """Returns true if the field is a oneof / type choice field, which can be contains several
363        data types.
364
365        This is the case if the field name ends with "[x]" and means that one of several types can
366        be set.
367        """
368
369        return field_name.endswith("[x]")
370
371    def _get_multi_type_name_for_type(self, field_name, type_code) -> bool:
372        """Returns the one of field name for a specific type.
373
374        For example for the field name "occurrence[x]" and type "dateTime" this will return
375        "occurrenceDateTime".
376        """
377
378        return field_name[:-3] + type_code[0].upper() + type_code[1:]
379
380    def _field_is_required(self, element_definition) -> bool:
381        """Returns true if the field is required
382
383        FHIR fields can have the following cardinalities:
384        - 0..1, meaning the field is optional
385        - 1..1, meaning the field is required
386        - 0..*, meaning the field is an optional array
387        - 1..*, meaning the field is a required array
388        """
389
390        min = element_definition["min"]
391
392        if min not in [0, 1]:
393            raise ValueError("Unexpected min cardinality value: " + min)
394
395        return min
396
397    def _field_is_array(self, element_definition) -> bool:
398        """Returns true if the field should be an array
399
400        FHIR fields can have the following cardinalities:
401        - 0..1, meaning the field is optional
402        - 1..1, meaning the field is required
403        - 0..*, meaning the field is an optional array
404        - 1..*, meaning the field is a required array
405        """
406
407        max = element_definition["max"]
408
409        if max == "1":
410            return False
411        elif max == "*":
412            return True
413        else:
414            raise ValueError("Unexpected max cardinality value: " + max)
415
416    def _extract_type_enum_from_type(self,
417                                     fhir_type: Mapping,
418                                     field_path: str) -> fhirspec_pb2.R4FhirType:
419        type_code = fhir_type["code"]
420
421        # Many fields of type Quantity have a profile SimpleQuantity specified. This adds one fhir
422        # constraint, which we don't validate, so we can ignore it.
423        if "profile" in fhir_type and fhir_type["profile"] \
424                != ["http://hl7.org/fhir/StructureDefinition/SimpleQuantity"]:
425            raise ValueError(f"Unexpected profile {type['profile']} for type {type_code}")
426        # TODO:b/385115510 - Consider validating targetProfile on "Reference" and "canonical" types.
427        #  A Reference field definition for example usually specifies which type of resource can be
428        #  referenced (e.g. reference to Encounter).
429
430        # These types are used for child types defined in a resource or complex type, so we use
431        # a separate type for them, derived from their path.
432        if type_code in ["Element", "BackboneElement"]:
433            return self._get_type_enum_from_type_code(
434                generate_fhir_type_string_for_child_type_path(field_path))
435
436        # System.String code is used in cases such as Resource.id field, Element.id field,
437        # primitive type value field, and Extension.url field.
438        # The code is used with the extension
439        # https://hl7.org/fhir/extensions/StructureDefinition-structuredefinition-fhir-type.html.
440        # For value fields (which we don't read), the "valueUrl" can take different values, but for
441        # Resource.id fields and Element.id fields the valueUrl is "string", so we manually set it
442        # to string.
443        # The Resource.id field, should actually be of type id, but this isn't specified in the
444        # r4 spec, so this is changed manually when extracting the resource configs.
445        if type_code == "http://hl7.org/fhirpath/System.String":
446            extension = fhir_type["extension"][0]
447            if extension["url"] != \
448                    "http://hl7.org/fhir/StructureDefinition/structuredefinition-fhir-type":
449                raise ValueError("Unexpected extension value for code System.String")
450            if extension["valueUrl"] == "uri":
451                # Used by Extension.url field
452                return fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_URI
453            if extension["valueUrl"] == "string":
454                # Used by Resource.id and Element.id field
455                return fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_STRING
456            else:
457                raise ValueError(
458                    f"Unexpected extension valueUrl in type extension: {str(fhir_type)}")
459
460        return self._get_type_enum_from_type_code(type_code)
461
462    def _get_type_enum_from_type_code(self, type_code: str) -> fhirspec_pb2.R4FhirType:
463        if not type_code.isalnum():
464            raise ValueError("Unexpected characters found in type_string: " + type_code)
465
466        return fhirspec_pb2.R4FhirType.Value(
467            R4_FHIR_TYPE_PREFIX + to_upper_snake_case(type_code))
468
469    def _get_kind_enum_from_kind(self, kind_string: str) -> fhirspec_pb2.Kind:
470        return fhirspec_pb2.Kind.Value("KIND_" + to_upper_snake_case(kind_string))
471