# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import collections import json import os class ConfigJsonIteratorError(Exception): """"Exception for config json iterator""" pass class ConfigJsonIterator(object): """Class to consolidate multiple config json files. This class reads and combines input JSON instances into one based on the following rules: 1. "_deps" value in the root config file contains a list of common config file paths. Each path represents a RELATIVE path to the root config file. For example (common.config is in the same directory as root.config): root.config: { "a": "123", "_deps": ["../common.config"]} common.config: { "b": "xxx" } End output: { "a": "123", "b": "xxx" } 2. common config files defined in "_deps" MUST NOT contain identical keys (otherwise an exception will be thrown), for example (invalid - common1 and common2.config are in the same directory as root.config): root.config: { "a": "123", "_deps": ["../common1.config", "../common2.config"]} common1.config: { "b": "xxx" } common2.config: { "b": "yyy" } 3. values in the root config will override the ones in the common config files. This logic applies to any dependency config file (imagine that the common config also has "_deps"), thus is recursive. For example (common.config is in the same directory as root.config): root.config: { "a": "123", "_deps": ["../common.config"]} common.config: { "a": "456", "b": "xxx" } End output: { "a": "123", "b": "xxx" } """ DEPS = '_deps' def __init__(self, config_path=None): """Constructor. @param config_path: String of root config file path. """ if config_path: self.set_config_dir(config_path) def set_config_dir(self, config_path): """Sets config dictionary. @param config_path: String of config file path. @raises ConfigJsonIteratorError if config does not exist. """ if not os.path.isfile(config_path): raise ConfigJsonIteratorError('config file does not exist %s' % config_path) self._config_dir = os.path.abspath(os.path.dirname(config_path)) def _load_config(self, config_path): """Iterate the base config file. @param config_path: String of config file path. @return Dictionary of the config file. @raises ConfigJsonIteratorError: if config file is not found or invalid. """ if not os.path.isfile(config_path): raise ConfigJsonIteratorError('config file does not exist %s' % config_path) with open(config_path, 'r') as config_file: try: return json.load(config_file) except ValueError: raise ConfigJsonIteratorError( 'invalid JSON file %s' % config_file) def aggregated_config(self, config_path): """Returns dictionary of aggregated config files. The dependency list contains the RELATIVE path to the root config. @param config_path: String of config file path. @return Dictionary containing the aggregated config files. @raises ConfigJsonIteratorError: if dependency config list does not exist. """ ret_dict = self._load_config(config_path) if ConfigJsonIterator.DEPS not in ret_dict: return ret_dict else: deps_list = ret_dict[ConfigJsonIterator.DEPS] if not isinstance(deps_list, list): raise ConfigJsonIteratorError('dependency must be a list %s' % deps_list) del ret_dict[ConfigJsonIterator.DEPS] common_dict = {} for dep in deps_list: common_config_path = os.path.join(self._config_dir, dep) dep_dict = self.aggregated_config(common_config_path) common_dict = self._merge_dict(common_dict, dep_dict, allow_override=False) return self._merge_dict(common_dict, ret_dict, allow_override=True) def _merge_dict(self, dict_one, dict_two, allow_override=True): """Returns a merged dictionary. @param dict_one: Dictionary to merge (first). @param dict_two: Dictionary to merge (second). @param allow_override: Boolean to allow override or not. @return Dictionary containing merged result. @raises ConfigJsonIteratorError: if no dictionary given. """ if not isinstance(dict_one, dict) or not isinstance(dict_two, dict): raise ConfigJsonIteratorError('Input is not a dictionary') if allow_override: return dict(dict_one.items() + dict_two.items()) else: merge = collections.Counter( dict_one.keys() + dict_two.keys()).most_common()[0] if merge[1] > 1: raise ConfigJsonIteratorError( 'Duplicate key %s found', merge[0]) return dict_one