Commit 753baa66 authored by Roman Sarrazin-Gendron's avatar Roman Sarrazin-Gendron
Browse files

added development folder

parent 3256c57c
This diff is collapsed.
#external libraries
import json #to hash data of edges (dictionnaries)
class RIN:
graph=None
order=0
size=0
primary_key=""
canonical=False
ID=-1
d_edges_by_category={}
secondary_key=""
nb_occurrences=0
d_occurrences={}
representing_occurrence=None
def __init__(self,graph,d_edges_by_category,g_name,h_name,d_generic_nodes_to_nodes_in_g,d_generic_nodes_to_nodes_in_h):
#note : g is assumed to be the representing_occurrence
self.graph=graph
self.order=self.graph.order()
self.size=self.graph.size()
self.primary_key='o'+str(self.order).zfill(6)+'s'+str(self.size).zfill(6) #zfill so primary keys are ordered
self.canonical=False
self.ID=-1
self.d_edges_by_category=d_edges_by_category
self.secondary_key=json.dumps(d_edges_by_category, sort_keys=True)
self.d_occurrences={}
g_occurrence_key=json.dumps(sorted(d_generic_nodes_to_nodes_in_g.values()))
self.representing_occurrence=(g_name,g_occurrence_key)
self.d_occurrences[g_name]={g_occurrence_key:d_generic_nodes_to_nodes_in_g}
h_occurrence_key=json.dumps(sorted(d_generic_nodes_to_nodes_in_h.values()))
self.d_occurrences[h_name]={h_occurrence_key:d_generic_nodes_to_nodes_in_h}
self.nb_occurrences=2
def make_canonical(self,ID):
self.canonical=True
self.ID=ID
def get_keys(self):
return (self.primary_key,self.secondary_key)
def create_occurrence_graphs(self,data):
#as this operation is to be performed once and since copy is only used in it
#we prefer to exceptionnaly place the import inside the function
import copy
l_occurrence_graphs=[]
for key,map in self.d_occurrences.items():
PDB_ID,chain,suffix=key
g=copy.deepcopy(data[(PDB_ID,chain)])
nodes_to_keep = list(list(map.values())[0].values())
nodes_to_remove = list(set(g.nodes())-set(nodes_to_keep))
g.remove_nodes_from(nodes_to_remove)
l_occurrence_graphs.append(((PDB_ID,chain),suffix,g))
return l_occurrence_graphs
# external imports
import networkx as nx
import queue
# internat imports
import shared_functions
import RIN
import matching
#import isomorphism
# import draw_graph
def edge_composition_compatibility(RIN_1,RIN_2):
for category,count in RIN_1.d_edges_by_category.items():
if RIN_2.d_edges_by_category.get(category,0) < count:
return False
return True
def are_isomorfic(RIN_1,RIN_2):
if RIN_1.order == RIN_2.order and RIN_1.size == RIN_2.size:
if RIN_1.secondary_key == RIN_2.secondary_key:
return isomorphism.isomorphism(RIN_1.graph,RIN_2.graph)
#if either keys are different, there is no way the two are isomorphic
return(False,None)
def compare_name (name_1,name_2):
#We want to prevent graphs with a mode to reach the front
#As a consequence we are twiking the ordering in their defavor
PDB_ID_1,chain_1,mode_1=name_1
PDB_ID_2,chain_2,mode_2=name_2
if (not mode_1) == (not mode_2):
#Case 1 : both have a mode or both don't
#just go with the 'lower'
return (name_1 < name_2)
else:
#Case 1 : only one doesn't have a mode
#We pick this one (equivalent to (not mode_1) )
return (not mode_1)
def merge_2_RINs(RIN_1,RIN_2,m):
if RIN_1.representing_occurrence[0]==RIN_2.representing_occurrence[0]:
if RIN_1.representing_occurrence[1] < RIN_2.representing_occurrence[1]:
RIN_to_return=RIN_1
RIN_to_merge=RIN_2
d_nodes=m.d_h
else:
RIN_to_return=RIN_2
RIN_to_merge=RIN_1
d_nodes=m.d_g
else:
if compare_name(RIN_1.representing_occurrence[0],RIN_2.representing_occurrence[0]):
RIN_to_return=RIN_1
RIN_to_merge=RIN_2
d_nodes=m.d_h
else:
RIN_to_return=RIN_2
RIN_to_merge=RIN_1
d_nodes=m.d_g
for graph,d_occurrences in RIN_to_merge.d_occurrences.items():
for key,occurrence in d_occurrences.items():
if RIN_to_return.d_occurrences.get(graph,None) != None:
if RIN_to_return.d_occurrences[graph].get(key,None) != None:
continue
else:
RIN_to_return.d_occurrences[graph]={}
#Now <graph> is in d_occurrence but not <graph><key>
updated_occurrence={}
for node_in_RIN_to_merge,node_in_graph in occurrence.items():
updated_occurrence[d_nodes[node_in_RIN_to_merge]]=node_in_graph
RIN_to_return.d_occurrences[graph][key]=updated_occurrence
RIN_to_return.nb_occurrences+=1
return RIN_to_return
def merge_protoRINs(d_protoRINs):
d_RINs={}
for primary_key,d_prime in d_protoRINs.items():
d_RINs[primary_key]={}
for secondary_key,l_second in d_prime.items():
d_RINs[primary_key][secondary_key]=[]
fifo=queue.Queue()
for pRIN in l_second:
fifo.put(pRIN)
while not fifo.empty():
pRIN_1=fifo.get()
next_fifo=queue.Queue()
while not fifo.empty():
pRIN_2=fifo.get()
are_isomorphic,matching=are_isomorfic(pRIN_1,pRIN_2)
if are_isomorphic:
pRIN_1=merge_2_RINs(pRIN_1,pRIN_2,matching)
else:
next_fifo.put(pRIN_2)
d_RINs[primary_key][secondary_key].append(pRIN_1)
fifo=next_fifo
return d_RINs
def merge_d_RINs(d_RINs_1,d_RINs_2):
#We are going to add the content of d_RINs_2 to d_RINs_1
#First, check for common primary_key/secondary_key
#For any common pair of key, merge
for primary_key,d_prime in d_RINs_1.items():
if d_RINs_2.get(primary_key,None)!= None:
#primary_key is in d_RINs_2
for secondary_key,l_second in d_prime.items():
if d_RINs_2[primary_key].get(secondary_key,None)!= None:
l_RINs_1=d_RINs_1[primary_key][secondary_key]
l_RINs_2=d_RINs_2[primary_key][secondary_key]
d_RINs_1[primary_key][secondary_key]=[]
d_invalid_RIN_2={}
for RIN_1 in l_RINs_1:
for RIN_2 in l_RINs_2:
if not d_invalid_RIN_2.get(RIN_2,False):
are_isomorphic,matching=are_isomorfic(RIN_1,RIN_2)
if are_isomorphic:
RIN_1=merge_2_RINs(RIN_1,RIN_2,matching)
d_invalid_RIN_2[RIN_2]=True
break #2 RINs of l_RINs_[1|2] can't be isomorphic so there's no point looking further
d_RINs_1[primary_key][secondary_key].append(RIN_1)
for RIN_2 in l_RINs_2:
if not d_invalid_RIN_2.get(RIN_2,False):
d_RINs_1[primary_key][secondary_key].append(RIN_2)
#now we copy the RINs specific to d_RINs_2
for primary_key,d_prime in d_RINs_2.items():
if d_RINs_1.get(primary_key,None)== None:
d_RINs_1[primary_key]={}
for secondary_key,l_second in d_prime.items():
if d_RINs_1[primary_key].get(secondary_key,None)== None:
d_RINs_1[primary_key][secondary_key]=[]
for RIN_2 in l_second:
d_RINs_1[primary_key][secondary_key].append(RIN_2)
return d_RINs_1
def count_RINs_in_dict (d_RINs_1):
count=0
for primary_key,d_prime in d_RINs_1.items():
for secondary_key,l_second in d_prime.items():
count+=len(l_second)
return count
def count_occurrences_in_dict (d_RINs_1):
count=0
for primary_key,d_prime in d_RINs_1.items():
for secondary_key,l_second in d_prime.items():
for RIN in l_second:
count+=RIN.nb_occurrences
return count
def d_RINs_to_sorted_list(d_RINs_1):
l_RINs=[]
for primary_key in sorted(d_RINs_1.keys()):
for secondary_key in sorted(d_RINs_1[primary_key].keys()):
l_RINs.extend(d_RINs_1[primary_key][secondary_key])
return l_RINs
###########Test tools##########################################################
def d_RINs_1_included_in_d_RINs_2(d_RINs_1,d_RINs_2):
l=[]
for primary_key,d_prime in d_RINs_1.items():
if d_RINs_2.get(primary_key,None)== None:
return False
else:
for secondary_key,l_second in d_prime.items():
if d_RINs_2[primary_key].get(secondary_key,None)== None:
return False
else:
for RIN_1 in l_second:
missing=True
for RIN_2 in d_RINs_2[primary_key][secondary_key]:
if are_isomorfic(RIN_1,RIN_2)[0]:
missing=False
break
if missing:
return False
return True
def diff_d_RINs_1_minus_RINs_2(d_RINs_1,d_RINs_2):
#we assume d_RINs_2 is included in d_RINs_1
d_3={}
for primary_key,d_prime in d_RINs_1.items():
if d_RINs_2.get(primary_key,None)== None:
d_3[primary_key]=d_prime
else:
for secondary_key,l_second in d_prime.items():
if d_RINs_2[primary_key].get(secondary_key,None)== None:
tmp=d_3.get(primary_key,{})
tmp[secondary_key]=l_second
d_3[primary_key]=tmp
else:
for RIN_1 in l_second:
missing=True
for RIN_2 in d_RINs_2[primary_key][secondary_key]:
if are_isomorfic(RIN_1,RIN_2)[0]:
missing=False
break
if missing:
tmp_1=d_3.get(primary_key,{})
tmp_2=tmp_1.get(secondary_key,[])
tmp_2.append(RIN_1)
tmp_1[secondary_key]=tmp_2
d_3[primary_key]=tmp_1
return d_3
def count_duplicates_in_d_RINs_1(d_RINs_1):
n=0
for primary_key,d_prime in d_RINs_1.items():
for secondary_key,l_second in d_prime.items():
for RIN_1 in l_second:
unique=True
for RIN_2 in l_second:
if RIN_1 != RIN_2:
if are_isomorfic(RIN_1,RIN_2)[0]:
unique=False
break
if not unique:
n+=1
return n
def get_collection_occurrence_graph(d_RINs_1,data):
l=[]
for primary_key,d_prime in d_RINs_1.items():
for secondary_key,l_second in d_prime.items():
for RIN_1 in l_second:
l_g = RIN_1.create_occurrence_graphs(data)
l.append((RIN_1,l_g))
return l
###########Hierarchy construction tools########################################
def preliminary_test_inclusion(RIN_1,RIN_2):
if RIN_1.order <= RIN_2.order and RIN_1.size < RIN_2.size:
if edge_composition_compatibility(RIN_1,RIN_2):
return True
return False
def build_hierarchy(d_RINs):
hierarchy={}
return hierarchy
%% Cell type:code id: tags:
``` python
```
#external imports
import sys
import json #to hash data of edges (dictionnaries)
import queue
# internal imports
import matching
import shared_functions
# Convention :
# G,H are the two graphs received as inputs
# g(,h) are generic graphs
def elligible_neighbours_search_RINs(d_n_g,d_n_h):#tested
#given two matched nodes n_g and n_h resp. from g and h
#return the list of pairs of nodes of their neighbouroughs that may be matched together according to to edge categories
common_categories=list(set(d_n_g.keys()) & set(d_n_h.keys()))
l_candidates=[]
backbone=True
# backbone=False
for category in common_categories:
if not ('"label": "B53"' in category or '"label": "B35"' in category):
backbone=False
l_candidates.append((d_n_g[category][0],d_n_h[category][0]))#TODO deal with triangle double interaction
if not backbone:
return l_candidates
else:
return []
def elligible_neighbours_test_isomorphism(d_n_g,d_n_h):#tested
#given two matched nodes n_g and n_h resp. from g and h
#return the list of pairs of nodes of their neighbouroughs that may be matched together according to to edge categories
common_categories=list(set(d_n_g.keys()) & set(d_n_h.keys()))
if len(common_categories)!=len(d_n_g.keys()) or len(common_categories)!=len(d_n_h.keys()):
return None
l_candidates=[]
for category in common_categories:
l_candidates.append((d_n_g[category][0],d_n_h[category][0]))#TODO deal with triangle double interaction
return l_candidates
def elligible_neighbours_test_sub_isomorphism(d_n_g,d_n_h):#tested
#given two matched nodes n_g and n_h resp. from g and h
#return the list of pairs of nodes of their neighbouroughs that may be matched together according to to edge categories
common_categories=list(set(d_n_g.keys()) & set(d_n_h.keys()))
if len(common_categories)!=len(d_n_g.keys()): #what's the purpose of this ?
return None
l_candidates=[]
for category in common_categories:
l_candidates.append((d_n_g[category][0],d_n_h[category][0]))#TODO deal with triangle double interaction
return l_candidates
def get_single_dict_of_starting_edges_all(g,f,main=False):#tested
list_of_not_oriented_labels=['CWW','TWW','CSS','TSS','CHH','THH']
#this function is to be changed to fit the current needs
#current : dict of all long_range==True edges in g gathered by labels, labels being the key in the dict (label:list_e)
d_label={}
for edge in g.edges():
if g.edges[edge]['long_range']:
data=g.edges[edge]
category=json.dumps(f(g.edges[edge]), sort_keys=True)#TODO ajouter f
not_oriented=(g.edges[edge]['label'] in list_of_not_oriented_labels)
if not_oriented:
if edge[0] < edge[1]:
d_label[category]=d_label.get(category,[])
d_label[category].append(edge)
if main:
d_label[category].append((edge[1],edge[0]))
else:
d_label[category]=d_label.get(category,[])
d_label[category].append(edge)
return d_label
#Used in the "graph matching" algorithms (list of maximal sub-isomorphism / isomorphism / sub-isomorphism)
def get_single_dict_of_starting_edges_least_populated(g,f,main=False):
list_of_not_oriented_labels=['CWW','TWW','CSS','TSS','CHH','THH']
d_edges_by_category={}
for edge in g.edges():
category=json.dumps(f(g.edges[edge]), sort_keys=True)
d_edges_by_category[category]=d_edges_by_category.get(category,[])
d_edges_by_category[category].append(edge)
least_populated_category=None
numbers=g.size()*2+1
for category in sorted(d_edges_by_category.keys()):
list_of_edges=d_edges_by_category[category]
label=g.edges[list_of_edges[0]]['label']
if label in list_of_not_oriented_labels:
if len(list_of_edges)*2 < numbers:
least_populated_category=category
numbers=len(list_of_edges)
elif len(list_of_edges) < numbers:
least_populated_category=category
numbers=len(list_of_edges)
if main:
label=g.edges[d_edges_by_category[least_populated_category][0]]['label']
if label in list_of_not_oriented_labels:
l_symetric_edges=[]
for edge in d_edges_by_category[least_populated_category]:
l_symetric_edges.append((edge[1],edge[0]))
d_edges_by_category[least_populated_category].extend(l_symetric_edges)
return {least_populated_category:d_edges_by_category[least_populated_category]}
def get_dicts_of_starting_edges(g,h,d_options):
#all
#provide a dictionnary {(category):edge} of edges elligable as starting points
#we distinguish oriented intereaction such as TWS from non-oriented ones as TWW
#this distinction come from the fact that nodes of two non-oriented edges (n1_g,n2_g) and (n1_h,n2_h)
# can be matched in two ways: ((n1_g,n1_h),(n2_g,n2_h)) and ((n2_g,n1_h),(n1_g,n2_h))
# to cover this we are adding (n1_h,n2_h), (n1_g,n2_g) and (n2_g,n1_g) to the output
# distinguing g from h is covered by the optional 'main' parameter
#least_populated
# provide a dictionnary {(category):edge} of edges elligable as starting points
# we are looking for the least populated category
get_single_dict_of_starting_edges=d_options['get_dicts_of_starting_edges']
return (get_single_dict_of_starting_edges(g,d_options['hashing_filter'],True),get_single_dict_of_starting_edges(h,d_options['hashing_filter']))
def extension_blocked_by_blacklist(d_g,d_h,matching,d_options):
for n_g,n_h in matching.get_list_of_matched_pairs():
for pair in d_options['elligible_neighbours'](d_g[n_g],d_h[n_h]):
if matching.pair_is_elligible_but_blacklisted(pair):
return True
return False
def extender(d_g,d_h,matching,d_options):
lifo_branching=queue.LifoQueue()
lifo_branching.put(matching)
l_results=[]
while not lifo_branching.empty():
m=lifo_branching.get()
branch_original_dicts=m.get_copies_of_white_and_black_dicts()
list_conflicts=[]
fifo_pair=queue.Queue()
for pair in m.get_list_of_matched_pairs():
fifo_pair.put(pair)
while not fifo_pair.empty():
n_g,n_h=fifo_pair.get()
l_candidates=d_options['elligible_neighbours'](d_g[n_g],d_h[n_h])
if l_candidates==None:
return (False,m)
for pair in l_candidates:
n_g,n_h=pair
if d_options['elligible_neighbours'](d_g[n_g],d_h[n_h]) != []:
case,extra=m.test_candidate_pair(pair)
if case == 'accepted':
fifo_pair.put(pair)
elif case == 'rejected':
continue
elif case == 'conflict':
if d_options['exit_on_conflict']:
return (False,m)
else:
list_conflicts.extend(extra)
if d_options['backtracking']:
if not extension_blocked_by_blacklist(d_g,d_h,m,d_options):
l_results.append(m)
#non_maximal_results DO generate interesting conflicts
for branched_matching in m.create_list_branched_matchings(branch_original_dicts,list_conflicts):
lifo_branching.put(branched_matching)
return d_options['return_format'](l_results,m,d_g,d_h)
def return_search_RINs(l_results,m,d_g,d_h):
return l_results
def return_test_isomorphism(l_results,m,d_g,d_h):
if m.get_size()==len(d_g) and m.get_size()==len(d_h):
return (True,m)
return (False,m)
def return_test_sub_isomorphism(l_results,m,d_g,d_h):
if m.get_size()==len(d_g):
return (True,m)
return (False,m)
def launcher(G,H,d_options):
l_matchings=[]
d_G=shared_functions.dict_from_graph(G,d_options['hashing_filter'])
d_H=shared_functions.dict_from_graph(H,d_options['hashing_filter'])
d_start_G,d_start_H=get_dicts_of_starting_edges(G,H,d_options)
elligable_categories = list(set(d_start_G.keys()) & set(d_start_H.keys()))
for category in elligable_categories:
for e_G in d_start_G[category]:
for e_H in d_start_H[category]:
n1_G,n2_G=e_G
n1_H,n2_H=e_H
pair1=(n1_G,n1_H)
pair2=(n2_G,n2_H)
need_to_return,result=d_options['intermediary_result_processing'](extender(d_G,d_H,matching.matching((pair1,pair2)),d_options),l_matchings)
if need_to_return:
return result
return d_options['final_result_processing'](l_matchings)
def intermediary_result_processing_multiple(extender_output,l_matchings):
l_matchings.extend(extender_output)
return (False,None)
def intermediary_result_processing_single(extender_output,l_matchings):
success,m=extender_output
if success:
return (True,(success,m))
else:
return (False,None)
def final_result_processing_multiple(l_matchings):
return l_matchings
def final_result_processing_single(l_matchings):
return (False,None)
#Return all common maximal sub-isomorphism between G and H
def search_RINs(G,H):
d_options={}
d_options['elligible_neighbours']=elligible_neighbours_search_RINs
d_options['get_dicts_of_starting_edges']=get_single_dict_of_starting_edges_all
d_options['hashing_filter']=shared_functions.identity
d_options['exit_on_conflict']=False
d_options['backtracking']=True
d_options['return_format']=return_search_RINs
d_options['intermediary_result_processing']=intermediary_result_processing_multiple
d_options['final_result_processing']=final_result_processing_multiple
return launcher(G,H,d_options)
#Tests if G and H are isomorphic
def test_isomorphism(G,H):
d_options={}
d_options['elligible_neighbours']=elligible_neighbours_test_isomorphism
d_options['get_dicts_of_starting_edges']=get_single_dict_of_starting_edges_least_populated
d_options['hashing_filter']=shared_functions.identity
d_options['exit_on_conflict']=True
d_options['backtracking']=False
d_options['return_format']=return_test_isomorphism
d_options['intermediary_result_processing']=intermediary_result_processing_single
d_options['final_result_processing']=final_result_processing_single
return launcher(G,H,d_options)
#Tests if G is a sub-isomorphism of H
def test_sub_isomorphism(G,H):
d_options={}
d_options['elligible_neighbours']=elligible_neighbours_test_sub_isomorphism
d_options['get_dicts_of_starting_edges']=get_single_dict_of_starting_edges_least_populated
d_options['hashing_filter']=shared_functions.identity
d_options['exit_on_conflict']=True
d_options['backtracking']=False
d_options['return_format']=return_test_sub_isomorphism
d_options['intermediary_result_processing']=intermediary_result_processing_single
d_options['final_result_processing']=final_result_processing_single
return launcher(G,H,d_options)
# external imports
import sys
import os.path
#teste
import networkx as nx
# internal imports
import shared_functions
import pre_processing
import core
import post_processing
import RIN_management
#configuration
data_folder='../data/'
raw_data_file='graphs_2.92.nxpickled'
pre_processed_data_folder='./pre_processed_data'