1import fhirspec_pb2 2from fhir_spec_utils import * 3from typing import Collection, Mapping, Iterable 4 5R4_FHIR_TYPE_PREFIX = "R4_FHIR_TYPE_" 6 7 8class FhirSpecExtractor: 9 """Extractor for getting information for HC FHIR validation from official FHIR spec json files. 10 11 Typical usage example: 12 extractor = new FhirSpecExtractor(profile_resources_json, profiles_types_json, 13 {"Immunization", "Observation"}) 14 fhir_spec_message = extractor.generate_r4_fhir_spec_proto_message(profile_types_json) 15 """ 16 17 def __init__(self, profile_resources_json: Mapping, resource_names: set[str]): 18 """Extracts StructureDefinitions for the requested resources from the provided fhir spec. 19 20 Args: 21 profile_resources_json: The contents of the profile-resources.json fhir spec file, which 22 is in the structure of a https://hl7.org/fhir/Bundle.html, parsed to dict. The 23 Bundle.entry will contain the list of https://hl7.org/fhir/StructureDefinition.html that 24 contain the resource definitions we are interested in. 25 resource_names: The set of FHIR resources to extract FHIR spec information for. 26 27 Raises: 28 ValueError: If a requested resource is not present in the spec, if it's not supported by 29 Health Connect, or if any spec values are not as expected. 30 """ 31 if not resource_names.issubset(HC_SUPPORTED_RESOURCE_SET): 32 raise ValueError("Provided resource set was not a subset of supported resources") 33 34 # A mapping from the resource name to the list of field definitions, which are in the 35 # structure of https://hl7.org/fhir/ElementDefinition.html 36 self._resource_to_element_definitions = ( 37 self._extract_element_definitions_by_resource_from_spec( 38 profile_resources_json, resource_names)) 39 40 def generate_r4_fhir_spec_proto_message( 41 self, profile_types_json: Mapping) -> fhirspec_pb2.FhirResourceSpec: 42 """Generates a FhirResourceSpec message from the fhir json spec. 43 44 Args: 45 profile_types_json: The contents of the profile-types.json fhir spec file, which 46 is in the structure of a https://hl7.org/fhir/Bundle.html, parsed to dict. The 47 Bundle.entry will contain the list of https://hl7.org/fhir/StructureDefinition.html that 48 contain the data type definitions we are interested in. 49 50 Returns: 51 The FhirResourceSpec message, with an entry for each requested resource, and the data 52 type configs for the required types. 53 """ 54 r4_resource_spec = fhirspec_pb2.FhirResourceSpec() 55 56 # Add Element type manually, as this is the type of primitive type extension fields. 57 data_types_set = {fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_ELEMENT} 58 data_type_configs = [] 59 60 for resource, element_definitions in self._resource_to_element_definitions.items(): 61 resource_type_int = RESOURCE_TYPE_STRING_TO_HC_INT_MAPPING[resource] 62 63 resource_complex_type_config, child_types = ( 64 self._generate_complex_type_config_and_child_types_from_element_definitions( 65 element_definitions, is_resource_type=True)) 66 data_type_configs.extend(child_types) 67 68 child_type_enums = set(child_type.fhir_type for child_type in child_types) 69 all_complex_type_configs = [child_type.fhir_complex_type_config for child_type in 70 child_types] + [resource_complex_type_config] 71 72 for complex_type_config in all_complex_type_configs: 73 new_sub_types = self._extract_subtypes_from_complex_type_config(complex_type_config) 74 # Add sub types to data_types_set but skip child types, since we have already 75 # extracted their definitions. 76 data_types_set.update(new_sub_types - child_type_enums) 77 78 r4_resource_spec.resource_type_to_config[ 79 resource_type_int].CopyFrom(resource_complex_type_config) 80 81 data_type_configs.extend(self._get_fhir_data_type_configs_for_types_and_nested_types( 82 profile_types_json, data_types_set)) 83 84 # Sort list by fhir_type before adding to make sure the script output is deterministic 85 data_type_configs = sorted(data_type_configs, key=lambda x: x.fhir_type) 86 r4_resource_spec.fhir_data_type_configs.extend(data_type_configs) 87 88 return r4_resource_spec 89 90 def _get_fhir_data_type_configs_for_types_and_nested_types( 91 self, 92 profile_types_json: Mapping, 93 type_names: set[fhirspec_pb2.R4FhirType]) -> Iterable[fhirspec_pb2.FhirDataType]: 94 # All structure definitions that have a matching enum value. If one is missing, this will 95 # cause an exception when extracting the data type configs. 96 all_type_to_structure_definition = {} 97 for type_string, structure_definition in extract_type_to_structure_definitions_from_spec( 98 profile_types_json, None).items(): 99 try: 100 type_enum = self._get_type_enum_from_type_code(type_string) 101 all_type_to_structure_definition[type_enum] = structure_definition 102 except ValueError: 103 print(f"Type {type_string} did not have an enum value.") 104 105 return self._recursively_extract_data_type_configs_and_sub_types_by_type( 106 all_type_to_structure_definition, type_names).values() 107 108 def _recursively_extract_data_type_configs_and_sub_types_by_type( 109 self, 110 all_type_to_structure_definition: Mapping, 111 fhir_types_to_extract: set[fhirspec_pb2.R4FhirType], 112 already_extracted_types=set()) -> Mapping: 113 new_types_to_extract = set() 114 type_to_data_type_config_map = {} 115 116 for fhir_type in fhir_types_to_extract: 117 if fhir_type == fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_RESOURCE: 118 # The Resource type definition does not exist in the profile types file. As we don't 119 # support contained resources yet, we don't need a config for this type for now. 120 type_to_data_type_config_map[fhir_type] = fhirspec_pb2.FhirDataType( 121 fhir_type=fhir_type, 122 kind=fhirspec_pb2.Kind.KIND_COMPLEX_TYPE) 123 continue 124 125 if fhir_type not in all_type_to_structure_definition: 126 raise ValueError( 127 f"Type {fhir_type} was missing from the list of structure definitions.") 128 129 structure_definition = all_type_to_structure_definition[fhir_type] 130 kind = self._get_kind_enum_from_kind(structure_definition["kind"]) 131 complex_type_config = None 132 133 if kind != fhirspec_pb2.Kind.KIND_PRIMITIVE_TYPE: 134 complex_type_config, child_types = ( 135 self._generate_complex_type_config_and_child_types_from_element_definitions( 136 extract_element_definitions_from_structure_def(structure_definition))) 137 138 sub_types = self._extract_subtypes_from_complex_type_config(complex_type_config) 139 child_type_enums = [child_type.fhir_type for child_type in child_types] 140 for child_type in child_types: 141 type_to_data_type_config_map[child_type.fhir_type] = child_type 142 sub_types.update(self._extract_subtypes_from_complex_type_config( 143 child_type.fhir_complex_type_config)) 144 145 for sub_type in sub_types: 146 if sub_type not in already_extracted_types and \ 147 sub_type not in fhir_types_to_extract and \ 148 sub_type not in child_type_enums: 149 new_types_to_extract.add(sub_type) 150 151 type_to_data_type_config_map[fhir_type] = fhirspec_pb2.FhirDataType( 152 fhir_type=fhir_type, 153 kind=kind, 154 fhir_complex_type_config=complex_type_config) 155 156 if new_types_to_extract: 157 type_to_data_type_config_map.update( 158 self._recursively_extract_data_type_configs_and_sub_types_by_type( 159 all_type_to_structure_definition, new_types_to_extract, 160 already_extracted_types.union(fhir_types_to_extract))) 161 162 return type_to_data_type_config_map 163 164 def _extract_subtypes_from_complex_type_config( 165 self, complex_type_config: fhirspec_pb2.FhirComplexTypeConfig) -> Collection[ 166 fhirspec_pb2.R4FhirType]: 167 return set(field_config.r4_type 168 for field_config in complex_type_config.allowed_field_names_to_config.values()) 169 170 def _extract_element_definitions_by_resource_from_spec( 171 self, profile_resources_json: Mapping, resource_names: set[str]) -> Mapping: 172 resource_to_element_definitions = {} 173 174 for resource, structure_definition in extract_type_to_structure_definitions_from_spec( 175 profile_resources_json, resource_names).items(): 176 if structure_definition["kind"] != "resource": 177 raise ValueError( 178 "Unexpected kind field in structure definition. Expected resource.") 179 resource_to_element_definitions[resource] = ( 180 extract_element_definitions_from_structure_def(structure_definition)) 181 182 return resource_to_element_definitions 183 184 def _generate_complex_type_config_and_child_types_from_element_definitions( 185 self, 186 element_definitions: Collection[Mapping], 187 is_resource_type=False) -> ( 188 fhirspec_pb2.FhirComplexTypeConfig, Collection[fhirspec_pb2.FhirDataType]): 189 # Element definitions that have 2 field parts, and are therefore fields present in this type 190 field_element_definitions = [] 191 # Element definitions with more than 2 field parts, which are used to define child types of 192 # this type, so need to be handled separately. 193 child_type_to_element_definitions = {} 194 195 for element in element_definitions: 196 field_id = element["id"] 197 if field_id != element["path"]: 198 raise ValueError("Expected id and path field to be the same") 199 field_parts = field_id.split(".") 200 field_parts_length = len(field_parts) 201 202 if field_parts_length == 1: 203 # This is the path to the element itself. For example for the Observation resource, 204 # There will be an ElementDefinition with id "Observation" 205 continue 206 207 elif field_parts_length == 2: 208 # This is a "regular" nested field, e.g. Immunization.status. 209 field_element_definitions.append(element) 210 211 elif field_parts_length == 3 or field_parts_length == 4: 212 # This means the field is part of type BackBoneElement or Element, which are the 213 # types of child types defined by Resource and Type definitions respectively. 214 # For an example see the https://hl7.org/fhir/Immunization.html "reaction" field. 215 # They need to be handled separately, as those fields don't have a type defined, but 216 # have a child type definition instead. 217 child_type_path = '.'.join(field_parts[:-1]) 218 if child_type_path not in child_type_to_element_definitions: 219 child_type_to_element_definitions[child_type_path] = [] 220 child_type_to_element_definitions[child_type_path].append(element) 221 222 else: 223 raise ValueError( 224 "This should not happen, unless there are more deeply nested child types") 225 226 complex_type_definition = self._generate_complex_type_config_from_element_definitions( 227 field_element_definitions, 228 is_resource_type 229 ) 230 231 child_data_types = [] 232 for path, element_definitions in child_type_to_element_definitions.items(): 233 child_data_types.append( 234 fhirspec_pb2.FhirDataType( 235 fhir_type=self._get_type_enum_from_type_code( 236 generate_fhir_type_string_for_child_type_path(path)), 237 kind=fhirspec_pb2.Kind.KIND_COMPLEX_TYPE, 238 fhir_complex_type_config= 239 self._generate_complex_type_config_from_element_definitions( 240 element_definitions, 241 False 242 )) 243 ) 244 245 return complex_type_definition, child_data_types 246 247 def _generate_complex_type_config_from_element_definitions( 248 self, 249 element_definitions: Collection[Mapping], 250 is_resource_type: bool) -> fhirspec_pb2.FhirComplexTypeConfig: 251 """Generates a FhirComplexTypeDefinition from element definitions. 252 253 The element definitions are expected to be pre filtered to contain only fields that should 254 be part of the complex type config. This is done in 255 _generate_complex_type_config_and_child_types_from_element_definitions(). 256 """ 257 required_fields = set() 258 259 multi_type_configs = [] 260 261 field_configs_by_name = {} 262 263 if is_resource_type: 264 # Manually add resourceType field, as this is not present in the spec 265 field_configs_by_name["resourceType"] = fhirspec_pb2.FhirFieldConfig( 266 is_array=False, 267 r4_type=fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_STRING 268 ) 269 270 for element in element_definitions: 271 field_name = element["id"].split(".")[-1] 272 273 field_configs_to_add, multi_type_config = ( 274 self._generate_field_configs_and_multi_type_config_from_field_element( 275 element, field_name, is_resource_type)) 276 for name in field_configs_to_add: 277 if name in field_configs_by_name: raise ValueError("Field name already exists") 278 279 field_configs_by_name.update(field_configs_to_add) 280 if self._field_name_is_multi_type_field(field_name): 281 multi_type_configs.append(multi_type_config) 282 elif self._field_is_required(element): 283 required_fields.add(field_name) 284 285 return fhirspec_pb2.FhirComplexTypeConfig( 286 allowed_field_names_to_config=field_configs_by_name, 287 # Sort the list of required fields alphabetically, as the output of this script is part 288 # of the build, which needs to be deterministic. The required_fields come from a set, 289 # which does not have ordering guarantees. 290 required_fields=sorted(required_fields), 291 multi_type_fields=multi_type_configs 292 ) 293 294 def _generate_field_configs_and_multi_type_config_from_field_element( 295 self, element_definition, field_name, is_resource_field: bool) -> ( 296 Mapping[str, fhirspec_pb2.FhirFieldConfig], list[fhirspec_pb2.MultiTypeFieldConfig]): 297 field_is_array = self._field_is_array(element_definition) 298 299 field_configs_by_name = {} 300 301 multi_type_config = None 302 303 if "contentReference" in element_definition: 304 # If an element definition has a contentReference, it doesn't have a type, but instead 305 # references a child type defined in a different field and we can assume the type from 306 # the referenced path. 307 # Since this only happens for one field currently, raise Error if it's a new field. 308 if element_definition["path"] != "Observation.component.referenceRange": 309 raise ValueError("Found unexpected field with contentReference") 310 311 content_reference_string = element_definition["contentReference"] 312 if content_reference_string.startswith('#'): 313 content_reference_string = content_reference_string[1:] 314 field_configs_by_name[field_name] = fhirspec_pb2.FhirFieldConfig( 315 is_array=field_is_array, 316 r4_type=self._get_type_enum_from_type_code( 317 generate_fhir_type_string_for_child_type_path(content_reference_string)) 318 ) 319 320 # If the field is a multi type field, it means one of several types can be set. An example 321 # is the field Immunization.occurrence, which has types "string" and "dateTime" and 322 # therefore means the fields "occurrenceString" and "occurrenceDateTime" are allowed. We 323 # therefore expand the field name with each defined type. 324 elif self._field_name_is_multi_type_field(field_name): 325 if field_is_array: 326 raise ValueError( 327 "Unexpected cardinality for type choice field. Did not expect array.") 328 329 multi_type_fields = [] 330 for data_type in element_definition["type"]: 331 field_with_type = self._get_multi_type_name_for_type(field_name, data_type["code"]) 332 type_enum = self._extract_type_enum_from_type(data_type, element_definition["path"]) 333 field_configs_by_name[field_with_type] = fhirspec_pb2.FhirFieldConfig( 334 is_array=False, 335 r4_type=type_enum 336 ) 337 multi_type_fields.append(field_with_type) 338 339 multi_type_config = fhirspec_pb2.MultiTypeFieldConfig( 340 name=field_name, 341 typed_field_names=multi_type_fields, 342 is_required=self._field_is_required(element_definition) 343 ) 344 345 else: 346 if len(element_definition["type"]) != 1: 347 raise ValueError("Expected exactly one type") 348 fhir_type = element_definition["type"][0] 349 # If the field is the resource "id" field manually set the type to be ID, since the spec 350 # just uses System.String extension with value "string" 351 type_enum = fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_ID \ 352 if is_resource_field and field_name == "id" \ 353 else self._extract_type_enum_from_type(fhir_type, element_definition["path"]) 354 field_configs_by_name[field_name] = fhirspec_pb2.FhirFieldConfig( 355 is_array=field_is_array, 356 r4_type=type_enum 357 ) 358 359 return field_configs_by_name, multi_type_config 360 361 def _field_name_is_multi_type_field(self, field_name) -> bool: 362 """Returns true if the field is a oneof / type choice field, which can be contains several 363 data types. 364 365 This is the case if the field name ends with "[x]" and means that one of several types can 366 be set. 367 """ 368 369 return field_name.endswith("[x]") 370 371 def _get_multi_type_name_for_type(self, field_name, type_code) -> bool: 372 """Returns the one of field name for a specific type. 373 374 For example for the field name "occurrence[x]" and type "dateTime" this will return 375 "occurrenceDateTime". 376 """ 377 378 return field_name[:-3] + type_code[0].upper() + type_code[1:] 379 380 def _field_is_required(self, element_definition) -> bool: 381 """Returns true if the field is required 382 383 FHIR fields can have the following cardinalities: 384 - 0..1, meaning the field is optional 385 - 1..1, meaning the field is required 386 - 0..*, meaning the field is an optional array 387 - 1..*, meaning the field is a required array 388 """ 389 390 min = element_definition["min"] 391 392 if min not in [0, 1]: 393 raise ValueError("Unexpected min cardinality value: " + min) 394 395 return min 396 397 def _field_is_array(self, element_definition) -> bool: 398 """Returns true if the field should be an array 399 400 FHIR fields can have the following cardinalities: 401 - 0..1, meaning the field is optional 402 - 1..1, meaning the field is required 403 - 0..*, meaning the field is an optional array 404 - 1..*, meaning the field is a required array 405 """ 406 407 max = element_definition["max"] 408 409 if max == "1": 410 return False 411 elif max == "*": 412 return True 413 else: 414 raise ValueError("Unexpected max cardinality value: " + max) 415 416 def _extract_type_enum_from_type(self, 417 fhir_type: Mapping, 418 field_path: str) -> fhirspec_pb2.R4FhirType: 419 type_code = fhir_type["code"] 420 421 # Many fields of type Quantity have a profile SimpleQuantity specified. This adds one fhir 422 # constraint, which we don't validate, so we can ignore it. 423 if "profile" in fhir_type and fhir_type["profile"] \ 424 != ["http://hl7.org/fhir/StructureDefinition/SimpleQuantity"]: 425 raise ValueError(f"Unexpected profile {type['profile']} for type {type_code}") 426 # TODO:b/385115510 - Consider validating targetProfile on "Reference" and "canonical" types. 427 # A Reference field definition for example usually specifies which type of resource can be 428 # referenced (e.g. reference to Encounter). 429 430 # These types are used for child types defined in a resource or complex type, so we use 431 # a separate type for them, derived from their path. 432 if type_code in ["Element", "BackboneElement"]: 433 return self._get_type_enum_from_type_code( 434 generate_fhir_type_string_for_child_type_path(field_path)) 435 436 # System.String code is used in cases such as Resource.id field, Element.id field, 437 # primitive type value field, and Extension.url field. 438 # The code is used with the extension 439 # https://hl7.org/fhir/extensions/StructureDefinition-structuredefinition-fhir-type.html. 440 # For value fields (which we don't read), the "valueUrl" can take different values, but for 441 # Resource.id fields and Element.id fields the valueUrl is "string", so we manually set it 442 # to string. 443 # The Resource.id field, should actually be of type id, but this isn't specified in the 444 # r4 spec, so this is changed manually when extracting the resource configs. 445 if type_code == "http://hl7.org/fhirpath/System.String": 446 extension = fhir_type["extension"][0] 447 if extension["url"] != \ 448 "http://hl7.org/fhir/StructureDefinition/structuredefinition-fhir-type": 449 raise ValueError("Unexpected extension value for code System.String") 450 if extension["valueUrl"] == "uri": 451 # Used by Extension.url field 452 return fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_URI 453 if extension["valueUrl"] == "string": 454 # Used by Resource.id and Element.id field 455 return fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_STRING 456 else: 457 raise ValueError( 458 f"Unexpected extension valueUrl in type extension: {str(fhir_type)}") 459 460 return self._get_type_enum_from_type_code(type_code) 461 462 def _get_type_enum_from_type_code(self, type_code: str) -> fhirspec_pb2.R4FhirType: 463 if not type_code.isalnum(): 464 raise ValueError("Unexpected characters found in type_string: " + type_code) 465 466 return fhirspec_pb2.R4FhirType.Value( 467 R4_FHIR_TYPE_PREFIX + to_upper_snake_case(type_code)) 468 469 def _get_kind_enum_from_kind(self, kind_string: str) -> fhirspec_pb2.Kind: 470 return fhirspec_pb2.Kind.Value("KIND_" + to_upper_snake_case(kind_string)) 471