0

I'm new a pyspark streaming i have a data stream from kafka that i want to fill every 0.1 seconds the timestamp from kafka is irregular I'm trying to fill all timestamp that are not there with the previous number. and i want to know how to deal with the batch.

input kafka Stream:

type number timestamp
car 7777 '2023-10-28 14:22:41.9'
car 8888 '2023-10-28 14:22:42.5'
car 1111 '2023-10-28 14:22:42.7'
motor 2222 '2023-10-28 14:22:41.2'
motor 6666 '2023-10-28 14:22:41.5'
# Create a Spark session
spark = SparkSession.builder.appName("KafkaStream").getOrCreate()


# Read data from Kafka as a streaming DataFrame
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "broker:29092") \
      .option("subscribe", "now") \
      .option("minOffsetsPerTrigger", 10) \
      .option("maxOffsetsPerTrigger", 15) \
      .option("startingOffsets", "latest") \
      .load()

schema = StructType([
    StructField("type", StringType(), True),
        StructField("number", LongType(), True),
        StructField("timestamp", StringType(), True)
])

output = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")


window_spec = Window.partitionBy("type").orderBy("type", "timestamp")

df = output.withColumn(
    "next_timestamp", to_timestamp(lead(col("timestamp")).over(window_spec))
)

result_df = df.withColumn(
    "timestamp",
    expr(
        "explode(sequence(to_timestamp(timestamp), nvl(next_timestamp, to_timestamp(timestamp)), interval 100 milliseconds))"
    ),
).withColumn(
    "to_drop",
    when((col("timestamp") == col("next_timestamp")), True).otherwise(False)
)
x = result_df.filter(col('to_drop') == False).drop("next_timestamp", "to_drop")

# Create a streaming query to process the data and fill missing values
query = x\
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

output batch in the console:

type number timestamp
car 7777 '2023-10-28 14:22:41.9'
car 7777 '2023-10-28 14:22:42.0'
car 7777 '2023-10-28 14:22:42.1'
car 7777 '2023-10-28 14:22:42.2'
car 7777 '2023-10-28 14:22:42.3'
car 7777 '2023-10-28 14:22:42.4'
car 8888 '2023-10-28 14:22:42.5'
car 8888 '2023-10-28 14:22:42.6'
car 1111 '2023-10-28 14:22:42.7'
motor 2222 '2023-10-28 14:22:41.2'
motor 2222 '2023-10-28 14:22:41.3'
motor 2222 '2023-10-28 14:22:41.4'
motor 6666 '2023-10-28 14:22:41.5'
2
  • For how long do you want to fill? You have to set a limit.
    – boyangeor
    Commented Nov 23, 2023 at 8:35
  • @boyangeor i want to fill from first car timestamp( '2023-10-28 14:22:41.9' ) until latest car timestamp where the number have changes in my example until ( '2023-10-28 14:22:42.4' ) and the same think for the rest of the values in column "Type". can you please show me what have i done wrong because i am new at Spark streaming.
    – kreemo
    Commented Nov 23, 2023 at 10:00

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.