1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2024 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16""" 17Haptic JSON Converter 18 19This tool converts between OH Haptic JSON and HE Haptic JSON formats (v1 and v2). 20 21Version: v0.0.1 22 23Usage: 24 converter.py <input> [-o <output>] -f <format> [-s <schema_dir>] [-v] 25 26Arguments: 27 input Path to the input JSON file or directory. 28 -o, --output Path to the output directory (default: input directory with '_out' suffix). 29 -f, --format Target format: 'oh', 'he_v1', or 'he_v2'. 30 -s, --schema_dir Directory containing JSON schema files (default: 'schemas'). 31 -v, --version_suffix Include version suffix ('_v1' or '_v2') in output HE file names. 32""" 33 34import json 35import argparse 36import os 37from pathlib import Path 38import logging 39from typing import Union, Dict, Any, Tuple 40 41import jsonschema 42from jsonschema import validate 43 44# Configure logging 45logging.basicConfig(level=logging.INFO, 46 format='%(asctime)s - %(levelname)s - %(message)s') 47 48# Constants for target formats 49FORMAT_OH = 'oh' 50FORMAT_HE_V1 = 'he_v1' 51FORMAT_HE_V2 = 'he_v2' 52 53# Type alias for JSON data 54JsonData = Dict[str, Any] 55 56 57def load_schema(file_path: Union[str, Path]) -> JsonData: 58 """ 59 Load a JSON schema from the specified file. 60 61 Args: 62 file_path (Union[str, Path]): Path to the JSON schema file. 63 64 Returns: 65 JsonData: Parsed JSON schema. 66 """ 67 file_path = Path(file_path) 68 try: 69 with file_path.open('r', encoding='utf-8') as file: 70 return json.load(file) 71 except (FileNotFoundError, json.JSONDecodeError) as err: 72 logging.error("Error loading schema from %s: %s", file_path, err) 73 raise 74 75 76def read_json(file_path: Union[str, Path]) -> JsonData: 77 """ 78 Read JSON data from a file. 79 80 Args: 81 file_path (Union[str, Path]): Path to the JSON file. 82 83 Returns: 84 JsonData: Parsed JSON data. 85 """ 86 file_path = Path(file_path) 87 try: 88 with file_path.open('r', encoding='utf-8') as file: 89 return json.load(file) 90 except (FileNotFoundError, json.JSONDecodeError) as err: 91 logging.error("Error reading JSON file %s: %s", file_path, err) 92 raise 93 94 95def write_json(data: JsonData, file_path: Path) -> None: 96 """ 97 Write JSON data to a file. 98 99 Args: 100 data (JsonData): JSON data to write. 101 file_path (Path): Path to the output file. 102 """ 103 try: 104 file_path.parent.mkdir(parents=True, exist_ok=True) 105 with file_path.open('w', encoding='utf-8') as file: 106 json.dump(data, file, indent=2) 107 except (OSError, json.JSONDecodeError) as err: 108 logging.error("Error writing JSON file %s: %s", file_path, err) 109 raise 110 111 112def validate_json(data: JsonData, schema: JsonData) -> Tuple[bool, Union[None, jsonschema.exceptions.ValidationError]]: 113 """ 114 Validate JSON data against a schema. 115 116 Args: 117 data (JsonData): JSON data to validate. 118 schema (JsonData): JSON schema to validate against. 119 120 Returns: 121 Tuple[bool, Union[None, jsonschema.exceptions.ValidationError]]: Validation result and error if invalid. 122 """ 123 try: 124 validate(data, schema) 125 return True, None 126 except jsonschema.exceptions.ValidationError as err: 127 return False, err 128 129 130def convert_oh_to_he_v1(oh_data: JsonData) -> JsonData: 131 """ 132 Convert OH JSON data to HE v1 format. 133 134 Args: 135 oh_data (JsonData): OH JSON data. 136 137 Returns: 138 JsonData: Converted HE v1 JSON data. 139 """ 140 he_data = { 141 "Metadata": { 142 "Version": 1 143 }, 144 "Pattern": [] 145 } 146 147 for channel in oh_data['Channels']: 148 for pattern in channel['Pattern']: 149 event = pattern['Event'] 150 he_event = { 151 "Type": event['Type'], 152 "RelativeTime": event['StartTime'], 153 "Parameters": { 154 "Intensity": event['Parameters']['Intensity'], 155 "Frequency": event['Parameters']['Frequency'] 156 } 157 } 158 if event['Type'] == 'continuous': 159 he_event['Duration'] = event['Duration'] 160 if 'Curve' in event['Parameters']: 161 he_event['Parameters']['Curve'] = [ 162 { 163 "Time": curve_point['Time'], 164 "Intensity": curve_point.get('Intensity', 0), 165 "Frequency": curve_point.get('Frequency', 0) 166 } 167 for curve_point in event['Parameters']['Curve'] 168 ] 169 else: 170 # Add default curve information if 'Curve' is not in parameters 171 he_event['Parameters']['Curve'] = [ 172 { 173 "Time": time, 174 "Intensity": 100, 175 "Frequency": 0 176 } 177 for time in range(4) # Add four default points to satisfy CURVE_POINT_NUM_MIN 178 ] 179 he_data['Pattern'].append(he_event) 180 181 return he_data 182 183 184def convert_oh_to_he_v2(oh_data: JsonData) -> JsonData: 185 """ 186 Convert OH JSON data to HE v2 format. 187 188 Args: 189 oh_data (JsonData): OH JSON data. 190 191 Returns: 192 JsonData: Converted HE v2 JSON data. 193 """ 194 he_v2_data = { 195 "Metadata": { 196 "Version": 2 197 }, 198 "PatternList": [] 199 } 200 201 current_pattern = None 202 last_event_end_time = -1 # Tracks the end time of the last event in the current pattern 203 204 for channel in oh_data["Channels"]: 205 for pattern in channel["Pattern"]: 206 event = pattern["Event"] 207 event_start_time = event["StartTime"] 208 event_end_time = event_start_time + \ 209 (event["Duration"] if event["Type"] == 210 "continuous" else 48) # Default transient duration 211 212 if current_pattern is None or not isinstance(current_pattern, dict) or len(current_pattern.get("Pattern", [])) >= 16 \ 213 or (last_event_end_time != -1 and event_start_time - last_event_end_time > 1000): 214 if current_pattern and isinstance(current_pattern, dict): 215 he_v2_data["PatternList"].append(current_pattern) 216 current_pattern = { 217 "AbsoluteTime": event_start_time, 218 "Pattern": [] 219 } 220 221 if event["Type"] == "continuous": 222 he_event = { 223 "Type": event["Type"], 224 "RelativeTime": event_start_time - current_pattern["AbsoluteTime"], 225 "Duration": event["Duration"], 226 "Parameters": { 227 "Intensity": event["Parameters"]["Intensity"], 228 "Frequency": event["Parameters"].get("Frequency", 50) 229 } 230 } 231 if "Curve" in event["Parameters"]: 232 he_event["Parameters"]["Curve"] = [ 233 { 234 "Time": point["Time"], 235 "Intensity": point.get("Intensity", 100), 236 # Assigning default value of 0 if Frequency is None 237 "Frequency": point.get("Frequency", 0) 238 } 239 for point in event["Parameters"]["Curve"] 240 if "Intensity" in point or "Frequency" in point 241 ] 242 else: 243 # Add default curve information if 'Curve' is not in parameters 244 he_event["Parameters"]["Curve"] = [ 245 { 246 "Time": time, 247 "Intensity": 100, 248 "Frequency": 0 249 } 250 for time in range(0, 4) # Add four default points to satisfy least need curve in hev2 251 ] 252 current_pattern["Pattern"].append(he_event) 253 else: 254 he_event = { 255 "Type": event["Type"], 256 "RelativeTime": event_start_time - current_pattern["AbsoluteTime"], 257 "Parameters": { 258 "Intensity": event["Parameters"]["Intensity"], 259 "Frequency": event["Parameters"].get("Frequency", 50) 260 } 261 } 262 current_pattern["Pattern"].append(he_event) 263 last_event_end_time = event_end_time 264 265 if current_pattern: 266 he_v2_data["PatternList"].append(current_pattern) 267 268 return he_v2_data 269 270 271def convert_he_v1_to_v2(he_v1_data: JsonData) -> JsonData: 272 """ 273 Convert HE V1 JSON data to HE V2 format. 274 275 Args: 276 data (JsonData): HE V1 JSON data. 277 278 Returns: 279 JsonData: Converted HE V2 JSON data. 280 """ 281 converted_data = { 282 "Metadata": { 283 "Version": 2 284 }, 285 "PatternList": [] 286 } 287 288 current_pattern = None 289 last_event_end_time = -1 # Tracks the end time of the last event in the current pattern 290 291 for event in he_v1_data["Pattern"]: 292 event_start_time = event["RelativeTime"] 293 event_end_time = event_start_time + \ 294 (event["Duration"] if event["Type"] == "continuous" else 48) 295 296 if current_pattern is None or not isinstance(current_pattern, dict) or len(current_pattern.get("Pattern", [])) >= 16 \ 297 or (last_event_end_time != -1 and event_start_time - last_event_end_time > 1000): 298 if current_pattern and isinstance(current_pattern, dict): 299 converted_data["PatternList"].append(current_pattern) 300 current_pattern = { 301 "AbsoluteTime": event_start_time, 302 "Pattern": [] 303 } 304 305 # Order of dictionary insertion matters 306 event_dict = { 307 "Type": event["Type"], 308 "RelativeTime": event_start_time - current_pattern["AbsoluteTime"] 309 } 310 311 if event["Type"] == "continuous": 312 event_dict["Duration"] = event["Duration"] 313 314 event_dict["Parameters"] = event["Parameters"] 315 316 current_pattern["Pattern"].append(event_dict) 317 last_event_end_time = event_end_time 318 319 if current_pattern: 320 converted_data["PatternList"].append(current_pattern) 321 322 return converted_data 323 324 325def convert_he_v2_to_v1(he_v2_data: JsonData) -> JsonData: 326 """ 327 Convert HE v2 JSON data to HE v1 format. 328 329 Args: 330 he_v2_data (JsonData): HE v2 JSON data. 331 332 Returns: 333 JsonData: Converted HE v1 JSON data. 334 """ 335 he_v1_data = { 336 "Metadata": { 337 "Version": 1 338 }, 339 "Pattern": [] 340 } 341 342 for pattern_list_entry in he_v2_data['PatternList']: 343 for event in pattern_list_entry['Pattern']: 344 he_v1_event = event.copy() 345 he_v1_event['RelativeTime'] = event['RelativeTime'] + \ 346 pattern_list_entry['AbsoluteTime'] 347 he_v1_data['Pattern'].append(he_v1_event) 348 349 return he_v1_data 350 351 352def clamp(value: float, min_value: float, max_value: float) -> float: 353 """ 354 Clamp a value between a minimum and maximum value. 355 356 Args: 357 value (float): Value to clamp. 358 min_value (float): Minimum value. 359 max_value (float): Maximum value. 360 361 Returns: 362 float: Clamped value. 363 """ 364 return max(min_value, min(value, max_value)) 365 366 367def convert_he_v1_to_oh(he_v1_data: JsonData) -> JsonData: 368 """ 369 Convert HE v1 JSON data to OH format. 370 371 Args: 372 he_v1_data (JsonData): HE v1 JSON data. 373 374 Returns: 375 JsonData: Converted OH JSON data. 376 """ 377 output_data: JsonData = { 378 "MetaData": { 379 "Version": 1.0, 380 "ChannelNumber": 1 381 }, 382 "Channels": [{ 383 "Parameters": { 384 "Index": 0 385 }, 386 "Pattern": [] 387 }] 388 } 389 390 for pattern in he_v1_data.get('Pattern', []): 391 event = { 392 "Event": { 393 "Type": pattern['Type'], 394 "StartTime": pattern['RelativeTime'], 395 "Parameters": { 396 "Intensity": clamp(pattern['Parameters']['Intensity'], 0, 100), 397 "Frequency": clamp(pattern['Parameters'].get('Frequency', 50), -100, 100) 398 } 399 } 400 } 401 if pattern['Type'] == 'continuous': 402 event['Event']['Duration'] = pattern['Duration'] 403 event['Event']['Parameters']['Curve'] = [ 404 { 405 "Time": point['Time'], 406 "Intensity": clamp(point.get('Intensity', 100), 0, 100), 407 "Frequency": clamp(point.get('Frequency', 0), -100, 100) 408 } 409 for point in pattern['Parameters'].get('Curve', []) 410 ] 411 output_data['Channels'][0]['Pattern'].append(event) 412 413 return output_data 414 415 416def convert_he_v2_to_oh(he_v2_data: JsonData) -> JsonData: 417 """ 418 Convert HE v2 JSON data to OH format. 419 420 Args: 421 he_v2_data (JsonData): HE v2 JSON data. 422 423 Returns: 424 JsonData: Converted OH JSON data. 425 """ 426 event_num_max = 128 427 428 output_data = { 429 "MetaData": { 430 "Version": 1.0, 431 "ChannelNumber": 1 432 }, 433 "Channels": [ 434 { 435 "Parameters": { 436 "Index": 0 437 }, 438 "Pattern": [] 439 } 440 ] 441 } 442 443 all_events = [] 444 445 for pattern_list_entry in he_v2_data.get('PatternList', []): 446 absolute_time = pattern_list_entry.get('AbsoluteTime', 0) 447 for pattern in pattern_list_entry.get('Pattern', []): 448 if pattern['Type'] == 'continuous': 449 event = { 450 "Event": { 451 "Type": pattern['Type'], 452 "StartTime": absolute_time + pattern.get('RelativeTime', 0), 453 "Duration": clamp(pattern.get('Duration', 1000), 1, 5000), 454 "Parameters": { 455 "Intensity": clamp(pattern['Parameters'].get('Intensity', 100), 0, 100), 456 "Frequency": clamp(pattern['Parameters'].get('Frequency', 50), -100, 100), 457 "Curve": [ 458 { 459 "Time": clamp(point.get('Time', 0), 0, 10000), 460 "Intensity": clamp(point.get('Intensity', 100), 0, 100), 461 "Frequency": clamp(point.get('Frequency', 0), -100, 100) 462 } 463 for point in pattern['Parameters'].get('Curve', []) 464 ] 465 } 466 } 467 } 468 else: 469 event = { 470 "Event": { 471 "Type": pattern['Type'], 472 "StartTime": absolute_time + pattern.get('RelativeTime', 0), 473 "Parameters": { 474 "Intensity": clamp(pattern['Parameters'].get('Intensity', 100), 0, 100), 475 "Frequency": clamp(pattern['Parameters'].get('Frequency', 50), -100, 100) 476 } 477 } 478 } 479 all_events.append(event) 480 481 all_events.sort(key=lambda e: e['Event']['StartTime']) 482 483 for event in all_events: 484 index = event['Event'].get('Index', 0) 485 channel_found = False 486 487 for channel in output_data['Channels']: 488 if channel['Parameters']['Index'] == index: 489 if len(channel['Pattern']) < event_num_max: 490 channel['Pattern'].append(event) 491 channel_found = True 492 break 493 494 if not channel_found: 495 if len(output_data['Channels']) < 3: 496 new_channel = { 497 "Parameters": { 498 "Index": index 499 }, 500 "Pattern": [event] 501 } 502 output_data['Channels'].append(new_channel) 503 504 return output_data 505 506 507def process_file(input_file: Union[str, Path], output_dir: Union[str, Path], target_format: str, schema_dir: Union[str, Path], version_suffix: bool) -> None: 508 """ 509 Process a single JSON file and convert it to the target format. 510 511 Args: 512 input_file (Union[str, Path]): Path to the input JSON file. 513 output_dir (Union[str, Path]): Path to the output directory. 514 target_format (str): Target format: 'oh', 'he_v1', or 'he_v2'. 515 schema_dir (Union[str, Path]): Directory containing JSON schema files. 516 version_suffix (bool): Include version suffix ('_v1' or '_v2') in output HE file names. 517 """ 518 input_file = Path(input_file) 519 output_dir = Path(output_dir) 520 schema_dir = Path(schema_dir) 521 522 try: 523 input_data = read_json(input_file) 524 525 # Load schemas 526 schemas = { 527 FORMAT_OH: load_schema(os.path.join(schema_dir, 'oh_schema.json')), 528 FORMAT_HE_V1: load_schema(os.path.join(schema_dir, 'he_v1_schema.json')), 529 FORMAT_HE_V2: load_schema(os.path.join(schema_dir, 'he_v2_schema.json')), 530 } 531 532 # Determine the schema to validate against 533 input_format = None 534 for in_format, schema in schemas.items(): 535 is_valid, _ = validate_json(input_data, schema) 536 if is_valid: 537 input_format = in_format 538 break 539 540 if not input_format: 541 logging.error("No valid schema found for file %s", input_file) 542 return 543 544 # Define a mapping of conversion functions and output extensions 545 conversion_map = { 546 (FORMAT_OH, FORMAT_HE_V1): (convert_oh_to_he_v1, '_v1.he', '.he'), 547 (FORMAT_OH, FORMAT_HE_V2): (convert_oh_to_he_v2, '_v2.he', '.he'), 548 (FORMAT_HE_V1, FORMAT_OH): (convert_he_v1_to_oh, '.json', '.json'), 549 (FORMAT_HE_V1, FORMAT_HE_V2): (convert_he_v1_to_v2, '_v2.he', '.he'), 550 (FORMAT_HE_V2, FORMAT_OH): (convert_he_v2_to_oh, '.json', '.json'), 551 (FORMAT_HE_V2, FORMAT_HE_V1): (convert_he_v2_to_v1, '_v1.he', '.he'), 552 } 553 554 # Perform conversion based on input format and target format 555 if (input_format, target_format) in conversion_map: 556 convert_func, versioned_ext, default_ext = conversion_map[(input_format, target_format)] 557 output_data = convert_func(input_data) 558 output_ext = versioned_ext if version_suffix else default_ext 559 else: 560 output_data = input_data if input_format in [FORMAT_OH, FORMAT_HE_V1, FORMAT_HE_V2] else None 561 output_ext = '.json' if input_format == FORMAT_OH else (f'_{input_format.lower()}.he' if version_suffix else '.he') 562 if output_data is None: 563 logging.error("Unsupported input format for file %s", input_file) 564 return 565 566 # Validate the output data with the target format schema 567 target_schema = schemas[target_format] 568 is_valid, error = validate_json(output_data, target_schema) 569 if not is_valid: 570 logging.error("Validation error for the converted data against %s schema: %s", target_format, error) 571 return 572 573 output_file = output_dir / (input_file.stem + output_ext) 574 write_json(output_data, output_file) 575 except (FileNotFoundError, json.JSONDecodeError, jsonschema.exceptions.ValidationError) as err: 576 logging.error("Error processing file %s: %s", input_file, err) 577 578 579def process_directory(input_dir: Union[str, Path], output_dir: Union[str, Path], target_format: str, schema_dir: Union[str, Path], version_suffix: bool) -> None: 580 """ 581 Process all JSON files in a directory and convert them to the target format. 582 583 Args: 584 input_dir (Union[str, Path]): Path to the input directory. 585 output_dir (Union[str, Path]): Path to the output directory. 586 target_format (str): Target format: 'oh', 'he_v1', or 'he_v2'. 587 schema_dir (Union[str, Path]): Directory containing JSON schema files. 588 version_suffix (bool): Include version suffix ('_v1' or '_v2') in output HE file names. 589 """ 590 input_dir = Path(input_dir) 591 output_dir = Path(output_dir) 592 593 for root, _, files in os.walk(input_dir): 594 for file in files: 595 if file.endswith('.json') or file.endswith('.he'): 596 input_file = os.path.join(root, file) 597 process_file(input_file, output_dir, target_format, 598 schema_dir, version_suffix) 599 600 601def main() -> None: 602 """ 603 Main entry point of the script. Parses arguments and processes the input accordingly. 604 """ 605 parser = argparse.ArgumentParser( 606 description="Convert between OH Haptic JSON and HE Haptic JSON formats.") 607 parser.add_argument("input", type=Path, 608 help="Path to the input JSON file or directory.") 609 parser.add_argument("-o", "--output", type=Path, 610 help="Path to the output dir (default: input dir with '_out' suffix).") 611 parser.add_argument("-f", "--format", choices=[FORMAT_OH, FORMAT_HE_V1, FORMAT_HE_V2], 612 required=True, help="Target format: 'oh', 'he_v1', or 'he_v2'.") 613 parser.add_argument("-s", "--schema_dir", type=Path, default=Path("schemas"), 614 help="Directory containing JSON schema files (default: 'schemas').") 615 parser.add_argument("-v", "--version_suffix", action="store_true", 616 help="Include version suffix ('_v1' or '_v2') in output HE file names.") 617 618 args = parser.parse_args() 619 620 if args.input.is_file(): 621 input_file = args.input 622 output_dir = args.output or args.input.parent 623 process_file(input_file, output_dir, args.format, 624 args.schema_dir, args.version_suffix) 625 elif args.input.is_dir(): 626 input_dir = args.input 627 output_dir = args.output or input_dir.with_name( 628 input_dir.name + '_out') 629 process_directory(input_dir, output_dir, args.format, 630 args.schema_dir, args.version_suffix) 631 else: 632 raise ValueError(f"Invalid input path: {args.input}") 633 634if __name__ == "__main__": 635 main() 636