from string import ascii_letters import copy import os from the_semantic_db_code import * from the_semantic_db_functions import * # Some hash tables mapping ops to the python equivalent. # Left hand side is BKO language, right is python. # functions built into ket/superposition classes. built_in_table = { "display" : "display", "transpose" : "transpose", # "select-elt" : "select_elt", "pick-elt" : "pick_elt", # "find-index" : "find_index", # "find-value" : "find_value", "normalize" : "normalize", "rescale" : "rescale", # "rescale" : "rescale", # "sigmoid" : "apply_sigmoid", # "function" : "apply_fn", # "similar" : "similar", # "collapse-fn" : "apply_fn_collapse", "collapse" : "collapse", "count" : "number_count", "count-sum" : "number_count_sum", "drop" : "drop", # "drop-below" : "drop_below", # "drop-above" : "drop_above", # "select-range" : "select_range", # "delete-elt" : "delete_elt", "reverse" : "reverse", "shuffle" : "shuffle", "coeff-sort" : "coeff_sort", "ket-sort" : "ket_sort", "max-elt" : "find_max_elt", "min-elt" : "find_min_elt", "max" : "find_max", "min" : "find_min", # special: "type" : "type", # implemented for debugging purposes. } # table of sigmoids: sigmoid_table = { "clean" : "clean", # "threshold-filter" : "threshold_filter", # we can't handle paramters with our ops yet. "binary-filter" : "binary_filter", "not-binary-filter" : "not_binary_filter", "pos" : "pos", "NOT" : "NOT", "xor-filter" : "xor_filter", # "mult" : "mult", } # some ket -> ket functions: fn_table = { "value" : "apply_value", "extract-category" : "extract_category", "extract-value" : "extract_value", "to-number" : "category_number_to_number", } # some other functions. Some are ket -> ket, some are ket -> superposition. fn_table2 = { "read" : "read_text", "spell" : "spell_word", "factor" : "factor_numbers", "near-number" : "near_numbers", "strange-int" : "strange_int", "is-prime" : "is_prime", "strange-int-prime" : "strange_int_prime", "strange-int-depth" : "strange_int_depth", "strange-int-delta" : "strange_int_delta", "strange-int-list" : "strange_int_list", } # table of compound operators. # They need to be handled separately from those in the tables above, because they have parameters. compound_table = { "select-elt" : ".select_elt({0})", # "find-index" # can't support these two until we have more advanced parsing. # "find-value # eg: find-index[|person: Fred>] |x> currently would split on the space in the ket. "normalize" : ".normalize({0})", "rescale" : ".rescale({0})", "similar" : ".similar(context,\"{0}\")", "find-topic" : ".find_topic(context,\"{0}\")", # "collapse-function" : ".apply_fn_collapse({0})", # broken for now. eg, how handle collapse-fn[spell] |x> ?? "drop-below" : ".drop_below({0})", # Not needed anyway. Just use: collapse spell |x> "drop-above" : ".drop_above({0})", "select-range" : ".select_range({0})", # may comment this one out, but fine for now to have two versions. "select" : ".select_range({0})", "delete-elt" : ".delete_elt({0})", "threshold-filter" : ".apply_sigmoid(threshold_filter,{0})", # "mult" : ".apply_sigmoid(mult,{0})", # this is now moved to ket/sp since it is common enough. "mult" : ".multiply({0})", "in-range" : ".apply_sigmoid(in_range,{0})", } def sanitize_op(op): if not op[0].isalpha(): return None if all(c in ascii_letters + '0123456789-' for c in op): return op else: return None def valid_op(op): if not op[0].isalpha(): return False return all(c in ascii_letters + '0123456789-' for c in op) def process_single_op(op): print("op:",op) splitat = "[,]" pieces = ''.join(s if s not in splitat else ' ' for s in op).split() if len(pieces) > 1: print("compound op found") op = pieces[0] parameters = ",".join(pieces[1:]) if op not in compound_table: print(op,"not in compound_table") python_code = "" else: python_code = compound_table[op].format(parameters) # probably risk of injection attack here elif op in built_in_table: # tables don't have injection bugs, since they must be in tables, hence already vetted. print("op in built in table") # unless I guess a hash-table collision between safe and unsafe? python_code = ".{0}()".format(built_in_table[op]) elif op in sigmoid_table: print("op in sigmoid table") python_code = ".apply_sigmoid({0})".format(sigmoid_table[op]) elif op in fn_table: print("op in fn table") python_code = ".apply_fn({0})".format(fn_table[op]) elif op in fn_table2: print("op in fn table 2") python_code = ".apply_fn({0})".format(fn_table2[op]) else: if op == "\"\"": python_code = ".apply_op(context,\"\")" elif op == "ops": # short-cut so we don't have to type supported-ops all the damn time! python_code = ".apply_op(context,\"supported-ops\")" elif not valid_op(op): # return "" # add code so that op2 7 op1 |x> is the same as op2 mult[7] op1 |x> try: value = float(op) python_code = ".multiply({0})".format(op) except: if op == '-': # treat - |x> as mult[-1] |x> python_code = ".multiply(-1)" else: return "" else: print("op is literal") # NB: we have to be very careful here, or it will cause SQL-injection type bugs!! python_code = ".apply_op(context,\"{0}\")".format(op) # fix is not hard. Process the passed in op to a valid op form. print("py:",python_code) # lower+upper+dash+number and thats roughly it. return python_code # here is an example of an injection bug (with an earlier version of the code): # op: era") shoot_me() print(" # op: era") shoot_me() print(" # op is literal # py: x.apply_op(context,"era") shoot_me() print("") # Note for the future: is there a cleaner way to do this without the somewhat risky eval? # x must be a ket or a superposition. #def process(context,x,ops): # I think the order should be changed to (context,ops,x) def process(context,ops,x): line = ops.split()[::-1] # put more advanced processing, and splitting a line into ops later. code = "x" for op in line: multi = op.split("^") if len(multi) == 2: print("multi-op",multi[1],"found") tmp = process_single_op(multi[0]) for k in range(int(multi[1])): code += tmp else: code += process_single_op(op) if code == "x": return None print("python:",code) return eval(code) # 17/2/2014: def extract_leading_ket(s): try: head, rest = s.split("|",1) head = head.strip() if len(head) == 0: value = 1 else: value = float(head) label, rest = rest.split(">",1) return ket(label,value), rest except: return s def extract_leading_bra(s): if s[0] != "<": return s try: label, rest = s[1:].split("|",1) return bra(label), rest except: return s def old_old_extract_literal_superposition(s): rest = s result = superposition() try: x, rest = extract_leading_ket(rest) result.data.append(x) except: return result, rest while True: try: null, rest = rest.split("+",1) x, rest = extract_leading_ket(rest) result.data.append(x) except: return result, rest def old_extract_literal_superposition(s): rest = s result = superposition() while True: try: x, rest = extract_leading_ket(rest) result.data.append(x) saved = rest null, rest = rest.split("+",1) print("els saved:",saved) print("els null:",null) print("null len:",len(null.strip())) print("els result:",result) if len(null.strip()) != 0: print("els null not zero") return result, saved except: print("els final result:",result) return result, rest def extract_literal_superposition(s,self_object=None): rest = s saved = rest result = superposition() while True: try: x, rest = extract_leading_ket(rest) result.data.append(x) except: return result, saved try: saved = rest null, rest = rest.split("+",1) # print("els saved:",saved) # print("els null:",null) # print("null len:",len(null.strip())) # print("els result:",result) if len(null.strip()) != 0: print("els null not zero") return result, saved except: print("els final result:",result) return result, rest def parse_rule_line(C,s): if s.strip().startswith("--"): return False try: op, rest = s.split("|",1) op = op.strip() label, rest = rest.split(">",1) except: return False if op.startswith("--") or op == "supported-ops": return False add_learn = False try: null, rest = rest.split("+=>",1) add_learn = True except: try: null, rest = rest.split("=>",1) except: return False try: # maybe tweak to handle: O|tmp> => op2 op1 |_self> rule, null = extract_compound_superposition(C,rest,label) except: return False print("op:",op) print("label:",label) print("rest:",rest.rstrip()) print("rule:",rule,"\n") if op == "" and label == "context": if len(rule.data) > 0: name = rule.data[0].label if name.startswith("context: "): name = name[9:] C.set(name) return True if not add_learn: C.learn(op,label,rule) else: C.add_learn(op,label,rule) return True # Now, make use of parse_rule_line() # load sw file: def load_sw(c,file): try: with open(file,'r') as f: for line in f: parse_rule_line(c,line) except: print("failed to load:",file) # and its brother: # save current context: def save_sw(c,name): try: file = open(name,'w') file.write(c.dump_universe(True)) file.close() except: print("failed to save:",name) # save multiverse: def save_sw_multi(c,name): try: file = open(name,'w') file.write(c.dump_multiverse(True)) file.close() except: print("failed to save:",name) # copied from here: # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size def human_readable_size(num): for x in ['B ','KB','MB','GB','TB']: if num < 1024.0: return "%3.0f %s" % (num,x) num /= 1024.0 # find the stats of a .sw file: # needs some tweaks yet ... def extract_sw_stats(file): try: stats = [] context = "" count = 0 with open(file,'r') as f: for line in f: if line.startswith("supported-ops |"): count += 1 elif line.startswith("|context> => |"): tmp_context = line[14:].split(">")[0] if tmp_context.startswith("context: "): tmp_context = tmp_context[9:] if len(tmp_context) > 0: if len(context) > 0 or count > 0: stats.append(context + " (" + str(count) + ")") context = tmp_context count = 0 if len(context) > 0 or count > 0: stats.append(context + " (" + str(count) + ")") # now find file size: size = human_readable_size(os.path.getsize(file)) return size + " " + ", ".join(stats) except: print("failed to load:",file) # we need code to handle the input in the semantic-agent console: # we have three cases to handle: # op3 op2 op1 # use x as the implicit ket # currently handled by process(c,line,x) # op-b op-a |x> # specifiy |x> # currently not handled # op |x> => 3|a> + |b> # learn rule # currenlty handled by parse_rule_line(C,line). def old_process_input_line(C,line,x): if not parse_rule_line(C,line): try: op, rest = line.split("|",1) except: return process(C,line,x) try: op = op.strip() label, rest = rest.split(">",1) return process(C,op,ket(label)) except: return return def process_input_line(C,line,x): if not parse_rule_line(C,line): try: result, null = extract_compound_superposition(C,line) return result except: return process(C,line,x) def process_op_ket(C,line,left_label=None): try: op, rest = line.split("|",1) op = op.strip() label, rest = rest.split(">",1) if label == "_self" and left_label is not None: label = left_label return process(C,op,ket(label)), rest except: return None # eg: intersection(op|X>,op|Y>) def old_process_function(C,line): try: fn, rest = line.split("(",1) fn = fn.strip() print("fn:",fn) sp1, rest = process_op_ket(C,rest) # sp is short for superposition print("sp1:",sp1) null, rest = rest.split(",",1) sp2, rest = process_op_ket(C,rest) print("sp2:",sp2) null, rest = rest.split(")",1) print("rest:",rest) except: return None # return intersection(sp1,sp2) code = "{0}(sp1,sp2)".format(fn) # this is seriously dangerous for injection attacks, ATM. print("python:",code) return eval(code) # dummy len 1 fn: def sp_len_1(x): return ket("sp") + x # white listed functions that take 1 parameter: whitelist_table_1 = { # "dump" : "C.dump_sp_rules", # buggy since, it returns a string! Not a ket/sp. "sp" : "sp_len_1", # "to-number" : "category_number_to_number", # I think this is better in the ket->ket section. } # whitelisted functions, that take 2 parameters: whitelist_table_2 = { "intersection" : "intersection", "intn" : "intersection", "union" : "union", "mult" : "multiply", "simm" : "simm", "silent-simm" : "silent_simm", "weighted-simm" : "weighted_simm", "nfc" : "normed_frequency_class", # pretty unlikely this will be used at command line since needs freq lists. "apply" : "apply", "range" : "show_range", "ket-simm" : "ket_simm", "to-base" : "decimal_to_base", } # whitelisted functions that take 3 parameters: whitelist_table_3 = { "intersection" : "tri_intersection", "intn" : "tri_intersection", "union" : "tri_union", "arithmetic" : "arithmetic", "range" : "show_range", "algebra" : "algebra", } # whitelisted functions that take 4 parameters: whitelist_table_4 = { "algebra" : "algebra", } def old_process_brackets(C,line,left_label=None): print("inside process_brackets:",line) try: fn, rest = line.split("(",1) fn = fn.strip() pieces = [] while True: try: sp, rest = extract_compound_superposition(C,rest,left_label) # sp is short for superposition pieces.append(sp) print("inside while, rest:",rest) null, rest = rest.split(",",1) except: break if len(pieces) == 0: return None null, rest = rest.split(")",1) except: return None print("fn: ",fn) print("len: ",len(pieces)) for sp in pieces: print("sp: ",sp) print("rest:",rest) # what if len(fn) == 0? if len(pieces) == 1: if fn in whitelist_table_1: print("op in whitelist 1") code = whitelist_table_1[fn] + "(pieces[0])" print("py:",code) result = eval(code) return result, rest elif len(fn) == 0: return pieces[0], rest else: return process(C,fn,pieces[0]), rest if len(pieces) == 2: tmp = fn.split() main_fn = tmp[-1] rest_fn = " ".join(tmp[:-1]) if main_fn in whitelist_table_2: print("op in whitelist 2") code = whitelist_table_2[main_fn] + "(pieces[0],pieces[1])" print("py:",code) result = eval(code) if type(result) != ket and type(result) != superposition: print("result not ket/sp") if type(result) == float or type(result) == int: try: x, rest = extract_leading_ket(rest) result = ket(x.label,result) except: return result, rest else: return None if len(rest_fn) == 0: return result, rest else: print("rest_fn:",rest_fn) print("result:",result) return process(C,rest_fn,result), rest # this raises an exception if result is not ket/sp. if len(pieces) == 3: tmp = fn.split() main_fn = tmp[-1] rest_fn = " ".join(tmp[:-1]) if main_fn in whitelist_table_3: print("op in whitelist 3") code = whitelist_table_3[main_fn] + "(pieces[0],pieces[1],pieces[2])" print("py:",code) result = eval(code) print("result:",result) return None def process_brackets(C,line,left_label=None): print("inside process_brackets:",line) try: fn, rest = line.split("(",1) fn = fn.strip() pieces = [] while True: try: sp, rest = extract_compound_superposition(C,rest,left_label) # sp is short for superposition pieces.append(sp) print("inside while, rest:",rest) null, rest = rest.split(",",1) except: break if len(pieces) == 0: return None null, rest = rest.split(")",1) except: return None print("fn: ",fn) print("len: ",len(pieces)) for sp in pieces: print("sp: ",sp) print("rest:",rest) # what if len(fn) == 0? if len(pieces) == 1: if fn in whitelist_table_1: print("op in whitelist 1") code = whitelist_table_1[fn] + "(pieces[0])" print("py:",code) result = eval(code) return result, rest elif len(fn) == 0: return pieces[0], rest else: return process(C,fn,pieces[0]), rest tmp = fn.split() main_fn = tmp[-1] rest_fn = " ".join(tmp[:-1]) match = False if len(pieces) == 2: if main_fn in whitelist_table_2: match = True print("op in whitelist 2") code = whitelist_table_2[main_fn] + "(pieces[0],pieces[1])" print("py:",code) result = eval(code) if len(pieces) == 3: if main_fn in whitelist_table_3: match = True print("op in whitelist 3") code = whitelist_table_3[main_fn] + "(pieces[0],pieces[1],pieces[2])" print("py:",code) result = eval(code) if not match: result = ket("",0) # not sure I want to keep this code block. # maybe simm should return v|simm> instead of just a float. Now implemented this as ket_simm(). # well, a variant of simm that does that. Recall some code needs it as a float. if type(result) != ket and type(result) != superposition: print("result not ket/sp") if type(result) == float or type(result) == int: try: x, rest = extract_leading_ket(rest) # may want something more elaborate here too. result = ket(x.label, x.value * result) except: return result, rest else: return None if len(rest_fn) == 0: return result, rest else: print("rest_fn:",rest_fn) print("result:",result) return process(C,rest_fn,result), rest # this raises an exception if result is not ket/sp. return None # this function is going to end up doing a lot of heavy lifting! def old_extract_compound_superposition(C,s): rule, rest = extract_literal_superposition(s) # first try for a literal superposition if len(rule.data) == 0: # try for: op2 op1 |x> try: rule, rest = process_op_ket(C,s) except: return None return rule, rest def previous_extract_compound_superposition(C,s): rule, rest = extract_literal_superposition(s) if len(rule.data) == 0: try: rule, rest = process_brackets(C,s) except: try: rule, rest = process_op_ket(C,s) except: return None return rule, rest def extract_compound_superposition(C,s,self_object=None): rest = s result = superposition() while True: try: rule, rest = extract_literal_superposition(rest,self_object) if len(rule.data) == 0: try: rule, rest = process_brackets(C,rest,self_object) except: try: rule, rest = process_op_ket(C,rest,self_object) except: return None result += rule saved = rest null, rest = rest.split("+",1) if len(null.strip()) != 0: print("ecs saved:",saved) return result, saved except: return result, rest