Apache Storm入门学习


=Start=

缘由:

学习、提高需要

正文:

参考解答:

一、什么是Apache Storm?

Apache Storm是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框架,具有很高的吞吐率。虽然Storm是无状态的,它通过Apache ZooKeeper管理分布式环境和集群状态。它很简单,您可以并行地对实时数据执行各种操作。Apache Storm易于设置和操作,并且它保证每个消息将通过拓扑至少处理一次。

二、Apache Storm vs. Hadoop

Hadoop和Storm框架都用于分析大数据。两者互补,在某些方面有所不同。Apache Storm执行除持久性之外的所有操作,而Hadoop在所有方面都很好,但滞后于实时计算。下表比较了Storm和Hadoop的属性。

Storm Hadoop
实时流处理 批量处理
无状态 有状态
主/从架构与基于ZooKeeper的协调。主节点称为nimbus,从属节点是主管 具有/不具有基于ZooKeeper的协调的主 – 从结构。主节点是作业跟踪器,从节点是任务跟踪器
Storm流过程在集群上每秒可以访问数万条消息。 Hadoop分布式文件系统(HDFS)使用MapReduce框架来处理大量的数据,需要几分钟或几小时。
Storm拓扑运行直到用户关闭或意外的不可恢复故障。 MapReduce作业按顺序执行并最终完成。
两者都是分布式和容错的
如果nimbus / supervisor死机,重新启动使它从它停止的地方继续,因此没有什么受到影响。 如果JobTracker死机,所有正在运行的作业都会丢失。
  • Storm:   分布式实时计算,强调实时性,常用于实时性要求较高的地方;
  • Hadoop:分布式批处理计算,强调批处理,常用于对已经在的大量数据进行挖掘、分析

三、Apache Storm的核心概念

下图描述了Apache Storm的核心概念。

核心理念

Storm 集群的输入流由一个被称作 spout 的组件管理,spout 把数据传递给 bolt, bolt 要么把数据保存到某种存储器,要么把数据传递给其它的 bolt。你可以想象一下,一个 Storm 集群就是在一连串的 bolt 之间转换 spout 传过来的数据:

data-source -> spout -> bolt[ -> bolt ...] -> data-storage

现在让我们仔细看看Apache Storm的组件:

组件 描述
Tuple Tuple是Storm中的主要数据结构。它是有序元素的列表。默认情况下,Tuple支持所有数据类型。通常,它被建模为一组逗号分隔的值,并传递到Storm集群。
Stream 流是元组的无序序列。
Spouts 流的源。通常,Storm从原始数据源(如Twitter Streaming API,Apache Kafka队列,Kestrel队列等)接受输入数据。否则,您可以编写spouts以从数据源读取数据。“ISpout”是实现spouts的核心接口,一些特定的接口是IRichSpout,BaseRichSpout,KafkaSpout等。
Bolts Bolts是逻辑处理单元。Spouts将数据传递到Bolts和Bolts过程,并产生新的输出流。Bolts可以执行过滤,聚合,加入,与数据源和数据库交互的操作。Bolts接收数据并发射到一个或多个Bolts。 “IBolt”是实现Bolts的核心接口。一些常见的接口是IRichBolt,IBasicBolt等。

拓扑

Spouts和Bolts连接在一起,形成拓扑结构。实时应用程序逻辑在Storm拓扑中指定。简单地说,拓扑是有向图,其中顶点是计算,边缘是数据流。

简单拓扑从spouts开始。Spouts将数据发射到一个或多个Bolts。Bolt表示拓扑中具有最小处理逻辑的节点,并且Bolts的输出可以发射到另一个Bolts作为输入。

Storm保持拓扑始终运行,直到您终止拓扑。Apache Storm的主要工作是运行拓扑,并在给定时间运行任意数量的拓扑。

任务

现在你有一个关于Spouts和Bolts的基本想法。它们是拓扑的最小逻辑单元,并且使用单个Spout和Bolt阵列构建拓扑。应以特定顺序正确执行它们,以使拓扑成功运行。Storm执行的每个Spout和Bolt称为“任务”。简单来说,任务是Spouts或Bolts的执行。在给定时间,每个Spout和Bolt可以具有在多个单独的螺纹中运行的多个实例。

进程

拓扑在多个工作节点上以分布式方式运行。Storm将所有工作节点上的任务均匀分布。工作节点的角色是监听作业,并在新作业到达时启动或停止进程。

四、Apache Storm的集群架构

Apache Storm的主要亮点是,它是一个容错,快速,没有“单点故障”(SPOF)分布式应用程序。我们可以根据需要在多个系统中安装Apache Storm,以增加应用程序的容量。

让我们看看Apache Storm集群如何设计和其内部架构。下图描述了集群设计:

动物园管理员框架

Apache Storm有两种类型的节点,Nimbus(主节点)和Supervisor(工作节点)。Nimbus是Apache Storm的核心组件。Nimbus的主要工作是运行Storm拓扑。Nimbus分析拓扑并收集要执行的任务。然后,它将任务分配给可用的supervisor。

Supervisor将有一个或多个工作进程。Supervisor将任务委派给工作进程。工作进程将根据需要产生尽可能多的执行器并运行任务。Apache Storm使用内部分布式消息传递系统来进行Nimbus和管理程序之间的通信。

组件 描述
Nimbus(主节点) Nimbus是Storm集群的主节点。集群中的所有其他节点称为工作节点。主节点负责在所有工作节点之间分发数据,向工作节点分配任务和监视故障。
Supervisor(工作节点) 遵循指令的节点被称为Supervisors。Supervisor有多个工作进程,它管理工作进程以完成由nimbus分配的任务。
Worker process(工作进程) 工作进程将执行与特定拓扑相关的任务。工作进程不会自己运行任务,而是创建执行器并要求他们执行特定的任务。工作进程将有多个执行器。
Executor(执行者) 执行器只是工作进程产生的单个线程。执行器运行一个或多个任务,但仅用于特定的spout或bolt。
Task(任务) 任务执行实际的数据处理。所以,它是一个spout或bolt。
ZooKeeper framework(ZooKeeper框架) Apache的ZooKeeper的是使用群集(节点组)自己和维护具有强大的同步技术共享数据之间进行协调的服务。Nimbus是无状态的,所以它依赖于ZooKeeper来监视工作节点的状态。

ZooKeeper的帮助supervisor与nimbus交互。它负责维持nimbus,supervisor的状态。

Storm是无状态的。即使无状态性质有它自己的缺点,它实际上帮助Storm以最好的可能和最快的方式处理实时数据。

Storm虽然不是完全无状态的。它将其状态存储在Apache ZooKeeper中。由于状态在Apache ZooKeeper中可用,故障的网络可以重新启动,并从它离开的地方工作。通常,像monit这样的服务监视工具将监视Nimbus,并在出现任何故障时重新启动它。

五、Apache Storm的入门实例

HelloWorldSpout.java

public class HelloWorldSpout extends BaseRichSpout{
    private SpoutOutputCollector collector;
    private int referenceRandom;
    private static final int MAX_RANDOM = 10;

    public HelloWorldSpout() {
        final Random rand = new Random();
        referenceRandom = rand.nextInt(MAX_RANDOM);
    }

    /*
     * declareOutputFields() => you need to tell the Storm cluster which fields this Spout emits within the
     *  declareOutputFields method.
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    /*
     * open() => The first method called in any spout is 'open'
     *           TopologyContext => contains all our topology data
     *           SpoutOutputCollector => enables us to emit the data that will be processed by the bolts
     *           conf => created in the topology definition
     */
    @Override
    public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    /*
     * nextTuple() => Storm cluster will repeatedly call the nextTuple method which will do all the work of the spout.
     *  nextTuple() must release the control of the thread when there is no work to do so that the other methods have
     *  a chance to be called.
     */
    @Override
    public void nextTuple() {
        final Random rand = new Random();
        int instanceRandom = rand.nextInt(MAX_RANDOM);
        if(instanceRandom == referenceRandom){
            collector.emit(new Values("Hello World"));
        } else {
            collector.emit(new Values("Other Random Word"));
        }
    }
}

HelloWorldBolt.java

public class HelloWorldBolt extends BaseRichBolt {
    private int myCount = 0;

    /*
     * prepare() => on create
     */
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    }

    /*
     * execute() => most important method in the bolt is execute(Tuple input), which is called once per tuple received
     *  the bolt may emit several tuples for each tuple received
     */
    @Override
    public void execute(Tuple tuple) {
        String test = tuple.getStringByField("sentence");
        if(test == "Hello World"){
            myCount++;
            System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount));
        }
    }

    /*
     * declareOutputFields => This bolt emits nothing hence no body for declareOutputFields()
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }
}

HelloWorldTopology.java

/**
 * Author: ashrith
 * Desc: setup the topology and submit it to either a local of remote Storm cluster depending on the arguments
 *       passed to the main method.
 */
public class HelloWorldTopology {
    /*
     * main class in which to define the topology and a LocalCluster object (enables you to test and debug the
     * topology locally). In conjunction with the Config object, LocalCluster allows you to try out different
     * cluster configurations.
     *
     * Create a topology using 'TopologyBuilder' (which will tell storm how the nodes area arranged and how they
     * exchange data)
     * The spout and the bolts are connected using 'ShuffleGroupings'
     *
     * Create a 'Config' object containing the topology configuration, which is merged with the cluster configuration
     * at runtime and sent to all nodes with the prepare method
     *
     * Create and run the topology using 'createTopology' and 'submitTopology'
     */
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10);
        builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 1).shuffleGrouping("randomHelloWorld");
        Config conf = new Config();
        conf.setDebug(true);
        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }
}

 

参考链接:

=END=


《“Apache Storm入门学习”》 有 28 条评论

  1. 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink
    http://www.infoq.com/cn/articles/hadoop-storm-samza-spark-flink/

    What is/are the main difference(s) between Flink and Storm?
    https://stackoverflow.com/questions/30699119/what-is-are-the-main-differences-between-flink-and-storm

    流式大数据处理的三种框架:Storm,Spark和Samza
    http://www.csdn.net/article/2015-03-09/2824135
    Spark与Flink:对比与分析
    http://www.csdn.net/article/2015-07-16/2825232
    流式大数据处理的三种框架:Storm,Spark和Flink
    http://blog.csdn.net/cm_chenmin/article/details/53072498

    实时流处理Storm、Spark Streaming、Samza、Flink孰优孰劣?
    http://www.dataguru.cn/article-9532-1.html

  2. HDFS是GFS的一种实现,它的完整名字是分布式文件系统,类似于FAT32、NTFS,是一种文件格式,是底层的。

    HBase是Hadoop database,即Hadoop数据库。它是一个适合于非结构化数据存储的数据库,HBase是基于列的而不是基于行的模式。HBase是Google Bigtable的开源实现,类似Google Bigtable利用GFS作为其文件存储系统,HBase利用Hadoop HDFS作为其文件存储系统;Google运行MapReduce来处理Bigtable中的海量数据,HBase同样利用Hadoop MapReduce来处理HBase中的海量数据。

    Hive基于数据仓库,提供静态数据的动态查询(不支持更改数据的操作)。它使用类SQL语言,底层经过编译转为MapReduce程序,在Hadoop上运行,数据存储在HDFS上。

    Pig和Hive类似,但更侧重于数据的查询和分析,底层都是转化成MapReduce程序运行。区别是Hive是类SQL的查询语言,要求数据存储于表中,而Pig是面向数据流的一个程序语言。

    Hadoop HDFS为HBase提供了高可靠性的底层存储支持。Hadoop MapReduce为HBase提供了高性能的计算能力。Zookeeper为HBase提供了稳定服务和failover机制。Pig和Hive还为HBase提供了高层语言支持,使得在HBase上进行数据统计处理变的非常简单。

    Hive和Hbase是两种基于Hadoop的不同技术–Hive是一种类SQL的引擎,并且运行MapReduce任务,Hbase是一种在Hadoop之上的NoSQL的Key/vale数据库。

  3. hadoop、storm和spark的区别、比较
    https://www.cnblogs.com/snowbook/p/5773562.html

    Hadoop,Spark和Storm
    http://blog.csdn.net/xiaomin1991222/article/details/50980638
    http://www.open-open.com/lib/view/open1416646884102.html

    Hadoop、Storm、Spark这三个大数据平台有啥区别,各有啥应用场景?
    http://blog.csdn.net/w1014074794/article/details/50687524

    量化派基于Hadoop、Spark、Storm的大数据风控架构
    https://www.csdn.net/article/2015-10-06/2825849

    与 Hadoop 对比,如何看待 Spark 技术?
    https://www.zhihu.com/question/26568496

    Hadoop,HBase,Storm,Spark到底是什么?
    http://blog.csdn.net/yangzhenping/article/details/41826163

    主流的三大分布式计算系统:Hadoop,Spark和Storm
    https://www.douban.com/note/545951210/

    hadoop和大数据的关系?和spark的关系?互补?并行?
    https://www.zhihu.com/question/23036370
    【专治不明觉厉】之“大数据”
    https://www.huxiu.com/article/31457/1.html
    `
    Hadoop,Spark和Storm是目前最重要的三大分布式计算系统,Hadoop常用于离线的复杂的大数据处理,Spark常用于离线的快速的大数据处理,而Storm常用于在线的实时的大数据处理。
    `

  4. 根据日志内容来决定如何消费
    https://stackoverflow.com/questions/31592866/storm-conditionally-consuming-stream-from-kafka-spout

    Storm的 emit
    https://stackoverflow.com/questions/19807395/how-would-i-split-a-stream-in-apache-storm
    `
    先在 declareOutputFields 里面声明 stream ,然后在 execute 里面根据实际情况进行 emit 操作。
    `

    用实例理解Storm的Stream概念
    http://zqhxuyuan.github.io/2016/06/30/Hello-Storm/

    Storm 的主要概念
    http://storm.apachecn.org/releases/cn/1.1.0/Concepts.html

  5. 如何利用Flink实现超大规模用户行为分析
    http://www.hansight.com/blog-flink-UBA.html
    `
    导语:主要分为四大部分:
    1)网络安全中的用户行为分析(简称 UBA);
    2)实时超大规模用户行为分析的技术挑战 ;
    3)Drools 规则引擎在 CEP 中的应用 ;
    4)Flink 原生 CEP 组件。
    `

  6. 《从0到1学习Flink》—— Apache Flink 介绍
    http://yoursite.com/2018/10/13/flink-introduction/
    《从0到1学习Flink》—— Flink 配置文件详解
    http://www.54tianzhisheng.cn/2018/10/27/flink-config/
    《从0到1学习Flink》—— Data Source 介绍
    http://www.54tianzhisheng.cn/2018/10/28/flink-sources/
    《从0到1学习Flink》—— Data Sink 介绍
    http://www.54tianzhisheng.cn/2018/10/29/flink-sink/
    《从0到1学习Flink》—— 如何自定义 Data Source ?
    http://www.54tianzhisheng.cn/2018/10/30/flink-create-source/
    《从0到1学习Flink》—— 如何自定义 Data Sink ?
    http://www.54tianzhisheng.cn/2018/10/31/flink-create-sink/

  7. cluster模式下storm kill topology时做cleanup的解决方法
    https://blog.csdn.net/maixiaohai/article/details/51683098
    https://blog.csdn.net/u010003835/article/details/52163430
    `
    在bolt中,需要在topology被关闭前执行某个操作,而根据官方文档,cleanup方法并不可靠,它只在local mode下生效。

    # 解决方案
    在killing a topology之前,需要先deactivate相应的topology,然后处理未完成的message。可以调用Spout.deactivate()方法,传给bolt一个特殊的tuple,在bolt处检查该特殊tuple,一旦收到执行需要执行的操作。
    tuple的特殊性可以通过tuple的stream来区分。
    `

    https://storm.apache.org/releases/current/Lifecycle-of-a-topology.html
    https://storm.apache.org/releases/current/Local-mode.html

  8. Storm简单介绍
    https://matt33.com/2015/05/26/the-basis-of-storm/
    `
    1. 基础
    1.1. Storm的Topology模型
    1.1.1. tuple
    1.1.2. spout
    1.1.3. bolt
    1.1.4. Streams

    1.2. storm并发机制
    1.2.1. 默认的并发机制
    1.2.2. 给topology增加worker
    1.2.3. 配置executor和task

    1.3. 数据流分组
    1.4. 可靠的消息处理机制
    1.4.1. spout的可靠性
    1.4.2. bolt的可靠性
    1.4.3. acker

    2. Storm集群框架
    2.1. nimbus守护进程
    2.2. supervisor守护进程
    2.3. Zookeeper的作用

    3. Storm程序框架
    3.1. topology提交
    3.1.1. 本地模式
    3.1.2. 集群模式
    3.1.3. 实际的例子
    3.2. Spout
    3.3. Bolt
    `

    getting-started-with-storm
    https://github.com/xetorthio/getting-started-with-storm/blob/master/ch05Bolts.asc

  9. Stream Grouping类型
    https://www.cnblogs.com/CRXY/p/4564070.html

    Storm Grouping —— 流分组策略
    https://www.cnblogs.com/xymqx/p/4365190.html
    `
    Shuffle Grouping :随机分组,尽量均匀分布到下游Bolt中
    将流分组定义为混排。这种混排分组意味着来自Spout的输入将混排,或随机分发给此Bolt中的任务。shuffle grouping对各个task的tuple分配的比较均匀。

    Fields Grouping :按字段分组,按数据中field值进行分组;相同field值的Tuple被发送到相同的Task
    这种grouping机制保证相同field值的tuple会去同一个task,这对于WordCount来说非常关键,如果同一个单词不去同一个task,那么统计出来的单词次数就不对了。“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”.

    All grouping :广播
    广播发送, 对于每一个tuple将会复制到每一个bolt中处理。

    Global grouping :全局分组,Tuple被分配到一个Bolt中的一个Task,实现事务性的Topology。
    Stream中的所有的tuple都会发送给同一个bolt任务处理,所有的tuple将会发送给拥有最小task_id的bolt任务处理。

    None grouping :不分组
    不关注并行处理负载均衡策略时使用该方式,目前等同于shuffle grouping,另外storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下。

    Direct grouping :直接分组 指定分组
    由tuple的发射单元直接决定tuple将发射给那个bolt,一般情况下是由接收tuple的bolt决定接收哪个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)。
    `

    Storm杂记 — Field Grouping和Shuffle Grouping的区别
    https://blog.csdn.net/luonanqin/java/article/details/40436397

    关于Storm Stream grouping
    https://www.cnblogs.com/kqdongnanf/p/4634607.html

    2.5 grouping策略和并发度
    http://www.tianshouzhi.com/api/tutorials/storm/15

    Storm部分:Storm Grouping — 数据流分组(即数据分发策略)
    https://blog.csdn.net/wyqwilliam/java/article/details/82156437

  10. Storm 系列(二)—— Storm 核心概念详解
    https://juejin.im/post/5d8593a0518825680725c883#heading-5
    `
    1.5 Stream groupings(分组策略)

    spouts 和 bolts 在集群上执行任务时,是由多个 Task 并行执行 (如上图,每一个圆圈代表一个 Task)。当一个 Tuple 需要从 Bolt A 发送给 Bolt B 执行的时候,程序如何知道应该发送给 Bolt B 的哪一个 Task 执行呢?

    这是由 Stream groupings 分组策略来决定的,Storm 中一共有如下 8 个内置的 Stream Grouping。当然你也可以通过实现 CustomStreamGrouping 接口来实现自定义 Stream 分组策略。

    # Shuffle grouping
    Tuples 随机的分发到每个 Bolt 的每个 Task 上,每个 Bolt 获取到等量的 Tuples。

    # Fields grouping
    Streams 通过 grouping 指定的字段 (field) 来分组。假设通过 user-id 字段进行分区,那么具有相同 user-id 的 Tuples 就会发送到同一个 Task。

    # Partial Key grouping
    Streams 通过 grouping 中指定的字段 (field) 来分组,与 Fields Grouping 相似。但是对于两个下游的 Bolt 来说是负载均衡的,可以在输入数据不平均的情况下提供更好的优化。

    # All grouping
    Streams 会被所有的 Bolt 的 Tasks 进行复制。由于存在数据重复处理,所以需要谨慎使用。

    # Global grouping
    整个 Streams 会进入 Bolt 的其中一个 Task,通常会进入 id 最小的 Task。

    # None grouping
    当前 None grouping 和 Shuffle grouping 等价,都是进行随机分发。

    # Direct grouping
    Direct grouping 只能被用于 direct streams 。使用这种方式需要由 Tuple 的生产者直接指定由哪个 Task 进行处理。

    # Local or shuffle grouping
    如果目标 Bolt 有 Tasks 和当前 Bolt 的 Tasks 处在同一个 Worker 进程中,那么则优先将 Tuple Shuffled 到处于同一个进程的目标 Bolt 的 Tasks 上,这样可以最大限度地减少网络传输。否则,就和普通的 Shuffle Grouping 行为一致。
    `

回复 a-z 取消回复

您的电子邮箱地址不会被公开。 必填项已用*标注