Skip to content

sql

PoseMetadata

Bases: Base

number_of_entities property

number_of_entities: int

Return the number of distinct entities (Gene/Protein products) found in the PoseMetadata

entity_names property

entity_names: list[str]

Return the names of each entity (Gene/Protein products) found in the PoseMetadata

design_ids property

design_ids: list[str]

Get the names of each DesignData in the PoseJob

design_names property

design_names: list[str]

Get the names of each DesignData in the PoseJob

pose_source property

pose_source

Provide the DesignData for the Pose itself

symmetry class-attribute instance-attribute

symmetry = Column(String(8))

The result of the SymEntry

PoseMetrics

Bases: Base

symmetric_interface class-attribute instance-attribute

symmetric_interface = Column(Boolean)

Thermophilicity implies this is a spectrum, while thermophilic implies binary

ProteinMetadata

Bases: Base

Used for hold fixed metadata of protein structures, typically pulled from PDB API

entity_id class-attribute instance-attribute

entity_id = Column(String(20), nullable=False, index=True, unique=True)

This could be described as the PDB API EntityID

thermophilicity class-attribute instance-attribute

thermophilicity = Column(Float)

Thermophilicity implies this is a spectrum, while thermophilic implies binary

uniprot_ids property

uniprot_ids: tuple[str, ...]

Access the UniProtID's associated with this instance

entity_info property

entity_info: dict[str, dict[str, Any]]

Format the instance for population of metadata via the entity_info kwargs

EntityData

Bases: Base

Used for unique Pose instances to connect multiple sources of information

entity_info property

entity_info: dict[str, dict[str, Any]]

Format the instance for population of metadata via the entity_info kwargs

EntityTransform

Bases: Base

transformation property writable

transformation: TransformationMapping | dict

Provide the names of all Entity instances mapped to the Pose

DesignData

Bases: Base

Account for design metadata created from pose metadata

initialize_metadata

initialize_metadata(session: Session, possibly_new_uniprot_to_prot_data: dict[tuple[str, ...], Iterable[ProteinMetadata]] = None, existing_uniprot_entities: Iterable[UniProtEntity] = None, existing_protein_metadata: Iterable[ProteinMetadata] = None) -> dict[tuple[str, ...], list[ProteinMetadata]] | dict

Compare newly described work to the existing database and set up metadata for all described entities

Doesn't commit new instances to the database in case they are attached to existing objects

Parameters:

  • session (Session) –

    A currently open transaction within sqlalchemy

  • possibly_new_uniprot_to_prot_data (dict[tuple[str, ...], Iterable[ProteinMetadata]], default: None ) –

    A mapping of the possibly required UniProtID entries and their associated ProteinMetadata. These could already exist in database, but were indicated they are needed

  • existing_uniprot_entities (Iterable[UniProtEntity], default: None ) –

    If any UniProtEntity instances are already loaded, pass them to expedite setup

  • existing_protein_metadata (Iterable[ProteinMetadata], default: None ) –

    If any ProteinMetadata instances are already loaded, pass them to expedite setup

Source code in symdesign/resources/sql.py
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
def initialize_metadata(session: Session,
                        possibly_new_uniprot_to_prot_data: dict[tuple[str, ...], Iterable[ProteinMetadata]] = None,
                        existing_uniprot_entities: Iterable[wrapapi.UniProtEntity] = None,
                        existing_protein_metadata: Iterable[ProteinMetadata] = None) -> \
        dict[tuple[str, ...], list[ProteinMetadata]] | dict:
    """Compare newly described work to the existing database and set up metadata for all described entities

    Doesn't commit new instances to the database in case they are attached to existing objects

    Args:
        session: A currently open transaction within sqlalchemy
        possibly_new_uniprot_to_prot_data: A mapping of the possibly required UniProtID entries and their associated
            ProteinMetadata. These could already exist in database, but were indicated they are needed
        existing_uniprot_entities: If any UniProtEntity instances are already loaded, pass them to expedite setup
        existing_protein_metadata: If any ProteinMetadata instances are already loaded, pass them to expedite setup
    """
    if not possibly_new_uniprot_to_prot_data:
        if existing_protein_metadata:
            pass
            # uniprot_id_to_metadata = {protein_data.uniprot_ids:
            #                           protein_data for protein_data in existing_protein_metadata}
        elif existing_uniprot_entities:
            existing_protein_metadata = {unp_entity.protein_metadata for unp_entity in existing_uniprot_entities}
        else:
            existing_protein_metadata = {}

        return {protein_data.uniprot_ids: [protein_data] for protein_data in existing_protein_metadata}

    # Todo
    #  If I ever adopt the UniqueObjectValidatedOnPending recipe, that could perform the work of getting the
    #  correct objects attached to the database

    # Get the set of all UniProtIDs
    possibly_new_uniprot_ids = set()
    for uniprot_ids in possibly_new_uniprot_to_prot_data.keys():
        possibly_new_uniprot_ids.update(uniprot_ids)
    # Find existing UniProtEntity instances from database
    if existing_uniprot_entities is None:
        existing_uniprot_entities = []
    else:
        existing_uniprot_entities = list(existing_uniprot_entities)

    existing_uniprot_ids = {unp_ent.id for unp_ent in existing_uniprot_entities}
    # Remove the certainly existing from possibly new and query for any new that already exist
    query_additional_existing_uniprot_entities_stmt = \
        select(wrapapi.UniProtEntity).where(wrapapi.UniProtEntity.id.in_(
            possibly_new_uniprot_ids.difference(existing_uniprot_ids)))
    # Add all requested to those known about
    existing_uniprot_entities += session.scalars(query_additional_existing_uniprot_entities_stmt).all()

    # Todo Maybe needed?
    #  Emit this select when there is a stronger association between the multiple
    #  UniProtEntity.uniprot_ids and referencing a unique ProteinMetadata
    #  The below were never tested
    # existing_uniprot_entities_stmt = \
    #     select(UniProtProteinAssociation.protein)\
    #     .where(UniProtProteinAssociation.uniprot_id.in_(possibly_new_uniprot_ids))
    #     # NEED TO GROUP THESE BY ProteinMetadata.uniprot_entities
    # OR
    # existing_uniprot_entities_stmt = \
    #     select(wrapapi.UniProtEntity).join(ProteinMetadata)\
    #     .where(wrapapi.UniProtEntity.uniprot_id.in_(possibly_new_uniprot_ids))
    #     # NEED TO GROUP THESE BY ProteinMetadata.uniprot_entities

    # Map the existing uniprot_id to UniProtEntity
    uniprot_id_to_unp_entity = {unp_entity.id: unp_entity for unp_entity in existing_uniprot_entities}
    insert_uniprot_ids = possibly_new_uniprot_ids.difference(uniprot_id_to_unp_entity.keys())

    # Get the remaining UniProtIDs as UniProtEntity entries
    new_uniprot_id_to_unp_entity = {uniprot_id: wrapapi.UniProtEntity(id=uniprot_id)
                                    for uniprot_id in insert_uniprot_ids}
    # Update entire dictionary for ProteinMetadata ops below
    uniprot_id_to_unp_entity.update(new_uniprot_id_to_unp_entity)
    # Insert new
    new_uniprot_entities = list(new_uniprot_id_to_unp_entity.values())
    session.add_all(new_uniprot_entities)

    # Repeat the process for ProteinMetadata
    # Map entity_id to uniprot_id for later cleaning of UniProtEntity
    possibly_new_entity_id_to_uniprot_ids = \
        {protein_data.entity_id: uniprot_ids
         for uniprot_ids, protein_datas in possibly_new_uniprot_to_prot_data.items()
         for protein_data in protein_datas}
    # Map entity_id to ProteinMetadata
    possibly_new_entity_id_to_protein_data = \
        {protein_data.entity_id: protein_data
         for protein_datas in possibly_new_uniprot_to_prot_data.values()
         for protein_data in protein_datas}
    possibly_new_entity_ids = set(possibly_new_entity_id_to_protein_data.keys())

    if existing_protein_metadata is None:
        existing_protein_metadata = []
    else:
        existing_protein_metadata = list(existing_protein_metadata)

    existing_entity_ids = {protein_data.entity_id for protein_data in existing_protein_metadata}
    # Remove the certainly existing from possibly new and query the new
    existing_protein_metadata_stmt = \
        select(ProteinMetadata) \
        .where(ProteinMetadata.entity_id.in_(possibly_new_entity_ids.difference(existing_entity_ids)))
    # Add all requested to those known about
    existing_protein_metadata += session.scalars(existing_protein_metadata_stmt).all()

    # Get all the existing ProteinMetadata.entity_ids to handle the certainly new ones
    existing_entity_ids = {protein_data.entity_id for protein_data in existing_protein_metadata}
    # Any remaining entity_ids are new and must be added
    new_entity_ids = possibly_new_entity_ids.difference(existing_entity_ids)
    # uniprot_ids_to_new_metadata = {
    #     possibly_new_entity_id_to_uniprot_ids[entity_id]: possibly_new_entity_id_to_protein_data[entity_id]
    #     for entity_id in new_entity_ids}
    uniprot_ids_to_new_metadata = defaultdict(list)
    for entity_id in new_entity_ids:
        uniprot_ids_to_new_metadata[possibly_new_entity_id_to_uniprot_ids[entity_id]].append(
            possibly_new_entity_id_to_protein_data[entity_id])

    # Add all existing to UniProtIDs to ProteinMetadata mapping
    all_uniprot_id_to_prot_data = defaultdict(list)
    for protein_data in existing_protein_metadata:
        all_uniprot_id_to_prot_data[protein_data.uniprot_ids].append(protein_data)

    # Collect all new ProteinMetadata which remain
    all_protein_metadata = []
    for uniprot_ids, metadatas in uniprot_ids_to_new_metadata.items():
        all_protein_metadata.extend(metadatas)
        # Add to UniProtIDs to ProteinMetadata map
        all_uniprot_id_to_prot_data[uniprot_ids].extend(metadatas)
        # Attach UniProtEntity to new ProteinMetadata by UniProtID
        for protein_metadata in metadatas:
            # Create the ordered_list of UniProtIDs (UniProtEntity) on ProteinMetadata entry
            try:
                # protein_metadata.uniprot_entities.extend(
                #     uniprot_id_to_unp_entity[uniprot_id] for uniprot_id in uniprot_ids)
                protein_metadata.uniprot_entities = \
                    [uniprot_id_to_unp_entity[uniprot_id] for uniprot_id in uniprot_ids]
            except KeyError:
                # uniprot_id_to_unp_entity is missing a key. Not sure why it wouldn't be here...
                raise SymDesignException(putils.report_issue)

    # Insert remaining ProteinMetadata
    session.add_all(all_protein_metadata)
    # # Finalize additions to the database
    # session.commit()

    return all_uniprot_id_to_prot_data

insert_dataframe

insert_dataframe(session: Session, table: Base, df: DataFrame, mysql: bool = False, **kwargs)

Take a formatted pandas DataFrame and insert values into a sqlalchemy session, then commit the transaction

Parameters:

  • session (Session) –

    A currently open transaction within sqlalchemy

  • table (Base) –

    A Class mapped to SQL table with sqlalchemy

  • df (DataFrame) –

    The DataFrame with records to insert

  • mysql (bool, default: False ) –

    Whether the database is a MySQL dialect

Source code in symdesign/resources/sql.py
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
def insert_dataframe(session: Session, table: Base, df: pd.DataFrame, mysql: bool = False, **kwargs):
    """Take a formatted pandas DataFrame and insert values into a sqlalchemy session, then commit the transaction

    Args:
        session: A currently open transaction within sqlalchemy
        table: A Class mapped to SQL table with sqlalchemy
        df: The DataFrame with records to insert
        mysql: Whether the database is a MySQL dialect
    """
    if mysql:
        insert = mysql_insert
    else:
        insert = sqlite_insert

    insert_stmt = insert(table)
    # # Get the columns that should be updated
    # new_columns = df.columns.tolist()
    # # logger.debug(f'Provided columns: {new_columns}')
    # excluded_columns = insert_stmt.excluded
    # update_columns = [c for c in excluded_columns if c.name in new_columns]
    # update_dict = {getattr(c, 'name'): c for c in update_columns if not c.primary_key}
    # table_ = table.__table__
    # # Find relevant column indicators to parse the non-primary key non-nullable columns
    # primary_keys = [key for key in table_.primary_key]
    # non_null_keys = [col for col in table_.columns if not col.nullable]
    # index_keys = [key for key in non_null_keys if key not in primary_keys]

    # do_update_stmt = insert_stmt.on_conflict_do_update(
    #     index_elements=index_keys,  # primary_keys,
    #     set_=update_dict
    # )
    # # Can't insert with .returning() until version 2.0...
    # # try:
    # #     result = session.execute(do_update_stmt.returning(table_.id), df.reset_index().to_dict('records'))
    # # except exc.CompileError as error:
    # #     logger.error(error)
    # #     try:
    # #         result = session.execute(insert_stmt.returning(table_.id), df.reset_index().to_dict('records'))
    # #     except exc.CompileError as _error:
    # #         logger.error(_error)
    # # try:
    # This works using insert with conflict, however, doesn't return the auto-incremented ids
    # result = session.execute(do_update_stmt, df.to_dict('records'))
    # result = session.execute(insert_stmt, df.to_dict('records'))
    start_time = time()
    session.execute(insert_stmt, df.to_dict('records'))
    logger.debug(f'Transaction with table "{table.__tablename__}" took {time() - start_time:8f}s')

upsert_dataframe

upsert_dataframe(session: Session, table: Base, df: DataFrame, mysql: bool = False, **kwargs)

Take a formatted pandas DataFrame and insert/update values into a sqlalchemy session, then commit the transaction

Parameters:

  • session (Session) –

    A currently open transaction within sqlalchemy

  • table (Base) –

    A Class mapped to SQL table with sqlalchemy

  • df (DataFrame) –

    The DataFrame with records to insert

  • mysql (bool, default: False ) –

    Whether the database is a MySQL dialect

Source code in symdesign/resources/sql.py
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
def upsert_dataframe(session: Session, table: Base, df: pd.DataFrame, mysql: bool = False, **kwargs):
    """Take a formatted pandas DataFrame and insert/update values into a sqlalchemy session, then commit the transaction

    Args:
        session: A currently open transaction within sqlalchemy
        table: A Class mapped to SQL table with sqlalchemy
        df: The DataFrame with records to insert
        mysql: Whether the database is a MySQL dialect
    """
    if mysql:
        insert_stmt = mysql_insert(table)
        excluded_columns = insert_stmt.inserted
    else:
        insert_stmt = sqlite_insert(table)
        excluded_columns = insert_stmt.excluded

    # Get the columns that should be updated
    new_columns = df.columns.tolist()
    # logger.debug(f'Provided columns: {new_columns}')
    update_columns = [c for c in excluded_columns if c.name in new_columns]
    update_dict = {c.name: c for c in update_columns if not c.primary_key}
    tablename = table.__tablename__
    if mysql:
        do_update_stmt = insert_stmt.on_duplicate_key_update(
            update_dict
        )
    else:  # SQLite and postgresql are the same
        # Find relevant column indicators to parse the non-primary key non-nullable columns
        unique_constraints = inspect(session.connection()).get_unique_constraints(tablename)
        # Returns
        #  [{'name': '_pose_design_uc', 'column_names': ['pose_id', 'design_id']}]
        table_unique_constraint_keys = set()
        for constraint in unique_constraints:
            table_unique_constraint_keys.update(constraint['column_names'])

        table_ = table.__table__
        unique_constraint_keys = {col.name for col in table_.columns if col.unique}
        index_keys = unique_constraint_keys.union(table_unique_constraint_keys)
        # primary_keys = [key for key in table_.primary_key]
        # non_null_keys = [col for col in table_.columns if not col.nullable]
        # index_keys = [key for key in non_null_keys if key not in primary_keys] \
        #     + unique_constraint_keys
        do_update_stmt = insert_stmt.on_conflict_do_update(
            index_elements=index_keys,  # primary_keys,
            set_=update_dict
        )
    # Todo Error
    #  sqlalchemy.exc.OperationalError:
    #  MySQLdb._exceptions.OperationalError:
    #    (1213, 'Deadlock found when trying to get lock; try restarting transaction')
    start_time = time()
    session.execute(do_update_stmt, df.to_dict('records'))
    logger.debug(f'Transaction with table "{tablename}" took {time() - start_time:8f}s')

format_residues_df_for_write

format_residues_df_for_write(df: DataFrame) -> DataFrame

Take a typical per-residue DataFrame and orient the top column level (level=0) containing the residue numbers on the index innermost level

Parameters:

  • df (DataFrame) –

    A per-residue DataFrame to transform

Returns:

  • DataFrame

    The transformed DataFrame

Source code in symdesign/resources/sql.py
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
def format_residues_df_for_write(df: pd.DataFrame) -> pd.DataFrame:
    """Take a typical per-residue DataFrame and orient the top column level (level=0) containing the residue numbers on
    the index innermost level

    Args:
        df: A per-residue DataFrame to transform

    Returns:
        The transformed DataFrame
    """
    # df.sort_index(level=0, axis=1, inplace=True, sort_remaining=False)
    # # residue_metric_columns = residues.columns.levels[-1].tolist()
    # # self.log.debug(f'Residues metrics present: {residue_metric_columns}')

    # Place the residue indices from the column names into the index at position -1
    df = df.stack(0)
    df.index.set_names('index', level=-1, inplace=True)

    return df

write_dataframe

write_dataframe(session: Session, designs: DataFrame = None, design_residues: DataFrame = None, entity_designs: DataFrame = None, poses: DataFrame = None, pose_residues: DataFrame = None, residues: DataFrame = None, update: bool = True, transaction_kwargs: dict = dict())

Format each possible DataFrame type for output via csv or SQL database

Parameters:

  • session (Session) –

    A currently open transaction within sqlalchemy

  • designs (DataFrame, default: None ) –

    The typical per-design metric DataFrame where each index is the design id and the columns are design metrics

  • design_residues (DataFrame, default: None ) –

    The typical per-residue metric DataFrame where each index is the design id and the columns are (residue index, Boolean for design utilization)

  • entity_designs (DataFrame, default: None ) –

    The typical per-design metric DataFrame for Entity instances where each index is the design id and the columns are design metrics

  • poses (DataFrame, default: None ) –

    The typical per-pose metric DataFrame where each index is the pose id and the columns are pose metrics

  • pose_residues (DataFrame, default: None ) –

    The typical per-residue metric DataFrame where each index is the design id and the columns are (residue index, residue metric)

  • residues (DataFrame, default: None ) –

    The typical per-residue metric DataFrame where each index is the design id and the columns are (residue index, residue metric)

  • update (bool, default: True ) –

    Whether the output identifiers are already present in the metrics

  • transaction_kwargs (dict, default: dict() ) –

    Any keyword arguments that should be passed for the transaction. Automatically populated with the database backend as located from the session

Source code in symdesign/resources/sql.py
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
def write_dataframe(session: Session, designs: pd.DataFrame = None,
                    design_residues: pd.DataFrame = None, entity_designs: pd.DataFrame = None,
                    poses: pd.DataFrame = None, pose_residues: pd.DataFrame = None, residues: pd.DataFrame = None,
                    update: bool = True, transaction_kwargs: dict = dict()):
    """Format each possible DataFrame type for output via csv or SQL database

    Args:
        session: A currently open transaction within sqlalchemy
        designs: The typical per-design metric DataFrame where each index is the design id and the columns are
            design metrics
        design_residues: The typical per-residue metric DataFrame where each index is the design id and the columns
            are (residue index, Boolean for design utilization)
        entity_designs: The typical per-design metric DataFrame for Entity instances where each index is the design id
            and the columns are design metrics
        poses: The typical per-pose metric DataFrame where each index is the pose id and the columns are
            pose metrics
        pose_residues: The typical per-residue metric DataFrame where each index is the design id and the columns are
            (residue index, residue metric)
        residues: The typical per-residue metric DataFrame where each index is the design id and the columns are
            (residue index, residue metric)
        update: Whether the output identifiers are already present in the metrics
        transaction_kwargs: Any keyword arguments that should be passed for the transaction. Automatically populated
            with the database backend as located from the session
    """
    #     job: The resources for the current job
    if update:
        dataframe_function = upsert_dataframe
    else:
        dataframe_function = insert_dataframe

    # If this is the first call, update the dictionary to specify the database dialect
    if transaction_kwargs == dict():
        transaction_kwargs.update({session.bind.dialect.name: True})
        # transaction_kwargs.update(which_dialect(session))
    # else:
    #     input(transaction_kwargs)
    # warn = warned = False
    #
    # def warn_multiple_update_results():
    #     nonlocal warned
    #     if warn and not warned:
    #         logger.warning(
    #             "Performing multiple metrics SQL transactions will only return results for the last transaction")
    #         warned = True
    replace_values = {np.nan: None, float('inf'): 1e6, float('-inf'): -1e6}

    if poses is not None and not poses.empty:
        # warn = True
        df = poses.replace(replace_values).reset_index()
        table = PoseMetrics
        dataframe_function(session, table=table, df=df, **transaction_kwargs)
        logger.info(f'Wrote {table.__tablename__} to Database')

    if designs is not None and not designs.empty:
        # warn_multiple_update_results()
        # warn = True
        df = designs.replace(replace_values).reset_index()
        table = DesignMetrics
        dataframe_function(session, table=table, df=df, **transaction_kwargs)
        logger.info(f'Wrote {table.__tablename__} to Database')

    if entity_designs is not None and not entity_designs.empty:
        # warn_multiple_update_results()
        # warn = True
        df = entity_designs.replace(replace_values).reset_index()
        table = DesignEntityMetrics
        dataframe_function(session, table=table, df=df, **transaction_kwargs)
        logger.info(f'Wrote {table.__tablename__} to Database')

    if design_residues is not None and not design_residues.empty:
        # warn_multiple_update_results()
        # warn = True
        df = format_residues_df_for_write(design_residues).replace(replace_values).reset_index()
        table = DesignResidues
        dataframe_function(session, table=table, df=df, **transaction_kwargs)
        logger.info(f'Wrote {table.__tablename__} to Database')

    if residues is not None and not residues.empty:
        # warn_multiple_update_results()
        # warn = True
        df = format_residues_df_for_write(residues).replace(replace_values).reset_index()
        table = ResidueMetrics
        dataframe_function(session, table=table, df=df, **transaction_kwargs)
        logger.info(f'Wrote {table.__tablename__} to Database')

    if pose_residues is not None and not pose_residues.empty:
        # warn_multiple_update_results()
        # warn = True
        df = format_residues_df_for_write(pose_residues).replace(replace_values).reset_index()
        table = PoseResidueMetrics
        dataframe_function(session, table=table, df=df, **transaction_kwargs)
        logger.info(f'Wrote {table.__tablename__} to Database')