pySparkUtils package¶
Submodules¶
pySparkUtils.utils module¶
A set of utilities to manage pySpark SparkContext object Assumes you have pyspark (and py4j) on the PYTHONPATH and SPARK_HOME is defined
-
pySparkUtils.utils.
balanced_repartition
(data, partitions)¶ Reparations an RDD making sure data is evenly distributed across partitions for Spark version < 2.1 (see: https://issues.apache.org/jira/browse/SPARK-17817) or < 2.3 when #partitions is power of 2 (see: https://issues.apache.org/jira/browse/SPARK-21782)
Parameters: - data – RDD
- partitions – number of partition to use
Returns: repartitioned data
-
pySparkUtils.utils.
change
(sc=None, app_name='customSpark', master=None, wait='ips', min_cores=None, min_ips=None, timeout=30, refresh_rate=0.5, fail_on_timeout=False, **kwargs)¶ Returns a new Spark Context (sc) object with added properties set
Parameters: - sc – current SparkContext if None will create a new one
- app_name – name of new spark app
- master – url to master, if None will get from current sc
- wait – when to return after asking for a new sc (or max of timeout seconds): ‘ips’: wait for all the previous ips that were connected to return (needs sc to not be None) ‘cores’: wait for min_cores None: return immediately
- min_cores – when wait is ‘cores’ will wait until defaultParallelism is back to at least this value. if None will be set to defaultParallelism.
- min_ips – when wait is ‘ips’ will wait until number of unique executor ips is back to at least this value. if None will be set to the what the original sc had.
- timeout – max time in seconds to wait for new sc if wait is ‘ips’ or ‘cores’
- fail_on_timeout – whether to fail if timeout has reached
- refresh_rate – how long to wait in seconds between each check of defaultParallelism
- kwargs – added properties to set. In the form of key value pairs (replaces ‘.’ with ‘_’ in key) examples: spark_task_cores=‘1’, spark_python_worker_memory=‘8g’ see: http://spark.apache.org/docs/latest/configuration.html
Returns: a new SparkContext
-
pySparkUtils.utils.
executor_ips
(sc)¶ gets the unique ip addresses of the executors of the current application This uses the REST API of the status web UI on the driver (http://spark.apache.org/docs/latest/monitoring.html)
Parameters: sc – Spark context Returns: set of ip addresses
-
pySparkUtils.utils.
fallback
(func)¶ Decorator function for functions that handle spark context. If a function changes sc we might lose it if an error occurs in the function. In the event of an error this decorator will log the error but return sc.
Parameters: func – function to decorate Returns: decorated function
-
pySparkUtils.utils.
load_rdd_from_pickle
(sc, path, min_partitions=None, return_type='images')¶ Loads an rdd that was saved as one pickle file per partition
Parameters: - sc – Spark Context
- path – directory to load from
- min_partitions – minimum number of partitions. If None will be sc.defaultParallelism
- return_type – what to return: ‘rdd’ - RDD ‘images’ - Thunder Images object ‘series’ - Thunder Series object
Returns: based on return type.
-
pySparkUtils.utils.
regroup
(*args, **kwargs)¶ Regroup an rdd using a new key added that is 0 ... number of groups - 1
Parameters: - rdd – input rdd as a (k,v) pairs
- groups – number of groups to concatenate to
- check_first – check if first value is a key value pair.
Returns: a new rdd in the form of (groupNum, list of (k, v) in that group) pairs
Example:
>>> data = sc.parallelize(zip(range(4), range(4))) >>> data.collect() [(0, 0), (1, 1), (2, 2), (3, 3)]
>>> data2 = regroup(data, 2) >>> data2.collect() [(0, [(0, 0), (2, 2)]), (1, [(1, 1), (3, 3)])]
-
pySparkUtils.utils.
save_rdd_as_pickle
(*args, **kwargs)¶ Saves an rdd by grouping all the records of each partition as one pickle file
Parameters: - rdd – rdd to save
- path – where to save
- batch_size – batch size to pass to spark saveAsPickleFile
- overwrite – if directory exist whether to overwrite
-
pySparkUtils.utils.
thunder_decorator
(func)¶ Decorator for functions so they could get as input a thunder.Images / thunder.Series object, while they are expecting an rdd. Also will return the data from rdd to the appropriate type Assumes only one input object of type Images/Series, and up to one output object of type RDD
Parameters: func – function to decorate Returns: decorated function