class JavaStreamingContext extends Closeable
A Java-friendly version of org.apache.spark.streaming.StreamingContext which is the main
entry point for Spark Streaming functionality. It provides methods to create
org.apache.spark.streaming.api.java.JavaDStream and
org.apache.spark.streaming.api.java.JavaPairDStream from input sources. The internal
org.apache.spark.api.java.JavaSparkContext (see core Spark documentation) can be accessed
using context.sparkContext
. After creating and transforming DStreams, the streaming
computation can be started and stopped using context.start()
and context.stop()
,
respectively. context.awaitTermination()
allows the current thread to wait for the
termination of a context by stop()
or by an exception.
- Annotations
- @deprecated
- Deprecated
(Since version Spark 3.4.0) DStream is deprecated. Migrate to Structured Streaming.
- Source
- JavaStreamingContext.scala
- Alphabetic
- By Inheritance
- JavaStreamingContext
- Closeable
- AutoCloseable
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Instance Constructors
-
new
JavaStreamingContext(path: String, hadoopConf: Configuration)
Re-creates a JavaStreamingContext from a checkpoint file.
Re-creates a JavaStreamingContext from a checkpoint file.
- path
Path to the directory that was specified as the checkpoint directory
-
new
JavaStreamingContext(path: String)
Recreate a JavaStreamingContext from a checkpoint file.
Recreate a JavaStreamingContext from a checkpoint file.
- path
Path to the directory that was specified as the checkpoint directory
-
new
JavaStreamingContext(conf: SparkConf, batchDuration: Duration)
Create a JavaStreamingContext using a SparkConf configuration.
Create a JavaStreamingContext using a SparkConf configuration.
- conf
A Spark application configuration
- batchDuration
The time interval at which streaming data will be divided into batches
-
new
JavaStreamingContext(sparkContext: JavaSparkContext, batchDuration: Duration)
Create a JavaStreamingContext using an existing JavaSparkContext.
Create a JavaStreamingContext using an existing JavaSparkContext.
- sparkContext
The underlying JavaSparkContext to use
- batchDuration
The time interval at which streaming data will be divided into batches
-
new
JavaStreamingContext(master: String, appName: String, batchDuration: Duration, sparkHome: String, jars: Array[String], environment: Map[String, String])
Create a StreamingContext.
Create a StreamingContext.
- master
Name of the Spark Master
- appName
Name to be used when registering with the scheduler
- batchDuration
The time interval at which streaming data will be divided into batches
- sparkHome
The SPARK_HOME directory on the worker nodes
- jars
Collection of JARs to send to the cluster. These can be paths on the local file system or HDFS, HTTP, HTTPS, or FTP URLs.
- environment
Environment variables to set on worker nodes
-
new
JavaStreamingContext(master: String, appName: String, batchDuration: Duration, sparkHome: String, jars: Array[String])
Create a StreamingContext.
Create a StreamingContext.
- master
Name of the Spark Master
- appName
Name to be used when registering with the scheduler
- batchDuration
The time interval at which streaming data will be divided into batches
- sparkHome
The SPARK_HOME directory on the worker nodes
- jars
Collection of JARs to send to the cluster. These can be paths on the local file system or HDFS, HTTP, HTTPS, or FTP URLs.
-
new
JavaStreamingContext(master: String, appName: String, batchDuration: Duration, sparkHome: String, jarFile: String)
Create a StreamingContext.
Create a StreamingContext.
- master
Name of the Spark Master
- appName
Name to be used when registering with the scheduler
- batchDuration
The time interval at which streaming data will be divided into batches
- sparkHome
The SPARK_HOME directory on the worker nodes
- jarFile
JAR file containing job code, to ship to cluster. This can be a path on the local file system or an HDFS, HTTP, HTTPS, or FTP URL.
-
new
JavaStreamingContext(master: String, appName: String, batchDuration: Duration)
Create a StreamingContext.
Create a StreamingContext.
- master
Name of the Spark Master
- appName
Name to be used when registering with the scheduler
- batchDuration
The time interval at which streaming data will be divided into batches
-
new
JavaStreamingContext(ssc: StreamingContext)
- Deprecated
This is deprecated as of Spark 3.4.0. There are no longer updates to DStream and it's a legacy project. There is a newer and easier to use streaming engine in Spark called Structured Streaming. You should use Spark Structured Streaming for your streaming applications.
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
addStreamingListener(streamingListener: StreamingListener): Unit
Add a org.apache.spark.streaming.scheduler.StreamingListener object for receiving system events related to streaming.
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
awaitTermination(): Unit
Wait for the execution to stop.
Wait for the execution to stop. Any exceptions that occurs during the execution will be thrown in this thread.
- Annotations
- @throws( ... )
-
def
awaitTerminationOrTimeout(timeout: Long): Boolean
Wait for the execution to stop.
Wait for the execution to stop. Any exceptions that occurs during the execution will be thrown in this thread.
- timeout
time to wait in milliseconds
- returns
true
if it's stopped; or throw the reported error during the execution; orfalse
if the waiting time elapsed before returning from the method.
- Annotations
- @throws( ... )
-
def
binaryRecordsStream(directory: String, recordLength: Int): JavaDStream[Array[Byte]]
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as flat binary files with fixed record lengths, yielding byte arrays
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as flat binary files with fixed record lengths, yielding byte arrays
- directory
HDFS directory to monitor for new files
- recordLength
The length at which to split the records
- Note
We ensure that the byte array for each record in the resulting RDDs of the DStream has the provided record length.
-
def
checkpoint(directory: String): Unit
Sets the context to periodically checkpoint the DStream operations for master fault-tolerance.
Sets the context to periodically checkpoint the DStream operations for master fault-tolerance. The graph will be checkpointed every batch interval.
- directory
HDFS-compatible directory where the checkpoint data will be reliably stored
-
def
clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
def
close(): Unit
- Definition Classes
- JavaStreamingContext → Closeable → AutoCloseable
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
fileStream[K, V, F <: InputFormat[K, V]](directory: String, kClass: Class[K], vClass: Class[V], fClass: Class[F], filter: Function[Path, Boolean], newFilesOnly: Boolean, conf: Configuration): JavaPairInputDStream[K, V]
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format.
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.
- K
Key type for reading HDFS file
- V
Value type for reading HDFS file
- F
Input format for reading HDFS file
- directory
HDFS directory to monitor for new file
- kClass
class of key for reading HDFS file
- vClass
class of value for reading HDFS file
- fClass
class of input format for reading HDFS file
- filter
Function to filter paths to process
- newFilesOnly
Should process only new files and ignore existing files in the directory
- conf
Hadoop configuration
-
def
fileStream[K, V, F <: InputFormat[K, V]](directory: String, kClass: Class[K], vClass: Class[V], fClass: Class[F], filter: Function[Path, Boolean], newFilesOnly: Boolean): JavaPairInputDStream[K, V]
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format.
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.
- K
Key type for reading HDFS file
- V
Value type for reading HDFS file
- F
Input format for reading HDFS file
- directory
HDFS directory to monitor for new file
- kClass
class of key for reading HDFS file
- vClass
class of value for reading HDFS file
- fClass
class of input format for reading HDFS file
- filter
Function to filter paths to process
- newFilesOnly
Should process only new files and ignore existing files in the directory
-
def
fileStream[K, V, F <: InputFormat[K, V]](directory: String, kClass: Class[K], vClass: Class[V], fClass: Class[F]): JavaPairInputDStream[K, V]
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format.
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.
- K
Key type for reading HDFS file
- V
Value type for reading HDFS file
- F
Input format for reading HDFS file
- directory
HDFS directory to monitor for new file
- kClass
class of key for reading HDFS file
- vClass
class of value for reading HDFS file
- fClass
class of input format for reading HDFS file
-
def
finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] )
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
def
getState(): StreamingContextState
:: DeveloperApi ::
:: DeveloperApi ::
Return the current state of the context. The context can be in three possible states -
- StreamingContextState.INITIALIZED - The context has been created, but not been started yet. Input DStreams, transformations and output operations can be created on the context.
- StreamingContextState.ACTIVE - The context has been started, and been not stopped. Input DStreams, transformations and output operations cannot be created on the context.
- StreamingContextState.STOPPED - The context has been stopped and cannot be used any more.
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
def
queueStream[T](queue: Queue[JavaRDD[T]], oneAtATime: Boolean, defaultRDD: JavaRDD[T]): JavaInputDStream[T]
Create an input stream from a queue of RDDs.
Create an input stream from a queue of RDDs. In each batch, it will process either one or all of the RDDs returned by the queue.
- T
Type of objects in the RDD
- queue
Queue of RDDs
- oneAtATime
Whether only one RDD should be consumed from the queue in every interval
- defaultRDD
Default RDD is returned by the DStream when the queue is empty
- Note
1. Changes to the queue after the stream is created will not be recognized. 2. Arbitrary RDDs can be added to
queueStream
, there is no way to recover data of those RDDs, soqueueStream
doesn't support checkpointing.
-
def
queueStream[T](queue: Queue[JavaRDD[T]], oneAtATime: Boolean): JavaInputDStream[T]
Create an input stream from a queue of RDDs.
Create an input stream from a queue of RDDs. In each batch, it will process either one or all of the RDDs returned by the queue.
- T
Type of objects in the RDD
- queue
Queue of RDDs
- oneAtATime
Whether only one RDD should be consumed from the queue in every interval
- Note
1. Changes to the queue after the stream is created will not be recognized. 2. Arbitrary RDDs can be added to
queueStream
, there is no way to recover data of those RDDs, soqueueStream
doesn't support checkpointing.
-
def
queueStream[T](queue: Queue[JavaRDD[T]]): JavaDStream[T]
Create an input stream from a queue of RDDs.
Create an input stream from a queue of RDDs. In each batch, it will process either one or all of the RDDs returned by the queue.
- T
Type of objects in the RDD
- queue
Queue of RDDs
- Note
1. Changes to the queue after the stream is created will not be recognized. 2. Arbitrary RDDs can be added to
queueStream
, there is no way to recover data of those RDDs, soqueueStream
doesn't support checkpointing.
-
def
rawSocketStream[T](hostname: String, port: Int): JavaReceiverInputDStream[T]
Create an input stream from network source hostname:port, where data is received as serialized blocks (serialized using the Spark's serializer) that can be directly pushed into the block manager without deserializing them.
Create an input stream from network source hostname:port, where data is received as serialized blocks (serialized using the Spark's serializer) that can be directly pushed into the block manager without deserializing them. This is the most efficient way to receive data.
- T
Type of the objects in the received blocks
- hostname
Hostname to connect to for receiving data
- port
Port to connect to for receiving data
-
def
rawSocketStream[T](hostname: String, port: Int, storageLevel: StorageLevel): JavaReceiverInputDStream[T]
Create an input stream from network source hostname:port, where data is received as serialized blocks (serialized using the Spark's serializer) that can be directly pushed into the block manager without deserializing them.
Create an input stream from network source hostname:port, where data is received as serialized blocks (serialized using the Spark's serializer) that can be directly pushed into the block manager without deserializing them. This is the most efficient way to receive data.
- T
Type of the objects in the received blocks
- hostname
Hostname to connect to for receiving data
- port
Port to connect to for receiving data
- storageLevel
Storage level to use for storing the received objects
-
def
receiverStream[T](receiver: Receiver[T]): JavaReceiverInputDStream[T]
Create an input stream with any arbitrary user implemented receiver.
Create an input stream with any arbitrary user implemented receiver. Find more details at: https://spark.apache.org/docs/latest/streaming-custom-receivers.html
- receiver
Custom implementation of Receiver
-
def
remember(duration: Duration): Unit
Sets each DStreams in this context to remember RDDs it generated in the last given duration.
Sets each DStreams in this context to remember RDDs it generated in the last given duration. DStreams remember RDDs only for a limited duration of duration and releases them for garbage collection. This method allows the developer to specify how long to remember the RDDs ( if the developer wishes to query old data outside the DStream computation).
- duration
Minimum duration that each DStream should remember its RDDs
-
def
socketStream[T](hostname: String, port: Int, converter: Function[InputStream, Iterable[T]], storageLevel: StorageLevel): JavaReceiverInputDStream[T]
Create an input stream from network source hostname:port.
Create an input stream from network source hostname:port. Data is received using a TCP socket and the receive bytes it interpreted as object using the given converter.
- T
Type of the objects received (after converting bytes to objects)
- hostname
Hostname to connect to for receiving data
- port
Port to connect to for receiving data
- converter
Function to convert the byte stream to objects
- storageLevel
Storage level to use for storing the received objects
-
def
socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String]
Create an input stream from network source hostname:port.
Create an input stream from network source hostname:port. Data is received using a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- hostname
Hostname to connect to for receiving data
- port
Port to connect to for receiving data
-
def
socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel): JavaReceiverInputDStream[String]
Create an input stream from network source hostname:port.
Create an input stream from network source hostname:port. Data is received using a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited lines.
- hostname
Hostname to connect to for receiving data
- port
Port to connect to for receiving data
- storageLevel
Storage level to use for storing the received objects
-
val
sparkContext: JavaSparkContext
The underlying SparkContext
- val ssc: StreamingContext
-
def
start(): Unit
Start the execution of the streams.
-
def
stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
Stop the execution of the streams.
Stop the execution of the streams.
- stopSparkContext
Stop the associated SparkContext or not
- stopGracefully
Stop gracefully by waiting for the processing of all received data to be completed
-
def
stop(stopSparkContext: Boolean): Unit
Stop the execution of the streams.
Stop the execution of the streams.
- stopSparkContext
Stop the associated SparkContext or not
-
def
stop(): Unit
Stop the execution of the streams.
Stop the execution of the streams. Will stop the associated JavaSparkContext as well.
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
def
textFileStream(directory: String): JavaDStream[String]
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat).
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat). Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored. The text files must be encoded as UTF-8.
- directory
HDFS directory to monitor for new file
-
def
toString(): String
- Definition Classes
- AnyRef → Any
-
def
transform[T](dstreams: List[JavaDStream[_]], transformFunc: Function2[List[JavaRDD[_]], Time, JavaRDD[T]]): JavaDStream[T]
Create a new DStream in which each RDD is generated by applying a function on RDDs of the DStreams.
Create a new DStream in which each RDD is generated by applying a function on RDDs of the DStreams. The order of the JavaRDDs in the transform function parameter will be the same as the order of corresponding DStreams in the list.
- Note
For adding a JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using org.apache.spark.streaming.api.java.JavaPairDStream.toJavaDStream(). In the transform function, convert the JavaRDD corresponding to that JavaDStream to a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
-
def
transformToPair[K, V](dstreams: List[JavaDStream[_]], transformFunc: Function2[List[JavaRDD[_]], Time, JavaPairRDD[K, V]]): JavaPairDStream[K, V]
Create a new DStream in which each RDD is generated by applying a function on RDDs of the DStreams.
Create a new DStream in which each RDD is generated by applying a function on RDDs of the DStreams. The order of the JavaRDDs in the transform function parameter will be the same as the order of corresponding DStreams in the list.
- Note
For adding a JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using org.apache.spark.streaming.api.java.JavaPairDStream.toJavaDStream(). In the transform function, convert the JavaRDD corresponding to that JavaDStream to a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
-
def
union[K, V](jdstreams: JavaPairDStream[K, V]*): JavaPairDStream[K, V]
Create a unified DStream from multiple DStreams of the same type and same slide duration.
Create a unified DStream from multiple DStreams of the same type and same slide duration.
- Annotations
- @varargs()
-
def
union[T](jdstreams: JavaDStream[T]*): JavaDStream[T]
Create a unified DStream from multiple DStreams of the same type and same slide duration.
Create a unified DStream from multiple DStreams of the same type and same slide duration.
- Annotations
- @varargs()
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()