• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2024 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17Haptic JSON Converter
18
19This tool converts between OH Haptic JSON and HE Haptic JSON formats (v1 and v2).
20
21Version: v0.0.1
22
23Usage:
24    converter.py <input> [-o <output>] -f <format> [-s <schema_dir>] [-v]
25
26Arguments:
27    input               Path to the input JSON file or directory.
28    -o, --output        Path to the output directory (default: input directory with '_out' suffix).
29    -f, --format        Target format: 'oh', 'he_v1', or 'he_v2'.
30    -s, --schema_dir    Directory containing JSON schema files (default: 'schemas').
31    -v, --version_suffix Include version suffix ('_v1' or '_v2') in output HE file names.
32"""
33
34import json
35import argparse
36import os
37from pathlib import Path
38import logging
39from typing import Union, Dict, Any, Tuple
40
41import jsonschema
42from jsonschema import validate
43
44# Configure logging
45logging.basicConfig(level=logging.INFO,
46                    format='%(asctime)s - %(levelname)s - %(message)s')
47
48# Constants for target formats
49FORMAT_OH = 'oh'
50FORMAT_HE_V1 = 'he_v1'
51FORMAT_HE_V2 = 'he_v2'
52
53# Type alias for JSON data
54JsonData = Dict[str, Any]
55
56
57def load_schema(file_path: Union[str, Path]) -> JsonData:
58    """
59    Load a JSON schema from the specified file.
60
61    Args:
62        file_path (Union[str, Path]): Path to the JSON schema file.
63
64    Returns:
65        JsonData: Parsed JSON schema.
66    """
67    file_path = Path(file_path)
68    try:
69        with file_path.open('r', encoding='utf-8') as file:
70            return json.load(file)
71    except (FileNotFoundError, json.JSONDecodeError) as err:
72        logging.error("Error loading schema from %s: %s", file_path, err)
73        raise
74
75
76def read_json(file_path: Union[str, Path]) -> JsonData:
77    """
78    Read JSON data from a file.
79
80    Args:
81        file_path (Union[str, Path]): Path to the JSON file.
82
83    Returns:
84        JsonData: Parsed JSON data.
85    """
86    file_path = Path(file_path)
87    try:
88        with file_path.open('r', encoding='utf-8') as file:
89            return json.load(file)
90    except (FileNotFoundError, json.JSONDecodeError) as err:
91        logging.error("Error reading JSON file %s: %s", file_path, err)
92        raise
93
94
95def write_json(data: JsonData, file_path: Path) -> None:
96    """
97    Write JSON data to a file.
98
99    Args:
100        data (JsonData): JSON data to write.
101        file_path (Path): Path to the output file.
102    """
103    try:
104        file_path.parent.mkdir(parents=True, exist_ok=True)
105        with file_path.open('w', encoding='utf-8') as file:
106            json.dump(data, file, indent=2)
107    except (OSError, json.JSONDecodeError) as err:
108        logging.error("Error writing JSON file %s: %s", file_path, err)
109        raise
110
111
112def validate_json(data: JsonData, schema: JsonData) -> Tuple[bool, Union[None, jsonschema.exceptions.ValidationError]]:
113    """
114    Validate JSON data against a schema.
115
116    Args:
117        data (JsonData): JSON data to validate.
118        schema (JsonData): JSON schema to validate against.
119
120    Returns:
121        Tuple[bool, Union[None, jsonschema.exceptions.ValidationError]]: Validation result and error if invalid.
122    """
123    try:
124        validate(data, schema)
125        return True, None
126    except jsonschema.exceptions.ValidationError as err:
127        return False, err
128
129
130def convert_oh_to_he_v1(oh_data: JsonData) -> JsonData:
131    """
132    Convert OH JSON data to HE v1 format.
133
134    Args:
135        oh_data (JsonData): OH JSON data.
136
137    Returns:
138        JsonData: Converted HE v1 JSON data.
139    """
140    he_data = {
141        "Metadata": {
142            "Version": 1
143        },
144        "Pattern": []
145    }
146
147    for channel in oh_data['Channels']:
148        for pattern in channel['Pattern']:
149            event = pattern['Event']
150            he_event = {
151                "Type": event['Type'],
152                "RelativeTime": event['StartTime'],
153                "Parameters": {
154                    "Intensity": event['Parameters']['Intensity'],
155                    "Frequency": event['Parameters']['Frequency']
156                }
157            }
158            if event['Type'] == 'continuous':
159                he_event['Duration'] = event['Duration']
160                if 'Curve' in event['Parameters']:
161                    he_event['Parameters']['Curve'] = [
162                        {
163                            "Time": curve_point['Time'],
164                            "Intensity": curve_point.get('Intensity', 0),
165                            "Frequency": curve_point.get('Frequency', 0)
166                        }
167                        for curve_point in event['Parameters']['Curve']
168                    ]
169                else:
170                    # Add default curve information if 'Curve' is not in parameters
171                    he_event['Parameters']['Curve'] = [
172                        {
173                            "Time": time,
174                            "Intensity": 100,
175                            "Frequency": 0
176                        }
177                        for time in range(4)  # Add four default points to satisfy CURVE_POINT_NUM_MIN
178                    ]
179            he_data['Pattern'].append(he_event)
180
181    return he_data
182
183
184def convert_oh_to_he_v2(oh_data: JsonData) -> JsonData:
185    """
186    Convert OH JSON data to HE v2 format.
187
188    Args:
189        oh_data (JsonData): OH JSON data.
190
191    Returns:
192        JsonData: Converted HE v2 JSON data.
193    """
194    he_v2_data = {
195        "Metadata": {
196            "Version": 2
197        },
198        "PatternList": []
199    }
200
201    current_pattern = None
202    last_event_end_time = -1  # Tracks the end time of the last event in the current pattern
203
204    for channel in oh_data["Channels"]:
205        for pattern in channel["Pattern"]:
206            event = pattern["Event"]
207            event_start_time = event["StartTime"]
208            event_end_time = event_start_time + \
209                (event["Duration"] if event["Type"] ==
210                 "continuous" else 48)  # Default transient duration
211
212            if current_pattern is None or not isinstance(current_pattern, dict) or len(current_pattern.get("Pattern", [])) >= 16 \
213                or (last_event_end_time != -1 and event_start_time - last_event_end_time > 1000):
214                if current_pattern and isinstance(current_pattern, dict):
215                    he_v2_data["PatternList"].append(current_pattern)
216                current_pattern = {
217                    "AbsoluteTime": event_start_time,
218                    "Pattern": []
219                }
220
221            if event["Type"] == "continuous":
222                he_event = {
223                    "Type": event["Type"],
224                    "RelativeTime": event_start_time - current_pattern["AbsoluteTime"],
225                    "Duration": event["Duration"],
226                    "Parameters": {
227                        "Intensity": event["Parameters"]["Intensity"],
228                        "Frequency": event["Parameters"].get("Frequency", 50)
229                    }
230                }
231                if "Curve" in event["Parameters"]:
232                    he_event["Parameters"]["Curve"] = [
233                        {
234                            "Time": point["Time"],
235                            "Intensity": point.get("Intensity", 100),
236                            # Assigning default value of 0 if Frequency is None
237                            "Frequency": point.get("Frequency", 0)
238                        }
239                        for point in event["Parameters"]["Curve"]
240                        if "Intensity" in point or "Frequency" in point
241                    ]
242                else:
243                    # Add default curve information if 'Curve' is not in parameters
244                    he_event["Parameters"]["Curve"] = [
245                        {
246                            "Time": time,
247                            "Intensity": 100,
248                            "Frequency": 0
249                        }
250                        for time in range(0, 4)  # Add four default points to satisfy least need curve in hev2
251                    ]
252                current_pattern["Pattern"].append(he_event)
253            else:
254                he_event = {
255                    "Type": event["Type"],
256                    "RelativeTime": event_start_time - current_pattern["AbsoluteTime"],
257                    "Parameters": {
258                        "Intensity": event["Parameters"]["Intensity"],
259                        "Frequency": event["Parameters"].get("Frequency", 50)
260                    }
261                }
262                current_pattern["Pattern"].append(he_event)
263            last_event_end_time = event_end_time
264
265    if current_pattern:
266        he_v2_data["PatternList"].append(current_pattern)
267
268    return he_v2_data
269
270
271def convert_he_v1_to_v2(he_v1_data: JsonData) -> JsonData:
272    """
273    Convert HE V1 JSON data to HE V2 format.
274
275    Args:
276        data (JsonData): HE V1 JSON data.
277
278    Returns:
279        JsonData: Converted HE V2 JSON data.
280    """
281    converted_data = {
282        "Metadata": {
283            "Version": 2
284        },
285        "PatternList": []
286    }
287
288    current_pattern = None
289    last_event_end_time = -1  # Tracks the end time of the last event in the current pattern
290
291    for event in he_v1_data["Pattern"]:
292        event_start_time = event["RelativeTime"]
293        event_end_time = event_start_time + \
294            (event["Duration"] if event["Type"] == "continuous" else 48)
295
296        if current_pattern is None or not isinstance(current_pattern, dict) or len(current_pattern.get("Pattern", [])) >= 16 \
297            or (last_event_end_time != -1 and event_start_time - last_event_end_time > 1000):
298            if current_pattern and isinstance(current_pattern, dict):
299                converted_data["PatternList"].append(current_pattern)
300            current_pattern = {
301                "AbsoluteTime": event_start_time,
302                "Pattern": []
303            }
304
305        # Order of dictionary insertion matters
306        event_dict = {
307            "Type": event["Type"],
308            "RelativeTime": event_start_time - current_pattern["AbsoluteTime"]
309        }
310
311        if event["Type"] == "continuous":
312            event_dict["Duration"] = event["Duration"]
313
314        event_dict["Parameters"] = event["Parameters"]
315
316        current_pattern["Pattern"].append(event_dict)
317        last_event_end_time = event_end_time
318
319    if current_pattern:
320        converted_data["PatternList"].append(current_pattern)
321
322    return converted_data
323
324
325def convert_he_v2_to_v1(he_v2_data: JsonData) -> JsonData:
326    """
327    Convert HE v2 JSON data to HE v1 format.
328
329    Args:
330        he_v2_data (JsonData): HE v2 JSON data.
331
332    Returns:
333        JsonData: Converted HE v1 JSON data.
334    """
335    he_v1_data = {
336        "Metadata": {
337            "Version": 1
338        },
339        "Pattern": []
340    }
341
342    for pattern_list_entry in he_v2_data['PatternList']:
343        for event in pattern_list_entry['Pattern']:
344            he_v1_event = event.copy()
345            he_v1_event['RelativeTime'] = event['RelativeTime'] + \
346                pattern_list_entry['AbsoluteTime']
347            he_v1_data['Pattern'].append(he_v1_event)
348
349    return he_v1_data
350
351
352def clamp(value: float, min_value: float, max_value: float) -> float:
353    """
354    Clamp a value between a minimum and maximum value.
355
356    Args:
357        value (float): Value to clamp.
358        min_value (float): Minimum value.
359        max_value (float): Maximum value.
360
361    Returns:
362        float: Clamped value.
363    """
364    return max(min_value, min(value, max_value))
365
366
367def convert_he_v1_to_oh(he_v1_data: JsonData) -> JsonData:
368    """
369    Convert HE v1 JSON data to OH format.
370
371    Args:
372        he_v1_data (JsonData): HE v1 JSON data.
373
374    Returns:
375        JsonData: Converted OH JSON data.
376    """
377    output_data: JsonData = {
378        "MetaData": {
379            "Version": 1.0,
380            "ChannelNumber": 1
381        },
382        "Channels": [{
383            "Parameters": {
384                "Index": 0
385            },
386            "Pattern": []
387        }]
388    }
389
390    for pattern in he_v1_data.get('Pattern', []):
391        event = {
392            "Event": {
393                "Type": pattern['Type'],
394                "StartTime": pattern['RelativeTime'],
395                "Parameters": {
396                    "Intensity": clamp(pattern['Parameters']['Intensity'], 0, 100),
397                    "Frequency": clamp(pattern['Parameters'].get('Frequency', 50), -100, 100)
398                }
399            }
400        }
401        if pattern['Type'] == 'continuous':
402            event['Event']['Duration'] = pattern['Duration']
403            event['Event']['Parameters']['Curve'] = [
404                {
405                    "Time": point['Time'],
406                    "Intensity": clamp(point.get('Intensity', 100), 0, 100),
407                    "Frequency": clamp(point.get('Frequency', 0), -100, 100)
408                }
409                for point in pattern['Parameters'].get('Curve', [])
410            ]
411        output_data['Channels'][0]['Pattern'].append(event)
412
413    return output_data
414
415
416def convert_he_v2_to_oh(he_v2_data: JsonData) -> JsonData:
417    """
418    Convert HE v2 JSON data to OH format.
419
420    Args:
421        he_v2_data (JsonData): HE v2 JSON data.
422
423    Returns:
424        JsonData: Converted OH JSON data.
425    """
426    event_num_max = 128
427
428    output_data = {
429        "MetaData": {
430            "Version": 1.0,
431            "ChannelNumber": 1
432        },
433        "Channels": [
434            {
435                "Parameters": {
436                    "Index": 0
437                },
438                "Pattern": []
439            }
440        ]
441    }
442
443    all_events = []
444
445    for pattern_list_entry in he_v2_data.get('PatternList', []):
446        absolute_time = pattern_list_entry.get('AbsoluteTime', 0)
447        for pattern in pattern_list_entry.get('Pattern', []):
448            if pattern['Type'] == 'continuous':
449                event = {
450                    "Event": {
451                        "Type": pattern['Type'],
452                        "StartTime": absolute_time + pattern.get('RelativeTime', 0),
453                        "Duration": clamp(pattern.get('Duration', 1000), 1, 5000),
454                        "Parameters": {
455                            "Intensity": clamp(pattern['Parameters'].get('Intensity', 100), 0, 100),
456                            "Frequency": clamp(pattern['Parameters'].get('Frequency', 50), -100, 100),
457                            "Curve": [
458                                {
459                                    "Time": clamp(point.get('Time', 0), 0, 10000),
460                                    "Intensity": clamp(point.get('Intensity', 100), 0, 100),
461                                    "Frequency": clamp(point.get('Frequency', 0), -100, 100)
462                                }
463                                for point in pattern['Parameters'].get('Curve', [])
464                            ]
465                        }
466                    }
467                }
468            else:
469                event = {
470                    "Event": {
471                        "Type": pattern['Type'],
472                        "StartTime": absolute_time + pattern.get('RelativeTime', 0),
473                        "Parameters": {
474                            "Intensity": clamp(pattern['Parameters'].get('Intensity', 100), 0, 100),
475                            "Frequency": clamp(pattern['Parameters'].get('Frequency', 50), -100, 100)
476                        }
477                    }
478                }
479            all_events.append(event)
480
481    all_events.sort(key=lambda e: e['Event']['StartTime'])
482
483    for event in all_events:
484        index = event['Event'].get('Index', 0)
485        channel_found = False
486
487        for channel in output_data['Channels']:
488            if channel['Parameters']['Index'] == index:
489                if len(channel['Pattern']) < event_num_max:
490                    channel['Pattern'].append(event)
491                channel_found = True
492                break
493
494        if not channel_found:
495            if len(output_data['Channels']) < 3:
496                new_channel = {
497                    "Parameters": {
498                        "Index": index
499                    },
500                    "Pattern": [event]
501                }
502                output_data['Channels'].append(new_channel)
503
504    return output_data
505
506
507def process_file(input_file: Union[str, Path], output_dir: Union[str, Path], target_format: str, schema_dir: Union[str, Path], version_suffix: bool) -> None:
508    """
509    Process a single JSON file and convert it to the target format.
510
511    Args:
512        input_file (Union[str, Path]): Path to the input JSON file.
513        output_dir (Union[str, Path]): Path to the output directory.
514        target_format (str): Target format: 'oh', 'he_v1', or 'he_v2'.
515        schema_dir (Union[str, Path]): Directory containing JSON schema files.
516        version_suffix (bool): Include version suffix ('_v1' or '_v2') in output HE file names.
517    """
518    input_file = Path(input_file)
519    output_dir = Path(output_dir)
520    schema_dir = Path(schema_dir)
521
522    try:
523        input_data = read_json(input_file)
524
525        # Load schemas
526        schemas = {
527            FORMAT_OH: load_schema(os.path.join(schema_dir, 'oh_schema.json')),
528            FORMAT_HE_V1: load_schema(os.path.join(schema_dir, 'he_v1_schema.json')),
529            FORMAT_HE_V2: load_schema(os.path.join(schema_dir, 'he_v2_schema.json')),
530        }
531
532        # Determine the schema to validate against
533        input_format = None
534        for in_format, schema in schemas.items():
535            is_valid, _ = validate_json(input_data, schema)
536            if is_valid:
537                input_format = in_format
538                break
539
540        if not input_format:
541            logging.error("No valid schema found for file %s", input_file)
542            return
543
544        # Define a mapping of conversion functions and output extensions
545        conversion_map = {
546            (FORMAT_OH, FORMAT_HE_V1): (convert_oh_to_he_v1, '_v1.he', '.he'),
547            (FORMAT_OH, FORMAT_HE_V2): (convert_oh_to_he_v2, '_v2.he', '.he'),
548            (FORMAT_HE_V1, FORMAT_OH): (convert_he_v1_to_oh, '.json', '.json'),
549            (FORMAT_HE_V1, FORMAT_HE_V2): (convert_he_v1_to_v2, '_v2.he', '.he'),
550            (FORMAT_HE_V2, FORMAT_OH): (convert_he_v2_to_oh, '.json', '.json'),
551            (FORMAT_HE_V2, FORMAT_HE_V1): (convert_he_v2_to_v1, '_v1.he', '.he'),
552        }
553
554        # Perform conversion based on input format and target format
555        if (input_format, target_format) in conversion_map:
556            convert_func, versioned_ext, default_ext = conversion_map[(input_format, target_format)]
557            output_data = convert_func(input_data)
558            output_ext = versioned_ext if version_suffix else default_ext
559        else:
560            output_data = input_data if input_format in [FORMAT_OH, FORMAT_HE_V1, FORMAT_HE_V2] else None
561            output_ext = '.json' if input_format == FORMAT_OH else (f'_{input_format.lower()}.he' if version_suffix else '.he')
562            if output_data is None:
563                logging.error("Unsupported input format for file %s", input_file)
564                return
565
566        # Validate the output data with the target format schema
567        target_schema = schemas[target_format]
568        is_valid, error = validate_json(output_data, target_schema)
569        if not is_valid:
570            logging.error("Validation error for the converted data against %s schema: %s", target_format, error)
571            return
572
573        output_file = output_dir / (input_file.stem + output_ext)
574        write_json(output_data, output_file)
575    except (FileNotFoundError, json.JSONDecodeError, jsonschema.exceptions.ValidationError) as err:
576        logging.error("Error processing file %s: %s", input_file, err)
577
578
579def process_directory(input_dir: Union[str, Path], output_dir: Union[str, Path], target_format: str, schema_dir: Union[str, Path], version_suffix: bool) -> None:
580    """
581    Process all JSON files in a directory and convert them to the target format.
582
583    Args:
584        input_dir (Union[str, Path]): Path to the input directory.
585        output_dir (Union[str, Path]): Path to the output directory.
586        target_format (str): Target format: 'oh', 'he_v1', or 'he_v2'.
587        schema_dir (Union[str, Path]): Directory containing JSON schema files.
588        version_suffix (bool): Include version suffix ('_v1' or '_v2') in output HE file names.
589    """
590    input_dir = Path(input_dir)
591    output_dir = Path(output_dir)
592
593    for root, _, files in os.walk(input_dir):
594        for file in files:
595            if file.endswith('.json') or file.endswith('.he'):
596                input_file = os.path.join(root, file)
597                process_file(input_file, output_dir, target_format,
598                             schema_dir, version_suffix)
599
600
601def main() -> None:
602    """
603    Main entry point of the script. Parses arguments and processes the input accordingly.
604    """
605    parser = argparse.ArgumentParser(
606        description="Convert between OH Haptic JSON and HE Haptic JSON formats.")
607    parser.add_argument("input", type=Path,
608                        help="Path to the input JSON file or directory.")
609    parser.add_argument("-o", "--output", type=Path,
610                        help="Path to the output dir (default: input dir with '_out' suffix).")
611    parser.add_argument("-f", "--format", choices=[FORMAT_OH, FORMAT_HE_V1, FORMAT_HE_V2],
612                        required=True, help="Target format: 'oh', 'he_v1', or 'he_v2'.")
613    parser.add_argument("-s", "--schema_dir", type=Path, default=Path("schemas"),
614                        help="Directory containing JSON schema files (default: 'schemas').")
615    parser.add_argument("-v", "--version_suffix", action="store_true",
616                        help="Include version suffix ('_v1' or '_v2') in output HE file names.")
617
618    args = parser.parse_args()
619
620    if args.input.is_file():
621        input_file = args.input
622        output_dir = args.output or args.input.parent
623        process_file(input_file, output_dir, args.format,
624                     args.schema_dir, args.version_suffix)
625    elif args.input.is_dir():
626        input_dir = args.input
627        output_dir = args.output or input_dir.with_name(
628            input_dir.name + '_out')
629        process_directory(input_dir, output_dir, args.format,
630                          args.schema_dir, args.version_suffix)
631    else:
632        raise ValueError(f"Invalid input path: {args.input}")
633
634if __name__ == "__main__":
635    main()
636