Skip to main content
Version: 2.0
🕑Estimated time for completion

This section takes about 5 minutes to complete.

Sharing Streaming Data

In the prior examples, we've displayed our resulting aggregated stream data in the console. In the scenario where we would like to share the results of our aggregated streaming data with others, we can write that to a remote data store in the form of csv, json, parquet, and other formats similar to writing Batch data to remote data stores (Sink). It is also possible to write directly to Transactional databases, such as Cassandra or DynamoDB or even back to a Kafka topic. Spark supports writes to certain remote data stores out of the box (e.g. Cassandra) and some require leveraging the foreach operator plus some custom code to interact directly with the remote data store.

Writing to Cassandra​

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()

Source

Writing to DynamoDB​

from pyspark.sql.functions import *

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreach(SendToDynamoDB_ForeachWriter()) # => custom code. See https://docs.databricks.com/structured-streaming/examples.html#write-to-amazon-dynamodb-using-foreach-in-scala-and-python for details.
.outputMode("update")
.start()
)

Source