import itertools from typing import Dict, List class DataFrame: """Table-like class for storing a 2D cells table with named columns.""" def __init__(self, data: Dict[str, List[object]] = {}): """ Create a new DataFrame from a dictionary (keys = headers, values = columns). """ self._headers = [i for i in data.keys()] self._rows = [] row_num = 0 def get_data_row(idx): r = {} for header, header_data in data.items(): if not len(header_data) > idx: continue r[header] = header_data[idx] return r while True: row_dict = get_data_row(row_num) if len(row_dict) == 0: break self._append_row(row_dict.keys(), row_dict.values()) row_num = row_num + 1 def concat_rows(self, other: 'DataFrame') -> None: """ In-place concatenate rows of other into the rows of the current DataFrame. None is added in pre-existing cells if new headers are introduced. """ other_datas = other._data_only() other_headers = other.headers for d in other_datas: self._append_row(other_headers, d) def _append_row(self, headers: List[str], data: List[object]): new_row = {k:v for k,v in zip(headers, data)} self._rows.append(new_row) for header in headers: if not header in self._headers: self._headers.append(header) def __repr__(self): # return repr(self._rows) repr = "" header_list = self._headers_only() row_format = u"" for header in header_list: row_format = row_format + u"{:>%d}" %(len(header) + 1) repr = row_format.format(*header_list) + "\n" for v in self._data_only(): repr = repr + row_format.format(*v) + "\n" return repr def __eq__(self, other): if isinstance(other, self.__class__): return self.headers == other.headers and self.data_table == other.data_table else: print("wrong instance", other.__class__) return False @property def headers(self) -> List[str]: return [i for i in self._headers_only()] @property def data_table(self) -> List[List[object]]: return list(self._data_only()) @property def data_table_transposed(self) -> List[List[object]]: return list(self._transposed_data()) @property def data_row_len(self) -> int: return len(self._rows) def data_row_at(self, idx) -> List[object]: """ Return a single data row at the specified index (0th based). Accepts negative indices, e.g. -1 is last row. """ row_dict = self._rows[idx] l = [] for h in self._headers_only(): l.append(row_dict.get(h)) # Adds None in blank spots. return l def copy(self) -> 'DataFrame': """ Shallow copy of this DataFrame. """ return self.repeat(count=0) def repeat(self, count: int) -> 'DataFrame': """ Returns a new DataFrame where each row of this dataframe is repeated count times. A repeat of a row is adjacent to other repeats of that same row. """ df = DataFrame() df._headers = self._headers.copy() rows = [] for row in self._rows: for i in range(count): rows.append(row.copy()) df._rows = rows return df def merge_data_columns(self, other: 'DataFrame'): """ Merge self and another DataFrame by adding the data from other column-wise. For any headers that are the same, data from 'other' is preferred. """ for h in other._headers: if not h in self._headers: self._headers.append(h) append_rows = [] for self_dict, other_dict in itertools.zip_longest(self._rows, other._rows): if not self_dict: d = {} append_rows.append(d) else: d = self_dict d_other = other_dict if d_other: for k,v in d_other.items(): d[k] = v for r in append_rows: self._rows.append(r) def data_row_reduce(self, fnc) -> 'DataFrame': """ Reduces the data row-wise by applying the fnc to each row (column-wise). Empty cells are skipped. fnc(Iterable[object]) -> object fnc is applied over every non-empty cell in that column (descending row-wise). Example: DataFrame({'a':[1,2,3]}).data_row_reduce(sum) == DataFrame({'a':[6]}) Returns a new single-row DataFrame. """ df = DataFrame() df._headers = self._headers.copy() def yield_by_column(header_key): for row_dict in self._rows: val = row_dict.get(header_key) if val: yield val new_row_dict = {} for h in df._headers: cell_value = fnc(yield_by_column(h)) new_row_dict[h] = cell_value df._rows = [new_row_dict] return df def _headers_only(self): return self._headers def _data_only(self): row_len = len(self._rows) for i in range(row_len): yield self.data_row_at(i) def _transposed_data(self): return zip(*self._data_only())