PySpark Structured Streaming with Confluent Kafka Tutorial

1. Set up PySpark and Confluent Kafka

Before starting, make sure you have PySpark installed and a Confluent Kafka cluster set up.

Follow this tutorial if you need assistance spinning up a Confluent Kafka Cluster in Docker.

2. Configure PySpark for Kafka

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("KafkaStructuredStreaming").getOrCreate()
df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "your_kafka_bootstrap_servers")
.option("subscribe", "your_topic")
.load()

3. Define Schema and Process Data

from pyspark.sql.types import StringType
from pyspark.sql.functions import expr
schema = "key STRING, value STRING"
parsed_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select("key", "value")

4. Write the Output

query = parsed_df.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()

5. Run the PySpark Script

Save the script and run it using spark-submit.

Additional Resources