Big Data: Part-4 Let’s Explore PySpark

Deeksha Sharma
22 min readMay 11, 2024

--

Apache SPARK is a parallel processing framework just like Hadoop. Apache Spark was invented to address the shortcomings of Apache Hadoop. It is predominantly developed using Scala. Several shortcomings of Hadoop were addressed in Spark, which gave Spark a 10x-100x improvement over Hadoop. Yes up to 100x it is! Sprk reduces the Disk I/O operations and makes use of RAM which makes it faster than Hadoop as Hadoop used to do every operation as a disk-based operation. But let me share that SPARK is not a complete memory-based architecture there are moments with Spark also where disk operations become inevitable!

🙄What is Scala?

Scala is a high-level multi-paradigm language i.e. it supports both functional as well as object-oriented style programming. The functional style of programming supports parallel processing tasks quite graciously. In a functional style of programming value of an object cannot be changed during the whole course of execution. You can understand this using C++ :

#In functional style programming this is not allowed
const int a = 10;
a++; #Changing value of a is not allowed

#Instead functional style of programming likes to have it like this
int a = 10;
int b = a+1;

#This is the actual reason that Functional style of programming supports parallel
#tasks as the state of the object can't be changed so race condition and
#deadlocks conditions would not be coming here.

#Functional style of programming can be implemented in any language but Scala
#natively supports this.

🔮How is Spark better than Hadoop?

🎒Hadoop writes all the intermediate results on the hard disk and then reads them again from the disk whereas Spark stores the intermediate results in the RAM itself which makes Spark faster than Hadoop.

🎒Hadoop is built for Batch-Data processing but Spark can do both batch as well as streaming data processing.

🎒In the case of security Hadoop wins the race as it provides Kerberos authentication and ACL Authorization but Spark does not have these. But this is not a major issue, when Spark uses HDFS it gets ACL authorization and when it uses YARN as its resource manager it gets Kerberos authentication also.

🎒For fault tolerance Hadoop uses the concept of replication factor whereas Spark uses the DAG for fault tolerance. Don’t get intimidated by what a DAG is, I will be explaining it later in the article.

📡Overview of Spark Architecture

Spark also works on master-slave architecture.

The driver is the first point from where the job starts to execute. Driver is a software service that resides on the machine on the cluster and it launches the JVM so that a SparkContext object can be instantiated. The driver here can be compared to the Driver Code in the Hadoop Ecosystem.

Spark does not have any built-in storage system.It is just a computing engine.

SparkContext can be understood as an object that should live throughout the execution of the Job. And once the job finishes this Spark Context related to that job is cleared up from the memory. Assume it is like you create objects in object-oriented programming.

The cluster manager in Spark can be compared to the Resource Manager(YARN) of Hadoop. The cluster manager is the one which is responsible for allocating resources to the jobs on worker nodes. Spark has its own cluster manager called Mesos or it can also use the YARN of Hadoop as a cluster manager. These days Kubernetes is also used as a cluster manager in Spark.

Worker Nodes in Spark are the same as Data Nodes in Hadoop.

In Hadoop, on one data node, we can have N number of mappers depending upon the splits done. Now on each data node of Hadoop v2, we had an App Master i.e. per application master. So, in Spark Executor is just like the AppMaster of Hadoop. Remember a single worker node in Spark can have as many executors as it wants but only a single executor exists for a single Job. Summing up If you launch 2 jobs in the spark cluster then two executors will be there on the worker node for those 2 jobs respectively. In each executor, several tasks could be found there according to the splits that are done of data.

In Spark, the central parallel procession framework is Spark Core just like we used to have Map Reduce in Hadoop. Apache Spark does not use Map Reduce at all.

Hadoop is mostly used to support HDFS file systems to read data but with Spark, you can read data from HDFS, Cassandra, HBase, MySQL, etc. So this is the advantage of the Data Source API of Spark which connects Spark to read from several data sources. Spark is not a completely obsolete framework from Hadoop, it can work with Hadoop as its DataSource API lets it read data lying in the HDFS.

Now we have several packages like GraphX(to do graphs-related computations over data), and MLlib(to deal with machine learning jobs). Dataframe API is the one that connects these libraries to Spark core so that several computations can be done over data with the help of these libraries. Like SparkSQL package could be understood as Hadoop having Hive with it. In Spark, if you have structured data and want to fire SQL-like queries over it, then you can integrate the SparkSQL package using the DataFrame API.

Since Spark core is written completely using Scala it expects that you submit a job by writing code in Scala itself. But Spark provides us the flexibility to submit a job to Spark Core either using Python(most popular), Java, or R.

So, in Spark when you submit a Job, it gets instantiated through the Driver Program, a spark context gets generated that goes to live throughout the execution of the job, and then the cluster manager schedules the job on several data nodes and makes sure that it gets to break into small tasks and then these small tasks will run on corresponding worker nodes with the help of executors(per application, these executor exists).

🤔Since you have understood every component of this architecture now is the time to understand how they work collaboratively:-

Suppose we have a cluster of 5 machines where one machine is the master and the rest of the machines are slaves.

We are assuming that we are using YARN as a cluster manager here so now on the master machine, a Resource manager will be there and the worker nodes will have a node manager. Now the client will be submitting a spark job through his device and that will go to this Resource manager. When the client submitted the job he demanded that I want:

→Driver of 20GB, Executor of 20GB, 5 executors, and I want 5 CPU cores.

🎤— num-executors →Defines the number of executors in the entire cluster. Remember I am saying the entire cluster, not each node!!

🎤— executor-memory →It is the amount of memory allocated to each executor process to store intermediate data, cached data, and execution buffers.

🎤— executor cores →No of cores per executor.

Each executor is a JVM instance. So we can have multiple executors in a single Node. A node can have multiple executors and cores.

Number of cores = Concurrent tasks as executor can run
So we might think, more concurrent tasks for each executor will give better performance. But research shows that any application with more than 5 concurrent tasks, would lead to bad show. So stick this to 5.

All processors of today have multiple cores (e.g. 1 CPU = 8 Cores)

Most processors of today are multi-threaded (e.g. 1 Core = 2 Threads, 8 cores = 16 Threads)

A Spark Task runs on a Slot. 1 Thread is capable of doing 1 Task at a time. To make use of all our threads on the CPU, we cleverly assign the number of Slots to correspond to a multiple of the number of Cores (which translates to multiple Threads).

To understand better about the performance output based upon these core and executor number selection do refer this awesome article :https://medium.com/analytics-vidhya/how-no-of-cores-and-amount-of-memory-of-the-executors-can-impact-the-performance-of-the-spark-276cf58900a3

Now Resource manager will see this and it will ask one of the worker nodes to create a container of 20GB RAM for the driver and the application master will be controlling this container.

Driver is a Java process. This is the process that runs the main() function of the application and creates the Spark context. It executes the user code and creates a SparkSession or SparkContext and the SparkSession is responsible for creating DataFrame, DataSet, RDD, executing SQL, performing Transformation & Action, etc.

If you set config “spark.deploy.mode = cluster”, then your driver will be launched at your worker hosts.
If spark.deploy.mode=driver, which is the default value, then the driver will run on the machine you submit your application.

Now Driver will be contacting the resource manager for the allocation of the executors of the worker machines, driver is like the lead from here onwards.

The cluster manager will launch executors on the worker nodes that have data needed for this job using data locality. Then tasks will be launched in these executors.

Worker nodes keeps sending heartbeats to the driver program.This snapshot gonna definitely help to draw clear understanding of all this in your brain:

Let’s understand a few more times of Pyspark:

Application →When we do a spark-submit we submit one application. Spark Application consists of a driver process and a set of executor processes.

Job →The number of jobs will be equal to the action statements in your pyspark code.

Stage →Each job will be broken down into small stages. It represents a set of tasks that can be executed together

Task →Now each stage can have many tasks. A task is the smallest unit of work in Spark. It represents a single operation that can be executed on a partitioned subset of data. Tasks are executed by the worker nodes in parallel, leveraging the distributed computing capabilities of Spark.

Now let me share some words on the partition of data in Pyspark. In HDFS, files are already partitioned into blocks for distributed storage. However, when reading data from HDFS into PySpark, the concept of partitions in PySpark refers to how the data is distributed across the Spark cluster for processing, rather than the physical storage layout in HDFS.

Remember one task works upon one partition.1 executor for 1 partition for 1 task at a time). Cores are slots for tasks, and each executor can process more than one partition at a time if it has more than one core.

Number of tasks = No. of Stages * Number of partitions in each stage.

🥌Multiple tasks can be executed at any stage because no of tasks equals the number of cores.

Remember for each job at least 1 stage having at least 1 task will always be created.

Whenever we do a groupBy operation, by default 200 partitions and 200 tasks get created. Maybe all the partitions might not have the data within them.

🏖️Execution Plan In Spark

A very nice article to follow on execution in spark is https://medium.com/datalex/sparks-logical-and-physical-plans-when-why-how-and-beyond-8cd1947b605a

🎓️Spark Deployment Modes

🫧Standalone Mode → This mode is used for learning purposes mostly. This is very similar to the pseudo-distributed mode of Hadoop where spark services run on different Java Virtual Environments(JVMs) on a single machine. This is also known as a standalone cluster. In this mode, only Spark will be installed, no YARN and MESOS will be there.It will be using the file system of your machine.

🫧Local Mode →This mode runs the various Spark services on the single JVM in a single machine. This mode is seldom used. It does not require even HDFS.

🫧Cluster Mode with YARN or MESOS →This mode works with Master Cluster Manager or YARN. In this mode spark services run on different machines in the network. This one is used in the production environment. This is a fully distributed mode.

🫧Spark in Map-reduce(SIMR) →This mode lets you submit map-reduce code which gets converted to a Spark job internally and executes further in the same manner as spark jobs are executed. This mode was used when Spark was new and people using Hadoop were reluctant to move to Spark.

The biggest problem with Hadoop is Disk I/O operations because reading and writing to disk takes time and Spark solved this problem. Spark allows the result of computations to be saved in memory i.e. RAM for future re-use. Reading the data from memory is much faster than reading from the disk.

But I am repeating it, that Spark is not a completely in-memory-based parallel processing framework. When the object is too large and memory is low then Spark also needs to move the object to the Disk.

But Remember computations will always happen over data in RAM not on the data lying on the hard disk.

So in the above image, on the left-hand side, i.e with Hadoop what will happen is that the result of the first Map-Reduce phase will be written to the hard disk of the data node and then again the second mapper have to read it again from the hard disk. But in the case of Spark what’s gonna happen is that the result of the first transformation can be cached in the memory and then the second phase can get the input from here itself, without having the need to read from the hard disk. I hope now must have got the actual magic of Spark clearly! We just eliminated 1 disk-write operation with Spark.

📌Cluster and Client Mode in Spark

In client mode, the driver program runs on the client machine (Edge Node) where the application is submitted. This means the client machine initiates the SparkContext and submits tasks to the Spark cluster for execution whereas in cluster mode, the driver program runs within the Spark cluster, typically on one of the cluster’s worker nodes. This snapshot will make you more clear on this.

Credits: https://www.linkedin.com/pulse/spark-deployment-modes-client-mode-vs-cluster-sai-prasad-padhy-0krjc/

🔮RDDs: Building blocks of Spark

Firstly we should understand why the need for RDDs arises. Suppose with spark you are using HDFS as a storage layer then data will be stored in different machines of the cluster. Now a data structure like RDD is needed so that we can work with this distributed data.

Spark Objects are in-memory variables. The data loaded in spark objects is called as an RDD(Resilient Distributed Dataset). Suppose data is being read from HDFS then it will be loaded in a spark object and an RDD will be formed and later all the transformations will be done on this RDD and data will be stored back.

In Hadoop data is by default in partitioned form as data gets divided according to data block size. But in Spark suppose you are reading data from Cassandra then it will be a single file. So in Spark, if you read it as a single file then a single executor will be working on it and that will be slow. That’s why the concept of RDD was brought in spark so that we can have partitioned data.

See, one executor atleast needs one partition to work upon. An executor is a JVM container so it needs one core for an executor to run. So a device with two CPU cores can launch a maximum of 2 executors.A executor can work on more than one partitons parallely.

🙃How computations are really done on the data in Spark?

The spark object that is trying to pull or read the data from the underlying file system like the HDFS file system in the case of big data has to get the data from different data nodes of the cluster. On each data node, data will be brought from disks of the node into their RAMS and several computations will be done on each node respectively then the result will be captured by the driver program from where the job actually started or the result will be stored in HDFS itself that is hard disks of the node if it’s too huge to be fit into the RAM of a single machine.

💭Transformations and Actions in Spark

Transformations in Spark refer to the changes you gonna do to the existing data like filtering or sorting the data. Now transformations are of two major types :

🥁Narrow Transformations:- These transformations do not need the movement of data between partitions.Like map or filter transformations.

🥁Wide Transformations:- These need the movement of data between partitions like groupBy Transformations will need to see the data residing on other partitions also.

Wide transformations make the process slow so we always try to replace wide transformations with narrow ones to optimize code.

Actions are terms like show and count. An interesting thing to note is that Spark will not do anything even will not read the file until and unless you call some action. When you pass some action like show me the output, only then Spark will start to execute whatever you have written in lines before that action.

Whenever a wide transformation occurs always a new stage is created.

📢Partitions and Data Locality

Data locality says that whichever machine's hard drive will have the data block of the file on which computations have to be performed, data will be loaded into the RAM of that machine itself. And it’s quite logical!

The concept of partition is that logically the file has a single path but physically the data of that file will be spread across different data nodes in the cluster. So, the Spark objects are partitioned objects so that they can read data from different data nodes.

Notice the above image, In HDFS a file was spread across three data nodes so the Spark Object i.e. Object 1 has been partitioned(has been broken down into 3 physical blocks) and this object has been distributed across these 3 nodes to have data from all three nodes.

🔦Properties of Spark Data objects i.e. RDDs

🔗RDDs are partitioned and distributed in nature as we discussed above.

🔗RDDs are immutable that is they cannot be changed.

One thing, I wanna share is that the concept of this immutability of spark objects is not something new in Spark because in HDFS also you cannot make changes to the data. It’s like writing once read many so this concept of immutability is taken from there itself.

And you know that these RDDs are fault tolerant only due to this property of immutability because the next RDD we will be forming is by applying some transformations on the previous RDD.So, in case this present one gets corrupted we can obtain it again by applying those same transformations again on that previous RDD.

With RDD you have to tell spark each and every step that is spark will not be doing any optimazation when you are using RDD, you yourself have to take care about writing optimize code using RDDs because RDD itself is that building block of spark.
Whereas when you work with DataFrame APIs you just have to write what you want to do and spark will be taking care of doing that optimizely.But with RDDs you have more flexibility as whole control is in your hands.
One more thing is that RDD can work on unstructured data whereas this Dataframe APIs can work only on structured data.

🌿Workflow of Apache Spark

Let’s understand the flow of job execution in Apache Spark now:

Step 1 → From the client machine, the client will be submitting the Spark Job.

Step 2 → The machine on which Driver service is running in JVM will be accepting this job and will be instantiating a Spark Context Object.

Step 3 →This Spark context object will be living throughout the execution of this Job.

Step 4 → Now Cluster manager comes into the scene and makes sure that the job gets broken down into smaller tasks and these tasks get executed in the executor on the respective data nodes.

Step 5 →Computations happen over the data residing on separate worker nodes.

Step 6 → The Result of all those computations is then brought back to the node where the driver service is running only in the case data is able to fit into RAM of that machine otherwise it gets stored in HDFS.

🧊RDD Lazy Evaluation Model or DAG Formation

With immutability, there is one problem multiple copies of the same data will be there and too many passes will be there of the same data. So to solve this problem lazy evaluation comes into the picture.

In Spark, Lazy Evaluation means that You can apply as many transformations as you want, but Spark will not start the execution of the process until an ACTION is called.

You can get it better after going through the below lines:

val c1 = collection.map(value => value+1) //no computation happen till now
val c2 = c1.map(value =>value+2) //no computation even till now because no action called till now
print(c2) --> Now the computation will happen

And this computation will happen by combining these above 2 computations
val c2 = collection.map(value => {var result = val+1 result = result+2})

#So now you can see that result of this part result = val+1 is stored in memory and then computation further over that is done
#So this lazy compuation saved the multiple passes between spark objects

So the concept of lazy evaluation says that the code you will be writing will not be executed immediately instead it will start to run according to the steps created in DAG(Directed acyclic graph) only when you perform some action like print, count, etc

We can write 100s of lines of code where base RDD is getting filtered and transformed but until and unless ACTION is called out somewhere in the code nothing gonna happen. Before calling the ACTION even the data will not be loaded in the RDD, only a directed acyclic graph will keep forming(DAG).

So, graph building keeps on happening in the background but the execution of the job only happens when ACTION is called out.

Every action forms one job and each job has its own DAG.

Transformations are the operations that are meant to manipulate the RDDs and a spark job is a collection of several transformations.Actions are those operations that have the duty of triggering the execution of a sequence of transformations.

🌽R stands for Resilient in RDDs

RDDs are fault-tolerant or you can say resilient distributed datasets.RDDS that are lost or corrupt due to the course of execution can be reconstructed easily due to the concept of Lineage. Lineage is the history of how an RDD was created from its parent RDD through transformations. To get a lost RDD, we just need to replay those transformations.

Suppose in the above image RDD2 gets lost then it can be recreated through the lineage history that we were maintaining like this:

0 Creating Base RDD

1 Multiply the RDD elements by 2.

2 Increment the values by 6.

3 select only those which are divisible by 4.

Now to get the lost RDD 2 we just have to replay the replay step 2 on RDD 1 and wohoo! we got the lost one. And don’t get confused about how we gonna get RDD1, it is cached in memory.

One cool thing about RDDs, that I want to share is that if we feel that a certain set of RDDs or even a single RDD will be used in the future then we can cache that in the memory of the executor itself.

⛩️What are the ways of creating an RDD?

🌈RDDs can be created by reading by reading a big-data file directly from an external file system.

🌈Second way is using the Parallelize API →If the file source is not distributed then as a developer we call this function to break the data into smaller distributed partitions.

🌈Using the makeRDD Api.

Installing PySpark in your linux machine is super easy.Just follow this article : https://medium.com/@agusmahari/pyspark-step-by-step-guide-to-installing-pyspark-on-linux-bb8af96ea5e8
This will be installing pyspark in localmode in your machine →Do not require any cluster manager as everything will be running in single JVM.

This is how your pyspark shell will look like once installation will be successful: -

We do not prefer to use the shell for writing scripts as using IDE is much easier so for that you can follow this article to install the anaconda navigator — https://phoenixnap.com/kb/install-anaconda-ubuntu

To activate the virtual environments in anaconda follow this guide: https://www.digitalocean.com/community/tutorials/how-to-install-the-anaconda-python-distribution-on-ubuntu-22-04

A few terms that you will hear in this world of spark:

✌️Application → When you submit a program that is when you press the spark-submit it gives rise to an application.

✌️Job → Your program will have many actions written in it. One Action gives rise to one job in spark.

✌️Stages →Now in one Job you might be doing many transformations so all these will be broken down into smaller logical plans named as stages.1 job can have multiple stages.

✌️Task →Now in actual spark will be performing the transformations told to it by our program on the data. This is a task.Multiple tasks can be there in a stage.

Remember for a Job it is mandatory to have at least one stage and inside that stage at least one task should be there.
Whenever you do a wide dependecy transformation a new stage will be created.
If you have N partitions then N tasks will be created.
The number of shuffle partitions is fixed during a spark job execution with the default value as 200.

Uff!🥱 Enough of theory now. Let’s see some hands-on commands now to work with Pyspark.

To run Pyspark in Jupyter-notebook follow these steps:

#Run this in terminal first 
export PATH=/home/yourUserName/anaconda3/bin:$PATH

#After that run
pip install ipykernel
python -m ipykernel install --user --name=myenv

#Then type in terminal
jupyter-notebook

Spark is written in Scala but it provides an API so that it can work with python.PySpark is a python API.

Although the term Application Programming Interface is mostly used for the element exposing the services of a web server, it has a more general meaning. For frameworks as extended as Spark, it names specific ways of interacting with the library available for the user.

Spark features different APIs with different purposes, which serve as front-facing interfaces masking more complex underlying or structural code. Even if every Spark job runs RDD-based pipelines, Spark offers different ways to construct a job:

Core API (Spark Core): user manipulates directly the RDDs, it is the low level API

Dataset API (Spark SQL): User manipulates high level typed objects

DataFrame API (Spark SQL): User manipulates high level untyped objects

SQL API (Spark SQL): User writes SQL query strings

#Shows all the configs of spark in CLI
sc._conf.getAll()

Reading Data in Spark

Firstly install the findSpark library to find the spark in your machine and connect that with Jupyter-notebooks

import findspark
findspark.init()

Now create a spark context:

import pyspark
from pyspark.sql import SparkSession
#Creating the spark context
spark = SparkSession.builder.appName('learnSpark').getOrCreate()

Let’s read a CSV file

flight_df = spark.read.format("csv")\
.option("header", "true")\
.option("inferschema", "false")\
.option("mode", "FAILFAST")\
.load("Documents//2010-summary.csv")

flight_df.show(5)
flight_df.printSchema() #prints the schema
#option tells that whether we want to have headers in the datframe or not
#Using inferSchema=false (default option) will give a dataframe where all columns are strings (StringType).By setting inferSchema=true, Spark will automatically go through the csv file and infer the schema of each column.
#there are three types of mode:
#failfast -->while reading the file if any corrupted record is there just stop reading
#Dropmalformed-->drop the corrupted record and read rest of the rows
#Permissive-->put null in place of corrupted record.THis is the default one.

Voila! it worked and this is the output:

Now suppose you want to define your own schema while reading the data so for that we have two options:

#Option 1:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
#Creating the spark context
spark = SparkSession.builder.appName('learnSpark').getOrCreate()

mySchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", IntegerType(), True)
])

flight_df3 = spark.read.format("csv")\
.option("header", "true")\
.option("inferschema", "false")\
.schema(mySchema)\
.option("mode", "FAILFAST")\
.load("Documents//2010-summary.csv")

flight_df3.show(5)
flight_df3.printSchema()

Output is :

setting up the manual schema like we have done above is of great help when we want to print corrupted records.

Now suppose you have a corrupted record and you wanna catch it then you can just make one more column in schema _corrupt_record and it will catch it.

mySchemaCorrupted = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", IntegerType(), True),
StructField("_corrupt_record", StringType(), True)
])
flight_df4 = spark.read.format("csv")\
.option("header", "true")\
.option("inferschema", "false")\
.option("mode", "permissive")\
.schema(mySchemaCorrupted)\
.load("Documents//2010-summary.csv")

flight_df4.show(truncate=False)
Output of above code

You can also capture corrupted records in a file but this feature is specific to databricks.

flight_df5 = spark.read.format("csv")\
.option("header", "true")\
.option("inferschema", "false")\
.schema(mySchemaCorrupted)\
.option("badRecordsPath", "/Documents/badRecordsPath")\
.load("Documents//2010-summary.csv")

flight_df5.show(truncate=False)

Reading JSON File in PySpark

Let’s first learn to read single-line delimited JSON:

#line_delimited_json
{"name":"Manish","age":20,"salary":20000},
{"name":"Nikita","age":25,"salary":21000},
{"name":"Pritam","age":16,"salary":22000},
{"name":"Prantosh","age":35,"salary":25000},
{"name":"Vikash","age":67,"salary":40000}

df1 = spark.read.format("json")\
.option("inferSchema", "true")\
.option("mode", "PERMISSIVE")\
.load("Documents//demo1.json")
df1.show()

Next, let’s read multiline-json

#Multi-line JSON
[
{
"name": "Manish",
"age": 20,
"salary": 20000
},
{
"name": "Nikita",
"age": 25,
"salary": 21000
},
{
"name": "Pritam",
"age": 16,
"salary": 22000
}
]

df2 = spark.read.format("json")\
.option("inferSchema", "true")\
.option("mode", "PERMISSIVE")\
.option("multiline", "true")\
.load("Documents//demo4.json")
df2.show()

Suppose I pass an incorrect json then spark will automatically create a corrupt record column capturing that record. Let’s see it in action:-

#Corrupted json
{"name":"Manish","age":20,"salary":20000},
{"name":"Nikita","age":25,"salary":21000},
{"name":"Pritam","age":16,"salary":22000},
{"name":"Prantosh","age":35,"salary":25000},
{"name":"Vikash","age":67,"salary":40000

df1 = spark.read.format("json")\
.option("inferSchema", "true")\
.option("mode", "PERMISSIVE")\
.load("Documents//demo1.json")
df1.show(truncate=False)

Reading a .parquet File

df = spark.read.parquet("Documents/demo5.gz.parquet")
df.show()

A Parquet file stores the data in a columnar fashion so it is good for OLAP.

Writing Dataframe to a disk

df6.write.format("csv")\
.option("header" ,"true")\
.option("path", "Documents/newfolder")\
.save()

Creating a Dataframe in Spark

from pyspark.sql.functions import * 
from pyspark.sql.types import *
myData = [
(1,1),
(2,3),
(5,6)
]
mySchema = ["id", "num"]
spark.createDataFrame(data=myData, schema=mySchema).show()

Flatten Nested JSON

If the nested JSON has array then we can use explode to flatten it and if that is of struct type we can access elements inside it using dot notation.

See we are loading JSON data :

df6 = spark.read.format("json")\
.option("mode", "PERMISSIVE")\
.option("multiline", "true")\
.load("Downloads//resturant_json_data.json")
df6.show()
df6.printSchema()

Now let’s try to flatten this restaurants array:

df6.select("*", explode("restaurants").alias("new_restaurants"))\
.drop("restaurants").printSchema()

#This will transform this restraunt array into struct and then it will be easy for us to access elements insise it using dot notation.

Now we will try to get the res_id under R from this:

df6.select("*", explode("restaurants").alias("new_restaurants"))\
.drop("restaurants")\
.select("new_restaurants.restaurant.R.res_id").show()

Getting all required columns by flattening the json data:

df6.select("*", explode("restaurants").alias("new_restaurants"))\
.drop("restaurants")\
.select("*","new_restaurants.restaurant.R.res_id",
explode_outer("new_restaurants.restaurant.establishment_types").alias("new_estb")
,"new_restaurants.restaurant.name").drop("new_restaurants").show()

We will continue exploring this in upcoming articles. I hope you enjoy the read! Try to shift your focus from I, me, and myself to we our, and ourselves, and see the magic of learning through sharing. Happy Coding!!

--

--