Close Menu
    Facebook LinkedIn YouTube WhatsApp X (Twitter) Pinterest
    Trending
    • ‘Sexual Chocolate’ Faces Recalls After FDA Tests Reveal Undisclosed Viagra
    • Manchester gambling raid sparks wider enforcement focus
    • Electrify America Shifts From Prepaid Accounts to Direct Card Payments
    • Ensuring Data Integrity with Cryptographic Hashing and the Ethereum Blockchain
    • Unique telescoping recumbent e-trike turns heads
    • Ask these three questions before choosing a co-founder or regret it later
    • Norse Atlantic Airways Offers Dirt-Cheap Tickets. There’s a Catch
    • Burbank laboratory owner sentenced over Medicare gambling fraud
    Facebook LinkedIn WhatsApp
    Times FeaturedTimes Featured
    Tuesday, June 2
    • Home
    • Founders
    • Startups
    • Technology
    • Profiles
    • Entrepreneurs
    • Leaders
    • Students
    • VC Funds
    • More
      • AI
      • Robotics
      • Industries
      • Global
    Times FeaturedTimes Featured
    Home»Artificial Intelligence»PySpark for Pandas Users | Towards Data Science
    Artificial Intelligence

    PySpark for Pandas Users | Towards Data Science

    Editor Times FeaturedBy Editor Times FeaturedFebruary 24, 2026No Comments20 Mins Read
    Facebook Twitter Pinterest Telegram LinkedIn Tumblr WhatsApp Email
    Share
    Facebook Twitter LinkedIn Pinterest Telegram Email WhatsApp Copy Link


    an actual problem when coping with very giant datasets. What I imply by “very giant” is knowledge that exceeds the capability of a single machine’s RAM. 

    Among the key friction factors Pandas customers face embody:

    In-Reminiscence Constraints

    Pandas requires your entire dataset it’s processing to be within the machine’s Random Entry Reminiscence (RAM). It could’t simply course of knowledge saved on a tough drive except it’s first loaded, and if that knowledge is just too large on your reminiscence, you get issues.

    For instance, when you attempt to load a 100GB CSV file into Pandas on an ordinary laptop computer with 16GB of RAM, the code will crash instantly.

    And, it isn’t only a 1:1 ratio. Due to knowledge sorts and object overhead, Pandas often requires a number of multiples of the RAM required by the file’s on-disk measurement. With 16GB of RAM, your file measurement restrict could also be as little as 3-4 GB.

    Single-Threaded Execution

    Pandas was designed for comfort and evaluation, not uncooked efficiency scale. By default, Pandas executes operations on a single CPU core. Even when a person is operating their code on a strong server with 64 cores, Pandas will largely utilise just one, leaving the others idle.

    Keen Execution vs. Lazy Analysis

    Pandas makes use of Keen Execution, which means it performs calculations as quickly because the code is run. Large Information instruments (like Apache Spark) use Lazy Analysis. The latter is usually extra performant than keen execution as a result of when there’s a sequence of steps required to carry out some process, lazy analysis can have a look at all of the steps and the required finish consequence and optimise appropriately. Keen execution can’t do this. It blindly executes every step in flip, it doesn’t matter what.

    Vertical Scaling Limits

    To make Pandas work with bigger datasets, you have to depend on Vertical Scaling (shopping for a dearer pc with extra RAM and a sooner CPU). However this will solely take you to this point. As an illustration, Pandas has no native capability to “discuss” to a cluster. It can’t distribute a dataframe throughout a number of machines.

    So what to do?

    As all the time within the IT world, a number of options current themselves. Three of the preferred options are:-

    1/ Dask or Ray

    These are third-party libraries that show you how to to jot down distributed code that may run throughout clusters of computer systems. Whereas these try to mimic the Pandas API, they nonetheless have refined variations and limitations which may require code refactoring.

    2/ Spark: One other distributed compute engine. Requires a special syntax and a special psychological mannequin.

    3/ RDBMS: Requires shifting your knowledge right into a database and studying SQL.

    All the above choices require fairly a bit of labor to implement, however for the remainder of this text, I’ll think about possibility 2. 

    So, let’s say I’ve satisfied you, or at the least piqued your curiosity, and also you’re contemplating shifting some or your entire present Pandas-based processing to PySpark. What ought to your subsequent transfer be? Properly, you’ll want to start out changing some or your entire codebase. That might be daunting, however don’t fear, I’ve obtained you lined.

    Learn on as I take you thru a bunch of instance code snippets that showcase some typical knowledge processing operations, from simple to extra complicated. I’m certain you’ll recognise a few of these patterns in your individual code. I’ll present you the Pandas manner of doing issues and replicate it in PySpark, offering output and timing comparisons between the 2.

    Establishing the dev atmosphere

    I’m operating Ubuntu on WSL2. First, we’ll arrange a separate improvement atmosphere for this work, making certain our initiatives are siloed and don’t intrude with one another. I’m utilizing Conda for this half, however be happy to make use of whichever methodology you’re accustomed to.

    Set up PySpark, Pandas, and many others.

    (base) $ conda create -n pandas_to_pyspark python=3.11 -y
    (base) $ conda activate pandas_to_pyspark
    (pands_to_pyspark) $ conda set up jupyter polars pyarrow pandas -y
    (pands_to_pyspark) $ conda set up -c conda-forge pyspark

    To examine that PySpark has been put in appropriately, sort the pyspark command right into a terminal window.

    (pands_to_pyspark) pyspark
    
    Python 3.11.14 | packaged by conda-forge | (fundamental, Oct 22 2025, 22:46:25) [GCC 14.3.0] on linux
    Sort "assist", "copyright", "credit" or "license" for extra data.
    WARNING: Utilizing incubator modules: jdk.incubator.vector
    WARNING: bundle solar.safety.motion not in java.base
    Utilizing Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
    26/01/15 16:15:21 WARN Utils: Your hostname, tpr-desktop, resolves to a loopback deal with: 127.0.1.1; utilizing 10.255.255.254 as a substitute (on interface lo)
    26/01/15 16:15:21 WARN Utils: Set SPARK_LOCAL_IP if you want to bind to a different deal with
    Utilizing Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
    Setting default log degree to "WARN".
    To regulate logging degree use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    26/01/15 16:15:22 WARN NativeCodeLoader: Unable to load native-hadoop library on your platform... utilizing builtin-java courses the place relevant
    WARNING: A terminally deprecated methodology in solar.misc.Unsafe has been referred to as
    WARNING: solar.misc.Unsafe::arrayBaseOffset has been referred to as by org.apache.spark.unsafe.Platform (file:/residence/tom/miniconda3/envs/pandas_to_pyspark/lib/python3.11/site-packages/pyspark/jars/spark-unsafe_2.13-4.1.1.jar)
    WARNING: Please think about reporting this to the maintainers of sophistication org.apache.spark.unsafe.Platform
    WARNING: solar.misc.Unsafe::arrayBaseOffset will likely be eliminated in a future launch
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /__ / .__/_,_/_/ /_/_   model 4.1.1
          /_/
    
    Utilizing Python model 3.11.14 (fundamental, Oct 22 2025 22:46:25)
    Spark context Internet UI obtainable at http://10.255.255.254:4040
    Spark context obtainable as 'sc' (grasp = native[*], app id = local-1768493723158).
    SparkSession obtainable as 'spark'.
    >>>

    In case you don’t see the Spark welcome banner, then one thing has gone incorrect, and you must double-check your set up.

    Getting our pattern knowledge set

    We don’t want an advanced set for our functions. A set of artificial gross sales knowledge with the next schema will suffice:

    • order_id (int)
    • order_date (date)
    • customer_id (int)
    • customer_name (str)
    • product_id (int)
    • product_name (str)
    • class (str)
    • amount (int)
    • value (float)
    • whole (float)

    Our enter knowledge will likely be a 30-million-record CSV file. Right here’s a Python program to generate the take a look at knowledge:

    import polars as pl
    import random
    from datetime import datetime, timedelta
    
    # Generate faux knowledge
    def generate_fake_data(num_records):
        random.seed(42)
        
        product_names = ['Laptop', 'Smartphone', 'Desk', 'Chair', 'Monitor', 
                         'Printer', 'Paper', 'Pen', 'Notebook', 'Coffee Maker']
        classes = ['Electronics', 'Electronics', 'Office', 'Office', 'Electronics',
                      'Electronics', 'Office', 'Office', 'Office', 'Electronics']
        
        knowledge = {
            'order_id': vary(num_records),
            'order_date': [datetime(2023, 1, 1) + timedelta(days=random.randint(0, 364)) 
                           for _ in range(num_records)],
            'customer_id': [random.randint(100, 999) for _ in range(num_records)],
            'customer_name': [f'Customer_{random.randint(0, 99999)}' for _ in range(num_records)],
            'product_id': [random.randint(200, 209) for _ in range(num_records)],
            'product_name': [random.choice(product_names) for _ in range(num_records)],
            'class': [random.choice(categories) for _ in range(num_records)],
            'amount': [random.randint(1, 10) for _ in range(num_records)],
            'value': [round(random.uniform(1.99, 999.99), 2) for _ in range(num_records)]
        }
        
        df = pl.DataFrame(knowledge)
        df = df.with_columns((pl.col('value') * pl.col('amount')).alias('whole'))
        
        return df
    # Generate 30 million data
    num_records = 30000000
    df = generate_fake_data(num_records)
    # Save to CSV
    df.write_csv('/mnt/d/sales_data/sales_data_30m.csv')
    print('CSV file with faux gross sales knowledge has been created.')

    Right here’s what the primary few rows of my take a look at knowledge file seemed like.

    order_id,order_date,customer_id,customer_name,product_id,product_name,class,amount,value,whole
    0,2023-11-24T00:00:00.000000,434,Customer_46318,201,Pocket book,Workplace,6,925.68,5554.08
    1,2023-02-27T00:00:00.000000,495,Customer_26514,203,Espresso Maker,Workplace,3,676.44,2029.3200000000002
    2,2023-01-13T00:00:00.000000,377,Customer_56676,204,Pen,Electronics,10,533.2,5332.0
    3,2023-05-21T00:00:00.000000,272,Customer_13772,209,Pocket book,Electronics,5,752.0,3760.0
    4,2023-05-06T00:00:00.000000,490,Customer_23118,206,Espresso Maker,Electronics,3,747.46,2242.38
    5,2023-04-25T00:00:00.000000,515,Customer_88284,202,Desk,Electronics,10,886.22,8862.2
    6,2023-03-13T00:00:00.000000,885,Customer_47303,200,Desk,Electronics,1,38.97,38.97
    7,2023-02-22T00:00:00.000000,598,Customer_90712,203,Desk,Electronics,5,956.31,4781.549999999999
    8,2023-12-13T00:00:00.000000,781,Customer_32943,205,Espresso Maker,Electronics,7,258.25,1807.75
    9,2023-10-07T00:00:00.000000,797,Customer_40215,208,Pen,Electronics,8,464.81,3718.48
    10,2023-02-14T00:00:00.000000,333,Customer_18388,209,Monitor,Electronics,1,478.95,478.95

    Code Examples

    Begin up a Jupyter Pocket book:

    (pands_to_pyspark) $ jupyter pocket book

    The info and the 2 code units I’ll be operating are on my desktop PC. I’ll present the outputs from each code runs so you’ll be able to confirm they do the identical process, and I’ll embody timings (in seconds) so you’ll be able to examine efficiency. The Pandas code and output first, then the Spark code and output.

    The code snippets are brief and properly commented, so in case you are already a Pandas programmer, it ought to be pretty simple to comply with what’s happening within the PySpark code when you’re not already aware of it.

    To be clear, because the enter knowledge set I’ll be utilizing is NOT “large knowledge”, the timings ought to be checked out as being of secondary significance.

    Instance 1 — Loading knowledge from a CSV

    We’ll begin with a simple operation — merely studying our enter CSV knowledge file and sorting it by the order_date and order_id columns earlier than displaying the primary and final 5 data.

    Right here’s the Pandas code.

    import pandas as pd
    import time
    
    # 1. Outline Path (WSL format)
    file_path = "/mnt/d/sales_data/sales_data_30m.csv"
    
    print(f"Beginning course of for {file_path}...")
    
    # --- LOAD PHASE ---
    start_load = time.time()
    df = pd.read_csv(file_path)
    end_load = time.time()
    
    print(f"Loading full. Time taken: {end_load - start_load:.2f} seconds")
    
    # --- SORT PHASE ---
    start_sort = time.time()
    # Observe: Sorting by two columns without delay
    df_sorted = df.sort_values(by=['order_date', 'order_id'])
    end_sort = time.time()
    
    print(f"Sorting full. Time taken: {end_sort - start_sort:.2f} seconds")
    
    # --- DISPLAY ---
    print("n" + "="*30)
    print("TOP 5 RECORDS")
    print(df_sorted.head(5))
    
    print("nBOTTOM 5 RECORDS")
    print(df_sorted.tail(5))
    print("="*30)
    
    total_time = end_sort - start_load
    print(f"nTotal Execution Time: {total_time:.2f} seconds")

    Right here is the output.

    (pands_to_pyspark) $ python ex1_pandas.py
    
    Beginning course of for /mnt/d/sales_data/sales_data_30m.csv...
    Loading full. Time taken: 34.02 seconds
    Sorting full. Time taken: 7.00 seconds
    
    ==============================
    TOP 5 RECORDS
          order_id                  order_date  customer_id   customer_name  ...     class amount   value    whole
    179        179  2023-01-01T00:00:00.000000          350  Customer_93033  ...       Workplace        5  640.16  3200.80
    520        520  2023-01-01T00:00:00.000000          858  Customer_31280  ...  Electronics        3  841.21  2523.63
    557        557  2023-01-01T00:00:00.000000          651  Customer_95137  ...       Workplace        7   75.66   529.62
    1080      1080  2023-01-01T00:00:00.000000          303  Customer_87422  ...  Electronics       10   98.34   983.40
    2023      2023  2023-01-01T00:00:00.000000          838  Customer_95193  ...       Workplace        4  427.96  1711.84
    
    [5 rows x 10 columns]
    
    BOTTOM 5 RECORDS
              order_id                  order_date  customer_id   customer_name  ...     class amount   value    whole
    29997832  29997832  2023-12-31T00:00:00.000000          831  Customer_49372  ...  Electronics        6  418.86  2513.16
    29997903  29997903  2023-12-31T00:00:00.000000          449  Customer_17384  ...       Workplace        3  494.29  1482.87
    29998337  29998337  2023-12-31T00:00:00.000000          649  Customer_24018  ...  Electronics        5  241.71  1208.55
    29999674  29999674  2023-12-31T00:00:00.000000          105  Customer_39890  ...       Workplace        1   94.97    94.97
    29999933  29999933  2023-12-31T00:00:00.000000          572  Customer_38794  ...       Workplace        8  375.36  3002.88
    
    [5 rows x 10 columns]
    ==============================
    
    Complete Execution Time: 41.03 seconds

    Right here’s the equal Spark code and processing output.

    from pyspark.sql import SparkSession
    from pyspark.sql.sorts import StructType, StructField, IntegerType, StringType, DateType, DoubleType
    import time
    import pandas as pd
    
    start_overall = time.time()
    
    # 1. Initialize with express Reminiscence and Shuffle tuning
    spark = SparkSession.builder 
        .appName("OptimizedSpark") 
        .config("spark.sql.shuffle.partitions", "16") 
        .config("spark.driver.reminiscence", "8g") 
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")
    
    # 2. Outline Guide Schema (Skips the double-read of inferSchema)
    schema = StructType([
        StructField("order_id", IntegerType(), True),
        StructField("order_date", DateType(), True),
        StructField("customer_id", IntegerType(), True),
        StructField("customer_name", StringType(), True),
        StructField("product_id", IntegerType(), True),
        StructField("product_name", StringType(), True),
        StructField("category", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("price", DoubleType(), True),
        StructField("total", DoubleType(), True)
    ])
    
    file_path = "/mnt/d/sales_data/sales_data_30m.csv"
    print(f"Processing {file_path} with Optimized Spark...")
    
    # --- LOAD ---
    start_load = time.time()
    # No inferSchema!
    df = spark.learn.csv(file_path, header=True, schema=schema)
    print(f"LOAD INITIATED. (Time taken: {time.time() - start_load:.2f}s)")
    
    # --- SORT ---
    start_sort = time.time()
    # Sorting 30M rows
    df_sorted = df.orderBy(["order_date", "order_id"])
    
    # Pressure the kind with a light-weight motion (NOT cache)
    row_count = df_sorted.rely()
    end_sort = time.time()
    
    print(f"SORT COMPLETE. Rows: {row_count}")
    print(f"   Time taken: {end_sort - start_sort:.2f} seconds")
    
    # --- DISPLAY ---
    print("n" + "="*80)
    print("TOP 5 RECORDS")
    print(df_sorted.restrict(5).toPandas().to_string(index=False))
    
    print("nBOTTOM 5 RECORDS")
    tail_data = df_sorted.tail(5)
    print(pd.DataFrame(tail_data, columns=df.columns).to_string(index=False))
    print("="*80)
    
    print(f"nTotal Execution Time: {time.time() - start_overall:.2f} seconds")
    spark.cease()

    And the output.

    (pands_to_pyspark) $ spark-submit ex1_spark.py 2> /dev/null
    Processing /mnt/d/sales_data/sales_data_30m.csv with Optimized Spark...
    LOAD INITIATED. (Time taken: 0.72s)
    SORT COMPLETE. Rows: 30000000
       Time taken: 5.65 seconds
    
    ================================================================================
    TOP 5 RECORDS
     order_id order_date  customer_id  customer_name  product_id product_name    class  amount  value   whole
          179 2023-01-01          350 Customer_93033         207         Desk      Workplace         5 640.16 3200.80
          520 2023-01-01          858 Customer_31280         201          Pen Electronics         3 841.21 2523.63
          557 2023-01-01          651 Customer_95137         209      Printer      Workplace         7  75.66  529.62
         1080 2023-01-01          303 Customer_87422         204   Smartphone Electronics        10  98.34  983.40
         2023 2023-01-01          838 Customer_95193         201        Paper      Workplace         4 427.96 1711.84
    
    BOTTOM 5 RECORDS
     order_id order_date  customer_id  customer_name  product_id product_name    class  amount  value   whole
     29997832 2023-12-31          831 Customer_49372         201        Chair Electronics         6 418.86 2513.16
     29997903 2023-12-31          449 Customer_17384         205         Desk      Workplace         3 494.29 1482.87
     29998337 2023-12-31          649 Customer_24018         201   Smartphone Electronics         5 241.71 1208.55
     29999674 2023-12-31          105 Customer_39890         203        Chair      Workplace         1  94.97   94.97
     29999933 2023-12-31          572 Customer_38794         201         Desk      Workplace         8 375.36 3002.88
    ================================================================================
    
    Complete Execution Time: 36.12 seconds

    Instance 2— Changing a CSV file to Parquet

    On this instance, we’ll learn the identical 30M-record enter CSV file, then write it out once more as a Parquet file.

    As earlier than, we’ll begin with the pandas code and output.

    import pandas as pd
    import pyarrow.parquet as pq
    import pyarrow as pa
    import time
    
    csv_file = "/mnt/d/sales_data/sales_data_30m.csv"
    parquet_file = "/mnt/d/sales_data/sales_data_pandas_30m.parquet"
    chunk_size = 1_000_000  # Course of 1 million rows at a time
    
    print(f"Beginning memory-efficient conversion...")
    start_total = time.time()
    
    # 1. Create a CSV reader object (this does not load knowledge but)
    reader = pd.read_csv(csv_file, chunksize=chunk_size)
    
    parquet_writer = None
    
    for i, chunk in enumerate(reader):
        start_chunk = time.time()
    
        # Convert Pandas chunk to PyArrow Desk
        desk = pa.Desk.from_pandas(chunk)
    
        # Initialize the author on the primary chunk
        if parquet_writer is None:
            parquet_writer = pq.ParquetWriter(parquet_file, desk.schema, compression='snappy')
    
        # Write this chunk to the file
        parquet_writer.write_table(desk)
    
        print(f"Processed chunk {i+1} (Rows {i*chunk_size} to {(i+1)*chunk_size}) in {time.time() - start_chunk:.2f}s")
    
    # 2. Shut the author
    if parquet_writer:
        parquet_writer.shut()
    
    print("n" + "="*40)
    print(f"Conversion Full!")
    print(f"Complete Time: {time.time() - start_total:.2f} seconds")
    print("="*40)

    The output.

    (pands_to_pyspark) $ python ex2_pandas.py
    
    Beginning memory-efficient conversion...
    Processed chunk 1 (Rows 0 to 1000000) in 4.82s
    Processed chunk 2 (Rows 1000000 to 2000000) in 0.40s
    Processed chunk 3 (Rows 2000000 to 3000000) in 0.39s
    Processed chunk 4 (Rows 3000000 to 4000000) in 0.36s
    Processed chunk 5 (Rows 4000000 to 5000000) in 0.43s
    Processed chunk 6 (Rows 5000000 to 6000000) in 0.45s
    Processed chunk 7 (Rows 6000000 to 7000000) in 0.35s
    Processed chunk 8 (Rows 7000000 to 8000000) in 0.34s
    Processed chunk 9 (Rows 8000000 to 9000000) in 0.36s
    Processed chunk 10 (Rows 9000000 to 10000000) in 0.36s
    Processed chunk 11 (Rows 10000000 to 11000000) in 0.37s
    Processed chunk 12 (Rows 11000000 to 12000000) in 0.41s
    Processed chunk 13 (Rows 12000000 to 13000000) in 0.48s
    Processed chunk 14 (Rows 13000000 to 14000000) in 0.43s
    Processed chunk 15 (Rows 14000000 to 15000000) in 0.38s
    Processed chunk 16 (Rows 15000000 to 16000000) in 0.35s
    Processed chunk 17 (Rows 16000000 to 17000000) in 0.34s
    Processed chunk 18 (Rows 17000000 to 18000000) in 0.35s
    Processed chunk 19 (Rows 18000000 to 19000000) in 0.36s
    Processed chunk 20 (Rows 19000000 to 20000000) in 0.35s
    Processed chunk 21 (Rows 20000000 to 21000000) in 0.34s
    Processed chunk 22 (Rows 21000000 to 22000000) in 0.34s
    Processed chunk 23 (Rows 22000000 to 23000000) in 0.34s
    Processed chunk 24 (Rows 23000000 to 24000000) in 0.36s
    Processed chunk 25 (Rows 24000000 to 25000000) in 0.36s
    Processed chunk 26 (Rows 25000000 to 26000000) in 0.35s
    Processed chunk 27 (Rows 26000000 to 27000000) in 0.36s
    Processed chunk 28 (Rows 27000000 to 28000000) in 0.35s
    Processed chunk 29 (Rows 28000000 to 29000000) in 0.35s
    Processed chunk 30 (Rows 29000000 to 30000000) in 0.34s
    
    ========================================
    Conversion Full!
    Complete Time: 43.30 seconds
    ========================================

    And now for PySpark.

    from pyspark.sql import SparkSession
    from pyspark.sql.sorts import StructType, StructField, IntegerType, StringType, DateType, DoubleType
    import time
    
    # Begin the general timer instantly
    start_overall = time.time()
    
    # 1. Initialize Spark with excessive reminiscence configuration
    spark = SparkSession.builder 
        .appName("EfficientParquetConversion") 
        .config("spark.driver.reminiscence", "8g") 
        .grasp("native[*]") 
        .getOrCreate()
    
    # Silence logs
    spark.sparkContext.setLogLevel("ERROR")
    
    # 2. Explicitly outline the Schema (Best for CSV)
    schema = StructType([
        StructField("order_id", IntegerType(), True),
        StructField("order_date", DateType(), True),
        StructField("customer_id", IntegerType(), True),
        StructField("customer_name", StringType(), True),
        StructField("product_id", IntegerType(), True),
        StructField("product_name", StringType(), True),
        StructField("category", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("price", DoubleType(), True),
        StructField("total", DoubleType(), True)
    ])
    
    csv_path = "/mnt/d/sales_data/sales_data_30m.csv"
    parquet_path = "/mnt/d/sales_data/sales_data_parquet"
    
    print(f"Beginning Spark conversion to {parquet_path}...")
    
    # 3. Learn the CSV utilizing the outlined schema
    start_proc = time.time()
    df = spark.learn.csv(csv_path, header=True, schema=schema)
    
    # 4. Write to Parquet (Overwrite if exists)
    df.write.mode("overwrite").parquet(parquet_path)
    end_proc = time.time()
    
    print("-" * 40)
    print(f"CONVERSION COMPLETE")
    print(f"Processing Time (Learn + Write): {end_proc - start_proc:.2f} seconds")
    print(f"Complete Execution Time (incl. Spark startup): {time.time() - start_overall:.2f} seconds")
    print("-" * 40)
    
    spark.cease()

    I can verify that the contents of the parquet file created by Pandas and Pyspark have been an identical.

    (pands_to_pyspark) $ spark-submit --driver-memory 8g ex2_spark.py 2> /dev/null
    Beginning Spark conversion to /mnt/d/sales_data/sales_data_parquet...
    ----------------------------------------
    CONVERSION COMPLETE
    Processing Time (Learn + Write): 21.62 seconds
    Complete Execution Time (incl. Spark startup): 23.26 seconds
    ----------------------------------------

    Instance 3— Information pivoting

    Learn the Parquet information we simply created and calculate the entire gross sales per product_name per order_date.

    Pandas.

    import pandas as pd
    from timeit import default_timer as timer
    
    parquet_path = r'/mnt/d/sales_data/sales_data_pandas_30m.parquet'
    
    begin = timer()
    
    # Learn the Parquet file
    df = pd.read_parquet(parquet_path)
    
    # 1) Make order_date a correct date
    # Convert to datetime then extract the date part
    df["order_date"] = pd.to_datetime(df["order_date"]).dt.date
    
    # 2) Pivot (sum)
    # Pandas pivot_table handles the aggregation (sum) and the form concurrently
    pivot = df.pivot_table(
        values="whole",
        index="order_date",
        columns="product_name",
        aggfunc="sum"
    )
    
    # 3) Kind rows by date (Pandas index)
    pivot = pivot.sort_index()
    
    # 4) Implement a constant column order (alphabetical product columns)
    # pivot_table already kinds columns by default, however we may be express
    pivot = pivot.reindex(sorted(pivot.columns), axis=1)
    
    # 5) (Optionally available) Exchange nulls with 0
    # pivot = pivot.fillna(0)
    
    finish = timer()
    
    print(f"Pandas: learn + standardized pivot took {finish - begin:.2f} seconds")
    print(pivot.head(5))

    Pandas Output.

    (pandas_pysaprk) $ python ex3_pandas.py
    Pandas: learn + standardized pivot took 9.98 seconds
    product_name        Chair  Espresso Maker         Desk       Laptop computer  ...        Paper          Pen      Printer   Smartphone
    order_date                                                         ...                                                  
    2023-01-01    22041864.51   22596967.46  22228235.43  22319250.97  ...  22778128.78  22690394.34  22747419.90  22848102.42
    2023-01-02    22702337.42   21960074.98  23539803.82  23332945.56  ...  22414013.44  22378123.52  22494364.89  22321919.79
    2023-01-03    22626028.85   22651440.10  22930421.42  22938328.34  ...  22880161.09  21607713.73  22937117.72  22262604.28
    2023-01-04    22605466.70   22652219.77  22463371.43  22506729.47  ...  23097987.72  22327386.63  22922449.38  22673066.75
    2023-01-05    22581240.40   23004302.70  22511769.34  22882968.52  ...  22058769.99  22379327.80  22946133.94  22988219.48
    
    [5 rows x 10 columns]

    PySpark.

    from pyspark.sql import SparkSession
    from pyspark.sql import features as F
    from timeit import default_timer as timer
    
    # Initialize Spark
    spark = SparkSession.builder 
        .appName("SparkPivotBenchmark") 
        .config("spark.driver.reminiscence", "8g") 
        .grasp("native[*]") 
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")
    
    parquet_path = '/mnt/d/sales_data/sales_data_parquet'
    begin = timer()
    
    # 1. Learn the Parquet file
    df = spark.learn.parquet(parquet_path)
    
    # 2. Make order_date a correct date
    # We solid the column to DateType
    df = df.withColumn("order_date", F.col("order_date").solid("date"))
    
    # 3. Pivot (sum)
    # Spark's pivot is far sooner when you present the distinctive values (product_names)
    # however it will probably additionally infer them mechanically as proven under
    pivot_df = df.groupBy("order_date") 
        .pivot("product_name") 
        .agg(F.sum("whole"))
    
    # 4. Kind rows by date
    pivot_df = pivot_df.orderBy("order_date")
    
    # 5. Implement constant column order (alphabetical product columns)
    # The primary column is 'order_date', the remaining are the pivoted merchandise
    columns = pivot_df.columns
    product_cols = sorted([c for c in columns if c != "order_date"])
    pivot_df = pivot_df.choose(["order_date"] + product_cols)
    
    # 6. Exchange nulls with 0
    pivot_df = pivot_df.na.fill(0)
    
    # Set off an motion to measure precise efficiency (rely of pivoted days)
    row_count = pivot_df.rely()
    finish = timer()
    
    print(f"PySpark: learn + standardized pivot took {finish - begin:.2f} seconds")
    print(f"Complete days processed: {row_count}")
    
    # 7. Show prime 5
    pivot_df.present(5)
    
    spark.cease()

    PySpark Output.

    (pandas_pyspark) $ spark-submit --driver-memory 8g ex3_spark.py 2> /dev/null
    PySpark: learn + standardized pivot took 3.54 seconds
    Complete days processed: 365
    +----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
    |order_date|               Chair|        Espresso Maker|                Desk|              Laptop computer|             Monitor|            Pocket book|               Paper|                 Pen|             Printer|          Smartphone|
    +----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
    |2023-01-01|2.2041864510000005E7|2.2596967459999997E7|       2.222823543E7|2.2319250969999995E7|       2.309861159E7|2.2687765309999995E7|2.2778128780000005E7|2.2690394339999996E7|        2.27474199E7|2.2848102419999998E7|
    |2023-01-02|       2.270233742E7|2.1960074980000004E7|2.3539803819999993E7|2.3332945560000006E7|2.2441403840000004E7|       2.282151253E7|       2.241401344E7|2.2378123520000003E7|       2.249436489E7|       2.232191979E7|
    |2023-01-03|2.2626028849999998E7|        2.26514401E7|       2.293042142E7|       2.293832834E7|       2.290862974E7|2.2432433990000006E7|2.2880161090000004E7|2.1607713730000008E7|       2.293711772E7|       2.226260428E7|
    |2023-01-04|2.2605466699999996E7|2.2652219770000003E7|       2.246337143E7| 2.250672947000001E7|2.1930874809999995E7|2.3261865149999995E7|       2.309798772E7|2.2327386629999995E7|2.2922449380000003E7|2.2673066749999996E7|
    |2023-01-05|2.2581240400000002E7|2.3004302700000003E7|       2.251176934E7|2.2882968520000003E7|       2.284090005E7|       2.272256243E7|2.2058769990000002E7|2.2379327800000004E7|2.2946133940000005E7|       2.298821948E7|
    +----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
    solely displaying prime 5 rows

    Instance 4 — Windowing analytics with LAG/LEAD

    For my closing instance code, we’ll calculate the SUM of all orders per order_date, then use LAG/LEAD performance to calculate the proportion change in whole orders over consecutive order dates.

    Pandas.

    import pandas as pd
    from timeit import default_timer as timer
    
    parquet_path = '/mnt/d/sales_data/sales_data_pandas_30m.parquet'
    
    begin = timer()
    
    # 1. Learn the Parquet file
    df = pd.read_parquet(parquet_path)
    
    # 2. Normalize order_date
    # Pandas to_datetime is mostly versatile sufficient to deal with a number of codecs
    # mechanically, which replaces the handbook pl.coalesce logic.
    df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce').dt.date
    
    # 3. Group by date and combination
    result_pandas = df.groupby("order_date")["total"].sum().reset_index()
    
    # 4. Kind by date
    result_pandas = result_pandas.sort_values("order_date")
    
    # 5. Analytic features (Lag and Lead)
    # In Pandas, shift(1) is lag, shift(-1) is lead
    result_pandas["total_lag"] = result_pandas["total"].shift(1)
    result_pandas["total_lead"] = result_pandas["total"].shift(-1)
    
    # 6. Calculate P.c Adjustments
    # We use Sequence operations which deal with the 'None/NaN' and 'divide by zero'
    # logic much like pl.when().in any other case()
    result_pandas["percent_change_from_lag"] = (
        (result_pandas["total"] - result_pandas["total_lag"]) * 100 / result_pandas["total_lag"]
    )
    
    result_pandas["percent_change_from_lead"] = (
        (result_pandas["total"] - result_pandas["total_lead"]) * 100 / result_pandas["total_lead"]
    )
    
    finish = timer()
    
    print(f"Pandas: learn + analytic (lag/lead) took {finish - begin:.2f} seconds")
    print(result_pandas.head(10).to_string(index=False))

    Pandas Output.

    (pandas_pyspark) $ python ex4_pandas.py
    Pandas: learn + analytic (lag/lead) took 8.99 seconds
    order_date        whole    total_lag   total_lead  percent_change_from_lag  percent_change_from_lead
    2023-01-01 226036740.71          NaN 226406499.79                      NaN                 -0.163316
    2023-01-02 226406499.79 226036740.71 226174879.26                 0.163584                  0.102408
    2023-01-03 226174879.26 226406499.79 226441417.81                -0.102303                 -0.117708
    2023-01-04 226441417.81 226174879.26 226916194.65                 0.117846                 -0.209230
    2023-01-05 226916194.65 226441417.81 226990804.43                 0.209669                 -0.032869
    2023-01-06 226990804.43 226916194.65 225973424.85                 0.032880                  0.450221
    2023-01-07 225973424.85 226990804.43 227894370.99                -0.448203                 -0.842911
    2023-01-08 227894370.99 225973424.85 227111347.09                 0.850076                  0.344775
    2023-01-09 227111347.09 227894370.99 226271884.19                -0.343591                  0.370997
    2023-01-10 226271884.19 227111347.09 226635543.97                -0.369626                 -0.160460

    PySpark.

    from pyspark.sql import SparkSession
    from pyspark.sql import features as F
    from pyspark.sql.window import Window
    from timeit import default_timer as timer
    
    # Initialize Spark
    spark = SparkSession.builder 
        .appName("SparkAnalyticBenchmark") 
        .config("spark.driver.reminiscence", "8g") 
        .grasp("native[*]") 
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")
    
    # Path to the Parquet file
    
    parquet_path = '/mnt/d/sales_data/sales_data_parquet'
    
    begin = timer()
    
    # 1. Learn the Parquet file
    df = spark.learn.parquet(parquet_path)
    
    # 2. Normalize order_date
    # Spark's to_date is environment friendly; coalesce handles a number of potential codecs if wanted
    df = df.withColumn("order_date", F.to_date(F.col("order_date")))
    
    # 3. Group by date and combination
    daily_revenue = df.groupBy("order_date").agg(F.sum("whole").alias("whole"))
    
    # 4. Outline the Window for Analytic features
    # We should order by date for lag/result in make sense
    window_spec = Window.orderBy("order_date")
    
    # 5. Apply Lag and Lead
    # lag(col, 1) = earlier row; lead(col, 1) = subsequent row
    daily_revenue = daily_revenue.withColumn("total_lag", F.lag("whole", 1).over(window_spec))
    daily_revenue = daily_revenue.withColumn("total_lead", F.lead("whole", 1).over(window_spec))
    
    # 6. Calculate P.c Adjustments
    # We use F.when() to deal with nulls and keep away from division by zero
    daily_revenue = daily_revenue.withColumn(
        "percent_change_from_lag",
        F.when((F.col("total_lag").isNotNull()) & (F.col("total_lag") != 0),
               (F.col("whole") - F.col("total_lag")) * 100 / F.col("total_lag"))
        .in any other case(None)
    )
    
    daily_revenue = daily_revenue.withColumn(
        "percent_change_from_lead",
        F.when((F.col("total_lead").isNotNull()) & (F.col("total_lead") != 0),
               (F.col("whole") - F.col("total_lead")) * 100 / F.col("total_lead"))
        .in any other case(None)
    )
    
    # 7. Last Kind and Motion
    result_spark = daily_revenue.orderBy("order_date")
    
    # Set off motion to measure efficiency
    row_count = result_spark.rely()
    finish = timer()
    
    print(f"PySpark: learn + analytic (lag/lead) took {finish - begin:.2f} seconds")
    print(f"Complete days processed: {row_count}")
    
    # Show prime 10
    result_spark.present(10)
    
    spark.cease()

    PySpark Output.

    (pandas_pyspark) $ spark-submit --driver-memory 8g ex4_spark.py 2> /dev/null
    PySpark: learn + analytic (lag/lead) took 4.05 seconds
    Complete days processed: 365
    +----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
    |order_date|               whole|           total_lag|          total_lead|percent_change_from_lag|percent_change_from_lead|
    +----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
    |2023-01-01|      2.2603674071E8|                NULL|2.2640649979000002E8|                   NULL|    -0.16331645970543143|
    |2023-01-02|2.2640649979000002E8|      2.2603674071E8|      2.2617487926E8|    0.16358361868011784|     0.10240771687724477|
    |2023-01-03|      2.2617487926E8|2.2640649979000002E8|2.2644141781000003E8|    -0.1023029507610723|    -0.11770750800707579|
    |2023-01-04|2.2644141781000003E8|      2.2617487926E8|2.2691619464999998E8|    0.11784622185810545|     -0.2092300378702583|
    |2023-01-05|2.2691619464999998E8|2.2644141781000003E8|2.2699080442999995E8|    0.20966872782889678|    -0.03286907599068832|
    |2023-01-06|2.2699080442999995E8|2.2691619464999998E8| 2.259734248499999E8|   0.032879883304517334|     0.45022089684898775|
    |2023-01-07| 2.259734248499999E8|2.2699080442999995E8|2.2789437099000004E8|    -0.4482029933127909|     -0.8429107448575048|
    |2023-01-08|2.2789437099000004E8| 2.259734248499999E8|2.2711134708999988E8|     0.8500761278788644|       0.344775331586518|
    |2023-01-09|2.2711134708999988E8|2.2789437099000004E8|2.2627188419000003E8|   -0.34359071555765364|     0.37099744097899573|
    |2023-01-10|2.2627188419000003E8|2.2711134708999988E8|2.2663554396999997E8|    -0.3696261374678007|     -0.1604601703817825|
    +----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
    solely displaying prime 10 rows

    Abstract

    On this article, I defined that there are various paths to improve your programs if the info that you just’re coping with begins to encroach on “large knowledge” territory, such that it turns into tough (or not possible) to course of utilizing your present Pandas code base. 

    I cited three widespread options: distributed libraries similar to dask or ray, shifting your knowledge to an RDBMS and interrogating it with SQL, or utilizing the distributed compute library – Spark.

    Specializing in the latter, I outlined the case for PySpark, then used 4 real-world examples of typical knowledge processing duties for which Pandas is usually used, together with the equal PySpark code for every.

    Whereas the timing benchmarks confirmed some enchancment in PySpark run occasions in comparison with Pandas, these weren’t the first focus. In spite of everything, with even bigger datasets, Pandas would merely not be capable to course of them in any respect, by no means thoughts inside a particular time-frame.

    As a substitute, the primary purpose of this text was to indicate you the way comparatively simple it’s to:

    • Get a Spark atmosphere up and operating rapidly.
    • Replicate widespread Pandas knowledge operations within the PySpark language to provide the assurance that large knowledge shouldn’t restrict your processing talents.

    By bridging the hole between single-threaded evaluation and scalable big-data processing, know that you may confidently transition your workflows as your knowledge outgrows your native {hardware}.



    Source link

    Share. Facebook Twitter Pinterest LinkedIn Tumblr Email
    Editor Times Featured
    • Website

    Related Posts

    Ensuring Data Integrity with Cryptographic Hashing and the Ethereum Blockchain

    June 1, 2026

    RAG Is Not Machine Learning, and the ML Toolkit Solves the Wrong Problem

    June 1, 2026

    How to Combine Claude Code and Codex for Maximum Coding Power

    June 1, 2026

    It’s the Lessons We Learned Along the Way. Or, Is It?

    June 1, 2026

    Proxy-Pointer RAG: Eliminating Wasteful Entity & Relations Extraction in Knowledge Graphs

    May 31, 2026

    Solving a Murder Mystery Using Bayesian Inference

    May 31, 2026

    Comments are closed.

    Editors Picks

    ‘Sexual Chocolate’ Faces Recalls After FDA Tests Reveal Undisclosed Viagra

    June 2, 2026

    Manchester gambling raid sparks wider enforcement focus

    June 2, 2026

    Electrify America Shifts From Prepaid Accounts to Direct Card Payments

    June 2, 2026

    Ensuring Data Integrity with Cryptographic Hashing and the Ethereum Blockchain

    June 1, 2026
    Categories
    • Founders
    • Startups
    • Technology
    • Profiles
    • Entrepreneurs
    • Leaders
    • Students
    • VC Funds
    About Us
    About Us

    Welcome to Times Featured, an AI-driven entrepreneurship growth engine that is transforming the future of work, bridging the digital divide and encouraging younger community inclusion in the 4th Industrial Revolution, and nurturing new market leaders.

    Empowering the growth of profiles, leaders, entrepreneurs businesses, and startups on international landscape.

    Asia-Middle East-Europe-North America-Australia-Africa

    Facebook LinkedIn WhatsApp
    Featured Picks

    Today’s NYT Connections: Sports Edition Hints, Answers for Oct. 14 #386

    October 14, 2025

    The Doomsday Clock Jumps Closer to Midnight. AI Is a Big Reason Why

    January 28, 2026

    Musk Allies Discuss Deploying A.I. to Find Budget Savings

    February 4, 2025
    Categories
    • Founders
    • Startups
    • Technology
    • Profiles
    • Entrepreneurs
    • Leaders
    • Students
    • VC Funds
    Copyright © 2024 Timesfeatured.com IP Limited. All Rights.
    • Privacy Policy
    • Disclaimer
    • Terms and Conditions
    • About us
    • Contact us

    Type above and press Enter to search. Press Esc to cancel.