I'm new a pyspark streaming i have a data stream from kafka that i want to fill every 0.1 seconds the timestamp from kafka is irregular I'm trying to fill all timestamp that are not there with the previous number. and i want to know how to deal with the batch.
input kafka Stream:
type | number | timestamp |
---|---|---|
car | 7777 | '2023-10-28 14:22:41.9' |
car | 8888 | '2023-10-28 14:22:42.5' |
car | 1111 | '2023-10-28 14:22:42.7' |
motor | 2222 | '2023-10-28 14:22:41.2' |
motor | 6666 | '2023-10-28 14:22:41.5' |
# Create a Spark session
spark = SparkSession.builder.appName("KafkaStream").getOrCreate()
# Read data from Kafka as a streaming DataFrame
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:29092") \
.option("subscribe", "now") \
.option("minOffsetsPerTrigger", 10) \
.option("maxOffsetsPerTrigger", 15) \
.option("startingOffsets", "latest") \
.load()
schema = StructType([
StructField("type", StringType(), True),
StructField("number", LongType(), True),
StructField("timestamp", StringType(), True)
])
output = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
window_spec = Window.partitionBy("type").orderBy("type", "timestamp")
df = output.withColumn(
"next_timestamp", to_timestamp(lead(col("timestamp")).over(window_spec))
)
result_df = df.withColumn(
"timestamp",
expr(
"explode(sequence(to_timestamp(timestamp), nvl(next_timestamp, to_timestamp(timestamp)), interval 100 milliseconds))"
),
).withColumn(
"to_drop",
when((col("timestamp") == col("next_timestamp")), True).otherwise(False)
)
x = result_df.filter(col('to_drop') == False).drop("next_timestamp", "to_drop")
# Create a streaming query to process the data and fill missing values
query = x\
.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
output batch in the console:
type | number | timestamp |
---|---|---|
car | 7777 | '2023-10-28 14:22:41.9' |
car | 7777 | '2023-10-28 14:22:42.0' |
car | 7777 | '2023-10-28 14:22:42.1' |
car | 7777 | '2023-10-28 14:22:42.2' |
car | 7777 | '2023-10-28 14:22:42.3' |
car | 7777 | '2023-10-28 14:22:42.4' |
car | 8888 | '2023-10-28 14:22:42.5' |
car | 8888 | '2023-10-28 14:22:42.6' |
car | 1111 | '2023-10-28 14:22:42.7' |
motor | 2222 | '2023-10-28 14:22:41.2' |
motor | 2222 | '2023-10-28 14:22:41.3' |
motor | 2222 | '2023-10-28 14:22:41.4' |
motor | 6666 | '2023-10-28 14:22:41.5' |