Skip to content

job

job_resources_factory module-attribute

job_resources_factory: Annotated[JobResourcesFactory, 'Calling this factory method returns the single instance of the JobResources class'] = JobResourcesFactory()

Calling this factory method returns the single instance of the JobResources class

DBInfo

DBInfo(location: AnyStr, echo: bool = False)
Source code in symdesign/resources/job.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def __init__(self, location: AnyStr, echo: bool = False):
    self.location = location
    self.engine: Engine = create_engine(self.location, echo=echo, future=True)
    self.session: sessionmaker = sessionmaker(self.engine, future=True)

    if 'sqlite' in self.location:
        # The below functions are recommended to help overcome issues with SQLite transaction scope
        # See: https://docs.sqlalchemy.org/en/20/dialects/sqlite.html#pysqlite-serializable
        @event.listens_for(self.engine, 'connect')
        def do_connect(dbapi_connection, connection_record):
            """Disable pysqlite's emitting of the BEGIN statement entirely.
            Also stops it from emitting COMMIT before any DDL
            """
            dbapi_connection.isolation_level = None

        @event.listens_for(self.engine, 'begin')
        def do_begin(conn):
            """Emit our own BEGIN"""
            conn.exec_driver_sql('BEGIN')

JobResources

JobResources(program_root: AnyStr = None, arguments: Namespace = None, initial: bool = False, **kwargs)

The intention of JobResources is to serve as a singular source of design info which is common across all jobs. This includes common paths, databases, and design flags which should only be set once in program operation, then shared across all member designs

Parameters:

  • program_root (AnyStr, default: None ) –

    The root location of program operation

  • arguments (Namespace, default: None ) –

    The argparse.Namespace object with associated program flags

  • initial (bool, default: False ) –

    Whether this is the first instance of the particular program output

Source code in symdesign/resources/job.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
def __init__(self, program_root: AnyStr = None, arguments: argparse.Namespace = None, initial: bool = False,
             **kwargs):
    """Parse the program operation location, ensure paths to these resources are available, and parse arguments

    Args:
        program_root: The root location of program operation
        arguments: The argparse.Namespace object with associated program flags
        initial: Whether this is the first instance of the particular program output
    """
    try:
        if os.path.exists(program_root):
            self.program_root = program_root
        else:
            raise FileNotFoundError(
                f"Path doesn't exist\n\t{program_root}")
    except TypeError:
        raise TypeError(
            f"Can't initialize {JobResources.__name__} without parameter 'program_root'")

    # Format argparse.Namespace arguments
    if arguments is not None:
        kwargs.update(deepcopy(vars(arguments)))

    # Set the module for the current job. This will always be a '-' separated string when more than one name
    self.module: str = kwargs.get(flags.module)
    # Ensure that the protocol is viable
    if self.module == flags.protocol:
        self.protocol_module = True
        self.modules = kwargs.get(flags.modules)
    else:
        self.protocol_module = False
        # Instead of setting this, let self.module be used dynamically with property
        # self.modules = [self.module]

    # Computing environment and development Flags
    # self.command_only: bool = kwargs.get('command_only', False)
    # """Whether to reissue commands, only if distribute_work=False"""
    self.log_level: bool = kwargs.get(flags.log_level._)
    self.debug: bool = True if self.log_level == logging.DEBUG else False
    self.force: bool = kwargs.get(flags.force._)
    self.development: bool = kwargs.get(flags.development._)
    self.profile_memory: bool = kwargs.get(flags.profile_memory._)
    if self.profile_memory and not self.development:
        logger.warning(f"{flags.profile_memory.long} was set but {flags.development.long} wasn't")

    self.mpi: int = kwargs.get(flags.mpi)
    if self.mpi is None:
        self.mpi = 0
        self.distribute_work: bool = kwargs.get(flags.distribute_work._)
        # # Todo implement, see symdesign.utils and CommandDistributor
        # # extras = ' mpi {CommmandDistributer.mpi}'
        # number_mpi_processes = CommmandDistributer.mpi - 1
        # logger.info('Setting job up for submission to MPI capable computer. Pose trajectories run in parallel, '
        #             f'{number_mpi_processes} at a time. This will speed up processing ~
        #             f'{job.design.number / number_mpi_processes:2f}-fold.')
    else:  # self.mpi > 0
        self.distribute_work = True
        raise NotImplementedError(
            f"Can't compute the number of resources to allocate using {flags.mpi.long} yet...")

    self.multi_processing: int = kwargs.get(flags.multi_processing._)
    if self.multi_processing:
        # Calculate the number of cores to use depending on computer resources
        self.cores = utils.calculate_mp_cores(cores=kwargs.get(flags.cores))  # Todo mpi=self.mpi
    else:
        self.cores: int = 1
    self.threads = self.cores * 2
    self.gpu_available = False

    # Input parameters
    self.project_name = kwargs.get(flags.project_name._)
    # program_root subdirectories
    self.data = os.path.join(self.program_root, putils.data.title())
    self.projects = os.path.join(self.program_root, putils.projects)
    self.job_paths = os.path.join(self.program_root, 'JobPaths')
    self.sbatch_scripts = os.path.join(self.program_root, 'Scripts')
    self.all_scores = os.path.join(self.program_root, 'AllScores')

    self.api_db = wrapapi.api_database_factory.get(source=self.data)
    self.sequences = self.api_db.sequences.location
    self.profiles = self.api_db.hhblits_profiles.location
    self.pdb_api = self.api_db.pdb.location
    self.uniprot_api = self.api_db.uniprot.location
    # data subdirectories
    self.clustered_poses = os.path.join(self.data, 'ClusteredPoses')
    # pdbs subdirectories
    self.structure_db = structure_db.structure_database_factory.get(
        source=os.path.join(self.data, 'StructureInfo'))
    self.pdbs = self.structure_db.models.location  # Used to store downloaded PDB's
    self.orient_dir = self.structure_db.oriented.location
    self.orient_asu_dir = self.structure_db.oriented_asu.location
    self.refine_dir = self.structure_db.refined.location
    self.full_model_dir = self.structure_db.full_models.location
    self.stride_dir = self.structure_db.stride.location

    # Set the job instance on these db objects
    self.api_db.job = self
    self.structure_db.job = self
    self.fragment_source = kwargs.get(flags.fragment_source._)
    self.fragment_db: structure.fragment.db.FragmentDatabase | None = None

    default_db = f'sqlite:///{os.path.join(self.data, f"{putils.program_name}.db")}'
    self.db_config = os.path.join(self.data, 'db.cfg')
    database_url = kwargs.get(flags.database_url._)
    if initial:
        if database_url is None:
            database_url = default_db
        db_cfg = {'url': database_url}
        with open(self.db_config, 'w') as f:
            json.dump(db_cfg, f)
    else:
        if os.path.exists(self.db_config):
            with open(self.db_config, 'r') as f:
                db_cfg = json.load(f)
            if database_url is not None:
                raise utils.InputError(
                    f"The {flags.database_url.long} '{database_url}' can't be used as this {putils.program_output} "
                    f"was already initialized with the url='{db_cfg.get('url')}")
            else:
                database_url = db_cfg.get('url')
        else:  # This should always exist
            database_url = default_db

    self.database_url = database_url
    self.debug_db = kwargs.get('debug_db')
    self.db: DBInfo = DBInfo(self.database_url, echo=self.debug_db)
    if initial:  # if not os.path.exists(self.internal_db):
        # Emit CREATE TABLE DDL
        sql.Base.metadata.create_all(self.db.engine)
    self.load_to_db = kwargs.get(flags.load_to_db._)
    self.reset_db = kwargs.get(flags.reset_db._)
    if self.reset_db:
        response = input(f"All database information will be wiped if you proceed. Enter 'YES' to proceed"
                         f"{utils.query.input_string}")
        if response == 'YES':
            logger.warning(f'Dropping all tables and data from DB')
            # All tables are deleted
            sql.Base.metadata.drop_all(self.db.engine)
            # Emit CREATE TABLE DDL
            sql.Base.metadata.create_all(self.db.engine)
        else:
            logger.info(f'Skipping {flags.format_args(flags.reset_db)}')
            pass
    # else:  # When --no-database is provided as a flag
    #     self.db = None

    # PoseJob initialization flags
    self.init = Init.from_flags(**kwargs)
    self.specify_entities = kwargs.get(flags.specify_entities._)
    # self.init.pre_refined
    # self.init.pre_loop_modeled
    # self.init.refine_input
    # self.init.loop_model_input

    # self.preprocessed = kwargs.get(flags.preprocessed)
    # if self.init.pre_loop_modeled or self.init.pre_refined:
    #     self.preprocessed = True
    # else:
    #     self.preprocessed = False
    self.range = kwargs.get(flags.range_._)
    if self.range is not None:
        try:
            self.low, self.high = map(float, self.range.split('-'))
        except ValueError:  # Didn't unpack correctly
            raise ValueError(
                f'The {flags.format_args(flags.range_args)} flag must take the form "LOWER-UPPER"')
    else:
        self.low = self.high = None
    # Program flags
    # self.consensus: bool = kwargs.get(consensus, False)  # Whether to run consensus
    self.background_profile: str = kwargs.get(flags.background_profile._)
    """The type of position specific profile (per-residue amino acid frequencies) to utilize as the design 
    background profile. 
    Choices include putils.design_profile, putils.evolutionary_profile, and putils.fragment_profile
    """
    # Process design_selector
    self.design_selector: PoseSpecification = parse_design_selector_flags(**kwargs)

    self.update_metadata = kwargs.get(flags.update_metadata._)
    self.component1 = kwargs.get(flags.component1._)
    self.query_codes = kwargs.get(flags.query_codes._)
    pdb_codes = kwargs.get(flags.pdb_code._, kwargs.get('target_pdb_code'))
    if pdb_codes:
        # Collect all provided codes required for component 1 processing
        codes = []
        for code_or_file in pdb_codes:
            codes.extend(utils.to_iterable(code_or_file))
        self.pdb_codes = utils.remove_duplicates(codes)
    else:
        self.pdb_codes = None

    self.component2 = kwargs.get(flags.component2._)
    self.query_codes2 = kwargs.get('query_codes2')
    pdb_codes2 = kwargs.get('pdb_code2', kwargs.get('aligned_pdb_code'))
    if pdb_codes2:
        # Collect all provided codes required for component 1 processing
        codes = []
        for code_or_file in pdb_codes2:
            codes.extend(utils.to_iterable(code_or_file))
        self.pdb_codes2 = utils.remove_duplicates(codes)
    else:
        self.pdb_codes2 = None

    # Docking flags
    self.dock = Dock.from_flags(**kwargs)
    if self.development:
        self.dock.quick = True
    if self.dock.perturb_dof or self.dock.perturb_dof_rot or self.dock.perturb_dof_tx:
        # Check if no other values were set and set them if so
        if not self.dock.perturb_dof_rot and not self.dock.perturb_dof_tx:
            # Set all perturb_dof on and set to the provided default
            self.dock.perturb_dof_rot = self.dock.perturb_dof_tx = True
            if self.dock.perturb_dof_steps is None:
                self.dock.perturb_dof_steps_rot = self.dock.perturb_dof_steps_tx = flags.default_perturbation_steps
            else:
                self.dock.perturb_dof_steps_rot = self.dock.perturb_dof_steps_tx = self.dock.perturb_dof_steps
        else:  # Parse the provided values
            self.dock.perturb_dof = True
            if self.dock.perturb_dof_rot:
                if self.dock.perturb_dof_steps_rot is None:
                    self.dock.perturb_dof_steps_rot = flags.default_perturbation_steps
            else:
                self.dock.perturb_dof_steps_rot = 1

            if self.dock.perturb_dof_tx:
                if self.dock.perturb_dof_steps_tx is None:
                    self.dock.perturb_dof_steps_tx = flags.default_perturbation_steps
            else:
                self.dock.perturb_dof_steps_tx = 1
    else:  # None provided, set the unavailable dof to 1 step and warn if one was provided
        if self.dock.perturb_dof_steps is not None:
            logger.warning(f"Couldn't use the flag {flags.perturb_dof_steps.long} as {flags.perturb_dof.long}"
                           f" wasn't set")
        if self.dock.perturb_dof_steps_rot is not None:
            logger.warning(f"Couldn't use the flag {flags.perturb_dof_steps_rot.long} as "
                           f"{flags.perturb_dof_rot.long} wasn't set")
        if self.dock.perturb_dof_steps_tx is not None:
            logger.warning(f"Couldn't use the flag {flags.perturb_dof_steps_tx.long} as {flags.perturb_dof_tx.long}"
                           f" wasn't set")
        self.dock.perturb_dof_steps = self.dock.perturb_dof_steps_rot = self.dock.perturb_dof_steps_tx = 1

    # dock_weight = kwargs.get('weight')
    # dock_weight_file = kwargs.get('weight_file')
    if self.dock.weight or self.dock.weight_file is not None:
        self.dock.weight = flags.parse_weights(self.dock.weight, file=self.dock.weight_file)
    # No option to get filters on the fly...
    # elif self.dock.weight is not None:  # --dock-weight was provided, but as a boolean-esq. Query the user
    #     self.dock.weight = []
    else:
        self.dock.weight = None
    if self.dock.filter or self.dock.filter_file is not None:
        self.dock.filter = flags.parse_filters(self.dock.filter, file=self.dock.filter_file)
    # No option to get filters on the fly...
    # elif self.dock.weight is not None:  # --dock-weight was provided, but as a boolean-esq. Query the user
    #     self.dock.weight = []
    else:
        self.dock.filter = None
    # self.proteinmpnn_score: bool = kwargs.get('proteinmpnn_score', False)
    # self.contiguous_ghosts: bool = kwargs.get('contiguous_ghosts', False)

    # self.rotation_step1: bool = kwargs.get('rotation_step1', False)
    # self.rotation_step2: bool = kwargs.get('rotation_step2', False)
    # self.min_matched: bool = kwargs.get('min_matched', False)
    # self.match_value: bool = kwargs.get('match_value', False)
    # self.initial_z_value: bool = kwargs.get('initial_z_value', False)

    self.fuse_chains: list[tuple[str]] = [tuple(pair.split(':')) for pair in kwargs.get(flags.fuse_chains._, [])]

    self.interface_distance = kwargs.get(flags.interface_distance._)
    self.interface = kwargs.get(flags.interface._)
    self.interface_only = kwargs.get(flags.interface_only._)
    self.oligomeric_interfaces = kwargs.get(flags.oligomeric_interfaces._)
    self.use_proteinmpnn = kwargs.get(flags.use_proteinmpnn._)
    self.use_evolution = kwargs.get(flags.use_evolution._)
    # Explicitly set to false if not designing or predicting
    use_evolution_modules = [
        flags.nanohedra, flags.initialize_building_blocks, flags.refine, flags.interface_metrics,
        flags.process_rosetta_metrics, flags.analysis, flags.predict_structure, flags.design
    ]
    if self.use_evolution and not any([module in use_evolution_modules for module in self.modules]):
        logger.info(f'Setting {flags.format_args(flags.use_evolution_args)} to False as no module '
                    'requesting evolutionary information is utilized')
        self.use_evolution = False

    # Design flags
    self.design = Design.from_flags(**kwargs)
    if self.design.ignore_clashes:
        self.design.ignore_pose_clashes = self.design.ignore_symmetric_clashes = True
    # Handle protocol specific flags
    if self.module == flags.interface_design:  # or self.design.neighbors:
        # Handle interface-design module alias
        self.module = flags.design
        self.design.interface = True
    if self.design.method == putils.consensus:
        self.design.term_constraint = True
    if self.design.term_constraint:
        self.generate_fragments: bool = True
    else:
        self.generate_fragments = False

    if self.design.structure_background:
        self.design.evolution_constraint = False
        self.design.hbnet = False
        self.design.scout = False
        self.design.term_constraint = False

    # if self.design.evolution_constraint and flags.design not in self.modules:
    #     logger.debug(f'Setting {flags.format_args(flags.evolution_constraint_args)} to False as the no module '
    #                  f'requesting evolutionary information is utilized')
    #     self.design.evolution_constraint = False

    # self.dock_only: bool = kwargs.get('dock_only')
    # if self.dock_only:
    #     self.design.sequences = self.design.structures = False
    self.only_write_frag_info: bool = kwargs.get(flags.only_write_frag_info._)
    self.increment_chains: bool = kwargs.get(flags.increment_chains._)
    self.interface_to_alanine: bool = kwargs.get(flags.interface_to_alanine._)
    self.metrics: bool = kwargs.get(flags.metrics._)
    self.measure_pose: str = kwargs.get(flags.measure_pose._)
    self.specific_protocol: str = kwargs.get(flags.specific_protocol._)
    # Process symmetry
    sym_entry_number = kwargs.get(flags.sym_entry._)
    symmetry = kwargs.get(flags.symmetry._)
    if sym_entry_number is None and symmetry is None:
        self.sym_entry: SymEntry.SymEntry | str | None = None
    else:
        if symmetry and utils.symmetry.CRYST in symmetry.upper():
            # Later, symmetry information will be retrieved from the file header
            self.sym_entry = SymEntry.CrystRecord  # Input was provided as 'cryst'
        else:
            self.sym_entry = SymEntry.parse_symmetry_to_sym_entry(
                sym_entry_number=sym_entry_number, symmetry=symmetry)

    # Selection flags
    self.save_total = kwargs.get(flags.save_total._)
    # self.total = kwargs.get('total')
    self.protocol = kwargs.get(flags.protocol._)
    _filter = kwargs.get(flags.filter_._)
    _filter_file = kwargs.get(flags.filter_file._)
    if _filter == list():
        # --filter was provided, but as a boolean-esq. Query the user once there is a df
        self.filter = True
    elif _filter or _filter_file is not None:
        self.filter = flags.parse_filters(_filter, file=_filter_file)
    else:
        self.filter = None
    _weight = kwargs.get(flags.weight._)
    _weight_file = kwargs.get(flags.weight_file._)
    if _weight == list():
        # --weight was provided, but as a boolean-esq. Query the user once there is a df
        self.weight = True
    elif _weight or _weight_file is not None:
        self.weight = flags.parse_weights(_weight, file=_weight_file)
    else:
        self.weight = None
    self.weight_function = kwargs.get(flags.weight_function._)
    self.select_number = kwargs.get(flags.select_number._)
    self.designs_per_pose = kwargs.get(flags.designs_per_pose._)
    # self.allow_multiple_poses = kwargs.get('allow_multiple_poses')
    self.tag_entities = kwargs.get(flags.tag_entities._)
    self.specification_file = kwargs.get(flags.specification_file._)
    # Don't need this at the moment...
    # self.poses = kwargs.get(flags.poses)
    """Used to specify whether specific designs should be fetched for select_* modules"""
    self.dataframe = kwargs.get(flags.dataframe._)

    # Sequence flags
    self.avoid_tagging_helices = kwargs.get(flags.avoid_tagging_helices._)
    self.csv = kwargs.get(flags.csv._)
    self.nucleotide = kwargs.get(flags.nucleotide)
    self.optimize_species = kwargs.get(flags.optimize_species._)
    self.preferred_tag = kwargs.get(flags.preferred_tag._)
    self.tag_linker = kwargs.get(flags.tag_linker._)
    self.multicistronic = kwargs.get(flags.multicistronic._)
    self.multicistronic_intergenic_sequence = kwargs.get(flags.multicistronic_intergenic_sequence._)

    # Output flags
    self.overwrite: bool = kwargs.get(flags.overwrite._)
    self.pose_format = kwargs.get(flags.pose_format._)
    prefix = kwargs.get(flags.prefix._)
    if prefix:
        self.prefix = f'{prefix}_'
    else:
        self.prefix = ''

    suffix = kwargs.get(flags.suffix._)
    if suffix:
        self.suffix = f'_{suffix}'
    else:
        self.suffix = ''

    # Check if output already exists or --overwrite is provided
    if self.module in flags.select_modules:
        if self.prefix == '':
            # self.location must not be None
            self.prefix = f'{utils.starttime}_{os.path.basename(os.path.splitext(self.input_source)[0])}_'
        output_directory = kwargs.get(flags.output_directory._)
        # if not self.output_to_directory:
        if not output_directory:
            output_directory = os.path.join(os.path.dirname(self.program_root), f'SelectedDesigns')
            #     os.path.join(os.path.dirname(self.program_root), f'{self.prefix}SelectedDesigns{self.suffix}')
    else:  # if output_directory:
        output_directory = kwargs.get(flags.output_directory._)

    if output_directory:
        self.output_directory = output_directory
        if os.path.exists(self.output_directory) and not self.overwrite:
            print(f"The specified output directory '{self.output_directory}' already exists. Proceeding may "
                  f'overwrite your old data. Either specify a new one or use the flags {flags.prefix.long} or '
                  f'{flags.suffix.long} to modify the name. To proceed, append {flags.overwrite.long} to your '
                  f'command')
            sys.exit(1)
        putils.make_path(self.output_directory)

    output_file = kwargs.get(flags.output_file._)
    if output_file:
        self.output_file = output_file
        if os.path.exists(self.output_file) and not self.overwrite:
            # if self.module in flags.analysis:  # Todo this was allowed, but it's outdated...
            print(f"The specified output file '{self.output_file}' already exists. Proceeding may "
                  f'overwrite your old data. Either specify a new one or use the flags {flags.prefix.long} or '
                  f'{flags.suffix.long} to modify the name. To proceed, append {flags.overwrite.long} to your '
                  f'command')
            sys.exit(1)

    # When we are performing expand-asu, make sure we set output_assembly to True
    if self.module == flags.expand_asu:
        self.output_assembly = True
    else:
        self.output_assembly: bool = kwargs.get(flags.output_assembly._)
    self.output_surrounding_uc: bool = kwargs.get(flags.output_surrounding_uc._)
    self.output_fragments: bool = kwargs.get(flags.output_fragments._)
    self.output_interface: bool = kwargs.get(flags.output_interface._)
    self.output_oligomers: bool = kwargs.get(flags.output_oligomers._)
    self.output_entities: bool = kwargs.get(flags.output_entities._)
    self.output_structures: bool = kwargs.get(flags.output_structures._)
    self.output_trajectory: bool = kwargs.get(flags.output_trajectory._)

    self.skip_logging: bool = kwargs.get(flags.skip_logging._)
    # self.merge: bool = kwargs.get(flags.merge._)
    # self.save: bool = kwargs.get(flags.save._)
    # self.figures: bool = kwargs.get(flags.figures._)

    if self.output_structures or self.output_assembly or self.output_surrounding_uc or self.output_fragments \
            or self.output_oligomers or self.output_entities or self.output_trajectory:
        self.output: bool = True
    else:
        self.output: bool = False

    # self.nanohedra_output: bool = kwargs.get(flags.nanohedra_output)
    # self.nanohedra_root: str | None = None
    # if self.nanohedra_output:
    #     self.construct_pose: bool = kwargs.get('construct_pose', True)
    # else:
    # self.construct_pose = True

    # Align helix flags
    self.aligned_start = kwargs.get(flags.aligned_start._)
    self.aligned_end = kwargs.get(flags.aligned_end._)
    self.aligned_chain = kwargs.get(flags.aligned_chain._)
    self.alignment_length = kwargs.get(flags.alignment_length._)
    self.bend = kwargs.get(flags.bend._)
    self.extension_length = kwargs.get(flags.extend._)
    self.target_start = kwargs.get(flags.target_start._)
    self.target_end = kwargs.get(flags.target_end._)
    self.target_chain = kwargs.get(flags.target_chain._)
    self.target_termini = kwargs.get(flags.target_termini._)
    self.trim_termini = kwargs.get(flags.trim_termini._)
    # Helix Bending flags
    self.direction = kwargs.get(flags.direction._)
    self.joint_residue = kwargs.get(flags.joint_residue._)
    self.joint_chain = kwargs.get(flags.joint_chain._)
    self.sample_number = kwargs.get(flags.sample_number._)
    # Prediction flags
    self.predict = Predict.from_flags(**kwargs)
    # self.num_predictions_per_model = kwargs.get('num_predictions_per_model')
    # if self.predict.num_predictions_per_model is None:
    #     if 'monomer' in self.predict.mode:
    #         self.num_predictions_per_model = 1
    #     else:  # 'multimer
    #         self.num_predictions_per_model = 5
    if self.predict.models_to_relax == 'none':
        self.predict.models_to_relax = None

    # Clustering flags
    # Todo
    #  This is pretty sloppy. Modify this DataClass mechanism...
    self.cluster_selection = kwargs.get(flags.cluster_selection._)
    # self.cluster_map = kwargs.get('cluster_map')
    # self.as_objects: bool = kwargs.get('as_objects')
    # self.mode: bool = kwargs.get('mode')
    if flags.cluster_poses in self.modules or flags.cluster_map._ in kwargs:
        self.cluster = Cluster.from_flags(**kwargs)
        # self.cluster.map: AnyStr
        # """The path to a file containing the currently loaded mapping from cluster representatives to members"""
    else:
        self.cluster = False

    # Finally perform checks on desired work to see if viable
    # if self.protocol_module:
    self.check_protocol_module_arguments()
    # Start with None and set this once a session is opened
    self.job_protocol = None
    # self.job_protocol = self.load_job_protocol()
    self.parsed_arguments = None

background_profile instance-attribute

background_profile: str = get(_)

The type of position specific profile (per-residue amino acid frequencies) to utilize as the design background profile. Choices include putils.design_profile, putils.evolutionary_profile, and putils.fragment_profile

specification_file instance-attribute

specification_file = get(_)

Used to specify whether specific designs should be fetched for select_* modules

id property

id: int

Get the JobProtocol.id for reference to the work performed

modules property writable

modules: list[str]

Return the modules slated to run during the job

output_to_directory property

output_to_directory: bool

If True, broadcasts that output is not typical putils.program_output directory structure

output_directory property writable

output_directory: AnyStr | None

Where to output the Job

output_file property writable

output_file: AnyStr | None

Where to output info related to successful Job operation

location property writable

location: str | None

The location where PoseJob instances are located

input_source property

input_source: str

Provide the name of the specified PoseJob instances to perform work on

default_output_tuple property

default_output_tuple: tuple[str, str, str]

Format fields for the output file depending on time, specified name and module type

construct_pose property writable

construct_pose

Whether to construct the PoseJob

number_of_modules property

number_of_modules: int

The number of modules for the specified Job

get_parsed_arguments

get_parsed_arguments() -> list[str]

Return the arguments submitted during application initialization

Returns:

  • list[str]

    Each of the submitted flags, removed of input arguments, and formatted such as were parsed at runtime, i.e. --file, --poses, or -d are removed, and the remainder are left in the same order so as coule be formatted by subprocess.list2cmdline()

Source code in symdesign/resources/job.py
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
def get_parsed_arguments(self) -> list[str]:
    """Return the arguments submitted during application initialization

    Returns:
        Each of the submitted flags, removed of input arguments, and formatted such as were parsed at runtime,
            i.e. --file, --poses, or -d are removed, and the remainder are left in the same order so as coule be
            formatted by subprocess.list2cmdline()
    """
    if self.parsed_arguments:
        return self.parsed_arguments

    # Remove the program
    parsed_arguments = sys.argv[1:]
    logger.debug(f'Starting with arguments {parsed_arguments}')
    # Todo
    #  Should the module be removed?
    #  sys.argv.remove(self.module)
    # Remove the input
    possible_input_args = [arg for args in flags.input_mutual_arguments.keys() for arg in args] \
        + [arg for args in flags.pose_inputs.keys() for arg in args] \
        + [arg for args in flags.component_mutual1_arguments.keys() for arg in args] \
        + [arg for args in flags.component_mutual2_arguments.keys() for arg in args]
    for input_arg in possible_input_args:
        try:
            pop_index = parsed_arguments.index(input_arg)
        except ValueError:  # Not in list
            continue
        else:
            removed_flag = parsed_arguments.pop(pop_index)
            while parsed_arguments[pop_index][0] != flags.flag_delimiter:
                removed_arg = parsed_arguments.pop(pop_index)
                logger.debug(f'From {removed_flag}, removed argument {removed_arg}')
            # # If the flag requires an argument, pop the index a second time
            # if input_arg not in single_input_flags:

    # Remove distribution flags
    for arg in flags.distribute_args:
        try:
            pop_index = parsed_arguments.index(arg)
        except ValueError:  # Not in list
            continue
        else:
            parsed_arguments.pop(pop_index)

    # Set for the next time
    self.parsed_arguments = parsed_arguments

    return parsed_arguments

load_job_protocol

load_job_protocol()

Acquire the JobProtocol for the current set of input instructions

Sets

self.job_protocol (sql.JobProtocol)

Source code in symdesign/resources/job.py
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
def load_job_protocol(self):
    """Acquire the JobProtocol for the current set of input instructions

    Sets:
        self.job_protocol (sql.JobProtocol)
    """
    # Tabulate the protocol arguments that should be provided to the JobProtocol search/creation
    if self.module == flags.design:
        protocol_kwargs = dict(
            ca_only=self.design.ca_only,
            evolution_constraint=self.design.evolution_constraint,
            interface=self.design.interface,
            term_constraint=self.design.term_constraint,
            neighbors=self.design.neighbors,
            proteinmpnn_model_name=self.design.proteinmpnn_model,
        )
    elif self.module == flags.nanohedra:
        protocol_kwargs = dict(
            ca_only=self.design.ca_only,
            contiguous_ghosts=self.dock.contiguous_ghosts,
            initial_z_value=self.dock.initial_z_value,
            match_value=self.dock.match_value,
            minimum_matched=self.dock.minimum_matched,
            proteinmpnn_model_name=self.design.proteinmpnn_model,
        )
    elif self.module == flags.predict_structure:
        protocol_kwargs = dict(
            number_predictions=self.predict.num_predictions_per_model,
            prediction_model=self.predict.models_to_relax,
            use_gpu_relax=self.predict.use_gpu_relax,
        )
    elif self.module == flags.analysis:
        protocol_kwargs = dict(
            ca_only=self.design.ca_only,
            proteinmpnn_model_name=self.design.proteinmpnn_model,
        )
    # Todo
    #  raise NotImplementedError()
    # elif self.module == flags.interface_metrics:
    # elif self.module == flags.generate_fragments:
    else:
        protocol_kwargs = {}

    protocol_kwargs.update(dict(
        module=self.module,
        commit=putils.commit,
    ))

    job_protocol_stmt = select(sql.JobProtocol)\
        .where(*[getattr(sql.JobProtocol, table_column) == job_resources_attr
                 for table_column, job_resources_attr in protocol_kwargs.items()])
    # logger.debug(job_protocol_stmt.compile(compile_kwargs={"literal_binds": True}))
    with self.db.session(expire_on_commit=False) as session:
        job_protocol_result = session.scalars(job_protocol_stmt).all()
        if not job_protocol_result:  # Create a new one
            job_protocol = sql.JobProtocol(**protocol_kwargs)
            session.add(job_protocol)
            session.commit()
        elif len(job_protocol_result) > 1:
            for result in job_protocol_result:
                print(result)
            raise utils.InputError(
                f"sqlalchemy.IntegrityError should've been raised. "
                f"Can't have more than one matching {sql.JobProtocol.__name__}")
        else:
            job_protocol = job_protocol_result[0]

    self.job_protocol = job_protocol

get_range_slice

get_range_slice(jobs: Sequence) -> Sequence[Any]

Slice the input work by a set increment. This is parsed from the flags.range_args

Parameters:

  • jobs (Sequence) –

    The work that should be sliced by the specified range

Returns: The work, limited to the range provided by -r/--range input flag

Source code in symdesign/resources/job.py
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
def get_range_slice(self, jobs: Sequence) -> Sequence[Any]:
    """Slice the input work by a set increment. This is parsed from the flags.range_args

    Args:
        jobs: The work that should be sliced by the specified range
    Returns:
        The work, limited to the range provided by -r/--range input flag
    """
    if self.range:
        path_number = len(jobs)
        # Adding 0.5 to ensure rounding occurs
        low_range = int((self.low/100) * path_number + 0.5)
        high_range = int((self.high/100) * path_number + 0.5)
        if low_range < 0 or high_range > path_number:
            raise ValueError(
                f'The {flags.format_args(flags.range_args)} flag is outside of the acceptable bounds [0-100]')
        logger.debug(f'Selecting input work ({path_number}) with range: {low_range}-{high_range}')
        range_slice = slice(low_range, high_range)
    else:
        range_slice = slice(None)

    return jobs[range_slice]

check_protocol_module_arguments

check_protocol_module_arguments()

Given provided modules for the 'protocol' module, check to ensure the work is adequate

Source code in symdesign/resources/job.py
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
def check_protocol_module_arguments(self):
    """Given provided modules for the 'protocol' module, check to ensure the work is adequate

    Raises:
        InputError if the inputs are found to be incompatible
    """
    protocol_module_allowed_modules = [
        flags.align_helices,
        flags.bend,
        flags.expand_asu,
        flags.rename_chains,
        flags.check_clashes,
        flags.generate_fragments,
        flags.nanohedra,
        flags.predict_structure,
        flags.interface_metrics,
        flags.optimize_designs,
        flags.refine,
        flags.design,
        flags.interface_design,
        flags.analysis,
        flags.cluster_poses,
        flags.select_poses,
        flags.select_designs
    ]
    disallowed_modules = [
        # 'custom_script',
        # flags.select_sequences,
        flags.initialize_building_blocks
    ]

    def check_gpu() -> str | bool:
        available_devices = jax.local_devices()
        for idx, device in enumerate(available_devices):
            if device.platform == 'gpu':
                self.gpu_available = True
                return device.device_kind
                # device_id = idx
                # return True
        return False

    problematic_modules = []
    not_recognized_modules = []
    nanohedra_prior = False
    gpu_device_kind = None
    for idx, module in enumerate(self.modules, 1):
        if module == flags.nanohedra:
            if idx > 1:
                raise utils.InputError(
                    f"For {flags.protocol} module, {module} can only be run in --modules position #1")
            nanohedra_prior = True
            continue
        elif module in flags.select_modules and self.protocol_module:
            if idx != self.number_of_modules:
                raise utils.InputError(
                    f"For {flags.protocol} module, {module} can only be run in --modules position N i.e. #1,2,...N")

        elif module == flags.predict_structure:
            if gpu_device_kind is None:
                # Check for GPU access
                gpu_device_kind = check_gpu()

            if gpu_device_kind:
                logger.info(f'Running {module} on {gpu_device_kind} GPU')
                # Disable GPU on tensorflow. I think that this is so tensorflow doesn't leak any calculations
                tf.config.set_visible_devices([], 'GPU')
            else:  # device.platform == 'cpu':
                logger.warning(f'No GPU detected, will {module} using CPU')
        elif module == flags.design:
            if self.design.method == putils.proteinmpnn:
                if gpu_device_kind is None:
                    # Check for GPU access
                    gpu_device_kind = check_gpu()

                if gpu_device_kind:
                    logger.info(f'Running {module} on {gpu_device_kind} GPU')
                else:  # device.platform == 'cpu':
                    logger.warning(f'No GPU detected, will {module} using CPU')

        if nanohedra_prior:
            if module in flags.select_modules:
                # We only should allow select-poses after nanohedra
                if module == flags.select_poses:
                    logger.critical(f"Running {module} after {flags.nanohedra} won't produce any Designs to "
                                    f"operate on. In order to {module}, ensure you run a design protocol first")
                else:  # flags.select_designs, flags.select_sequences
                    if not self.weight:  # not self.filter or
                        logger.critical(f'Using {module} after {flags.nanohedra} without specifying the flag '
                                        # f'{flags.format_args(flags.filter_args)} or '
                                        f'{flags.format_args(flags.weight_args)} defaults to selection '
                                        f'parameters {config.default_weight_parameter[flags.nanohedra]}')
        nanohedra_prior = False
        if self.protocol_module:
            if module in protocol_module_allowed_modules:
                continue
            elif module in disallowed_modules:
                problematic_modules.append(module)
            else:
                not_recognized_modules.append(module)

    if not_recognized_modules:
        raise utils.InputError(
            f"For {flags.protocol} module, the --{flags.modules} {', '.join(not_recognized_modules)} aren't "
            f'recognized modules. See"{putils.program_help}" for available module names')

    if problematic_modules:
        raise utils.InputError(
            f"For {flags.protocol} module, the --{flags.modules} {', '.join(problematic_modules)} aren't possible "
            f'modules\n\nAllowed modules are {", ".join(protocol_module_allowed_modules)}')

report_specified_arguments

report_specified_arguments(arguments: Namespace) -> dict[str, Any]

Filter all flags for only those that were specified as different on the command line

Parameters:

  • arguments (Namespace) –

    The arguments as parsed from the command-line argparse namespace

Returns: Arguments specified during program execution

Source code in symdesign/resources/job.py
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
def report_specified_arguments(self, arguments: argparse.Namespace) -> dict[str, Any]:
    """Filter all flags for only those that were specified as different on the command line

    Args:
        arguments: The arguments as parsed from the command-line argparse namespace
    Returns:
        Arguments specified during program execution
    """
    arguments = vars(arguments).copy()

    reported_args = {}
    # Start with JobResources flags that should be reported, or if the argument is not important, format it
    if self.module:
        reported_args['module'] = self.module
    if self.sym_entry:
        reported_args[flags.sym_entry._] = self.sym_entry.number
    # if self.design_selector:
    #     reported_args.pop('design_selector', None)

    # # Custom removal/formatting for all remaining
    # for custom_arg in list(arguments.keys()):
    #     value = arguments.pop(custom_arg, None)
    #     if value is not None:
    #         reported_args[custom_arg] = value

    if self.debug:
        def report_arg(_dest, _default):
            try:
                value = arguments.pop(_dest)
                if value is not None:
                    reported_args[arg.dest] = value
            except KeyError:
                return
    else:
        def report_arg(_dest, _default):
            try:
                value = arguments.pop(_dest)
                if value is not None and value != _default:
                    reported_args[arg.dest] = value
            except KeyError:
                return

    # Get all the default program args and compare them to the provided values
    for group in flags.entire_parser._action_groups:
        for arg in group._group_actions:
            if isinstance(arg, argparse._SubParsersAction):  # This is a subparser, recurse
                for name, sub_parser in arg.choices.items():
                    for sub_group in sub_parser._action_groups:
                        for arg in sub_group._group_actions:
                            report_arg(arg.dest, arg.default)
            else:
                report_arg(arg.dest, arg.default)

    return dict(sorted(reported_args.items()))  # , key=lambda arg: arg[0]

calculate_memory_requirements

calculate_memory_requirements(number_jobs: int)

Format memory requirements with module dependencies and set self.reduce_memory

Source code in symdesign/resources/job.py
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
def calculate_memory_requirements(self, number_jobs: int):
    """Format memory requirements with module dependencies and set self.reduce_memory"""
    if self.module == flags.nanohedra:  # Todo
        required_memory = putils.baseline_program_memory + putils.nanohedra_memory  # 30 GB ?
    elif self.module == flags.analysis:
        required_memory = (putils.baseline_program_memory +
                           number_jobs * putils.approx_ave_design_directory_memory_w_assembly) * 1.2
    else:
        required_memory = (putils.baseline_program_memory +
                           number_jobs * putils.approx_ave_design_directory_memory_w_pose) * 1.2

    available_memory = psutil.virtual_memory().available
    logger.debug(f'Available memory: {available_memory / gb_divisior:.2f} GB')
    logger.debug(f'Required memory: {required_memory / gb_divisior:.2f} GB')
    # If we are running a protocol, check for reducing memory requirements
    if self.protocol_module and self.number_of_modules > 2:
        self.reduce_memory = True
    elif available_memory < required_memory:
        self.reduce_memory = True
    else:
        # Todo when requirements are more accurate with database
        #  self.reduce_memory = False
        self.reduce_memory = True
    logger.debug(f'Reduce job memory?: {self.reduce_memory}')

can_process_evolutionary_profiles staticmethod

can_process_evolutionary_profiles() -> bool

Return True if the current computer has the computational requirements to collect evolutionary profiles

Source code in symdesign/resources/job.py
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
@staticmethod
def can_process_evolutionary_profiles() -> bool:
    """Return True if the current computer has the computational requirements to collect evolutionary profiles"""
    # Run specific checks
    if psutil.virtual_memory().available <= distribute.hhblits_memory_threshold:
        print('\n')
        logger.critical(f'The available RAM is probably insufficient to run {putils.hhblits}. '
                        f'Required/Available memory: {distribute.hhblits_memory_threshold / gb_divisior:.2f} GB/'
                        f'{psutil.virtual_memory().available / gb_divisior:.2f} GB')
        logger.critical(f'Creating scripts that can be distributed to a capable computer instead')
        return False
    return True

evolutionary_profile_processes staticmethod

evolutionary_profile_processes() -> int

Return the number of evolutionary profile processes that can be run given the available memory

Source code in symdesign/resources/job.py
1242
1243
1244
1245
@staticmethod
def evolutionary_profile_processes() -> int:
    """Return the number of evolutionary profile processes that can be run given the available memory"""
    return int(psutil.virtual_memory().available <= distribute.hhblits_memory_threshold)

process_evolutionary_info

process_evolutionary_info(uniprot_entities: Iterable[UniProtEntity] = None, entities: Iterable[GeneEntity] = None, batch_commands: bool = False) -> list[str]

Format the job with evolutionary constraint options

Parameters:

  • uniprot_entities (Iterable[UniProtEntity], default: None ) –

    A list of the UniProtIDs for the Job

  • entities (Iterable[GeneEntity], default: None ) –

    A list of the Entity instances initialized for the Job

  • batch_commands (bool, default: False ) –

    Whether commands should be made for batch submission

Returns: A list evolutionary setup instructions

Source code in symdesign/resources/job.py
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
def process_evolutionary_info(self, uniprot_entities: Iterable[wrapapi.UniProtEntity] = None,
                              entities: Iterable[structure.sequence.GeneEntity] = None,
                              batch_commands: bool = False) -> list[str]:
    """Format the job with evolutionary constraint options

    Args:
        uniprot_entities: A list of the UniProtIDs for the Job
        entities: A list of the Entity instances initialized for the Job
        batch_commands: Whether commands should be made for batch submission
    Returns:
        A list evolutionary setup instructions
    """
    info_messages = []
    hhblits_cmds, bmdca_cmds, msa_cmds = [], [], []
    # Set up sequence data using hhblits and profile bmDCA for each input entity
    # all_entity_ids = []
    putils.make_path(self.sequences)
    if not putils.uniclust_db:
        # No database has been set up
        logger.warning(f"Couldn't locate a compatible database to run hhblits. Skipping evolutionary acquisition")
    elif uniprot_entities is not None:
        for uniprot_entity in uniprot_entities:
            evolutionary_profile_file = self.api_db.hhblits_profiles.retrieve_file(name=uniprot_entity.id)
            if not evolutionary_profile_file:
                hhblits_cmds.append(hhblits(uniprot_entity.id,
                                            sequence=uniprot_entity.reference_sequence,
                                            out_dir=self.profiles, threads=self.threads,
                                            return_command=True))
                msa_file = None
            else:
                msa_file = self.api_db.alignments.retrieve_file(name=uniprot_entity.id)

            if not msa_file:
                sto_cmd = [
                    putils.reformat_msa_exe_path, 'a3m', 'sto',
                    f"{os.path.join(self.profiles, f'{uniprot_entity.id}.a3m')}", '.sto', '-num', '-uc']
                fasta_cmd = [
                    putils.reformat_msa_exe_path, 'a3m', 'fas',
                    f"{os.path.join(self.profiles, f'{uniprot_entity.id}.a3m')}", '.fasta', '-M', 'first', '-r']
                msa_cmds.extend([sto_cmd, fasta_cmd])

    elif entities is not None:
        raise NotImplementedError(
            f'Currently must use {wrapapi.UniProtEntity.__class__.__name__} in '
            f'{self.process_evolutionary_info.__name__}'
        )
        for entity in entities:
            evolutionary_profile_file = self.api_db.hhblits_profiles.retrieve_file(name=entity.name)
            if not evolutionary_profile_file:
                sequence_file = self.api_db.sequences.retrieve_file(name=entity.name)
                if not sequence_file:
                    sequence_file = entity.write_sequence_to_fasta(out_dir=self.sequences)

                hhblits_cmds.append(entity.hhblits(sequence_file=sequence_file, out_dir=self.profiles,
                                                   return_command=True))
                msa_file = None
            else:
                msa_file = self.api_db.alignments.retrieve_file(name=entity.name)

            if not msa_file:
                sto_cmd = [
                    putils.reformat_msa_exe_path, 'a3m', 'sto',
                    f"{os.path.join(self.profiles, f'{entity.name}.a3m')}", '.sto', '-num', '-uc']
                fasta_cmd = [
                    putils.reformat_msa_exe_path, 'a3m', 'fas',
                    f"{os.path.join(self.profiles, f'{entity.name}.a3m')}", '.fasta', '-M', 'first', '-r']
                msa_cmds.extend([sto_cmd, fasta_cmd])

    if hhblits_cmds:
        protocol = putils.hhblits
        logger.info(f"Starting Profile(dtype='{protocol}') generation")

        if protocol == putils.hhblits:
            if not os.access(putils.hhblits_exe, os.X_OK):
                raise RuntimeError(
                    f"Couldn't locate the {protocol} executable. Ensure the executable file referenced by "
                    f"'{putils.hhblits_exe}' exists then try your job again. Otherwise, use the argument "
                    f'--no-{flags.use_evolution} OR set up hhblits to run.{utils.guide.hhblits_setup_instructions}')
        else:
            assert_never(protocol)

        putils.make_path(self.profiles)
        putils.make_path(self.sbatch_scripts)
        protocol_log_file = os.path.join(self.profiles, 'generate_profiles.log')

        # Run hhblits commands
        if not batch_commands and self.can_process_evolutionary_profiles():
            logger.info(f'Writing {protocol} results to file: {protocol_log_file}')
            # Run commands in this process
            if self.multi_processing:
                zipped_args = zip(hhblits_cmds, repeat(hhblits_log_file))
                utils.mp_starmap(distribute.run, zipped_args, processes=self.cores)
            else:
                for cmd in tqdm(hhblits_cmds):
                    logger.debug(f'Starting command: {subprocess.list2cmdline(cmd)}')
                    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                    stdout, stderr = p.communicate()
                    if stderr:
                        logger.warning(stderr.decode('utf-8'))

            # Format .a3m multiple sequence alignments to .sto/.fasta
            for cmd in msa_cmds:
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                stdout, stderr = p.communicate()
                if stderr:
                    logger.warning(stderr.decode('utf-8'))
        else:  # Convert each command to a string and write to distribute
            hhblits_cmds = [subprocess.list2cmdline(cmd) for cmd in hhblits_cmds]
            msa_cmds = [subprocess.list2cmdline(cmd) for cmd in msa_cmds]
            all_evolutionary_commands = hhblits_cmds + msa_cmds
            evolutionary_cmds_file = distribute.write_commands(
                all_evolutionary_commands, name=f'{utils.starttime}-{protocol}', out_path=self.profiles)

            if distribute.is_sbatch_available():
                shell = distribute.sbatch
                max_jobs = len(hhblits_cmds)  # number_of_hhblits_cmds
            else:
                shell = distribute.default_shell
                max_jobs = self.evolutionary_profile_processes()

            # Todo
            #  distribution.Attrs()
            protocol_kwargs = dict(out_path=self.sbatch_scripts, scale=protocol,
                                   max_jobs=max_jobs, number_of_commands=len(all_evolutionary_commands),
                                   log_file=protocol_log_file)
            # reformat_msa_cmds_script = distribute.distribute(file=reformat_msa_cmd_file, **hhblits_kwargs)
            protocol_script = distribute.distribute(file=evolutionary_cmds_file, **protocol_kwargs)
            # Format messages
            info_messages.append(
                'Please follow the instructions below to generate sequence profiles for input proteins')
            protocol_job_info_message = f'Enter the following to distribute {protocol} jobs:\n\t'
            protocol_job_info_message += f'{shell} {protocol_script}'
            info_messages.append(protocol_job_info_message)
    elif msa_cmds:  # These may still be missing
        putils.make_path(self.profiles)

        if not os.access(putils.reformat_msa_exe_path, os.X_OK):
            logger.error(f"Couldn't execute multiple sequence alignment reformatting script")

        # Format .a3m multiple sequence alignments to .sto/.fasta
        for cmd in msa_cmds:
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout, stderr = p.communicate()
            if stderr:
                logger.warning(stderr.decode('utf-8'))

    if bmdca_cmds:
        putils.make_path(self.profiles)
        putils.make_path(self.sbatch_scripts)
        # bmdca_cmds = \
        #     [list2cmdline([putils.bmdca_exe_path, '-i', os.path.join(self.profiles, '%s.fasta' % entity.name),
        #                   '-d', os.path.join(self.profiles, '%s_bmDCA' % entity.name)])
        #      for entity in entities.values()]
        bmdca_cmd_file = \
            distribute.write_commands(bmdca_cmds, name=f'{utils.starttime}-bmDCA', out_path=self.profiles)
        bmdca_script = distribute.distribute(file=bmdca_cmd_file, out_path=self.sbatch_scripts,
                                             scale='bmdca', max_jobs=len(bmdca_cmds),
                                             number_of_commands=len(bmdca_cmds),
                                             log_file=os.path.join(self.profiles, 'generate_couplings.log'))
        # reformat_msa_cmd_file = \
        #     SDUtils.write_commands(reformat_msa_cmds, name='%s-reformat_msa' % SDUtils.starttime,
        #                            out_path=self.profiles)
        # reformat_sbatch = distribute(file=reformat_msa_cmd_file, out_path=self.program_root,
        #                              scale='script', max_jobs=len(reformat_msa_cmds),
        #                              log_file=os.path.join(self.profiles, 'generate_profiles.log'),
        #                              number_of_commands=len(reformat_msa_cmds))
        if distribute.is_sbatch_available():
            shell = distribute.sbatch
        else:
            shell = distribute.default_shell

        bmdca_script_message = \
            f'Once you are satisfied, enter the following to distribute jobs:\n\t{shell} %s' \
            % bmdca_script if not info_messages else 'ONCE this job is finished, to calculate evolutionary ' \
                                                     'couplings i,j for each amino acid in the multiple ' \
                                                     f'sequence alignment, enter:\n\t{shell} {bmdca_script}'
        info_messages.append(bmdca_script_message)

    return info_messages

JobResourcesFactory

JobResourcesFactory(**kwargs)

Return a JobResource instance by calling the Factory instance

Handles creation and allotment to other processes by making a shared pointer to the JobResource for the current Job

Source code in symdesign/resources/job.py
1433
1434
1435
def __init__(self, **kwargs):
    self._resources = {}
    self._warn = True

__call__

__call__(**kwargs) -> JobResources

Return the specified JobResources object singleton

Returns:

  • JobResources

    The instance of the specified JobResources

Source code in symdesign/resources/job.py
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
def __call__(self, **kwargs) -> JobResources:
    """Return the specified JobResources object singleton

    Returns:
        The instance of the specified JobResources
    """
    #         Args:
    #             source: The JobResources source name
    source = 'single'
    job = self._resources.get(source)
    if job:
        if kwargs and self._warn:
            # try:
            #     fragment_db.update(kwargs)
            # except RuntimeError:
            self._warn = False
            logger.warning(f"Can't pass the new arguments {', '.join(kwargs.keys())} to JobResources "
                           f'since it was already initialized and is a singleton')
        return job
    else:
        logger.info(f'Initializing {JobResources.__name__}({kwargs.get("program_root", os.getcwd())})')
        self._resources[source] = JobResources(**kwargs)

    return self._resources[source]

get

get(**kwargs) -> JobResources

Return the specified JobResources object singleton

Returns:

  • JobResources

    The instance of the specified JobResources

Source code in symdesign/resources/job.py
1466
1467
1468
1469
1470
1471
1472
1473
1474
def get(self, **kwargs) -> JobResources:
    """Return the specified JobResources object singleton

    Returns:
        The instance of the specified JobResources
    """
    # Keyword Args:
    #     source: The JobResource source name
    return self.__call__(**kwargs)

generate_sequence_mask

generate_sequence_mask(fasta_file: AnyStr) -> list[int]

From a sequence with a design_selector, grab the residue indices that should be designed in the target structural calculation

Parameters:

  • fasta_file (AnyStr) –

    The path to a file with fasta information

Returns:

  • list[int]

    The residue numbers (in pose format) that should be ignored in design

Source code in symdesign/resources/job.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def generate_sequence_mask(fasta_file: AnyStr) -> list[int]:
    """From a sequence with a design_selector, grab the residue indices that should be designed in the target
    structural calculation

    Args:
        fasta_file: The path to a file with fasta information

    Returns:
        The residue numbers (in pose format) that should be ignored in design
    """
    sequence_and_mask = list(sequence.read_fasta_file(fasta_file))
    # sequences = sequence_and_mask
    _sequence, mask, *_ = sequence_and_mask
    if not len(_sequence) == len(mask):
        raise ValueError(
            'The sequence and design_selector are different lengths. Please correct the alignment before proceeding')

    return [idx for idx, aa in enumerate(mask, 1) if aa != '-']

generate_chain_mask

generate_chain_mask(chains: str) -> set[str]

From a string with a design_selection, format the chains provided

Parameters:

  • chains (str) –

    The specified chains separated by commas to split

Returns: The provided chain ids in pose format

Source code in symdesign/resources/job.py
53
54
55
56
57
58
59
60
61
def generate_chain_mask(chains: str) -> set[str]:
    """From a string with a design_selection, format the chains provided

    Args:
        chains: The specified chains separated by commas to split
    Returns:
        The provided chain ids in pose format
    """
    return set(utils.clean_comma_separated_string(chains))