Skip to content

utils

log_levels module-attribute

log_levels = dict(zip(log_level_keys, [DEBUG, INFO, WARNING, ERROR, CRITICAL, DEBUG, INFO, WARNING, ERROR, CRITICAL, DEBUG, INFO, WARNING, ERROR, CRITICAL, DEBUG, INFO, WARNING, ERROR, CRITICAL, DEBUG, INFO, WARNING, ERROR, CRITICAL, DEBUG, INFO, WARNING, ERROR, CRITICAL]))

log_level = { 'debug': DEBUG, 'info': INFO, 'warning': WARNING, 'error': ERROR, 'critical': CRITICAL, 'DEBUG': DEBUG, 'INFO': INFO, 'WARNING': WARNING, 'ERROR': ERROR, 'CRITICAL': CRITICAL, 1: DEBUG, 2: INFO, 3: WARNING, 4: ERROR, 5: CRITICAL, 10: DEBUG, 20: INFO, 30: WARNING, 40: ERROR, 50: CRITICAL}

PoseSpecification

PoseSpecification(file: AnyStr)
Source code in symdesign/utils/__init__.py
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
def __init__(self, file: AnyStr):
    self.directive_delimiter: str = ':'
    self.file: AnyStr = file
    self.directives: list[dict[int, str]] = []

    all_poses, design_names, all_design_directives, = [], [], []
    with open(self.file) as f:
        # pose_identifiers, design_names, all_design_directives, *_ = zip(*reader(file, dialect=self))
        all_info = list(zip(*csv.reader(f)))  # dialect=self)))

    for idx in range(len(all_info)):
        if idx == 0:
            all_poses = all_info[idx]
        elif idx == 1:
            design_names = all_info[idx]
        elif idx == 2:
            all_design_directives = all_info[idx]

    # logger.debug(f'Found poses {all_poses}')
    # logger.debug(f'Found designs {design_names}')
    # logger.debug(f'Found directives {all_design_directives}')
    self.pose_identifiers: list[str] = list(map(str.strip, all_poses))
    self.design_names: list[str] = list(map(str.strip, design_names))

    # First, split directives by white space, then by directive_delimiter
    # self.directives = \
    #     [dict((residue, directive) for residues_s, directive in [residue_directive.split(self.directive_delimiter)
    #                                                              for residue_directive in design_directives.split()]
    #           for residue in format_index_string(residues_s)) for design_directives in all_design_directives]
    for design_directives in all_design_directives:
        # print('Design Directives', design_directives)
        parsed_directives = []
        # for residues_s, directive in map(str.split, design_directives.split(), repeat(self.directive_delimiter)):
        for design_directive in design_directives.split():
            try:
                design_specification, directive = design_directive.split(self.directive_delimiter)
            except ValueError:  # Not enough values to unpack
                break
            else:
                if design_specification.replace(',', '').replace('-', '') == design_specification:
                    parsed_directives.append((design_specification, directive))
                else:
                    parsed_directives.extend([(spec, directive) for spec in format_index_string(design_specification)])
        self.directives.append(dict(parsed_directives))

get_directives

get_directives() -> Generator[tuple[str, list[str] | None, list[dict[int, str]] | None], None, None]

Retrieve the parsed PoseID, Design Name, and Mutation Directive information from a Specification file

Returns:

  • Generator[tuple[str, list[str] | None, list[dict[int, str]] | None], None, None]

    An generator of tuples where each tuple contains the PoseID, then if provided in the parsed file, the corresponding DesignID and then design directives. If they aren't provided then None will be returned for the DesignID and directives.

Source code in symdesign/utils/__init__.py
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
def get_directives(self) -> Generator[tuple[str, list[str] | None, list[dict[int, str]] | None], None, None]:
    """Retrieve the parsed PoseID, Design Name, and Mutation Directive information from a Specification file

    Returns:
        An generator of tuples where each tuple contains the PoseID, then if provided in the parsed file, the
            corresponding DesignID and then design directives. If they aren't provided then None will be returned
            for the DesignID and directives.
    """
    # Calculate whether there are multiple designs present per pose
    found_poses = defaultdict(list)
    for idx, pose in enumerate(self.pose_identifiers):
        # if pose in found_poses:
        found_poses[pose].append(idx)
        # else:
        #     found_poses[pose] = [idx]

    # Ensure correctly sized inputs. Create blank data otherwise
    number_pose_identifiers = len(self.pose_identifiers)
    if self.design_names:  # design_file
        if number_pose_identifiers != len(self.design_names):
            raise ValueError(
                f"The 'design identifiers' provided to {self.__class__.__name__} are a different length "
                f"({len(self.design_names)}) than the 'pose identifiers' ({number_pose_identifiers})")
        if self.directives:
            if number_pose_identifiers != len(self.directives):
                raise ValueError(
                    f"The 'directives' provided to {self.__class__.__name__} are a different length "
                    f"({len(self.directives)}) than the 'pose identifiers' ({number_pose_identifiers})")
        else:
            directives = list(repeat(None, number_pose_identifiers))
    else:
        design_names = list(repeat(None, number_pose_identifiers))
        directives = design_names.copy()

    # Group the pose_identifiers with the design_names and directives
    if len(found_poses) == number_pose_identifiers:  # There is one design per pose
        if self.design_names:
            design_names = [[design_name] for design_name in self.design_names]
            if self.directives:
                directives = [[directive] for directive in self.directives]
    else:  # More than one
        if self.design_names:
            design_names = [[self.design_names[index] for index in indices] for indices in found_poses.values()]
            for pose_identifier, names in zip(found_poses, design_names):
                if len(names) != len(set(names)):
                    overlapping_designs = {design: names.count(design) for design in names}
                    raise InputError(f"Can't use a specification file with more than one entry for the same design"
                                     f".\nThe design{'' if len(overlapping_designs) == 1 else 's'} "
                                     f"{', '.join(overlapping_designs)} for pose '{pose_identifier}'")
            if self.directives:
                directives = [[self.directives[index] for index in indices] for indices in found_poses.values()]

    # With above logic, it's impossible to have UnboundLocalError of design_names, directives
    return zip(found_poses, design_names, directives)

dictionary_lookup

dictionary_lookup(dictionary: dict, items: tuple[Any, ...]) -> Any

Return the values of a dictionary for the item pairs nested within

Parameters:

  • dictionary (dict) –

    The dictionary to search

  • items (tuple[Any, ...]) –

    The tuple of keys to search for

Returns: The value specified by dictionary keys

Source code in symdesign/utils/__init__.py
56
57
58
59
60
61
62
63
64
65
def dictionary_lookup(dictionary: dict, items: tuple[Any, ...]) -> Any:
    """Return the values of a dictionary for the item pairs nested within

    Args:
        dictionary: The dictionary to search
        items: The tuple of keys to search for
    Returns:
        The value specified by dictionary keys
    """
    return reduce(getitem, items, dictionary)

set_dictionary_by_path

set_dictionary_by_path(root, items, value)

Set a value in a nested object in root by item sequence.

Source code in symdesign/utils/__init__.py
68
69
70
def set_dictionary_by_path(root, items, value):
    """Set a value in a nested object in root by item sequence."""
    dictionary_lookup(root, items[:-1])[items[-1]] = value

handle_errors

handle_errors(errors: tuple[Type[Exception], ...] = (Exception)) -> Any

Decorator to wrap a function with try: ... except errors:

Parameters:

  • errors (tuple[Type[Exception], ...], default: (Exception) ) –

    A tuple of exceptions to monitor, even if single exception

Returns: Function return upon proper execution, else the Exception if one was raised

Source code in symdesign/utils/__init__.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def handle_errors(errors: tuple[Type[Exception], ...] = (Exception,)) -> Any:
    """Decorator to wrap a function with try: ... except errors:

    Args:
        errors: A tuple of exceptions to monitor, even if single exception
    Returns:
        Function return upon proper execution, else the Exception if one was raised
    """
    def wrapper(func: Callable) -> Any:
        @wraps(func)
        def wrapped(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except errors as error:
                return error
        return wrapped
    return wrapper

timestamp

timestamp() -> str

Return the date/time formatted as YR-MO-DA-HRMNSC. Ex: 2022-Jan-01-245959

Source code in symdesign/utils/__init__.py
101
102
103
def timestamp() -> str:
    """Return the date/time formatted as YR-MO-DA-HRMNSC. Ex: 2022-Jan-01-245959"""
    return time.strftime('%y-%m-%d-%H%M%S')

datestamp

datestamp(short: bool = False) -> str

Return the date/time formatted as Year-Mon-DA.

Parameters:

  • short (bool, default: False ) –

    Whether to return the short date

Returns: Ex: 2022-Jan-01 or 01-Jan-22 if short

Source code in symdesign/utils/__init__.py
106
107
108
109
110
111
112
113
114
115
116
117
def datestamp(short: bool = False) -> str:
    """Return the date/time formatted as Year-Mon-DA.

    Args:
        short: Whether to return the short date
    Returns:
        Ex: 2022-Jan-01 or 01-Jan-22 if short
    """
    if short:
        return time.strftime('%d-%b-%y')  # Desired PDB format
    else:
        return time.strftime('%Y-%b-%d')  # Preferred format

start_log

start_log(name: str = '', handler: int = 1, level: logging_level_literal = 2, location: AnyStr = os.getcwd(), propagate: bool = False, format_log: bool = True, no_log_name: bool = False, handler_level: logging_level_literal = None) -> Logger

Create a logger to handle program messages

Parameters:

  • name (str, default: '' ) –

    The name of the logger. By default, the root logger is returned

  • handler (int, default: 1 ) –

    Whether to handle to stream (1), a file (2), or a NullHandler (3+)

  • level (logging_level_literal, default: 2 ) –

    What level of messages to emit (1-debug, 2-info, 3-warning, 4-error, 5-critical)

  • location (AnyStr, default: getcwd() ) –

    If a FileHandler is used (handler=2) where should file be written? .log is appended to the filename

  • propagate (bool, default: False ) –

    Whether to propagate messages to parent loggers (such as root or parent.current_logger)

  • format_log (bool, default: True ) –

    Whether to format the log with logger specific formatting otherwise use message format

  • no_log_name (bool, default: False ) –

    Whether to omit the logger name from the output

  • handler_level (logging_level_literal, default: None ) –

    Whether to set the level for the logger handler on top of the overall level

Returns: Logger object to handle messages

Source code in symdesign/utils/__init__.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def start_log(name: str = '', handler: int = 1, level: logging_level_literal = 2, location: AnyStr = os.getcwd(),
              propagate: bool = False, format_log: bool = True, no_log_name: bool = False,
              handler_level: logging_level_literal = None) -> Logger:
    """Create a logger to handle program messages

    Args:
        name: The name of the logger. By default, the root logger is returned
        handler: Whether to handle to stream (1), a file (2), or a NullHandler (3+)
        level: What level of messages to emit (1-debug, 2-info, 3-warning, 4-error, 5-critical)
        location: If a FileHandler is used (handler=2) where should file be written? .log is appended to the filename
        propagate: Whether to propagate messages to parent loggers (such as root or parent.current_logger)
        format_log: Whether to format the log with logger specific formatting otherwise use message format
        no_log_name: Whether to omit the logger name from the output
        handler_level: Whether to set the level for the logger handler on top of the overall level
    Returns:
        Logger object to handle messages
    """
    _logger = getLogger(name)
    _logger.setLevel(log_levels[level])
    # Todo make a mechanism to only emit warning or higher if propagate=True
    #  See below this function for adding handler[0].addFilter()
    _logger.propagate = propagate
    if format_log:
        if no_log_name:
            message_fmt = '\033[38;5;208m{levelname}\033[0;0m: {message}'
        else:
            message_fmt = '\033[38;5;93m{name}\033[0;0m-\033[38;5;208m{levelname}\033[0;0m: {message}'
    else:
        message_fmt = '{message}'

    _handler = log_handler[handler]
    if handler == 2:
        # Check for extension. If one doesn't exist, add ".log"
        lh = _handler(f'{location}.log' if os.path.splitext(location)[1] == '' else location,
                      delay=True)
        # Set delay=True to prevent the log from opening until the first emit() is called
        # Remove any coloring from the log
        message_fmt = (message_fmt.replace('\033[38;5;208m', '')
                       .replace('\033[38;5;93m', '')
                       .replace('\033[0;0m', ''))
    else:
        # Check if a StreamHandler already exists
        remove_streams = []
        for idx, handler in enumerate(_logger.handlers):
            if getattr(handler, 'stream', None):
                remove_streams.append(idx)
        for stream_idx in reversed(remove_streams):
            _logger.handlers.pop(stream_idx)

        lh = _handler()

    if handler_level is not None:
        lh.setLevel(log_levels[handler_level])

    log_format = Formatter(fmt=message_fmt, style='{')
    lh.setFormatter(log_format)
    _logger.addHandler(lh)

    return _logger

set_logging_to_level

set_logging_to_level(level: logging_level_literal = None, handler_level: logging_level_literal = None)

For each Logger in current run time, set the Logger or the Logger.handlers level to level

level is debug by default if no arguments are specified

Parameters:

  • level (logging_level_literal, default: None ) –

    The level to set all loggers to

  • handler_level (logging_level_literal, default: None ) –

    The level to set all logger handlers to

Source code in symdesign/utils/__init__.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def set_logging_to_level(level: logging_level_literal = None, handler_level: logging_level_literal = None):
    """For each Logger in current run time, set the Logger or the Logger.handlers level to level

    level is debug by default if no arguments are specified

    Args:
        level: The level to set all loggers to
        handler_level: The level to set all logger handlers to
    """
    if level is not None:
        _level = log_levels[level]
        set_level_func = Logger.setLevel
    elif handler_level is not None:  # Todo possibly rework this to accept both arguments
        _level = log_levels[handler_level]

        def set_level_func(logger_: Logger, level_: int):
            for handler in logger_.handlers:
                handler.setLevel(level_)
    else:  # if level is None and handler_level is None:
        _level = log_levels[1]
        set_level_func = Logger.setLevel

    # print(root_logger.manager.loggerDict)
    for logger_name in root_logger.manager.loggerDict:
        _logger = getLogger(logger_name)
        set_level_func(_logger, _level)

set_loggers_to_propagate

set_loggers_to_propagate()

For each Logger in current run time, set the Logger to propagate

Source code in symdesign/utils/__init__.py
242
243
244
245
246
def set_loggers_to_propagate():
    """For each Logger in current run time, set the Logger to propagate"""
    for logger_name in root_logger.manager.loggerDict:
        _logger = getLogger(logger_name)
        _logger.propagate = True

pretty_format_table

pretty_format_table(data: Iterable[tuple | dict], justification: Sequence[str] = None, header: Sequence[str] = None, header_justification: Sequence[str] = None) -> list[str]

Present a table in readable format by sizing and justifying columns in a nested data structure i.e. [row1[column1, column2, ...], row2[], ...]

Parameters:

  • data (Iterable[tuple | dict]) –

    Where each successive element is a row and each row's sub-elements are unique columns. The typical data structure would be [[i, j, k], [yes, 4, 0.1], [no, 5, 0.3]]

  • justification (Sequence[str], default: None ) –

    Iterable with elements 'l'/'left', 'r'/'right', or 'c'/'center' as justification values

  • header (Sequence[str], default: None ) –

    The names of values to place in the table header

  • header_justification (Sequence[str], default: None ) –

    Iterable with elements 'l'/'left', 'r'/'right', or 'c'/'center' as justification values

Returns: The formatted data with each input row justified as an individual element in the list

Source code in symdesign/utils/__init__.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def pretty_format_table(data: Iterable[tuple | dict], justification: Sequence[str] = None, header: Sequence[str] = None,
                        header_justification: Sequence[str] = None) -> list[str]:
    """Present a table in readable format by sizing and justifying columns in a nested data structure
    i.e. [row1[column1, column2, ...], row2[], ...]

    Args:
        data: Where each successive element is a row and each row's sub-elements are unique columns.
            The typical data structure would be [[i, j, k], [yes, 4, 0.1], [no, 5, 0.3]]
        justification: Iterable with elements 'l'/'left', 'r'/'right', or 'c'/'center' as justification values
        header: The names of values to place in the table header
        header_justification: Iterable with elements 'l'/'left', 'r'/'right', or 'c'/'center' as justification values
    Returns:
        The formatted data with each input row justified as an individual element in the list
    """
    justification_d = {'l': str.ljust, 'r': str.rjust, 'c': str.center,
                       'left': str.ljust, 'right': str.rjust, 'center': str.center}
    # Incase data is passed as a dictionary, we should turn into an iterator of key, value
    if isinstance(data, dict):
        data = data.items()

    # Format data as list so we can insert header
    # data = [[column for column in row] for row in data]
    data = list(data)
    number_columns = len(data[0])
    if header is not None:
        if len(header) == number_columns:
            data.insert(0, header)  # list(header))
            if header_justification is None:
                header_justification = list(str.ljust for _ in range(number_columns))
            elif len(header_justification) == number_columns:
                header_justification = [justification_d.get(key.lower(), str.ljust) for key in header_justification]
            else:
                raise RuntimeError(
                    f"The header_justification length ({len(header_justification)}) doesn't match the number of columns"
                    f" ({number_columns})")
        else:
            raise RuntimeError(
                f"The header length ({len(header)}) doesn't match the number of columns ({number_columns})")

    column_widths = get_table_column_widths(data)
    # number_columns = len(column_widths)
    if not justification:
        justifications = list(str.ljust for _ in range(number_columns))
    elif len(justification) == number_columns:
        justifications = [justification_d.get(key.lower(), str.ljust) for key in justification]
    else:
        raise RuntimeError(
            f"The justification length ({len(justification)}) doesn't match the number of columns ({number_columns})")

    return [' '.join(header_justification[idx](column, column_widths[idx]) if row_idx == 0 and header is not None
                     else justifications[idx](column, column_widths[idx])
                     for idx, column in enumerate(map(str, row_entry)))
            for row_idx, row_entry in enumerate(data)]

get_table_column_widths

get_table_column_widths(data: Iterable) -> tuple[int]

Find the widths of each column in a nested data structure

Parameters:

  • data (Iterable) –

    Where each successive element is a row and each row's sub-elements are unique columns

Returns: A tuple containing the width of each column from the input data

Source code in symdesign/utils/__init__.py
304
305
306
307
308
309
310
311
312
def get_table_column_widths(data: Iterable) -> tuple[int]:
    """Find the widths of each column in a nested data structure

    Args:
        data: Where each successive element is a row and each row's sub-elements are unique columns
    Returns:
        A tuple containing the width of each column from the input data
    """
    return tuple(max(map(len, map(str, column))) for column in zip(*data))

read_json

read_json(file_name, **kwargs) -> dict | None

Use json.load to read an object from a file

Parameters:

  • file_name

    The location of the file to write

Returns: The json data in the file

Source code in symdesign/utils/__init__.py
315
316
317
318
319
320
321
322
323
324
325
326
def read_json(file_name, **kwargs) -> dict | None:
    """Use json.load to read an object from a file

    Args:
        file_name: The location of the file to write
    Returns:
        The json data in the file
    """
    with open(file_name, 'r') as f_save:
        data = json.load(f_save)

    return data

write_json

write_json(data: Any, file_name: AnyStr, **kwargs) -> AnyStr

Use json.dump to write an object to a file

Parameters:

  • data (Any) –

    The object to write

  • file_name (AnyStr) –

    The location of the file to write

Returns: The name of the written file

Source code in symdesign/utils/__init__.py
329
330
331
332
333
334
335
336
337
338
339
340
341
def write_json(data: Any, file_name: AnyStr, **kwargs) -> AnyStr:
    """Use json.dump to write an object to a file

    Args:
        data: The object to write
        file_name: The location of the file to write
    Returns:
        The name of the written file
    """
    with open(file_name, 'w') as f_save:
        json.dump(data, f_save, **kwargs)

    return file_name

unpickle

unpickle(file_name: AnyStr) -> Any

Unpickle (deserialize) and return a python object located at filename

Source code in symdesign/utils/__init__.py
345
346
347
348
349
350
351
352
353
354
355
356
def unpickle(file_name: AnyStr) -> Any:  # , protocol=pickle.HIGHEST_PROTOCOL):
    """Unpickle (deserialize) and return a python object located at filename"""
    if '.pkl' not in file_name and '.pickle' not in file_name:
        file_name = '%s.pkl' % file_name
    try:
        with open(file_name, 'rb') as serial_f:
            new_object = pickle.load(serial_f)
    except EOFError as ex:
        raise InputError(
            f"The serialized file '{file_name}' contains no data.")

    return new_object

pickle_object

pickle_object(target_object: Any, name: str = None, out_path: AnyStr = os.getcwd(), protocol: int = pickle.HIGHEST_PROTOCOL) -> AnyStr

Pickle (serialize) an object into a file named "out_path/name.pkl". Automatically adds extension

Parameters:

  • target_object (Any) –

    Any python object

  • name (str, default: None ) –

    The name of the pickled file

  • out_path (AnyStr, default: getcwd() ) –

    Where the file should be written

  • protocol (int, default: HIGHEST_PROTOCOL ) –

    The pickling protocol to use

Returns: The pickled filename

Source code in symdesign/utils/__init__.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def pickle_object(target_object: Any, name: str = None, out_path: AnyStr = os.getcwd(),
                  protocol: int = pickle.HIGHEST_PROTOCOL) -> AnyStr:
    """Pickle (serialize) an object into a file named "out_path/name.pkl". Automatically adds extension

    Args:
        target_object: Any python object
        name: The name of the pickled file
        out_path: Where the file should be written
        protocol: The pickling protocol to use
    Returns:
        The pickled filename
    """
    if name is None:
        file_name = out_path
    else:
        file_name = os.path.join(out_path, name)

    if not file_name.endswith('.pkl'):
        file_name = f'{file_name}.pkl'

    with open(file_name, 'wb') as f:
        pickle.dump(target_object, f, protocol)

    return file_name

remove_interior_keys

remove_interior_keys(dictionary: dict, keys: Iterable, keep: bool = False) -> dict[Any, dict[Any, Any]]

Clean specified keys from a dictionaries internal dictionary. Default removes the specified keys

Parameters:

  • dictionary (dict) –

    {outer_dictionary: {key: value, key2: value2, ...}, ...}

  • keys (Iterable) –

    Keys to be removed from dictionary, such as [key2, key10]

  • keep (bool, default: False ) –

    Whether to keep (True) or remove (False) specified keys

Returns: {outer_dictionary: {key: value, ...}, ...} - Cleaned dictionary

Source code in symdesign/utils/__init__.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
def remove_interior_keys(dictionary: dict, keys: Iterable, keep: bool = False) -> dict[Any, dict[Any, Any]]:
    """Clean specified keys from a dictionaries internal dictionary. Default removes the specified keys

    Args:
        dictionary: {outer_dictionary: {key: value, key2: value2, ...}, ...}
        keys: Keys to be removed from dictionary, such as [key2, key10]
        keep: Whether to keep (True) or remove (False) specified keys
    Returns:
        {outer_dictionary: {key: value, ...}, ...} - Cleaned dictionary
    """
    if keep:
        return {entry: {key: dictionary[entry][key] for key in dictionary[entry] if key in keys}
                for entry in dictionary}
    else:
        for entry in dictionary:
            for key in keys:
                dictionary[entry].pop(key, None)

        return dictionary

clean_comma_separated_string

clean_comma_separated_string(s: str) -> list[str]

Return a list from a comma separated string

Source code in symdesign/utils/__init__.py
444
445
446
def clean_comma_separated_string(s: str) -> list[str]:
    """Return a list from a comma separated string"""
    return list(map(str.strip, s.strip().split(',')))

format_index_string

format_index_string(index_string: str) -> list[int]

From a string with indices of interest, comma separated or in a range, format into individual, integer indices

Parameters:

  • index_string (str) –

    23, 34,35,56-89, 290

Returns: Indices in Pose formatting

Source code in symdesign/utils/__init__.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
def format_index_string(index_string: str) -> list[int]:
    """From a string with indices of interest, comma separated or in a range, format into individual, integer indices

    Args:
        index_string: 23, 34,35,56-89, 290
    Returns:
        Indices in Pose formatting
    """
    final_index = []
    for index in clean_comma_separated_string(index_string):
        if '-' in index:  # This is a range, extract ranges
            try:
                low, high = index.split('-')
            except ValueError:  # Too many values to unpack
                raise InputError(
                    f"Couldn't coerce the range '{index}' to a compatible range. Use the format 1-4 to specify the "
                    f"index consisting of 1,2,3,4")
            try:
                final_index.extend([idx for idx in range(int(low), int(high) + 1)])  # Include the last integer in range
            except ValueError:
                raise InputError(f"Couldn't coerce the input '{index}' to a compatible range({low}, {high})")
        else:  # Single integer
            final_index.append(int(index))

    return final_index

write_file

write_file(data: Iterable, file_name: AnyStr = None) -> AnyStr

Take an iterable and either output to user, write to a file, or both. User defined choice

Parameters:

  • data (Iterable) –

    The data to write to file

  • file_name (AnyStr, default: None ) –

    The name of the file to write to

Returns: The name of the output file

Source code in symdesign/utils/__init__.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def write_file(data: Iterable, file_name: AnyStr = None) -> AnyStr:
    """Take an iterable and either output to user, write to a file, or both. User defined choice

    Args:
        data: The data to write to file
        file_name: The name of the file to write to
    Returns:
        The name of the output file
    """
    if not file_name:
        file_name = os.path.join(os.getcwd(), input('What is your desired filename? (appended to current working '
                                                    f'directory){query.input_string}'))
    with open(file_name, 'w') as f:
        f.write('%s\n' % '\n'.join(map(str, data)))

    return file_name

io_save

io_save(data: Iterable, file_name: AnyStr = None) -> AnyStr

Take an iterable and either output to user, write to a file, or both. User defined choice

Parameters:

  • data (Iterable) –

    The data to write to file

  • file_name (AnyStr, default: None ) –

    The name of the file to write to

Returns: The name of the output file

Source code in symdesign/utils/__init__.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def io_save(data: Iterable, file_name: AnyStr = None) -> AnyStr:
    """Take an iterable and either output to user, write to a file, or both. User defined choice

    Args:
        data: The data to write to file
        file_name: The name of the file to write to
    Returns:
        The name of the output file
    """
    io_prompt = f"Enter 'P' to print Data, 'W' to write Data to file, or 'B' for both{query.input_string}"
    response = ['W', 'P', 'B', 'w', 'p', 'b']
    _input = query.validate_input(io_prompt, response=response).lower()

    if _input in 'bp':
        logger.info('%s\n' % '\n'.join(map(str, data)))

    if _input in 'wb':
        write_file(data, file_name)

    return file_name

to_iterable

to_iterable(obj: AnyStr | list, ensure_file: bool = False, skip_comma: bool = False) -> list[str]

Take an object and return a list of individual objects splitting on newline or comma

Parameters:

  • obj (AnyStr | list) –

    The object to convert to an Iterable

  • ensure_file (bool, default: False ) –

    Whether to ensure the passed obj is a file

  • skip_comma (bool, default: False ) –

    Whether to skip commas when converting the records to an iterable

Returns: The Iterable formed from the input obj

Source code in symdesign/utils/__init__.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
def to_iterable(obj: AnyStr | list, ensure_file: bool = False, skip_comma: bool = False) -> list[str]:
    """Take an object and return a list of individual objects splitting on newline or comma

    Args:
        obj: The object to convert to an Iterable
        ensure_file: Whether to ensure the passed obj is a file
        skip_comma: Whether to skip commas when converting the records to an iterable
    Returns:
        The Iterable formed from the input obj
    """
    try:
        with open(obj, 'r') as f:
            iterable = f.readlines()
    except (FileNotFoundError, TypeError) as error:
        if isinstance(error, FileNotFoundError) and ensure_file:
            raise error
        if isinstance(obj, list):
            iterable = obj
        else:  # Assume that obj is a string
            iterable = [obj]

    clean_list = []
    for item in iterable:
        if skip_comma:
            it_list = [item]
        else:
            it_list = item.split(',')
        clean_list.extend(map(str.strip, it_list))

    # # Remove duplicates but keep the order
    # clean_list = remove_duplicates(clean_list)
    try:
        clean_list.pop(clean_list.index(''))  # Remove any missing values
    except ValueError:
        pass
    return clean_list

remove_duplicates

remove_duplicates(iter_: Iterable[Any]) -> list[Any]

An efficient, order maintaining, set function to remove duplicates

Source code in symdesign/utils/__init__.py
558
559
560
561
562
def remove_duplicates(iter_: Iterable[Any]) -> list[Any]:
    """An efficient, order maintaining, set function to remove duplicates"""
    seen = set()
    seen_add = seen.add
    return [x for x in iter_ if not (x in seen or seen_add(x))]

calculate_mp_cores

calculate_mp_cores(cores: int = None, mpi: bool = False, jobs: int = None) -> int

Calculate the number of multiprocessing cores to use for a specific application, taking the minimum

Default options specify to leave at least one CPU available for the machine. If a SLURM environment is used, the number of cores will reflect the environmental variable SLURM_CPUS_PER_TASK Args: cores: How many cpu's to use mpi: If commands use MPI jobs: How many jobs to use Returns: The number of cores to use taking the minimum of cores, jobs, and max cpus available

Source code in symdesign/utils/__init__.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
def calculate_mp_cores(cores: int = None, mpi: bool = False, jobs: int = None) -> int:
    """Calculate the number of multiprocessing cores to use for a specific application, taking the minimum

    Default options specify to leave at least one CPU available for the machine. If a SLURM environment is used,
    the number of cores will reflect the environmental variable SLURM_CPUS_PER_TASK
    Args:
        cores: How many cpu's to use
        mpi: If commands use MPI
        jobs: How many jobs to use
    Returns:
        The number of cores to use taking the minimum of cores, jobs, and max cpus available
    """
    allocated_cpus = os.environ.get('SLURM_CPUS_PER_TASK')
    if allocated_cpus:  # Should follow allocation from SLURM environment
        max_cpus_to_use = int(allocated_cpus)
    else:  # logical=False only uses physical cpus, not logical threads
        max_cpus_to_use = psutil.cpu_count(logical=False) - 1  # Leave CPU available for computer

    if cores or jobs:
        # Take the minimum
        infinity = float('inf')
        return min((cores or infinity), (jobs or infinity), max_cpus_to_use)

    if mpi:  # Todo grab an environmental variable for mpi cores?
        return int(max_cpus_to_use / 6)  # distribute.mpi)
    else:
        return max_cpus_to_use

set_worker_affinity

set_worker_affinity()

When a new worker process is created, use this initialization function to set the affinity for all CPUs. Especially important for multiprocessing in the context of numpy, scipy, pandas FROM Stack Overflow: https://stackoverflow.com/questions/15639779/why-does-multiprocessing-use-only-a-single-core-after-i-import-numpy

http://manpages.ubuntu.com/manpages/precise/en/man1/taskset.1.html

-p is a mask for the logical cpu processors to use, the pid allows the affinity for an existing process to be specified instead of a new process being spawned

Source code in symdesign/utils/__init__.py
669
670
671
672
673
674
675
676
677
678
679
680
681
682
def set_worker_affinity():
    """When a new worker process is created, use this initialization function to set the affinity for all CPUs.
    Especially important for multiprocessing in the context of numpy, scipy, pandas
    FROM Stack Overflow:
    https://stackoverflow.com/questions/15639779/why-does-multiprocessing-use-only-a-single-core-after-i-import-numpy

    See: http://manpages.ubuntu.com/manpages/precise/en/man1/taskset.1.html
        -p is a mask for the logical cpu processors to use, the pid allows the affinity for an existing process to be
        specified instead of a new process being spawned
    """
    _cmd = ['taskset', '-p', f'0x{"f" * int((psutil.cpu_count() / 4))}', str(os.getpid())]
    logger.debug(subprocess.list2cmdline(_cmd))
    p = subprocess.Popen(_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
    p.communicate()

mp_map

mp_map(function: Callable, arg: Iterable, processes: int = 1, context: str = 'spawn') -> list[Any]

Maps an interable input with a single argument to a function using multiprocessing Pool

Parameters:

  • function (Callable) –

    Which function should be executed

  • arg (Iterable) –

    Arguments to be unpacked in the defined function, order specific

  • processes (int, default: 1 ) –

    How many workers/cores should be spawned to handle function(arguments)?

  • context (str, default: 'spawn' ) –

    How to start new processes? One of 'spawn', 'fork', or 'forkserver'.

Returns: The results produced from the function and arg

Source code in symdesign/utils/__init__.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
def mp_map(function: Callable, arg: Iterable, processes: int = 1, context: str = 'spawn') -> list[Any]:
    """Maps an interable input with a single argument to a function using multiprocessing Pool

    Args:
        function: Which function should be executed
        arg: Arguments to be unpacked in the defined function, order specific
        processes: How many workers/cores should be spawned to handle function(arguments)?
        context: How to start new processes? One of 'spawn', 'fork', or 'forkserver'.
    Returns:
        The results produced from the function and arg
    """
    # with mp.get_context(context).Pool(processes=processes) as p:  # , maxtasksperchild=100
    with mp.get_context(context).Pool(processes=processes, initializer=set_worker_affinity) as p:
        results = p.map(function, arg)

    return results

mp_starmap

mp_starmap(function: Callable, star_args: Iterable[tuple], processes: int = 1, context: str = 'spawn') -> list[Any]

Maps an iterable input with multiple arguments to a function using multiprocessing Pool

Parameters:

  • function (Callable) –

    Which function should be executed

  • star_args (Iterable[tuple]) –

    Arguments to be unpacked in the defined function, order specific

  • processes (int, default: 1 ) –

    How many workers/cores should be spawned to handle function(arguments)?

  • context (str, default: 'spawn' ) –

    How to start new processes? One of 'spawn', 'fork', or 'forkserver'.

Returns: The results produced from the function and star_args

Source code in symdesign/utils/__init__.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
def mp_starmap(function: Callable, star_args: Iterable[tuple], processes: int = 1, context: str = 'spawn') -> list[Any]:
    """Maps an iterable input with multiple arguments to a function using multiprocessing Pool

    Args:
        function: Which function should be executed
        star_args: Arguments to be unpacked in the defined function, order specific
        processes: How many workers/cores should be spawned to handle function(arguments)?
        context: How to start new processes? One of 'spawn', 'fork', or 'forkserver'.
    Returns:
        The results produced from the function and star_args
    """
    # with mp.get_context(context).Pool(processes=processes) as p:  # , maxtasksperchild=100
    with mp.get_context(context).Pool(processes=processes, initializer=set_worker_affinity) as p:
        results = p.starmap(function, star_args)

    return results

bytes2human

bytes2human(number: int, return_format: str = '{:.1f} {}') -> str

Convert bytes to a human-readable format

See: http://goo.gl/zeJZl

bytes2human(10000) '9.8 K' bytes2human(100001221) '95.4 M'

Parameters:

  • number (int) –

    The number of bytes

  • return_format (str, default: '{:.1f} {}' ) –

    The desired return format with '{}'.format() compatibility

Returns: The human-readable expression of bytes from a number of bytes

Source code in symdesign/utils/__init__.py
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
def bytes2human(number: int, return_format: str = "{:.1f} {}") -> str:
    """Convert bytes to a human-readable format

    See: http://goo.gl/zeJZl
    >>> bytes2human(10000)
    '9.8 K'
    >>> bytes2human(100001221)
    '95.4 M'

    Args:
        number: The number of bytes
        return_format: The desired return format with '{}'.format() compatibility
    Returns:
        The human-readable expression of bytes from a number of bytes
    """
    symbols = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {symbol: 1 << idx * 10 for idx, symbol in enumerate(symbols)}

    for symbol, symbol_number in reversed(prefix.items()):
        if number >= symbol_number:
            value = number / symbol_number
            break
    else:  # Smaller than the smallest
        symbol = symbols[0]
        value = number
    return return_format.format(value, symbol)

human2bytes

human2bytes(human_byte_str: AnyStr) -> int

Convert human-readable bytes to a numeric format

See: http://goo.gl/zeJZl

human2bytes('0 B') 0 human2bytes('1 K') 1024 human2bytes('1 M') 1048576 human2bytes('1 Gi') 1073741824 human2bytes('1 tera') 1099511627776 human2bytes('0.5kilo') 512 human2bytes('0.1 byte') 0 human2bytes('1 k') # k is an alias for K 1024 human2bytes('12 foo')

Returns: The number of bytes from a human-readable expression of bytes

Source code in symdesign/utils/__init__.py
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
def human2bytes(human_byte_str: AnyStr) -> int:
    """Convert human-readable bytes to a numeric format

    See: http://goo.gl/zeJZl
    >>> human2bytes('0 B')
    0
    >>> human2bytes('1 K')
    1024
    >>> human2bytes('1 M')
    1048576
    >>> human2bytes('1 Gi')
    1073741824
    >>> human2bytes('1 tera')
    1099511627776
    >>> human2bytes('0.5kilo')
    512
    >>> human2bytes('0.1  byte')
    0
    >>> human2bytes('1 k')  # k is an alias for K
    1024
    >>> human2bytes('12 foo')

    Raises:
        ValueError if input can't be parsed
    Returns:
        The number of bytes from a human-readable expression of bytes
    """
    # Find the scale prefix/abbreviation
    letter = human_byte_str.translate(remove_digit_table).replace('.', '').replace(' ', '')
    for name, symbol_set in SYMBOLS.items():
        if letter in symbol_set:
            break
    else:
        # if letter == 'k':
        #     # treat 'k' as an alias for 'K' as per: http://goo.gl/kTQMs
        #     sset = SYMBOLS['customary']
        #     letter = letter.upper()
        # else:
        raise ValueError(f"{human2bytes.__name__}: Can't interpret {human_byte_str}")

    # Find the size value
    number = human_byte_str.strip(letter).strip()
    try:
        number = float(number)
    except ValueError:
        raise ValueError(f"{human2bytes.__name__}: Can't interpret {human_byte_str}")
    else:
        # Convert to numeric bytes
        letter_index = symbol_set.index(letter)
        return int(number * (1 << letter_index * 10))

get_available_memory

get_available_memory(human_readable: bool = False, gpu: bool = False) -> int

Parameters:

  • human_readable (bool, default: False ) –

    Whether the return value should be human-readable

  • gpu (bool, default: False ) –

    Whether a GPU should be used

Returns: The available memory (in bytes) depending on the compute environment

Source code in symdesign/utils/__init__.py
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
def get_available_memory(human_readable: bool = False, gpu: bool = False) -> int:
    """

    Args:
        human_readable: Whether the return value should be human-readable
        gpu: Whether a GPU should be used
    Returns:
        The available memory (in bytes) depending on the compute environment
    """
    # Check if job is allocated by SLURM
    if 'SLURM_JOB_ID' in os.environ:
        jobid = os.environ['SLURM_JOB_ID']  # SLURM_JOB_ID
        # array_jobid = os.environ.get('SLURM_ARRAY_TASK_ID')
        # if array_jobid:
        #     jobid = f'{jobid}_{array_jobid}'  # SLURM_ARRAY_TASK_ID
        if 'SLURM_ARRAY_TASK_ID' in os.environ:
            jobid = f'{jobid}_{os.environ["SLURM_ARRAY_TASK_ID"]}'  # SLURM_ARRAY_TASK_ID
            logger.debug(f'The job is managed by SLURM with SLURM_ARRAY_TASK_ID={jobid}')
        else:
            logger.debug(f'The job is managed by SLURM with SLURM_JOB_ID={jobid}')

        # Run the command 'scontrol show job {jobid}'
        p = subprocess.Popen(['scontrol', 'show', 'job', jobid], stdout=subprocess.PIPE)
        out, err = p.communicate()
        out = out.decode('UTF-8')
        """ When --mem-per-cpu=20G, searching for the line
        MinCPUsNode=1 MinMemoryCPU=210000M MinTmpDiskNode=0
        Features=(null) DelayBoot=00:00:00
        """
        """ OR when --mem=20G, searching for the line
        MinMemoryNode = 20G
        """
        """ Additionally, the line with 
        TRES=cpu=1,mem=20G,node=1,billing=1
        Is the same with either submission
        """
        start_index = out.find('MinMemoryCPU=') + 13  # <- 13 is length of search string
        """
        Since default value is in M (MB), memory shouldn't be more than ~1000000 (1000 GB RAM?!)
        Use plus 10 characters to parse. Value could be 50 I suppose and the split will get this variable only...
        """
        # try:
        memory_allocated = out[start_index:start_index + 10].split()[0]
        # except IndexError:
        #     print(out)
        #     print(f"start_index where 'MinMemoryCPU=' '=' was found: {start_index}")
        logger.debug(f'Found memory allocated: {memory_allocated}')
        # memory_available = psutil.virtual_memory().available
        # logger.debug(f'Found memory available: {bytes2human(memory_available)}')
        process = psutil.Process()
        memory_used = process.memory_info().rss
        logger.debug(f'Found memory used: {bytes2human(memory_used)}')
        try:
            memory_constraint = human2bytes(memory_allocated) - memory_used
        except ValueError:
            logger.critical(f"Found the scontrol out: {out}")
            raise
    else:
        memory_constraint = psutil.virtual_memory().available

    if human_readable:
        memory_constraint = bytes2human(memory_constraint)

    return memory_constraint

get_base_root_paths_recursively

get_base_root_paths_recursively(directory: AnyStr, sort: bool = True) -> list[AnyStr]

Retrieve the bottom most directories recursively from a root directory

Parameters:

  • directory (AnyStr) –

    The root directory of interest

  • sort (bool, default: True ) –

    Whether the files should be filtered by name before returning

Returns: The list of directories matching the search

Source code in symdesign/utils/__init__.py
898
899
900
901
902
903
904
905
906
907
908
def get_base_root_paths_recursively(directory: AnyStr, sort: bool = True) -> list[AnyStr]:
    """Retrieve the bottom most directories recursively from a root directory

    Args:
        directory: The root directory of interest
        sort: Whether the files should be filtered by name before returning
    Returns:
        The list of directories matching the search
    """
    file_generator = (os.path.abspath(root) for root, dirs, files in os.walk(directory) if not dirs)
    return sorted(file_generator) if sort else list(file_generator)

get_file_paths_recursively

get_file_paths_recursively(directory: AnyStr, extension: str = None, sort: bool = True) -> list[AnyStr]

Retrieve files recursively from a directory

Parameters:

  • directory (AnyStr) –

    The directory of interest

  • extension (str, default: None ) –

    A extension to filter by

  • sort (bool, default: True ) –

    Whether the files should be filtered by name before returning

Returns: The list of files matching the search

Source code in symdesign/utils/__init__.py
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
def get_file_paths_recursively(directory: AnyStr, extension: str = None, sort: bool = True) -> list[AnyStr]:
    """Retrieve files recursively from a directory

    Args:
        directory: The directory of interest
        extension: A extension to filter by
        sort: Whether the files should be filtered by name before returning
    Returns:
        The list of files matching the search
    """
    if extension is not None:
        file_generator = (os.path.join(os.path.abspath(root), file)
                          for root, dirs, files in os.walk(directory, followlinks=True) for file in files
                          if extension in file)
    else:
        file_generator = (os.path.join(os.path.abspath(root), file)
                          for root, dirs, files in os.walk(directory, followlinks=True) for file in files)

    return sorted(file_generator) if sort else list(file_generator)

get_directory_file_paths

get_directory_file_paths(directory: AnyStr, suffix: str = '', extension: str = '', sort: bool = True) -> list[AnyStr]

Return all files in a directory with specified extensions and suffixes

Parameters:

  • directory (AnyStr) –

    The directory of interest

  • suffix (str, default: '' ) –

    A string to match before the extension. A glob pattern is built as follows "suffixextension" ex: suffix="model" matches "design_model.pdb" and "model1.pdb"

  • extension (str, default: '' ) –

    A extension to filter by. Include the "." if there is one

  • sort (bool, default: True ) –

    Whether the files should be filtered by name before returning

Returns: The list of files matching the search

Source code in symdesign/utils/__init__.py
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
def get_directory_file_paths(directory: AnyStr, suffix: str = '', extension: str = '', sort: bool = True) -> \
        list[AnyStr]:
    """Return all files in a directory with specified extensions and suffixes

    Args:
        directory: The directory of interest
        suffix: A string to match before the extension. A glob pattern is built as follows "*suffix*extension"
            ex: suffix="model" matches "design_model.pdb" and "model1.pdb"
        extension: A extension to filter by. Include the "." if there is one
        sort: Whether the files should be filtered by name before returning
    Returns:
        The list of files matching the search
    """
    directory = os.path.abspath(directory)
    file_generator = (file for file in glob(os.path.join(directory, f'*{suffix}*{extension}')))

    return sorted(file_generator) if sort else list(file_generator)

collect_nanohedra_designs

collect_nanohedra_designs(files: Sequence = None, directory: str = None, dock: bool = False) -> tuple[list[AnyStr], str]

Grab all poses from a Nanohedra directory via a file or a directory

Parameters:

  • files (Sequence, default: None ) –

    Iterable with disk location of files containing design directories

  • directory (str, default: None ) –

    Disk location of the program directory

  • dock (bool, default: False ) –

    Whether the designs are in current docking run

Returns: The absolute paths to Nanohedra output directories for all pose directories found

Source code in symdesign/utils/__init__.py
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
def collect_nanohedra_designs(files: Sequence = None, directory: str = None, dock: bool = False) -> \
        tuple[list[AnyStr], str]:
    """Grab all poses from a Nanohedra directory via a file or a directory

    Args:
        files: Iterable with disk location of files containing design directories
        directory: Disk location of the program directory
        dock: Whether the designs are in current docking run
    Returns:
        The absolute paths to Nanohedra output directories for all pose directories found
    """
    if files:
        all_paths = []
        for file in files:
            if not os.path.exists(file):
                logger.critical(f'No "{file}" file found! Please ensure correct location/name!')
                sys.exit(1)
            if '.pdb' in file:  # single .pdb files were passed as input and should be loaded as such
                all_paths.append(file)
            else:  # assume a file that specifies individual designs was passed and load all design names in that file
                try:
                    with open(file, 'r') as f:
                        # only strip the trailing 'os.sep' in case file names are passed
                        paths = map(str.rstrip, [location.strip() for location in f.readlines()
                                                 if location.strip() != ''], repeat(os.sep))
                except IsADirectoryError:
                    raise InputError(f'{file} is a directory not a file. Did you mean to run with --directory?')
                all_paths.extend(paths)
    elif directory:
        if dock:
            all_paths = get_docked_directories(directory)
        else:
            base_directories = get_base_nanohedra_dirs(directory)
            all_paths = []
            for base in base_directories:  # Todo we shouldn't allow multiple, it complicates SymEntry matching
                all_paths.extend(get_docked_dirs_from_base(base))
    else:  # this shouldn't happen
        all_paths = []
    location = (files or directory)

    return sorted(set(all_paths)), location if isinstance(location, str) else location[0]

get_base_nanohedra_dirs

get_base_nanohedra_dirs(base_dir)

Find all master directories corresponding to the highest output level of Nanohedra.py outputs. This corresponds to the PoseJob symmetry attribute

Source code in symdesign/utils/__init__.py
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
def get_base_nanohedra_dirs(base_dir):
    """Find all master directories corresponding to the highest output level of Nanohedra.py outputs. This corresponds
    to the PoseJob symmetry attribute
    """
    nanohedra_dirs = []
    for root, dirs, files in os.walk(base_dir, followlinks=True):
        if putils.master_log in files:
            nanohedra_dirs.append(root)
            del dirs[:]

    return nanohedra_dirs

get_docked_directories

get_docked_directories(base_directory, directory_type='NanohedraEntry')

Useful for when your docked directory is basically known but the

Source code in symdesign/utils/__init__.py
1007
1008
1009
1010
def get_docked_directories(base_directory, directory_type='NanohedraEntry'):  # '*DockedPoses'
    """Useful for when your docked directory is basically known but the """
    return [os.path.join(root, _dir) for root, dirs, files in os.walk(base_directory) for _dir in dirs
            if directory_type in _dir]

get_docked_dirs_from_base

get_docked_dirs_from_base(base: str) -> list[AnyStr]

Find every Nanohedra output base directory where each of the poses and files is contained

Parameters:

  • base (str) –

    The base of the filepath corresponding to the Nanohedra master output directory

Returns:

  • list[AnyStr]

    The absolute path to every directory containing Nanohedra output

Source code in symdesign/utils/__init__.py
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
def get_docked_dirs_from_base(base: str) -> list[AnyStr]:
    """Find every Nanohedra output base directory where each of the poses and files is contained

    Args:
        base: The base of the filepath corresponding to the Nanohedra master output directory

    Returns:
        The absolute path to every directory containing Nanohedra output
    """
    # base/building_blocks/degen/rot/tx/'
    # abspath removes trailing separator as well
    return sorted(set(map(os.path.abspath, glob(f'{base}{f"{os.sep}*" * 4}{os.sep}'))))

collect_designs

collect_designs(files: Sequence = None, directory: AnyStr = None, projects: Sequence = None, singles: Sequence = None) -> tuple[list, str]

Grab all poses from an input source

Parameters:

  • files (Sequence, default: None ) –

    Iterable with disk location of files containing design directories

  • directory (AnyStr, default: None ) –

    Disk location of the program directory

  • projects (Sequence, default: None ) –

    Disk location of a project directory

  • singles (Sequence, default: None ) –

    Disk location of a single design directory

Returns: All pose directories found, the location where they are located

Source code in symdesign/utils/__init__.py
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
def collect_designs(files: Sequence = None, directory: AnyStr = None, projects: Sequence = None,
                    singles: Sequence = None) -> tuple[list, str]:
    """Grab all poses from an input source

    Args:
        files: Iterable with disk location of files containing design directories
        directory: Disk location of the program directory
        projects: Disk location of a project directory
        singles: Disk location of a single design directory
    Returns:
        All pose directories found, the location where they are located
    """
    if files:
        all_paths = []
        for file in files:
            if not os.path.exists(file):
                logger.critical(f"No '{file}' file found. Please ensure correct location/name")
                sys.exit(1)
            if '.pdb' in file:  # Single .pdb file passed as input
                all_paths.append(file)
            elif '.cif' in file:  # Single .cif file passed as input
                all_paths.append(file)
            else:  # Assume a file that specifies individual designs was passed and load all design names in that file
                try:
                    with open(file, 'r') as f:
                        # only strip the trailing 'os.sep' in case file names are passed
                        paths = map(str.rstrip, [location.strip() for location in f.readlines()
                                                 if location.strip() != ''], repeat(os.sep))
                except IsADirectoryError:
                    raise IsADirectoryError(
                        f"'{file}' is a directory not a file. Did you mean to run with --file?'")
                all_paths.extend(paths)
    else:
        base_directory = get_program_root_directory(directory)
        # return all design directories within:
        #  base directory -> /base/Projects/project1, ... /base/Projects/projectN
        #  specified projects -> /base/Projects/project1, /base/Projects/project2, ...
        #  specified singles -> /base/Projects/project/design1, /base/Projects/project/design2, ...
        if base_directory or projects or singles:
            all_paths = get_program_directories(base=base_directory, projects=projects, singles=singles)
        elif directory:  # This is probably an uninitialized project. Grab all .pdb files
            all_paths = get_directory_file_paths(directory, extension='.pdb')
            directory = os.path.basename(directory)  # This is for the location variable return
        else:  # Function was called with all set to None. This shouldn't happen
            raise ValueError(
                f"Can't {collect_designs.__name__}() with no arguments passed")

    location = (files or directory or projects or singles)

    return sorted(set(all_paths)), location  # if isinstance(location, str) else location[0]  # Grab first index

get_program_root_directory

get_program_root_directory(search_path: str = None) -> AnyStr | None

Find the program_output variable in the specified path and return the path to it

Parameters:

  • search_path (str, default: None ) –

    The path to search

Returns: The absolute path of the identified program root

Source code in symdesign/utils/__init__.py
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
def get_program_root_directory(search_path: str = None) -> AnyStr | None:
    """Find the program_output variable in the specified path and return the path to it

    Args:
        search_path: The path to search
    Returns:
        The absolute path of the identified program root
    """
    root_directory = None
    if search_path is not None:
        # Search for the program_output name in the provided path
        search_path = os.path.abspath(search_path)
        if putils.program_output in search_path:   # directory1/program_output/directory2/directory3
            # Return the path to that directory
            for idx, dirname in enumerate(search_path.split(os.sep), 1):
                if dirname == putils.program_output:
                    root_directory = f'{os.sep}{os.path.join(*search_path.split(os.sep)[:idx])}'
                    break
            else:
                raise InputError(
                    f'{putils.program_output} is missing in search_path. This should never happen')
        else:  # See if program_output is a child of the provided search_path
            try:
                all_files = os.listdir(search_path)
            except (FileNotFoundError, NotADirectoryError):
                all_files = []
            if putils.program_output in all_files:  # directory_provided/program_output
                for sub_directory in all_files:
                    if sub_directory == putils.program_output:
                        root_directory = os.path.join(search_path, sub_directory)
                        break
                else:
                    raise InputError(
                        f'{putils.program_output} is missing in all_files. This should never happen')

    return root_directory

get_program_directories

get_program_directories(base: str = None, projects: Iterable = None, singles: Iterable = None) -> Generator[AnyStr, None, None]

Return the specific design directories from the specified hierarchy with the format /base(program_output)/Projects/project/design

Source code in symdesign/utils/__init__.py
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
def get_program_directories(base: str = None, projects: Iterable = None, singles: Iterable = None) \
        -> Generator[AnyStr, None, None]:
    """Return the specific design directories from the specified hierarchy with the format
    /base(program_output)/Projects/project/design
    """
    paths = []
    if base:
        paths.extend(glob(f'{base}{os.sep}{putils.projects}{os.sep}*{os.sep}*{os.sep}'))  # base/Projects/*/*/
    if projects:
        for project in projects:
            paths.extend(glob(f'{project}{os.sep}*{os.sep}'))  # base/Projects/project/*/
    if singles:
        for single, extension in map(os.path.splitext, singles):  # Remove extensions
            paths.extend(glob(f'{single}{os.sep}'))  # base/Projects/project/single/
    return map(os.path.abspath, paths)

all_vs_all

all_vs_all(iterable: Iterable, func: Callable, symmetrize: bool = True) -> ndarray

Calculate an all versus all comparison using a defined function. Matrix is symmetrized by default

Parameters:

  • iterable (Iterable) –

    Dictionary or array like object

  • func (Callable) –

    Function to calculate different iterations of the iterable

  • symmetrize (bool, default: True ) –

    Whether to make the resulting matrix symmetric

Returns: Matrix with resulting calculations

Source code in symdesign/utils/__init__.py
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
def all_vs_all(iterable: Iterable, func: Callable, symmetrize: bool = True) -> np.ndarray:
    """Calculate an all versus all comparison using a defined function. Matrix is symmetrized by default

    Args:
        iterable: Dictionary or array like object
        func: Function to calculate different iterations of the iterable
        symmetrize: Whether to make the resulting matrix symmetric
    Returns:
        Matrix with resulting calculations
    """
    if isinstance(iterable, dict):
        # func(iterable[obj1], iterable[obj2])
        _dict = iterable
    else:
        _dict = None

    pairwise = np.zeros((len(iterable), (len(iterable))))
    for i, obj1 in enumerate(iterable[:-1]):
        j = i+1
        for j, obj2 in enumerate(iterable[j:], j):
            # if type(iterable) == dict:  # _dict
            pairwise[i][j] = func(obj1, obj2, d=_dict)
            # pairwise[i][j] = func(obj1, obj2, iterable, d=_dict)
            # else:
            #     pairwise[i][j] = func(obj1, obj2, iterable, d=_dict)

    if symmetrize:
        return sym(pairwise)
    else:
        return pairwise

sym

sym(a: ndarray) -> ndarray

Symmetrize a numpy array. i.e. if a_ij = 0, then the returned array is such that a_ji = a_ij

Parameters:

  • a (ndarray) –

    A 2D square array

Returns: Symmetrized array

Source code in symdesign/utils/__init__.py
1340
1341
1342
1343
1344
1345
1346
1347
1348
def sym(a: np.ndarray) -> np.ndarray:
    """Symmetrize a numpy array. i.e. if a_ij = 0, then the returned array is such that a_ji = a_ij

    Args:
        a: A 2D square array
    Returns:
        Symmetrized array
    """
    return a + a.T - np.diag(a.diagonal())

condensed_to_square

condensed_to_square(k, n)

Return the i, j indices of a scipy condensed matrix from element k and matrix dimension n

Source code in symdesign/utils/__init__.py
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
def condensed_to_square(k, n):
    """Return the i, j indices of a scipy condensed matrix from element k and matrix dimension n"""
    def calc_row_idx(_k, _n):
        return int(math.ceil((1 / 2.) * (- (-8 * _k + 4 * _n ** 2 - 4 * _n - 7) ** 0.5 + 2 * _n - 1) - 1))

    def elem_in_i_rows(_i, _n):
        return _i * (_n - 1 - _i) + (_i * (_i + 1)) // 2

    def calc_col_idx(_k, _i, _n):
        return int(_n - elem_in_i_rows(_i + 1, _n) + _k)
    i = calc_row_idx(k, n)
    j = calc_col_idx(k, i, n)

    return i, j