pyspark.RDD.saveAsHadoopDataset#
- RDD.saveAsHadoopDataset(conf,keyConverter=None,valueConverter=None)[source]#
Output a Python RDD of key-value pairs (of form
RDD[(K,V)]) to any Hadoop filesystem, using the old Hadoop OutputFormat API (mapred package). Keys/values areconverted for output using either user specified converters or, by default,“org.apache.spark.api.python.JavaToWritableConverter”.New in version 1.1.0.
- Parameters
- confdict
Hadoop job configuration
- keyConverterstr, optional
fully qualified classname of key converter (None by default)
- valueConverterstr, optional
fully qualified classname of value converter (None by default)
See also
Examples
>>>importos>>>importtempfile
Set the related classes
>>>output_format_class="org.apache.hadoop.mapred.TextOutputFormat">>>input_format_class="org.apache.hadoop.mapred.TextInputFormat">>>key_class="org.apache.hadoop.io.IntWritable">>>value_class="org.apache.hadoop.io.Text"
>>>withtempfile.TemporaryDirectory(prefix="saveAsHadoopDataset")asd:...path=os.path.join(d,"old_hadoop_file")......# Create the conf for writing...write_conf={..."mapred.output.format.class":output_format_class,..."mapreduce.job.output.key.class":key_class,..."mapreduce.job.output.value.class":value_class,..."mapreduce.output.fileoutputformat.outputdir":path,...}......# Write a temporary Hadoop file...rdd=sc.parallelize([(1,""),(1,"a"),(3,"x")])...rdd.saveAsHadoopDataset(conf=write_conf)......# Create the conf for reading...read_conf={"mapreduce.input.fileinputformat.inputdir":path}......# Load this Hadoop file as an RDD...loaded=sc.hadoopRDD(input_format_class,key_class,value_class,conf=read_conf)...sorted(loaded.collect())[(0, '1\t'), (0, '1\ta'), (0, '3\tx')]