MultiprocessNonDaemonic
- class opencsp.common.lib.process.MultiprocessNonDaemonic.MultiprocessNonDaemonic(num_processes: int)
Bases:
objectA class for managing a pool of non-daemonic processes for parallel execution.
This class is similar to multiprocessing.Pool, but it allows for the creation of non-daemonic processes, which can spawn child processes. This is useful in scenarios where grandchild processes need to be created from the child processes.
- __init__(num_processes: int)
Initializes the MultiprocessNonDaemonic instance.
- Parameters:
num_processes (int) – The number of processes in this pool.
Notes
Daemonic processes have certain limitations, such as not being able to create child processes. This class allows for the creation of non-daemonic processes to facilitate such use cases.
- starmap(func: Callable, args: Iterable[Iterable])
Distributes the execution of a function across multiple processes.
This method takes a function and a sequence of argument tuples, and executes the function in parallel using the specified number of processes.
- Parameters:
func (Callable) – The function to execute in parallel.
args (Iterable[Iterable]) – An iterable of argument tuples to pass to the function.
- Returns:
A list of results returned by the function, in the order of the input arguments.
- Return type:
list
- Raises:
AssertionError – If the number of processes exceeds the specified limit.
- queue
//stackoverflow.com/questions/10415028/how-to-get-the-return-value-of-a-function-passed-to-multiprocessing-process)
- Type:
Allows us to collect results from processes (from https
ParallelPartitioner
- class opencsp.common.lib.process.ParallelPartitioner.ParallelPartitioner(nservers: int, server_idx: int, ncpus: int, cpu_idx: int, npartitions_ceil: int = -1)
Bases:
objectHelps portion out data to be executed per cpu, per server.
This class is here as a simple solution for parallel execution of trivially parallelizable problems. For anything more complicated, consider using a different parallel execution framework, such as Dask.
- __init__(nservers: int, server_idx: int, ncpus: int, cpu_idx: int, npartitions_ceil: int = -1)
Helps portion out data to be excecuted for a single server+cpu instance.
Typical usage is to get all partitioners for all cores with the generator:
partitioners = ParallelPartioner.get_partitioners(nservers, serv_idx, ncpus)
- Parameters:
nservers (-) – How many servers this program is running on.
server_idx (-) – Which server index this is (starting at 0).
ncpus (-) – How many processor cores per server this program is running on.
cpu_idx (-) – Which processor index this is (starting at 0).
npartitions_ceil (-) – A soft limit on the number of partitions to split the data into. -1 for no limit. Defaults to -1.
- cpu_identifier()
- get_my_portion(data: list[T], desc: str | None = None)
Get the subarray of the data list to operate on for the node + core that this partitioner represents.
- The data list is guaranteed to be split:
as evenly as possible
such that each element is in exactly one partition amongst all nodes and cpu core
The data is first split evenly by server node index, and then by cpu core index. Because of this, it is recommended to use this class with servers that have identical performance characteristics (eg all have the same processor with the same number of cores).
- Parameters:
data (list) – The data list to operate on.
desc (str, optional) – A description of the data. If not none, then an info message will be printed. Defaults to None.
- Returns:
A light-weight subarray of data with the portion for this node + core to operate on.
- Return type:
list[T]
- get_my_range(data: list[T], desc: str | None = None)
Like get_my_portion(), but returns the start and end indices instead of a subselection of data.
If desired, the subselection of data can be retrieved with this method. Example:
rstart, rend = partioner.get_my_range(data) if rstart == -1 and rend == -1: data_subselection = [] else: data_subselection = data[rstart:rend]
This is equivalent to:
data_subselection = partitioner.get_my_portion(data)
- Returns:
the start index (inclusive), or -1 if this partitioner has a range of length 0 - int: the stop index (exclusive), or -1 if this partitioner has a range of length 0
- Return type:
int
- classmethod get_partitioners(nservers: int, server_idx: int, ncpus: int, npartitions_ceil: int = -1)
Generate partitioners to split data into even chunks for each node and cpu core.
There are some cases where a task can be parallelized across many server nodes but not across cpu cores (such as ffmpeg, which is already parallelized across all cores). In these cases use 1 for the ncpus argument.
- Parameters:
nservers (-) – The number of server that this code is running on.
server_idx (-) – The node index of this server (starts at 0).
ncpus (-) – The number of cpus on this node.
npartitions_ceil (-) – A soft limit on the number of partitions to split the data into. -1 for no limit. Defaults to -1.
- Returns:
A list of parallel partitioners, one for each core for the given server_idx.
- Return type:
list[ParallelPartitioner]
- identifier()
Filename-friendly format for identifying this partitioner.
- server_identifier()
ServerSynchronizer
- class opencsp.common.lib.process.ServerSynchronizer.ServerSynchronizer(num_servers: int, server_index: int, propagate_errors=True, timeout: int = 1000, do_initial_wait=True)
Bases:
objectHelper class to force all servers to wait at specified synchronization points. This is particularly useful for scatter-gather type workflows.
- __init__(num_servers: int, server_index: int, propagate_errors=True, timeout: int = 1000, do_initial_wait=True)
Helper class to forces all servers to wait at specified synchronization points. This is particularly useful for scatter-gather type workflows.
At each call to wait(), including during this constructor if do_initial_wait, all servers will stop execution until all other servers have reached the same wait call. The same is true for the stop() call at the end of program execution.
Signaling between servers is achieved through the shared network file system that all of the servers share. In particular, the opencsp_temporary_dir() is utilized for this purpose. Because of the nature of multiprocessing and networked file systems, there is a decent chance that there are bugs in this implementation. If you find any bugs, please let me (BGB) know!
- Ideas for future improvement:
Use network communication as the signaling mechanism instead of a file system.
Provide a unique and/or random identifier when starting the execution, so that independent execution (by independent programmers) can be differentiated.
Example usage:
# num_servers = 8 # server_index = 0-7 synchronizer = ss.ServerSynchronizer(num_servers, server_index) try: # some work synchronizer.wait() # more work synchronizer.wait() ... # final work except Exception as ex: synchronizer.stop(ex) finally: synchronizer.stop()
- Parameters:
num_servers (int) – How many servers this code is running across
server_index (int) – Which server this is (indexing starts at 0)
propagate_errors (bool) – Whether to re-raise errors encountered in other servers during the wait method.
timeout (int) – Maximum amount of time to wait, in seconds. Default 1000.
- gather(value: str)
All servers write the given value to a file and wait. Then read all the values from all the servers (in server index order) as the return from this function.
In order for this (or really any ServerSynchronization method) to work, all server must call the same method at the same relative point in execution.
Parameters:
value (str): The value this server shares with all other servers.
Returns:
sum (list[str]): All server values, in order.
- get_errored_servers()
Get a list of servers that have errored (aka have “errored” indicator files).
Returns:
ret (list[int]): List of all the errored server indexes.
- get_stopped_servers()
Get a list of servers that have stopped (aka have “stopped” indicator files).
Returns:
server_idxs (list[int]): List of all the stopped server indexes.
- stop(error_to_propagate: Exception | None = None)
Create the wait and stop signal files, wait for other servers to stop, and remove the wait and stop files.
This works in essentially the same way as the wait() method, except that it is waiting on “stopped” files instead of “wait” files.
- wait()
Wait for all servers to reach this point.
- path = '/scratch/tmp/synchronize_servers_by_file'
- exception opencsp.common.lib.process.lib.ServerSynchronizerError.ServerSynchronizerError
Bases:
RuntimeErrorException raised for errors that occur during server synchronization.
This class extends the built-in RuntimeError to provide a specific exception type for handling errors related to server synchronization processes.
- Parameters:
message (str, optional) – An optional message describing the error. Defaults to None.
File tools
- opencsp.common.lib.process.parallel_file_tools.wait_on_files(files: list[str], timeout: float = 1000, alternates: dict[str, list[str]] | None = None, msg: str | None = None)
Waits up to ‘timeout’ seconds for all the files to exist.
Note: there is no guarantee that all files exist when this function completes. Rather, this function guarantees that all of the files existed at some point during its execution (ie some of the files could have been deleted after there existance was verified).
Arguments:
files (list[str]): The files to check for the existance of.
timeout (float): How many seconds to wait for the given files to exist.
- alternates (dict[str,list[str]]): Alternate files that can exist in the
place of the given files. An example use case could be to check if an error file was created from a process that we’re waiting on the output from.
Returns:
A list of the found files.
Raises:
TimeoutError: if the timeout triggers and not all files exist yet.
- class opencsp.common.lib.process.lib.ProcessOutputLine.ProcessOutputLine(val: str, is_err: bool = False, lineno: int = 0)
Bases:
objectRepresents a single line of stdout or stderr output.
- __init__(val: str, is_err: bool = False, lineno: int = 0) None
- is_err: bool = False
Whether this line originated from stderr. (Default False)
- lineno: int = 0
The index of this line from the mixed stderr/stdout of a process. (Default 0)
- val: str
The value of the line of output
Video tools
- opencsp.common.lib.process.parallel_video_tools.parallel_frames_to_videos(partitioner: ParallelPartitioner, frames_path: str, frames_names: list[str], videos_path: str, render_control: RenderControlVideo | None = None, overwrite=False)
Converts the given frames into videos, one video per computer.
To convert all of the given frames, this same function should be called with the same arguments across all computers. Note that there isn’t much of a point in parallelizing across cpu cores on a single computer because ffmpeg already uses all available cpu cores.
Example usage:
# worker function: partitioners = ppart.ParallelPartitioner.get_partitioners(nservers, my_server_idx, ncpus=1) frames_names = ft.files_in_directory_by_extension(frames_path, ["jpg"], sort=False)["jpg"] videos_names = pvt.parallel_frames_to_videos(partitioner, frames_path, frames_names, videos_path) with multiprocessing.Pool(ncores) as p: videos_names = p.starmap(pvt.parallel_frames_to_videos, [(partitioner, frames_path, frames_names, videos_path) for partitioner in partitioners]) return list(filter(lambda f: f != None, videos_names)) # calling function: if my_server_idx == 0: video_handler.merge_videos(partial_videos_names)
Args:
partitioner (ParallelPartitioner): Helper for portioning data to this server/cpu. frames_path (str): What directory to search for frames in. frames_suffix (str): The file suffix for the frames. Eg “.jpg” videos_path (str): Where to put the generated video(s). render_control (RenderControlVideo): Controls the properties of the generated video.
Returns:
str: The generated video file (name and extension). None if frames_names is empty or there are no frames for this cpu core.
- opencsp.common.lib.process.parallel_video_tools.parallel_video_to_frames(num_servers: int, server_index: int, video_handler: VideoHandler, server_synchronizer: ServerSynchronizer)
Extract all frames from the given video, where each server extracts the frames for part of the video. To extract all frames, execute this method on each server with that server’s server_index.
We use the time select method “-ss start_time -to end_time”, where each server starts at the exact same time that the last server stopped. This way there should be a 1-frame overlap between the frames that the servers output.
There is another method that could be frame-exact, meaning that there should be no (extra) duplicate frames beyond what would be already duplicated by ffmpeg running on a single server. The way to achieve this is with the “trim=start_frame=n:end_frame=m” option for the output stream, where n and m are determined per-server based on the length and framerate of the source video. However, we don’t use that method, because in practice what happens is ffmpeg duplicates the first frame n times. So instead we use the likely ok but maybe not time select method.
Args:
num_servers (int): How many servers this is being evaluated on.
server_index (int): Which server out of num_servers this is being evaluated on. Indexing starts at 0.
video_handler (VideoHandler): The handler to use to extract the frames
server_synchronizer (ServerSynchronizer): The synchronizer for all servers to wait on while collected extracted frames into a single directory.
Subprocess tools
- opencsp.common.lib.process.subprocess_tools.filter_lines(lines: list[ProcessOutputLine], keep_stdout=True, keep_stderr=True)
Filters a list of process output lines based on specified criteria.
This function allows you to filter out standard output (stdout) or standard error (stderr) lines from a list of process output lines. You can choose to keep only the stdout lines, only the stderr lines, or both.
- Parameters:
lines (list[pol.ProcessOutputLine]) – A list of process output lines to filter.
keep_stdout (bool, optional) – If True, keeps the stdout lines. If False, removes them. Defaults to True.
keep_stderr (bool, optional) – If True, keeps the stderr lines. If False, removes them. Defaults to True.
- Returns:
A list of filtered process output lines based on the specified criteria.
- Return type:
list[pol.ProcessOutputLine]
- opencsp.common.lib.process.subprocess_tools.get_executable_path(executable_name: str, dont_match: str | None = None) str
Returns the first “path/name.ext” for the given executable. If dont_match is specified, then paths containing that string are excluded from the returned results.
- Parameters:
executable_name (str) – The name of the executable to search for. On windows, either the plain name or the .exe name may be searched for equally.
dont_match (str, optional) – If specified, then don’t include results that contain this string. By default None.
- Returns:
executable_path_name_ext – The “path/name.exe” of the found executable.
- Return type:
str
- opencsp.common.lib.process.subprocess_tools.print_lines(lines: list[ProcessOutputLine])
Prints the process output lines to the console.
This function iterates through a list of process output lines and prints each line to the console. If the line is an error line, it uses the error logging function; otherwise, it uses the info logging function.
- Parameters:
lines (list[pol.ProcessOutputLine]) – A list of process output lines to print.
- Return type:
None
- opencsp.common.lib.process.subprocess_tools.run(cmd: str, cwd: str | None = None, stdout: str | None = None, stderr: str | None = None, ignore_return_code=False, timeout: float | None = None)
Runs the given command in the given directory, prints the output to the logger, and checks the return code.
This method is most useful for running a sub-process and capturing mixed stderr/stdout. For all other use cases, it is probably better to use proc=multiprocessing.Process(), proc.start(), proc.join().
Args:
cwd (str): The directory to change to before starting the subprocess. stdout (str): One of “collect” which returns all the stdout lines, “print”, or “collect+print”. Default is None (collect+print). stderr (str): One of “collect” which returns all the stderr lines, “print”, or “collect+print”. Default is None (collect+print). ignore_return_code (bool): If true, then don’t raise an error for a non-zero return code. timeout (float): If not None, then terminate/kill the process after timeout seconds. If terminated, there might not be any stdout/stderr to collect. None for no timeout. Default is None.
Raises:
subprocess.CalledProcessError: Raised if the subprocess returns an error code.
Returns:
list[ProcessOutputLine]: The lines the subprocess output. On windows, the lines are collected with stdout first, then stderr.
- exception opencsp.common.lib.process.lib.CalledProcessError.CalledProcessError(returncode, cmd, output=None, stderr=None)
Bases:
CalledProcessErrorException raised when a subprocess call fails.
This class extends the standard subprocess.CalledProcessError to include additional information about the standard error output captured during the subprocess execution.
- Parameters:
returncode (int) – The exit status of the process.
cmd (str or list) – The command that was executed.
output (bytes, optional) – The standard output captured from the process. Defaults to None.
stderr (bytes, optional) – The standard error output captured from the process. Defaults to None.