Skip to content

distribute

Module for Distribution of commands found for individual poses to SLURM/PBS computational cluster

is_sbatch_available

is_sbatch_available() -> bool

Ensure the sbatch executable is available and executable

Source code in symdesign/resources/distribute.py
35
36
37
38
39
40
41
42
43
def is_sbatch_available() -> bool:
    """Ensure the sbatch executable is available and executable"""
    global sbatch_exe
    sbatch_exe = shutil.which(sbatch)  # get_sbatch_exe()
    # if sbatch_exe is not None:
    try:
        return os.path.exists(sbatch_exe) and os.access(sbatch_exe, os.X_OK)
    except TypeError:  # NoneType
        return False

create_file

create_file(file: AnyStr = None)

If file doesn't exist, create a blank one

Source code in symdesign/resources/distribute.py
144
145
146
147
148
def create_file(file: AnyStr = None):
    """If file doesn't exist, create a blank one"""
    if file and not os.path.exists(file):
        with open(file, 'w') as new_file:
            dummy = True

run

run(cmd: list[str] | AnyStr, log_file_name: str, program: str = None, srun: str = None) -> bool

Executes specified command and appends command results to log file

Parameters:

  • cmd (list[str] | AnyStr) –

    The name of a command file which should be executed by the system

  • log_file_name (str) –

    Location on disk of log file

  • program (str, default: None ) –

    The interpreter for said command

  • srun (str, default: None ) –

    Whether to utilize a job step prefix during command issuance

Returns: Whether the command executed successfully

Source code in symdesign/resources/distribute.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def run(cmd: list[str] | AnyStr, log_file_name: str, program: str = None, srun: str = None) -> bool:
    """Executes specified command and appends command results to log file

    Args:
        cmd: The name of a command file which should be executed by the system
        log_file_name: Location on disk of log file
        program: The interpreter for said command
        srun: Whether to utilize a job step prefix during command issuance
    Returns:
        Whether the command executed successfully
    """
    cluster_prefix = srun if srun else []
    program = [program] if program else []
    command = [cmd] if isinstance(cmd, str) else cmd
    if log_file_name:
        with open(log_file_name, 'a') as log_f:
            command = cluster_prefix + program + command
            log_f.write(f'Command: {subprocess.list2cmdline(command)}\n')
            p = subprocess.Popen(command, stdout=log_f, stderr=log_f)
            p.communicate()
    else:
        print(f'Command: {subprocess.list2cmdline(command)}\n')
        p = subprocess.Popen(command)
        p.communicate()

    return p.returncode == 0

check_scripts_exist

check_scripts_exist(directives: Iterable[str] = None, file: AnyStr = None)

Check for the existence of scripts provided by an Iterable or present in a file

Parameters:

  • directives (Iterable[str], default: None ) –

    The locations of scripts which should be executed

  • file (AnyStr, default: None ) –

    The location of a file containing the location(s) of scripts/commands

Raises: InputError: When the scripts/commands passed are malformed or do not exist Returns: None

Source code in symdesign/resources/distribute.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def check_scripts_exist(directives: Iterable[str] = None, file: AnyStr = None):
    """Check for the existence of scripts provided by an Iterable or present in a file

    Args:
        directives: The locations of scripts which should be executed
        file: The location of a file containing the location(s) of scripts/commands
    Raises:
        InputError: When the scripts/commands passed are malformed or do not exist
    Returns:
        None
    """
    script_or_command = \
        '{} is malformed at line {}. All commands should match.\n* * *\n{}\n* * *' \
        '\nEither a file extension OR a command requried. Cannot mix'
    # Automatically detect if the commands file has executable scripts or errors
    if directives is None:
        if file is None:
            raise ValueError(f"Must pass either 'directives' or 'file'. Neither were passed")
        else:
            # Use collect_designs to get commands from the provided file
            scripts, _ = collect_designs(files=[file])

    # Check if the file lines (commands) contain a script or a command
    first_directive, *remaining_directives = directives
    contains_scripts = True if first_directive.endswith('.sh') else False
    for idx, directive in enumerate(remaining_directives, 1):
        # Check if the directive string is a shell script type file string. Ex: "refine.sh"
        if directive[-3:] == '.sh':  # This is a file
            if not os.path.exists(directive):  # Check if file is missing
                raise InputError(
                    f"The command at location '{directive}' doesn't exist")
            if not contains_scripts:  # There was a change from non-script files
                raise InputError(script_or_command.format(file, idx, directive))
        else:  # directive is a command
            # Check if there was a change from script files to non-script files
            if contains_scripts:
                raise InputError(script_or_command.format(file, idx, directive))
            else:
                contains_scripts = False

distribute

distribute(file: AnyStr, scale: protocols_literal, number_of_commands: int, out_path: AnyStr = os.getcwd(), success_file: AnyStr = None, failure_file: AnyStr = None, log_file: AnyStr = None, max_jobs: int = 80, mpi: int = None, finishing_commands: Iterable[str] = None, batch: bool = is_sbatch_available(), **kwargs) -> str

Take a file of commands formatted for execution in the SLURM environment and process into a script

Parameters:

  • file (AnyStr) –

    The location of the file which contains your commands to distribute through a sbatch array

  • scale (protocols_literal) –

    The stage of design to distribute. Works with CommandUtils and PathUtils to allocate jobs

  • number_of_commands (int) –

    The size of the job array

  • out_path (AnyStr, default: getcwd() ) –

    Where to write out the sbatch script

  • success_file (AnyStr, default: None ) –

    What file to write the successful jobs to for job organization

  • failure_file (AnyStr, default: None ) –

    What file to write the failed jobs to for job organization

  • log_file (AnyStr, default: None ) –

    The name of a log file to write command results to

  • max_jobs (int, default: 80 ) –

    The size of the job array limiter. This caps the number of commands executed at once

  • mpi (int, default: None ) –

    The number of processes to run concurrently with MPI

  • finishing_commands (Iterable[str], default: None ) –

    Commands to run once all sbatch processes are completed

  • batch (bool, default: is_sbatch_available() ) –

    Whether the distribution file should be formatted as a SLURM sbatch script

Returns: The name of the script that was written

Source code in symdesign/resources/distribute.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def distribute(file: AnyStr, scale: protocols_literal, number_of_commands: int, out_path: AnyStr = os.getcwd(),
               success_file: AnyStr = None, failure_file: AnyStr = None, log_file: AnyStr = None, max_jobs: int = 80,
               mpi: int = None,
               finishing_commands: Iterable[str] = None, batch: bool = is_sbatch_available(), **kwargs) -> str:
    """Take a file of commands formatted for execution in the SLURM environment and process into a script

    Args:
        file: The location of the file which contains your commands to distribute through a sbatch array
        scale: The stage of design to distribute. Works with CommandUtils and PathUtils to allocate jobs
        number_of_commands: The size of the job array
        out_path: Where to write out the sbatch script
        success_file: What file to write the successful jobs to for job organization
        failure_file: What file to write the failed jobs to for job organization
        log_file: The name of a log file to write command results to
        max_jobs: The size of the job array limiter. This caps the number of commands executed at once
        mpi: The number of processes to run concurrently with MPI
        finishing_commands: Commands to run once all sbatch processes are completed
        batch: Whether the distribution file should be formatted as a SLURM sbatch script
    Returns:
        The name of the script that was written
    """
    # Create success and failures files
    name, ext = os.path.splitext(os.path.basename(file))
    if success_file is None:
        success_file = os.path.join(out_path, f'{name}.success')
    if failure_file is None:
        failure_file = os.path.join(out_path, f'{name}.failures')
    output = os.path.join(out_path, 'output')
    putils.make_path(output)

    # Make sbatch file from template, array details, and command distribution script
    if batch:
        filename = os.path.join(out_path, f'{name}-{sbatch}.sh')
    else:
        filename = os.path.join(out_path, f'{name}.sh')

    with open(filename, 'w') as new_f:
        # Todo set up sbatch accordingly. Include a multiplier for the number of CPU's. Actually, might be passed
        #  if mpi:
        #      do_mpi_stuff = True
        if batch:
            # Write sbatch template
            template = sbatch_templates.get(scale)
            if not template:
                template = default_sbatch_template
                logger.warning(f"Couldn't find an sbatch script template for '{scale}'")
            with open(template) as template_f:
                new_f.write(''.join(template_f.readlines()))
            out = f'output={output}/%A_%a.out'
            new_f.write(f'{sb_flag}{out}\n')
            # Removing possibility to run multiple jobs simultaneously
            # array = f'array=1-{int(number_of_commands/process_scale[scale] + 0.5)}%{max_jobs}'
            array = f'array=1-{number_of_commands}%{max_jobs}'
            new_f.write(f'{sb_flag}{array}\n\n')
            distributer_command = f'python {putils.distributer_tool} {f"--log-file {log_file} " if log_file else ""}' \
                                  f'--success-file {success_file} --failure-file {failure_file} --command-file {file}'
        else:
            distributer_command = f'python {putils.distributer_tool} {f"--log-file {log_file} " if log_file else ""}' \
                                  f'--success-file {success_file} --failure-file {failure_file} --command-file {file}' \
                                  f' --number-of-processes {max_jobs}'
        new_f.write(distributer_command)
        if finishing_commands:
            if batch:
                new_f.write('\n# Wait for all to complete\n'
                            'wait\n'
                            '\n'
                            '# Then execute\n'
                            '%s\n' % '\n'.join(finishing_commands))
            else:
                new_f.write(' &&\n# Wait for all to complete, then execute\n'
                            '%s\n' % '\n'.join(finishing_commands))
        else:
            new_f.write('\n')

    return filename

commands

commands(commands: Sequence[str], name: str, protocol: protocols_literal, out_path: AnyStr = os.getcwd(), commands_out_path: AnyStr = None, **kwargs) -> str

Given a batch of commands, write them to a file and distribute that work for completion using specified computational resources

Parameters:

  • commands (Sequence[str]) –

    The commands which should be written to a file and then formatted for distribution

  • name (str) –

    The name of the collection of commands. Will be applied to commands file and distribution file(s)

  • protocol (protocols_literal) –

    The type of protocol to distribute

  • out_path (AnyStr, default: getcwd() ) –

    Where should the distributed script be written?

  • commands_out_path (AnyStr, default: None ) –

    Where should the commands file be written? If not specified, is written to out_path

Other Parameters:

  • success_file

    AnyStr = None - What file to write the successful jobs to for job organization

  • failure_file

    AnyStr = None - What file to write the failed jobs to for job organization

  • log_file

    AnyStr = None - The name of a log file to write command results to

  • max_jobs

    int = 80 - The size of the job array limiter. This caps the number of commands executed at once

  • mpi

    bool = False - The number of processes to run concurrently with MPI

  • finishing_commands

    Iterable[str] = None - Commands to run once all sbatch processes are completed

  • batch

    bool = is_sbatch_available() - Whether the distribution file should be formatted as a SLURM sbatch script

Returns:

  • str

    The name of the distribution script that was written

Source code in symdesign/resources/distribute.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def commands(commands: Sequence[str], name: str, protocol: protocols_literal,
             out_path: AnyStr = os.getcwd(), commands_out_path: AnyStr = None, **kwargs) -> str:
    """Given a batch of commands, write them to a file and distribute that work for completion using specified
    computational resources

    Args:
        commands: The commands which should be written to a file and then formatted for distribution
        name: The name of the collection of commands. Will be applied to commands file and distribution file(s)
        protocol: The type of protocol to distribute
        out_path: Where should the distributed script be written?
        commands_out_path: Where should the commands file be written? If not specified, is written to out_path

    Keyword Args:
        success_file: AnyStr = None - What file to write the successful jobs to for job organization
        failure_file: AnyStr = None - What file to write the failed jobs to for job organization
        log_file: AnyStr = None - The name of a log file to write command results to
        max_jobs: int = 80 - The size of the job array limiter. This caps the number of commands executed at once
        mpi: bool = False - The number of processes to run concurrently with MPI
        finishing_commands: Iterable[str] = None - Commands to run once all sbatch processes are completed
        batch: bool = is_sbatch_available() - Whether the distribution file should be formatted as a SLURM sbatch script

    Returns:
        The name of the distribution script that was written
    """
    if is_sbatch_available():
        shell = sbatch
        logger.info(sbatch_warning)
    else:
        shell = default_shell
        logger.info(script_warning)

    putils.make_path(out_path)
    if commands_out_path is None:
        commands_out_path = out_path
    else:
        putils.make_path(commands_out_path)
    command_file = write_commands(commands, name=name, out_path=commands_out_path)
    script_file = distribute(command_file, protocol, len(commands), out_path=out_path, **kwargs)

    logger.info(f'Once you are satisfied, enter the following to distribute:\n\t{shell} {script_file}')

    return script_file

write_script

write_script(command: str, name: str = 'script', out_path: AnyStr = os.getcwd(), additional: list = None, shell: str = 'bash', status_wrap: str = None) -> AnyStr

Take a command and write to a name.sh script. By default, bash is used as the shell interpreter

Parameters:

  • command (str) –

    The command formatted using subprocess.list2cmdline(list())

  • name (str, default: 'script' ) –

    The name of the output shell script

  • out_path (AnyStr, default: getcwd() ) –

    The location where the script will be written

  • additional (list, default: None ) –

    Additional commands also formatted using subprocess.list2cmdline()

  • shell (str, default: 'bash' ) –

    The shell which should interpret the script

  • status_wrap (str, default: None ) –

    The name of a file in which to check and set the status of the command in the shell

Returns: The name of the file

Source code in symdesign/resources/distribute.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def write_script(command: str, name: str = 'script', out_path: AnyStr = os.getcwd(),
                 additional: list = None, shell: str = 'bash', status_wrap: str = None) -> AnyStr:
    """Take a command and write to a name.sh script. By default, bash is used as the shell interpreter

    Args:
        command: The command formatted using subprocess.list2cmdline(list())
        name: The name of the output shell script
        out_path: The location where the script will be written
        additional: Additional commands also formatted using subprocess.list2cmdline()
        shell: The shell which should interpret the script
        status_wrap: The name of a file in which to check and set the status of the command in the shell
    Returns:
        The name of the file
    """
    if status_wrap:
        modifier = '&&'
        _base_cmd = ['python', putils.distributer_tool, '--stage', name, 'status', '--info', status_wrap]
        check = subprocess.list2cmdline(_base_cmd + ['--check', modifier, '\n'])
        _set = subprocess.list2cmdline(_base_cmd + ['--set'])
    else:
        check = _set = modifier = ''

    file_name = os.path.join(out_path, name if name.endswith('.sh') else f'{name}.sh')
    with open(file_name, 'w') as f:
        f.write(f'#!/bin/{shell}\n\n{check}{command} {modifier}\n\n')
        if additional:
            f.write('%s\n\n' % ('\n\n'.join(f'{command} {modifier}' for command in additional)))
        f.write(f'{_set}\n')

    return file_name

write_commands

write_commands(commands: Iterable[str], name: str = 'all_commands', out_path: AnyStr = os.getcwd()) -> AnyStr

Write a list of commands out to a file

Parameters:

  • commands (Iterable[str]) –

    An iterable with the commands as values

  • name (str, default: 'all_commands' ) –

    The name of the file. Will be appended with '.cmd(s)'

  • out_path (AnyStr, default: getcwd() ) –

    The directory where the file will be written

Returns: The filename of the new file

Source code in symdesign/resources/distribute.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def write_commands(commands: Iterable[str], name: str = 'all_commands', out_path: AnyStr = os.getcwd()) -> AnyStr:
    """Write a list of commands out to a file

    Args:
        commands: An iterable with the commands as values
        name: The name of the file. Will be appended with '.cmd(s)'
        out_path: The directory where the file will be written
    Returns:
        The filename of the new file
    """
    file = os.path.join(out_path, f'{name}.cmds' if len(commands) > 1 else f'{name}.cmd')
    with open(file, 'w') as f:
        f.write('%s\n' % '\n'.join(command for command in commands))

    return file