#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2017 Eli Lilly and Company
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Wrapper to execute docker containers as spark tasks.
"""
from __future__ import print_function, unicode_literals
import os
import shutil
from contextlib import contextmanager
from getpass import getuser
import logging
from lofn.base import tmp, hdfs
from lofn import tmp_file_handler, docker_handler
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %('
'message)s')
[docs]class DockerPipe(object):
"""Main entry point for lofn."""
def __init__(self, spark_configuration, **kwargs):
"""
:param spark_configuration: spark configuration (e.g.
pyspark.SparkConf())
:keyword temporary_directory_parent: specify an absolute path to the
parent directory to use for temp dirs and files. The default is None,
which then uses a location specified by a platform-dependent list
or uses environment variables TMPDIR, TEMP, or TMP. If specifying
a path, it either needs to exist on all nodes or you must run it
with appropriate permissions so lofn can attempt to create it.
:keyword shared_mountpoint: the path to the directory inside each
docker container that maps to the temporary directory on the host.
(Default '/shared/')
:keyword volumes: User defined volumes to mount in the container.
This is useful for data that is not being partitioned and needs to
be read into each container, such as a global reference. This must
be given as a dictionary. The keys for the dictionary are the
absolute paths to the directory on the host you want to bind. The
value of each of these keys is the information on how to bind that
volume. Provide a 'bind' path, which is the absolute path in the
container you want that volume to mount on, and optionally provide
a 'mode', as ro (read-only, the default) or rw (read-write). The
structure of this input is similar to docker-py volumes,
and resembles the following structure:
{'[host_directory_path]': {'bind': '[container_directory_path]',
'mode': '[ro|rw] } }
"""
self.shared_mountpoint = kwargs.get('shared_mountpoint', '/shared/')
self.user_volume = kwargs.get('volumes')
if self.user_volume:
docker_handler.validate_user_input(self.user_volume)
self.master_type = _get_master_type(spark_configuration)
self.temporary_directory_parent = kwargs.get(
'temporary_directory_parent', None)
def _map_docker_files(self, image_name, command, **kwargs):
records = kwargs.get('records')
docker_options = kwargs.get('docker_options')
input_mode = kwargs.get('input_mode')
container_input_name = kwargs.get('container_input_name')
container_output_name = kwargs.get('container_output_name')
shared_temp_dir = tmp.create_temp_directory(
directory=self.temporary_directory_parent)
# write partition to temp file
if 'map_udf' in kwargs.keys():
user_defined_function = tmp_file_handler.UDF(
temporary_directory=shared_temp_dir,
user_function=kwargs['map_udf'],
partition=records,
input_mode=input_mode
)
user_defined_function.map_udf()
elif 'reduce_udf' in kwargs.keys():
user_defined_function = tmp_file_handler.UDF(
temporary_directory=shared_temp_dir,
user_function=kwargs['reduce_udf'],
partition_1=kwargs['partition_1'],
partition_2=kwargs['partition_2'],
input_mode=input_mode
)
user_defined_function.reduce_udf()
else:
tmp.write_to_temp_file(records, os.path.join(
shared_temp_dir, container_input_name))
output_file = os.path.join(shared_temp_dir, container_output_name)
# run docker
status = docker_handler.run(
image_name=image_name,
command=command,
bind_files=[self.shared_mountpoint],
volume_files=[shared_temp_dir],
docker_options=docker_options,
volumes=self.user_volume,
master_type=self.master_type,
temporary_directory_parent=self.temporary_directory_parent)
if status: # only has a status message when it has failed
raise docker_handler.DockerFailure(status)
return shared_temp_dir, output_file
@contextmanager
[docs] def map_binary(self, rdd, image_name, command, **kwargs):
"""
Map binary output as a context manager. This currently takes
rdd as input and will output a directory of the
newly written binary files so that they can be read by the user with
sc.binaryFiles. After finishing with the context manager, the temp
files are destroyed.
:param rdd: spark RDD input
:param image_name: docker image
:param command: docker command to run
:keyword container_input_name: the name of the file within the
shared_mountpoint that is written to before the map steps from
the host, and read from as the first step in the map in
the container. (Default 'input.txt')
:keyword container_binary_output_name: the name of the output file
in map_binary step inside the container. This path will belong
inside of the shared_mountpoint which maps to the host temp
directory for that partition. (Default 'output.bin')
:keyword docker_options: additional docker options
:keyword map_udf: function that takes one input, the partition,
and returns a dictionary of filename: contents (as iterable).
:keyword hdfs_tempdir: temporary directory on HDFS to hold binary
files. The default attempts to find the home directory for the user,
but can be overridden by specifying an absolute path to use.
:return: directory path containing the output binary files to be read
"""
docker_options = kwargs.get('docker_options')
container_input_name = kwargs.get('container_input_name', 'input.txt')
container_binary_output_name = kwargs.get(
'container_binary_output_name', 'output.bin')
if self.master_type == 'standalone':
binary_tempdir = tmp.create_temp_directory(
directory=self.temporary_directory_parent)
elif self.master_type == 'yarn':
binary_tempdir = kwargs.get(
'hdfs_tempdir', '/user/{user}/lofn_temp'.format(
user=getuser()))
else:
raise ValueError("Not a valid master_type")
try:
def apply_transformation(partition):
"""
Apply to partition.
:param partition: string (rdd partition)
:return: path to parent directory of output files
"""
if 'map_udf' in kwargs.keys():
# handle the partition writing to temp file via a user
# defined function
shared_tmp_dir, output = self._map_docker_files(
image_name=image_name, command=command,
records=partition, docker_options=docker_options,
map_udf=kwargs['map_udf'],
container_output_name=container_binary_output_name)
else:
shared_tmp_dir, output = self._map_docker_files(
image_name=image_name, command=command,
records=partition, docker_options=docker_options,
container_input_name=container_input_name,
container_output_name=container_binary_output_name)
return tmp_file_handler.handle_binary(
shared_tmp_dir, binary_tempdir, output, self.master_type)
rdd.foreachPartition(apply_transformation)
yield binary_tempdir
finally:
if self.master_type == 'yarn':
hdfs.rm_r(binary_tempdir)
elif self.master_type == 'standalone':
shutil.rmtree(binary_tempdir)
[docs] def map(self, rdd, image_name, command, **kwargs):
"""
Map step by applying Spark's mapPartitions.
This writes the partition to temp file(s) and executes a docker
container to run the commands, which is read back into a new RDD.
:param rdd: a spark RDD as input
:param image_name: Docker image name
:param command: Command to run in the docker container
:keyword container_input_name: the name of the file within the
shared_mountpoint that is written to before the map from
the host, and read from as the first step in the map in
the container. (Default 'input.txt')
:keyword container_output_name: the name of the file the map step
writes to inside the container. This path will belong
inside of the shared_mountpoint which maps to the host temp
directory for that partition. (Default 'output.txt')
:keyword docker_options: additional docker options to provide for
'docker run'. Must be a list.
:keyword map_udf: optional keyword argument to pass a function that
accepts a partition and transforms the data into a dictionary
with a key as filename and value as contents, a list (iterable)
of elements to write to that file within the temporary directory.
:return: transformed RDD
"""
docker_options = kwargs.get('docker_options')
container_input_name = kwargs.get('container_input_name', 'input.txt')
container_output_name = kwargs.get('container_output_name',
'output.txt')
def apply_transformation(partition):
"""
Apply to partition.
:param partition: iterable (RDD partition)
:return: iterable (transformed partition)
"""
if 'map_udf' in kwargs.keys():
# handle the partition writing to temp file via a user
# defined function
shared_tmp_dir, output = self._map_docker_files(
image_name=image_name, command=command, records=partition,
docker_options=docker_options, map_udf=kwargs['map_udf'],
container_output_name=container_output_name)
else:
shared_tmp_dir, output = self._map_docker_files(
image_name=image_name, command=command, records=partition,
docker_options=docker_options,
container_input_name=container_input_name,
container_output_name=container_output_name)
return tmp_file_handler.read_back(shared_tmp_dir, output)
return rdd.mapPartitions(apply_transformation)
[docs] def reduce(self, rdd, image_name, command, **kwargs):
"""
Apply a Spark reduce function to the input RDD. This will take
rolling pairs and write to temp files, run a docker container,
execute a command in the container over the temp files, write to
temp files, and return the result.
:param rdd: a spark RDD as input
:param image_name: Docker image name
:param command: Command to run in the docker container
:keyword container_input_name: the name of the file within the
shared_mountpoint that is written to before the reduce from
the host, and read from as the first step in reduce in
the container. (Default 'input.txt')
:keyword container_output_name: the name of the file the reduce
step writes to inside the container. This path will belong
inside of the shared_mountpoint which maps to the host temp
directory for that partition. (Default 'output.txt')
:keyword docker_options: additional docker options to provide for
'docker run'. Must be a list.
:keyword reduce_udf: The default behavior for handling the pairs of
partitions is to append right to left and write to one temp
file. This can be overridden by supplying a 'reduce_udf'
function that takes two inputs, the pair of partitions,
and transforms them to return a dictionary mapping a key of
filename to value of contents in a list (iterable) of elements
to write to a file within the temp directory.
"""
docker_options = kwargs.get('docker_options')
container_input_name = kwargs.get('container_input_name', 'input.txt')
container_output_name = kwargs.get('container_output_name',
'output.txt')
def combine(left, right):
"""
combine partitions a pair at a time, rolling result.
Default behavior is to append partitions to each other,
otherwise a 'reduce_udf' is accepted.
"""
if not isinstance(left, list):
left = [left]
if not isinstance(right, list):
right = [right]
both = left + right
if 'reduce_udf' in kwargs.keys():
# handle the two partitions writing to temp files via a user
# defined function
shared_tmp_dir, output = self._map_docker_files(
image_name=image_name,
command=command,
docker_options=docker_options,
partition_1=left,
partition_2=right,
reduce_udf=kwargs['reduce_udf'],
container_input_name=container_input_name,
container_output_name=container_output_name)
else:
shared_tmp_dir, output = self._map_docker_files(
image_name=image_name, command=command, records=both,
docker_options=docker_options,
container_input_name=container_input_name,
container_output_name=container_output_name)
return tmp_file_handler.read_back(shared_tmp_dir, output)
result = rdd.reduce(combine)
return result
[docs] def reduce_binary(self, rdd, image_name, command, **kwargs):
"""
Reduce partitions from map_binary output. The format of these
partitions is different so this handles them and also writes the
temp files as one string rather than trying to split newlines since
these are binary.
:param rdd: spark RDD
:param image_name: docker image
:param command: docker command
:keyword container_binary_input_1_name: the name of the left side
file in reduce_binary step inside the container. This path will
belong inside of the shared_mountpoint which maps to the host temp
directory for that partition. (Default 'input_1.bin')
:keyword container_binary_input_2_name: the name of the right side
file in reduce_binary step inside the container. This path will
belong inside of the shared_mountpoint which maps to the host temp
directory for that partition. (Default 'input_2.bin')
:keyword container_binary_output_name: the name of the output file
in reduce_binary step inside the container. This path will belong
inside of the shared_mountpoint which maps to the host temp
directory for that partition. (Default 'output.bin')
:keyword docker_options: additional docker options
:keyword reduce_udf: default behavior for reduce_binary is to
write each input as a temp file to give two binary input files to the
container. Write a UDF here that takes two inputs and outputs a
dictionary mapping filename: contents (a string)
:return: iterable of reduce results
"""
docker_options = kwargs.get('docker_options')
container_binary_input_1_name = kwargs.get(
'container_binary_input_1_name', 'input_1.bin')
container_binary_input_2_name = kwargs.get(
'container_binary_input_2_name', 'input_2.bin')
container_binary_output_name = kwargs.get(
'container_binary_output_name', 'output.bin')
def combine(left, right):
"""
Apply to two partitions. Must be associative and commutative.
Since this has to unpack the binaryFiles type of partition we
need to check if the return was from a previous combine or
directly from the input RDD.
:param left: left partition
:param right: right partition
:return: str of results
"""
if len(left) > 1:
if "file" in left[0] or "hdfs" in left[0]:
left = left[1]
if len(right) > 1:
if "file" in right[0] or "hdfs" in right[0]:
right = right[1]
if 'reduce_udf' in kwargs.keys():
shared_tmp_dir, output = self._map_docker_files(
image_name=image_name,
command=command,
docker_options=docker_options,
partition_1=left,
partition_y=right,
reduce_udf=kwargs['reduce_udf'],
input_mode='binary',
container_output_name=container_binary_output_name)
else:
def default_reduce_temp_files(part_1, part_2):
"""
Apply attributes as names for files within container.
Instead of appending like in `reduce`, both partitions
are supplied for the reduce_binary function to combine,
so this is overriding that behavior and creating this as
the default if a reduce_udf is not specific.
:param part_1: left partition
:param part_2: right partition
:return: dict mapping file name in container as key to
the contents to be written to temp files.
"""
return {container_binary_input_1_name: part_1,
container_binary_input_2_name: part_2}
shared_tmp_dir, output = self._map_docker_files(
image_name=image_name,
command=command,
docker_options=docker_options,
partition_1=left,
partition_2=right,
reduce_udf=default_reduce_temp_files,
input_mode='binary',
container_output_name=container_binary_output_name
)
return tmp_file_handler.read_binary(shared_tmp_dir, output)
return rdd.reduce(combine)
def _get_master_type(configuration):
"""
Set the master type for lofn to use by reading the SparkConf() dictionary.
Fallback to standalone if no master is set.
:param configuration: SparkConf()
:return: master_type (standalone, yarn)
"""
master = configuration.get('spark.master')
if master:
if 'spark' in master or 'local' in master:
master_type = 'standalone'
elif 'yarn' in master:
master_type = 'yarn'
else:
raise ValueError("Not a supported spark master.")
else:
logging.warning("No master type set in the spark configuration, "
"attempting to run as standalone.")
master_type = 'standalone'
return master_type