lofn package

Submodules

lofn.api module

Wrapper to execute docker containers as spark tasks.

class lofn.api.DockerPipe(spark_configuration, **kwargs)[source]

Bases: object

Main entry point for lofn.

Parameters:

spark_configuration – spark configuration (e.g. pyspark.SparkConf())

Keyword Arguments:
 
  • temporary_directory_parent – specify an absolute path to the parent directory to use for temp dirs and files. The default is None, which then uses a location specified by a platform-dependent list or uses environment variables TMPDIR, TEMP, or TMP. If specifying a path, it either needs to exist on all nodes or you must run it with appropriate permissions so lofn can attempt to create it.
  • shared_mountpoint – the path to the directory inside each docker container that maps to the temporary directory on the host. (Default ‘/shared/’)
  • volumes – User defined volumes to mount in the container. This is useful for data that is not being partitioned and needs to be read into each container, such as a global reference. This must be given as a dictionary. The keys for the dictionary are the absolute paths to the directory on the host you want to bind. The value of each of these keys is the information on how to bind that volume. Provide a ‘bind’ path, which is the absolute path in the container you want that volume to mount on, and optionally provide a ‘mode’, as ro (read-only, the default) or rw (read-write). The structure of this input is similar to docker-py volumes, and resembles the following structure: {‘[host_directory_path]’: {‘bind’: ‘[container_directory_path]’, ‘mode’: ‘[ro|rw] } }
map(rdd, image_name, command, **kwargs)[source]

Map step by applying Spark’s mapPartitions. This writes the partition to temp file(s) and executes a docker container to run the commands, which is read back into a new RDD.

Parameters:
  • rdd – a spark RDD as input
  • image_name – Docker image name
  • command – Command to run in the docker container
Keyword Arguments:
 
  • container_input_name – the name of the file within the shared_mountpoint that is written to before the map from the host, and read from as the first step in the map in the container. (Default ‘input.txt’)
  • container_output_name – the name of the file the map step writes to inside the container. This path will belong inside of the shared_mountpoint which maps to the host temp directory for that partition. (Default ‘output.txt’)
  • docker_options – additional docker options to provide for ‘docker run’. Must be a list.
  • map_udf – optional keyword argument to pass a function that accepts a partition and transforms the data into a dictionary with a key as filename and value as contents, a list (iterable) of elements to write to that file within the temporary directory.
Returns:

transformed RDD

map_binary(*args, **kwds)[source]

Map binary output as a context manager. This currently takes rdd as input and will output a directory of the newly written binary files so that they can be read by the user with sc.binaryFiles. After finishing with the context manager, the temp files are destroyed.

Parameters:
  • rdd – spark RDD input
  • image_name – docker image
  • command – docker command to run
Keyword Arguments:
 
  • container_input_name – the name of the file within the shared_mountpoint that is written to before the map steps from the host, and read from as the first step in the map in the container. (Default ‘input.txt’)
  • container_binary_output_name – the name of the output file in map_binary step inside the container. This path will belong inside of the shared_mountpoint which maps to the host temp directory for that partition. (Default ‘output.bin’)
  • docker_options – additional docker options
  • map_udf – function that takes one input, the partition, and returns a dictionary of filename: contents (as iterable).
  • hdfs_tempdir – temporary directory on HDFS to hold binary files. The default attempts to find the home directory for the user, but can be overridden by specifying an absolute path to use.
Returns:

directory path containing the output binary files to be read

reduce(rdd, image_name, command, **kwargs)[source]

Apply a Spark reduce function to the input RDD. This will take rolling pairs and write to temp files, run a docker container, execute a command in the container over the temp files, write to temp files, and return the result.

Parameters:
  • rdd – a spark RDD as input
  • image_name – Docker image name
  • command – Command to run in the docker container
Keyword Arguments:
 
  • container_input_name – the name of the file within the shared_mountpoint that is written to before the reduce from the host, and read from as the first step in reduce in the container. (Default ‘input.txt’)
  • container_output_name – the name of the file the reduce step writes to inside the container. This path will belong inside of the shared_mountpoint which maps to the host temp directory for that partition. (Default ‘output.txt’)
  • docker_options – additional docker options to provide for ‘docker run’. Must be a list.
  • reduce_udf – The default behavior for handling the pairs of partitions is to append right to left and write to one temp file. This can be overridden by supplying a ‘reduce_udf’ function that takes two inputs, the pair of partitions, and transforms them to return a dictionary mapping a key of filename to value of contents in a list (iterable) of elements to write to a file within the temp directory.
reduce_binary(rdd, image_name, command, **kwargs)[source]

Reduce partitions from map_binary output. The format of these partitions is different so this handles them and also writes the temp files as one string rather than trying to split newlines since these are binary.

Parameters:
  • rdd – spark RDD
  • image_name – docker image
  • command – docker command
Keyword Arguments:
 
  • container_binary_input_1_name – the name of the left side file in reduce_binary step inside the container. This path will belong inside of the shared_mountpoint which maps to the host temp directory for that partition. (Default ‘input_1.bin’)
  • container_binary_input_2_name – the name of the right side file in reduce_binary step inside the container. This path will belong inside of the shared_mountpoint which maps to the host temp directory for that partition. (Default ‘input_2.bin’)
  • container_binary_output_name – the name of the output file in reduce_binary step inside the container. This path will belong inside of the shared_mountpoint which maps to the host temp directory for that partition. (Default ‘output.bin’)
  • docker_options – additional docker options
  • reduce_udf – default behavior for reduce_binary is to write each input as a temp file to give two binary input files to the container. Write a UDF here that takes two inputs and outputs a dictionary mapping filename: contents (a string)
Returns:

iterable of reduce results

lofn.docker_handler module

Run Docker containers.

exception lofn.docker_handler.DockerFailure[source]

Bases: exceptions.Exception

Custom exception to communicate the container failed.

lofn.docker_handler.execute(command)[source]

Use the shell to invoke Docker. Catch exceptions and return to master to be caught and reported helpfully.

Parameters:command – bash command to call docker with requested configuration
Returns:if failure, the message about why it failed is returned, otherwise it returns False
lofn.docker_handler.run(image_name, command, bind_files, volume_files, **kwargs)[source]

Make system calls with subprocess to run our containers.

Parameters:
  • image_name – docker image to run a container
  • command – docker command to run in container
  • bind_files – container paths to set as mount point
  • volume_files – host paths to mount
Keyword Arguments:
 
  • docker_options – additional options to pass to docker run as a list
  • temporary_directory_parent – specify the parent directory for temporary directories and files. Must exist or have permission to create the directory. The default is None, which uses the a default from a platform-dependent list or the system’s environment variable for TMP, TMPDIR, or TEMPDIR.
Returns:

status of execution, False is no issues otherwise returns failure message.

lofn.docker_handler.validate_user_input(volumes)[source]

Validate the user input. Checks the type and structure.

lofn.hdfs_handler module

Interact with HDFS.

lofn.hdfs_handler.setup_user_volumes_from_hdfs(*args, **kwds)[source]

GET a local copy of the volume to mount in Docker and update and map the volumes dictionary with the new local temp directory.

Parameters:volumes – Docker volumes specification as a dictionary, similar to structure seen in DockerPy. User defined volumes to mount in the container. example: {‘[host_directory_path]’: {‘bind’: ‘[container_directory_path]’, ‘mode’: ‘[ro|rw] }} The host directory path in the dictionary must be the absolute path to the directory on HDFS.
Keyword Arguments:
 temporary_directory_parent – manually specify the parent directory or the temp files directories, else default is None which uses the system’s TMP/TMPDIR.
Returns:a volumes dictionary mapping to the new temporary directories

lofn.tmp_file_handler module

Create temp files for Docker read/write

class lofn.tmp_file_handler.UDF(temporary_directory, user_function, **kwargs)[source]

Bases: object

Define how to handle RDD partitions to temp files.

The return should be a dictionary, with filename as key and list of elements as value. These files are written inside of the shared mount point temporary directory.

These file names override the input container name.

map_udf()[source]

Unpack a partition into multiple files based on a user defined function. The udf should return a dictionary, with filename as key and list of elements as value.These files are written inside of the shared mountpoint temporary directory.

These filenames override the input container name.

reduce_udf()[source]

Define handling of pairs of partitions for the reduce step. Pass a function to handle the pair of partitions input and return a dictionary mapping file name as key and value as an iterable of contents to write to the temp file.

This will override the input container name.

write_temp_files(inner_partitions)[source]

Write contents to temp file with defined name. Iterables are written line by line, while binary data is written as a single string.

Parameters:inner_partitions – the content to write, either as iterable for regular files or string for binary data
lofn.tmp_file_handler.handle_binary(origin_directory, destination_directory, input_path, master_type)[source]

Move, rename, and keep temp file outputs into one directory to be read back by user with sc.binaryFiles() and then remove the original temp directory used by the containers.

Parameters:
  • origin_directory – temporary directory mounted by container
  • destination_directory – the shared temporary directory, either locally or on hdfs, for all the output files to be moved
  • input_path – the full path to the file to be moved to on HDFS
  • master_type – spark master type, yarn or standalone
Returns:

path to new file

lofn.tmp_file_handler.read_back(shared_dir, output_file)[source]

Read text files back into an iterable from temp file then destroy the temp file and its parent directory.

Parameters:
  • shared_dir – the temporary directory that the container mounted
  • output_file – path to output filename
Returns:

iterable of output file contents

lofn.tmp_file_handler.read_binary(shared_dir, output_file)[source]

Read back binary file output from container into one string, not an iterable. Then remove the temporary parent directory the container mounted.

Parameters:
  • shared_dir – temporary directory container mounted
  • output_file – path to output file
Returns:

str of file contents