Working with CSV Files in PySpark

Reading and writing CSV files in PySpark involves several steps. To read a CSV file and create a DataFrame, you first need to create a SparkSession, which is the entry point to using Spark functionality. Once you have a SparkSession, you can use the spark.read.csv() method to read a CSV file and create a DataFrame. This method takes the path to the file, the schema of the DataFrame, and other options like the header and inferSchema.

After reading the CSV file into a DataFrame, you can use various DataFrame methods to explore the data such as printSchema(), show(), select(), groupBy(), etc.

Writing a DataFrame to a CSV file in PySpark can be done using the write.csv() method. This method takes the path to the output file and options like the header and the mode (overwrite, append, ignore). Additionally, you can use the spark-csv package to write DataFrames to CSV files by setting the format to "com.databricks.spark.csv" and specifying the options like header.

PySpark Read CSV File

The pyspark.sql.DataFrameReader.csv() method can be used to read a CSV file and create a DataFrame in PySpark. The method takes several options that can be used to specify the behavior of the CSV file reader. Here is an example of how to use the csv() method to read a CSV file and create a DataFrame in PySpark:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("CSV to DataFrame").getOrCreate()

# Load the CSV file into a DataFrame
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True, sep=",")

In this example, header=True tells Spark to use the first row of the CSV file as the column names for the DataFrame, and inferSchema=True tells Spark to try to infer the data types of the columns from the data in the CSV file. sep="," is used to specify the separator used in the CSV file, which is the default value.

File Read Options

Here are some more details about the options that can be passed to the csv() method:

  • header: This option is used to specify whether the file contains a header or not. The default value is True. If you set it to False, then Spark will not use the first line of the file as the column names, and will instead generate column names of the form "_c0", "_c1", etc.
  • inferSchema: This option is used to specify whether the data types of the columns should be inferred from the data or not. The default value is False, which means that all columns will be treated as strings. If you set it to True, then Spark will try to infer the data types of the columns from the data. This can save you the trouble of having to specify the data types manually, but it can also lead to errors if the data types are not correctly inferred.
  • sep: This option is used to specify the separator used in the CSV file. The default value is ,.
  • mode: This option is used to specify the behavior of the file reader when it encounters corrupt records. The default value is PERMISSIVE, which means that the file reader will attempt to parse corrupt records and assign them to a special column named _corrupt_record. Other options are DROPMALFORMED which drops the rows with malformed records and FAILFAST which throws an exception when it encounters corrupt records.
  • nullValue: This option is used to specify the string that indicates a null value. The default value is null.
  • timestampFormat: This option is used to specify the format of timestamp columns. The default value is yyyy-MM-dd'T'HH:mm:ss.SSSXXX.
  • dateFormat: This option is used to specify the format of date columns. The default value is yyyy-MM-dd.
  • maxColumns: This option is used to specify the maximum number of columns to read.
  • maxCharsPerColumn: This option is used to specify the maximum number of characters per column to read.
  • multiLine: This option is used to specify whether or not to allow multi-line CSV records.

Once you've specified the options you want to use, you can call the csv() method to read the CSV file and create the DataFrame. Once the DataFrame is created, you can use the DataFrame's API to perform various operations on the data, such as filtering, grouping, and aggregating. You can also use the show() method to display the first 20 rows of the DataFrame, or the printSchema() method to print the schema of the DataFrame, which displays the column names and data types of the DataFrame.

It's also important to note that the above code snippet is just an example of how to read a CSV file and create a DataFrame in PySpark. Depending on the size and structure of your data, you may need to adjust the options and methods used to read the file and create the DataFrame.

Also, you can use pandas_udf functions to work with large dataframes as it is more efficient than normal dataframe operations.

Write CSV files in PySpark

Once you have a DataFrame in PySpark, you can use the write.csv() method to write the DataFrame to a CSV file. Here is an example of how to use this method:

df.write.csv("path/to/output.csv", header=True, mode="overwrite") 

This will write the DataFrame to a CSV file named "output.csv" in the specified directory and the header parameter is used to specify whether to include the column names in the output file and the mode parameter is used to specify the behavior when the output file already exists. The available options are overwrite, append and ignore.

You can also use coalesce(1) or repartition(1) before saving the dataframe to a single file, so that you can avoid the multiple part files generated.

Additionally, you can also use the spark-csv package to write DataFrames to CSV files. To use this package, you first need to add it as a dependency in your project, and then you can use the dataframe.write.format("com.databricks.spark.csv").options(header="true").save("path/to/output.csv") to save the dataframe to a csv file.

In summary, you can use the write.csv() method or the spark-csv package to write DataFrames to CSV files in PySpark. Both methods have their own advantages and disadvantages, so you should choose the one that best suits your needs.