Skip to content

augernet.build_molecular_graphs

Molecular Graph Building

Builds PyTorch Geometric graphs for molecular property prediction. Supports both CEBE (Core Electron Binding Energy) and Auger spectroscopy outputs.

Usage: data_list = build_molecular_graphs( data_type='cebe', # 'cebe' or 'auger' source_type='calc', # 'calc', 'eval', or 'exp' ATOM_REP='SKIPATOM', raw_dir='/path/to/data', ... )

Key differences between graph types: - CEBE graphs: y = normalized (delta_be - mean) / std for binding energies - Auger graphs: y = flattened spectra [n_atoms, max_spec_len * 2] - Auger be_feature uses either molecular CEBE for carbons and atomic for others (be_feat = 'mol') or uses atomic reference values for all atoms (be_feat = 'atom') - CEBE be_feature uses atomic reference values for all atoms (be_feat = 'atom')

build_graphs(data_type, mol_file='mol_list.txt', auger_spin=None, auger_max_ke=273, auger_max_spec_len=300, DEBUG=False)

Process calculated CEBE data using the feature-store approach.

All node features are stored as separate data.feat_* attributes. data.x contains only the category_feature.

Source code in src/augernet/build_molecular_graphs.py
def build_graphs(data_type, mol_file="mol_list.txt", 
                 auger_spin=None, auger_max_ke=273,
                 auger_max_spec_len = 300,
                 DEBUG=False):
    """
    Process calculated CEBE data using the feature-store approach.

    All node features are stored as separate ``data.feat_*`` attributes.
    ``data.x`` contains only the category_feature.
    """
    mol_dir = os.path.join(DATA_RAW_DIR, data_type)

    skipatom_dir = os.path.join(DATA_RAW_DIR, "skipatom")

    mol_list_path = os.path.join(mol_dir, mol_file)
    with open(mol_list_path, 'r') as f:
        mol_list = [line.strip() for line in f]

    all_encoders = _initialize_all_atom_encoders(skipatom_dir)

    # Compute or load stats before loop over mol_list:

    # Calculate and save norm stats for calc data
    if data_type in ['calc_cebe', 'exp_cebe']:
        norm_stats_path = os.path.join(DATA_PROCESSED_DIR, 'cebe_norm_stats.pt')
    elif data_type in ['calc_auger', 'eval_auger']:
        norm_stats_path = os.path.join(DATA_PROCESSED_DIR, 'auger_cebe_norm_stats.pt')

    if data_type in ['calc_cebe', 'calc_auger']:
        mean, std = _compute_cebe_normalization_stats(mol_dir, mol_list)
        norm_stats = {'mean': float(mean), 'std': float(std)}
        print("Normalization statistics:", norm_stats)
        torch.save(norm_stats, norm_stats_path)

    # Load norm stats  from calc data for exp eval and other predictions
    if data_type in ['exp_cebe', 'eval_auger']:
        norm_stats = torch.load(norm_stats_path, weights_only=False)
        mean = norm_stats['mean']
        std = norm_stats['std']

    data_list = []

    if DEBUG:
        mol_list = mol_list[:5]

    for mol_name in mol_list:

        mol_xyz_path = os.path.join(mol_dir, f"{mol_name}.xyz")
        mol, xyz_symbols, pos, smiles = _mol_from_xyz_order(mol_xyz_path, labeled_atoms=False)

        cebe_path = f"{mol_dir}/{mol_name}_out.txt"
        cebe = np.loadtxt(cebe_path)

        if data_type in ['calc_auger', 'eval_auger']:
            spec_out, spec_len = spec_utils.extract_spectra(
                                        data_type, mol_dir, mol_name, 
                                        auger_spin, auger_max_ke, 
                                        auger_max_spec_len) 

        #Use to differentiate between cebe, auger sing, auger trip
        if data_type in ['calc_cebe', 'exp_cebe']: 
            category_feature=np.array([1, 0, 0])
        if data_type in ['calc_auger', 'eval_auger']:
            if auger_spin == 'singlet': 
                category_feature=np.array([0, 1, 0])
            if auger_spin == 'triplet': 
                category_feature=np.array([0, 0, 1])
        #print("mol_name:", mol_name)
        node_features, x, edge_index, edge_attr, atomic_be, carbon_env_indices = \
            _build_node_and_edge_features(mol, all_encoders, category_feature, cebe)
        #print(xyz_symbols)
        #print(np.column_stack((node_features['atomic_be'], node_features['mol_be'])))
        #print(" ")

        if data_type in ['calc_cebe', 'exp_cebe']:
            # Build targets (same logic as v1)
            out = []
            for n, val in enumerate(cebe):
                if val == -1:
                    out.append(-1)
                else:
                    ref_e = atomic_be[n].item()
                    dum = ref_e - val
                    #Mean std normalized output, across the full calc dataset
                    out.append((dum - mean) / std)

            y = torch.FloatTensor(out)

        if data_type in ['calc_auger', 'eval_auger']:
            spec_out_array = np.array(spec_out)
            y = torch.from_numpy(spec_out_array).float()
            mask_rows = ~(y.abs().sum(dim=2) == 0)
            mask_flat = mask_rows.repeat(1, 2).float()
            y = y.transpose(1, 2).reshape(len(xyz_symbols), auger_max_spec_len * 2)


        node_mask = [0. if n == -1 else 1. for n in cebe]

        # Store original CEBE values (eV) so evaluation can display them
        # without round-trip precision loss through normalize/denormalize.
        true_cebe = torch.tensor(
            [float(v) if v != -1 else -1.0 for v in cebe],
            dtype=torch.float64,
        )

        if data_type in ['calc_cebe', 'exp_cebe']: 
            data = Data(
                x=x, edge_index=edge_index, edge_attr=edge_attr,
                node_mask=torch.FloatTensor(node_mask),
                y=y.view(-1, 1), 
                pos=torch.tensor(pos, dtype=torch.float), 
                atomic_be=atomic_be,
                atom_symbols=xyz_symbols, 
                true_cebe=true_cebe,
                smiles=smiles, 
                mol_name=mol_name,
                carbon_env_labels=torch.tensor(carbon_env_indices, dtype=torch.long),
            )
        if data_type in ['calc_auger', 'eval_auger']:
            data = Data(
                x=x, edge_index=edge_index, edge_attr=edge_attr,
                node_mask=torch.FloatTensor(node_mask),
                y=y.view(-1, 1),
                mask_bin=mask_flat,
                spec_len=spec_len,
                pos=torch.tensor(pos, dtype=torch.float), 
                atomic_be=atomic_be,
                true_cebe=true_cebe,
                atom_symbols=xyz_symbols, 
                smiles=smiles, 
                mol_name=mol_name,
                carbon_env_labels=torch.tensor(carbon_env_indices, dtype=torch.long),
            )

        # Store all features as separate attributes
        for attr_name, tensor in node_features.items():
            setattr(data, attr_name, tensor)

        data_list.append(data)

    print("Total molecules processed:", len(data_list))

    return data_list

get_butina_clusters(smiles_list, cutoff=0.65)

Assign Butina cluster IDs from a list of SMILES strings.

Uses Morgan radius-2 / 1024-bit fingerprints (ECFP4) for the Tanimoto distance matrix, then Taylor-Butina clustering.

Parameters:

Name Type Description Default
smiles_list list of str

SMILES for every molecule in the dataset.

required
cutoff float

Distance cutoff passed to :func:_taylor_butina_clustering.

0.65

Returns:

Type Description
list of int

One cluster ID per molecule.

Source code in src/augernet/build_molecular_graphs.py
def get_butina_clusters(smiles_list, cutoff=0.65):
    """Assign Butina cluster IDs from a list of SMILES strings.

    Uses Morgan radius-2 / 1024-bit fingerprints (ECFP4) for the
    Tanimoto distance matrix, then Taylor-Butina clustering.

    Parameters
    ----------
    smiles_list : list of str
        SMILES for every molecule in the dataset.
    cutoff : float
        Distance cutoff passed to :func:`_taylor_butina_clustering`.

    Returns
    -------
    list of int
        One cluster ID per molecule.
    """
    gen = rdFingerprintGenerator.GetMorganGenerator(
        radius=BUTINA_RADIUS, fpSize=BUTINA_N_BITS)
    fp_list = []
    for smi in smiles_list:
        mol = Chem.MolFromSmiles(smi)
        if mol is None:
            raise ValueError(f"RDKit could not parse SMILES: {smi}")
        fp_list.append(gen.GetFingerprint(mol))
    return _taylor_butina_clustering(fp_list, cutoff=cutoff)

get_per_atom_morgan_bits(mol, radius=1, n_bits=2048)

Compute per-atom Morgan fingerprint bit sets for every atom.

This is the canonical low-level function used by all Morgan-FP consumers in this project (node features, locality analysis, etc.).

Parameters:

Name Type Description Default
mol RDKit Mol

Must already have explicit hydrogens (Chem.AddHs).

required
radius int

Morgan FP radius. 1 = ECFP2, 2 = ECFP4, …

1
n_bits int

Number of bits in the hashed fingerprint.

2048

Returns:

Type Description
list[frozenset[int]]

One frozenset of active bit indices per atom (length = n_atoms).

Source code in src/augernet/build_molecular_graphs.py
def get_per_atom_morgan_bits(mol, radius=1, n_bits=2048):
    """Compute per-atom Morgan fingerprint bit sets for every atom.

    This is the canonical low-level function used by all Morgan-FP
    consumers in this project (node features, locality analysis, etc.).

    Parameters
    ----------
    mol : RDKit Mol
        Must already have explicit hydrogens (``Chem.AddHs``).
    radius : int
        Morgan FP radius.  1 = ECFP2, 2 = ECFP4, …
    n_bits : int
        Number of bits in the hashed fingerprint.

    Returns
    -------
    list[frozenset[int]]
        One ``frozenset`` of active bit indices per atom (length = n_atoms).
    """
    n_atoms = mol.GetNumAtoms()

    gen = rdFingerprintGenerator.GetMorganGenerator(
        radius=radius, fpSize=n_bits,
    )

    ao = rdFingerprintGenerator.AdditionalOutput()
    ao.AllocateAtomToBits()

    # Side-effect: populates ao with atom→bit mapping
    gen.GetFingerprintAsNumPy(mol, additionalOutput=ao)

    atom_to_bits = ao.GetAtomToBits()
    return [frozenset(atom_to_bits[i]) for i in range(n_atoms)]