博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
开发系列:03、Spark Streaming Custom Receivers(译)
阅读量:4648 次
发布时间:2019-06-09

本文共 5484 字,大约阅读时间需要 18 分钟。

Spark Streaming can receive streaming data from any arbitrary data source beyond the one’s for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.). This requires the developer to implement a receiver that is customized for receiving data from the concerned data source. This guide walks through the process of implementing a custom receiver and using it in a Spark Streaming application.

 
Spark Streaming 可以从任意的数据源收集到流式数据,而不仅是内置支持(即Flume、Kafka、files、Socket等)这需要开发人员实现自定义的从有关的数据源接收数据的接收器。本指南贯穿了实现自定义的接收器并在Spark 应用中使用它的过程。
 

Implementing a Custom Receiver

This starts with implementing a . A custom receiver must extend this abstract class by implementing two methods - onStart(): Things to do to start receiving data. - onStop(): Things to do to stop receiving data.

实现一个自定义的接收器

这从实现一个Receiver类开始。一个自定义的接收器必须继承这个抽象类并实现两个方法:onStart():一些开始接收数据的操作;onStop():一些停止接收数据的操作。

Note that onStart() and onStop() must not block indefinitely. Typically, onStart() would start the threads that responsible for receiving the data and onStop() would ensure that the receiving by those threads are stopped. The receiving threads can also use isStopped(), a Receiver method, to check whether they should stop receiving data.

注意onStart()和onStop()不能无限期的阻塞。通常,onStart()将会启动一个线程负责接收数据;onStop()将保证这些线程都停止。接收线程也可以使用isStopped(),Receiver类的一个方法,检测它们是否应该停止接收数据。

Once the data is received, that data can be stored inside Spark by calling store(data), which is a method provided by the  class. There are number of flavours of store() which allow you store the received data record-at-a-time or as whole collection of objects / serialized bytes.

一旦接收到了数据,可以通过调用Spark里边的名叫store(data)的方法来保存数据,这个方法由Receiver类提供。store()方法有一个泛型的参数,它允许每次存储包含对象/序列化字节数组的容器的一条数据。

Any exception in the receiving threads should be caught and handled properly to avoid silent failures of the receiver. restart(<exception>) will restart the receiver by asynchronously calling onStop() and then calling onStart() after a delay. stop(<exception>) will call onStop() and terminate the receiver. Also, reportError(<error>) reports a error message to the driver (visible in the logs and UI) without stopping / restarting the receiver.

接收线程中的任何异常都应该被正确的处理,避免接收器无声故障。restart(< exception>)将重启接收器通过异步调用onStop()方法并在一定延迟之后调用onStart()。stop(< exception>)方法将调用onStop()并终止接收器。同样,reportError(<error>)方法报告一个错误信息到driver(可以在LOG和UI中看到)但不停止或重启接收器

The following is a custom receiver that receives a stream of text over a socket. It treats ‘\n’ delimited lines in the text stream as records and stores them with Spark. If the receiving thread has any error connecting or receiving, the receiver is restarted to make another attempt to connect.

下面是一个通过socket接收文本的自定义接收器。它把文本流中用'\n'分隔的行当作记录并存储到Spark。如果接收过程有错误的连接或接收错误,接收器会尝试另一次连接。

Scala

 

class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null try { // Connect to host:port socket = new Socket(host, port) // Until stopped or connection broken continue reading val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine() } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again") } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } } }

 

 

Using the custom receiver in a Spark Streaming application

The custom receiver can be used in a Spark Streaming application by using streamingContext.receiverStream(<instance of custom receiver>). This will create input DStream using data received by the instance of custom receiver, as shown below

 

在Spark Streaming应用中使用自定义接收器

可以对它使用streamingContext.receiverStream(<自定义接收器实例>)的方式使用自定义接收器。这将会使用自定义的接收器接收的数据创建输入DStream,如下面展示

Scala

 

// Assuming ssc is the StreamingContextval customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port)) val words = lines.flatMap(_.split(" ")) ...

 

The full source code is in the example .

这个例子完整的源码在.

 

Implementing and Using a Custom Actor-based Receiver

Custom  can also be used to receive data. The  trait can be applied on any Akka actor, which allows received data to be stored in Spark using store(...) methods. The supervisor strategy of this actor can be configured to handle failures, etc.

 

实现并使用基于Actor的自定义接收器

也可以自定义Akka的Actor来接收数据。 trait可以应用到所有Akka actor接收到的数据使用store()存储到spark。这个actor的处理故障可以配置相应策略

 

class CustomActor extends Actor with ActorHelper { def receive = { case data: String => store(data) } }

 

And a new input stream can be created with this custom actor as

可以使用这个自定义actor来创建新的输入流

 

// Assuming ssc is the StreamingContextval lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")

 

See  for an end-to-end example.

 

转载于:https://www.cnblogs.com/chanxiu/p/3967854.html

你可能感兴趣的文章
SqlServer索引的原理与应用
查看>>
使用Kubeadm搭建Kubernetes(1.12.2)集群
查看>>
微信小程序获取当前地址以及选择地址详解 地点标记
查看>>
任务平均分配的小算法
查看>>
学习日报 7-10(验证码)
查看>>
No.3 - CSS transition 和 CSS transform 配合制作动画
查看>>
c++STL全排列
查看>>
开发系列:03、Spark Streaming Custom Receivers(译)
查看>>
fixed与sticky的区别
查看>>
keil C51 例子
查看>>
MVC后台数据赋值给前端JS对象
查看>>
win7、offcie 2010是否激活查看方法
查看>>
Linux下使用wget下载FTP服务器文件
查看>>
Java基础 【Arrays 类的使用】
查看>>
MPI 环境搭建问题-运行程序闪退
查看>>
(数据科学学习手札05)Python与R数据读入存出方式的总结与比较
查看>>
面向对象课程 - 寒假第三次作业 - C++计算器项目初始部分
查看>>
Java私塾的一些基础练习题(一)
查看>>
Shell 07 项目案例
查看>>
Dapper基础用法
查看>>