暂无说说

storm相关概念

storm jiajun 11个月前 (11-20) 105次浏览 0个评论 扫描二维码

tuple

Storm 应用程序能够直接处理的基本数据单元称为 tuple(元组)。每个 tuple 由预定义的 field(字段 )列表组成。每个 field 的值类型可以是 byte、char、integer、long、float、double、Boolean 或 byte 数组。此外,Storm 提供了一个 API 来自定义数据类型,这些数据类型可以序列化为 tuple 中的 field。

tuple 是动态类型的,也就是说,只需要定义 tuple 中的字段名,而不需要指定它们的数据类型。动态类型有助于简化 API,使其易于使用。此外,由于 Storm 中的处理单元可以处理多种类型的 tuple,因此声明字段类型是不实际的。​

tuple 中的每个 field 都可以通过其名称 getValueByField(String)或 tuple 中的位置索引 getValue(int)来访问。tuple 还提供了一些方便的方法,例如 getIntegerByField(String),它可以避免对对象进行类型转换。例如,如果有一个分数(分子、分母)元组,表示小数,那么可以使用 getIntegerByField(“分子”)或 getInteger(0)来获得分子的值。

topology

在 Storm 中,topology(拓扑)是一种抽象,它为图计算提供了支持。我们可以创建一个 Storm 拓扑并将其部署到 Storm 集群中来处理数据。topology 可用有向无环图表示,其中每个节点执行某种处理并将其转发给流中的下一个节点。下面是一个 Storm topology 示例:

topology 的组件

Stream:Storm 中的关键抽象是 Stream(流)的抽象。Stream 是一个无界 tuple 序列,可以在 Storm 中并行处理。每个流可以由单个或多个的 bolts 处理(Storm 中的处理单元)。因此,Storm 也可以被看作是一个流转换平台。在前面的图中,Stream 用带箭头的线段表示。Storm 应用程序中的每个流都有一个 ID,bolts 可以根据这些流的 ID 生成和使用元组。

Spout:Spout 是 Storm 拓扑(topology)中 tuple 的来源。它负责从外部源读取或监听数据,例如,通过从日志读取或监听队列中的新消息,并在 Storm 中将其发布到 Stream 中。一个 Spout 可以向多个 Stream 发出数据,每个 Stream 都是不同的模式。例如,它可以从一个日志文件读取 10 元组的记录然后分别向不同 Stream 发射 7 元组和 4 元组的 tuple。

backtype.storm.spout.ISpout 接口:storm中用来定义 spout 的接口。如果使用 Java 编写 topology,可以使用 ISpout 的子接口 backtype.storm.topology.IRichSpout,因为 IRichSpout 声明了使用 TopologyBuilder API 的方法。当 spout 发出一个 tuple 时,Storm 会跟踪并处理该个 tuple 所生成的所有 tuple,当图中所有基于这个源 tuple 的 tuple 执行完成时,将向 spout 发回一个确认信息。只有在发出 tuple 时提供了 message ID,这个跟踪过程才会发生。如果使用 null 作为 message ID,则不会发生跟踪。

此外,还可以为拓扑设置处理超时时间,如果在指定时间内没有处理 tuple,则会将失败消息发送回 spout。同样,只有在定义 message ID 的约束时,才会发生这种情况。如果禁用消息确认机制,在发出 tuple 时跳过 message ID,虽然可以从 Storm 中获得少量的性能增益,但往往需要冒着数据丢失的风险。

spout 重要的方法:

nextTuple():Storm 调用该方法从输入源获取下一个元组。通过该方法,可以从外部源读取数据并将其发送到 backtype.storm.spout.ISpoutOutputCollector 实例。通过 backtype.storm.topology.OutputFieldsDeclarer 的 declareStream 方法可以声明流的 schema。如果 spout 希望向多个流发出数据,可以使用 declareStream 方法声明多个流,并在发出元组时指定流 ID。如果此时没有要发出的元组,则不会阻塞此方法。此外,如果该方法不发出元组,那么 Storm 将等待 1 毫秒后再调用它。这个等待时间可以使用 topology.sleep.spout.wait.strategy.time 设置。

ack(Object msgId):当拓扑完全处理完具有给定 message ID 的元组时,Storm 调用该方法。此时,用户应将消息标记为已处理的消息,并执行所需的清理操作,例如从消息队列中删除消息,这样消息就不会再次被处理。

fail(Object msgId):当 Storm 识别出带有给定 message ID 的元组没有被成功处理或超过配置的时间时,就会调用这个方法。在这种情况下,用户应该执行所需的处理,以便 nextTuple 方法可以再次发出消息。通常的方法是将消息放回传入消息队列中。

open():此方法仅在初始化 spout 时调用一次。如果需要连接到输入数据的外部源,可在 open 方法中定义连接到外部源的逻辑,然后在 nextTuple 方法中继续从这个外部源获取数据,并向下游发出。

在编写 spout 时需要注意的是,所有的方法都不应该阻塞,因为 Storm 在同一个线程中调用了所有的方法。每个 spout 都有一个内部缓冲区来跟踪到目前为止发出的元组的状态。spout 将在这个缓冲区中保存元组,分别调用 ack 或 fail 方法,直到它们被确认或失败。Storm 只在缓冲区未满时调用 nextTuple 方法。

 

Bolt:Bolt 是 Storm 拓扑的处理中心,负责流数据的转换。理想情况下,拓扑中的每个 Bolt 都应该对元组进行简单的转换,多个 Bolt 可以相互协调以实现复杂的转换。

backtype.storm.task.IBolt 接口:用于定义 Bolt 的接口,如果使用 Java 编写拓扑,可使用 backtype.storm.topology.IRichBolt 接口。一个 bolt 可以接收拓扑中其他多个组件的 tuple——无论是 Spout 还是其他的 bolt——同样也可以向多个流发出输出。输出流可以使用 backtype.storm.topology.OutputFieldsDeclarer 的 declareStream 方法声明。

bolt 的重要方法:

execute(Tuple input):对通过订阅输入流的每个元组执行此方法。在该方法中,可以对元组执行任何需要的处理,然后向声明的输出流发出更多元组生成输出,或者在数据库中持久化存储。在调用该方法时,不需要立即处理元组,并且元组可以一直保持到需要为止。例如,在对两个流进行连接时,当一个元组到达,可以一直持有它,直到它的对等项到来,然后才发出连接后的元组。与元组关联的元数据可以通过 Tuple 接口中定义的各种方法检索。如果 message ID 与 tuple 关联,则 execute 方法必须使用 OutputCollector 发布 ack 或 fail 事件,否则 Storm 将不知道是否成功处理了 tuple。backtype.storm.topology.IBasicBolt 接口是在执行方法完成后自动发送确认的方便接口。在要发送失败事件的情况下,该方法应该抛出 backtype.storm.topology.FailedException。

prepare(Map stormConf, TopologyContext context, OutputCollector collector): 在一个 Storm 拓扑中,一个 Bolt 可以由多个 worker 执行。在客户机上创建 bolt 实例,然后序列化并提交给 Nimbus。当 Nimbus 为拓扑创建 worker 实例时,将该序列化的 Bolt 发送给 worker。该过程将调用 prepare 方法,使 Bolt 启动使用。在该方法中,应该确保 Bolt 被正确配置,能够处理 tuple。

喜欢 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址