#!/bin/python3 # Author: phga # Date: 2022-02-08 # Desc: Generate data that can later be used in excel2sql.py. Meant to be one file import json from random import randint from argparse import ArgumentParser from datetime import datetime as dt, timedelta parser = ArgumentParser() parser.add_argument("-c", "--count", dest="count", type=int, required=False, help="Number of rows to generate") parser.add_argument("-i", "--in", dest="in_file", required=False, help="Json file to read table spec from") parser.add_argument("-o", "--out", dest="out_file", required=False, help="File to print sql statements to. If not provided use stdout") parser.add_argument("-s", "--show-spec", dest="spec", action="store_true", required=False, help="Show information about the table spec file") try: args = parser.parse_args() except: exit(1) if not args.spec and (not args.count or not args.in_file): parser.print_help() exit(1) class ColumnFactory: def create(c_name: str, c_type: str, data_generation: str, data_source: list[str]): if c_type == "int": return IntColumn(c_name, c_type, data_generation, data_source) elif c_type == "str": return StrColumn(c_name, c_type, data_generation, data_source) elif c_type == "date": return DateColumn(c_name, c_type, data_generation, data_source) elif c_type == "time": return TimeColumn(c_name, c_type, data_generation, data_source) def get_types() -> list[str]: return ["int", "str", "date", "time"] def describe(c_type: str): if c_type == "int": spec_dg = ["random", "loop", "auto_increment"] spec_ds = '[start, end] -> e.g. [10, 20] (boundaries included)\n' spec_ds+= 'For now, one has to always provide start AND end, even with AI' elif c_type == "str": spec_dg = ["random", "loop"] spec_ds = '[str, ...] -> e.g. ["Hans", "Berlin", "Tina"]' elif c_type == "date": spec_dg = ["random", "loop"] spec_ds = '[startdate, enddate] -> e.g. ["2020-01-01", "2020-10-28"]' elif c_type == "time": spec_dg = ["random", "loop"] spec_ds = '[starttime, endtime, step] -> ' spec_ds+= 'e.g. ["10:10:10.001", "21:11:11.002", "20 m"]\n' spec_ds+= 'step: [1,Inf] {s,m,h} -> e.g. "10 m", "33 s" or "2 h"' spec = f'{" " + c_type.upper() + " ":=^45}\n' spec+= 'col_name: str -> e.g. "Meine_tolle_Spalte"\n' spec+= f'col_type: {c_type}\n' spec+= f'data_generation: {", ".join(spec_dg)}\n' spec+= f'data_source: {spec_ds}\n\n' return spec #### Print intput file specification -> Yes multiple files would be great if args.spec: si = 'Information about the table spec file:\n' si+= 'Tables can be described via a json file. ' si+= 'A minimal example with 2 columns is show here:\n' si+= ''' [ { "col_name": "Checkout_Time", "col_type": "time", "data_generation": "loop", "data_source": [ "10:10:10.001", "21:11:11.002", "20 s" ] }, { "col_name": "Order_Amount", "col_type": "int", "data_generation": "random", "data_source": [ 10, 20 ] } ] ''' si += "Description for available column types:\n\n" for ct in ColumnFactory.get_types(): si += ColumnFactory.describe(ct) print(si) exit(0) #### Classes class Column: def __init__(self, c_name: str, c_type: str, data_generation: str, data_source: list[str]) -> None: self.c_name = c_name self.c_type = c_type self.data_generation = data_generation self.data_source = data_source def __repr__(self): return f'{self.c_name}[{self.c_type}, {self.data_generation}]: {self.data_source}' def generate(self) -> list[str]: raise NotImplementedError() class IntColumn(Column): def __init__(self, c_name: str, c_type: str, data_generation: str, data_source: list[str]): super().__init__(c_name, c_type, data_generation, data_source) # TODO: autogenerate end if not provided for some types if not type(data_source) is list or len(data_source) < 2: raise TypeError self.last_int = self.data_source[0] def generate(self) -> int: res = 0 if self.data_generation == "random": res = randint(self.data_source[0], self.data_source[1]) elif self.data_generation == "loop": res = self.last_int # [start, end] inclusive self.last_int = (self.last_int + 1) % (self.data_source[1] + 1) if self.last_int == 0: self.last_int = self.data_source[0] elif self.data_generation == "auto_increment": res = self.last_int self.last_int += 1 else: raise ValueError() return res class StrColumn(Column): def __init__(self, c_name: str, c_type: str, data_generation: str, data_source: list[str]): super().__init__(c_name, c_type, data_generation, data_source) if not type(data_source) is list or len(data_source) < 1: raise TypeError # MAYB: random loop -> First index is random self.last_index = 0 def generate(self) -> str: res = "NULL" l = len(self.data_source) if self.data_generation == "random": res = self.data_source[randint(0, l - 1)] elif self.data_generation == "loop": res = self.data_source[self.last_index % l] self.last_index += 1 else: raise ValueError() return res class DateColumn(Column): def __init__(self, c_name: str, c_type: str, data_generation: str, data_source: list[str]): super().__init__(c_name, c_type, data_generation, data_source) if not type(data_source) is list or len(data_source) < 2: raise TypeError # TODO: mayb add the ability to process multiple date ranges self.last_index = 0 self.date_format = "%Y-%m-%d" self.start_date = dt.strptime(self.data_source[0], self.date_format) self.end_date = dt.strptime(self.data_source[1], self.date_format) self.date_diff = (self.end_date - self.start_date).days def generate(self) -> str: res = "Not a date" if self.data_generation == "random": delta = randint(0, self.date_diff) res = self.start_date + timedelta(delta) elif self.data_generation == "loop": delta = self.last_index res = self.start_date + timedelta(delta) self.last_index = (self.last_index + 1) % (self.date_diff + 1) else: raise ValueError() return res.strftime(self.date_format) class TimeColumn(Column): def __init__(self, c_name: str, c_type: str, data_generation: str, data_source: list[str]): super().__init__(c_name, c_type, data_generation, data_source) if not type(data_source) is list or len(data_source) < 2: raise TypeError # TODO: mayb add the ability to process multiple date ranges self.last_index = 0 # SAP HANA: 11:00:00.001 self.time_format = "%H:%M:%S.%f" self.start_time = dt.strptime(self.data_source[0], self.time_format) self.end_time = dt.strptime(self.data_source[1], self.time_format) # 5 m, 10 s self.time_step = int(self.data_source[2].split(" ")[0]) self.time_mult = self.data_source[2].split(" ")[1] if self.time_mult == "m": self.time_mult = 60 elif self.time_mult == "h": self.time_mult = 60 * 60 else: self.time_mult = 1 self.time_diff = (self.end_time - self.start_time).seconds // self.time_mult def generate(self) -> str: res = "Not a time" if self.data_generation == "random": delta = randint(0, self.time_diff) res = self.start_time + timedelta(seconds=delta) elif self.data_generation == "loop": delta = self.last_index res = self.start_time + timedelta(seconds=delta * self.time_mult) # TODO: mayb reset the timer to the start time instead of beginning # a new cycle, but this is also kinda cool -> option? self.last_index = (self.last_index + self.time_step) % (self.time_diff + 1) else: raise ValueError() return res.strftime(self.time_format) pass #### Logic # Read specification from file with open(args.in_file, "r") as in_file: try: table_spec = json.load(in_file) except: print("Could not read json table spec.") print("Are you sure you provided the correct file?") exit(69) cols: list[Column] = [] for ts in table_spec: cols.append(ColumnFactory.create(ts["col_name"], ts["col_type"], ts["data_generation"], ts["data_source"])) rows = [] rows.append([]) # CSV headers for c in cols: rows[0].append(c.c_name) for i in range(1, args.count + 1): for j in range(len(cols)): if j == 0: rows.append([]) # Convert to str -> csv output and usage of join rows[i].append(str(cols[j].generate())) if args.out_file: with open(args.out_file, "w") as of: for r in rows: of.write(f'{",".join(r)}\n') else: for r in rows: print(",".join(r)) #### Tests # ic = IntColumn("test", "int", "loop", [10, 15]) # sc = StrColumn("test", "str", "loop", ["Hans", "Wurst", "mag", "Züge"]) # dc = DateColumn("test", "date", "random", ["2020-01-01", "2020-01-10"]) # tc = TimeColumn("test", "time", "loop", ["10:10:10.001", "11:11:11.002", "5 m"]) # print(ic) # print(sc) # print(dc) # print(tc)