[关闭]
@zhangyy 2018-08-16T15:04:07.000000Z 字数 8661 阅读 499

storm 流式计算框架

storm的部分


  • 一:storm 简介
  • 二:storm 的原理与架构
  • 三:storm 的 安装配置
  • 四:storm 的启动脚本

一: storm 的简介:

1.1 storm 是什么:

  1. 1. StormTwitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop。随着越来越多的场景对HadoopMapReduce高延迟无法容忍,比如网站统计、推荐系统、预警系统、金融系统(高频交易、股票)等等,大数据实时处理解决方案(流计算)的应用日趋广泛,目前已是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流。
  2. 2. 按照storm作者的说法,Storm对于实时计算的意义类似于Hadoop对于批处理的意义。Hadoop提供了mapreduce原语,使我们的批处理程序变得简单和高效。同样,Storm也为实时计算提供了一些简单高效的原语,而且StormTrident是基于Storm原语更高级的抽象框架,类似于基于HadoopPig框架,让开发更加便利和高效。本课程会深入、全面的讲解Storm,并穿插企业场景实战讲述Storm的运用。

1.2 实时计算的设计缺点:

  1. 数据源务必实时,所以采用Message Queue作为数据源,消息处理Comsumer实时从MQ获取数据进行处理,返回结果到Web或写DB
  2. 这种方式有以下几个缺陷:
  3. 1、单机模式,能处理的数据量有限
  4. 2、不健壮,服务器挂掉即结束。而Storm集群节点挂掉后,任务会重新分配给其他节点,作业不受影响。
  5. 3、失败重试、事务等,你需要在代码上进行控制,过多精力放在业务开发之外。
  6. 4、伸缩性差: 当一个消息处理者的消息量达到阀值,你需要对这些数据进行分流, 你需要配置这些新的处理者以让他们处理分流的消息。

1.3 storm 的特点:

  1. 1. 适用场景广泛: storm可以实时处理消息和更新DB,对一个数据量进行持续的查询并返回客户端(持续计算),对一个耗资源的查询作实时并行化的处理(分布式方法调用,即DRPC),storm的这些基础API可以满足大量的场景。
  2. 2. 可伸缩性高: Storm的可伸缩性可以让storm每秒可以处理的消息量达到很高。扩展一个实时计算任务,你所需要做的就是加机器并且提高这个计算任务的并行度 Storm使用ZooKeeper来协调集群内的各种配置使得Storm的集群可以很容易的扩展。
  3. 3. 保证无数据丢失: 实时系统必须保证所有的数据被成功的处理。 那些会丢失数据的系统的适用场景非常窄, storm保证每一条消息都会被处理, 这一点和S4相比有巨大的反差。
  4. 4. 异常健壮: storm集群非常容易管理,轮流重启节点不影响应用。
  5. 5. 容错性好:在消息处理过程中出现异常, storm会进行重试
  6. 6. 语言无关性: Stormtopology和消息处理组件(Bolt)可以用任何语言来定义, 这一点使得任何人都可以使用storm.

二:storm 的原理与架构

2.1 Storm集群结构

image_1al8vbhr315061ik51kic18mcevh9.png-10.5kB

image_1al8vdhb71coifipkc218o31stlm.png-27.3kB

  1. 1. Nimbus Supervisors 之间所有的协调工作是通过 一个Zookeeper 集群。
  2. 2. Nimbus进程和 Supervisors 进程是无法直接连接和无状态的; 所有的状态维持在Zookeeper中或保存在本地磁盘上。
  3. 3. 这意味着你可以 kill -9 Nimbus Supervisors 进程,而不需要做备份。
  4. 这种设计导致storm集群具有令人难以置信的稳定性,即无耦合。

2.2 storm 的工作原理:

  1. 1. Nimbus 负责在集群分发的代码,topo只能在nimbus机器上提交,将任务分配给其他机器,和故障监测。
  2. 2. Supervisor,监听分配给它的节点,根据Nimbus 的委派在必要时启动和关闭工作进程。 每个工作进程执行topology 的一个子集。一个运行中的topology 由很多运行在很多机器上的工作进程组成。
  3. 3. Storm中有对于流stream的抽象,流是一个不间断的无界的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组

image_1al901o4e1fs91pe2g3r15gg1r4s1g.png-68.6kB

  1. 4. Storm认为每个stream都有一个源,也就是原始元组的源头,叫做Spout(管口)
  2. 5.处理stream内的tuple,抽象为Boltbolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。
  3. 可以认为spout就是水龙头,并且每个水龙头里流出的水是不同的,我们想拿到哪种水就拧开哪个水龙头,然后使用管道将水龙头的水导向到一个水处理器(bolt),水处理器处理后再使用管道导向另一个处理器或者存入容器中。

image_1al9bqro0104c15k4bk31ttirdl9.png-85.3kB

  1. 为了增大水处理效率,我们很自然就想到在同个水源处接上多个水龙头并使用多个水处理器,这样就可以提高效率。
  2. 这是一张有向无环图,Storm将这个图抽象为Topology(拓扑),Topo就是stormJob抽象概念,一个拓扑就是一个流转换图
  3. 图中每个节点是一个spout或者bolt,每个spout或者bolt发送元组到下一级组件,广播方式。
  4. Spout到单个Bolt6grouping方式

image_1al9cm4v51c2h1dpf001ici1ntam.png-32.4kB

2.3 Topology 作业

  1. Storm将流中元素抽象为tuple,一个tuple就是一个值列表value listlist中的每个value都有一个name,并且该value可以是任意可序列化的类型。拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。

image_1al9cpn568iieva1gvtt9g1tno13.png-44.6kB

2.4 storm 中的角色与概念:

  1. Streams:消息流
  2. 消息流是一个没有边界的tuple序列,而这些tuples会被以一种分布式的方式并行创建和处理。 每个tuple可以包含多列,字段类型可以是: integer, long, short, byte, string, double, float, booleanbyte array 你还可以自定义类型 只要你实现对应的序列化器。
  1. Spouts:消息源
  2. Spoutstopology消息生产者。Spout从一个外部源(消息队列)读取数据向topology发出tuple 消息源Spouts可以是可靠的也可以是不可靠的。一个可靠的消息源可以重新发射一个处理失败的tuple 一个不可靠的消息源Spouts不会。
  3. Spout类的方法nextTuple不断发射tupletopologystorm在检测到一个tuple被整个topology成功处理的时候调用ack, 否则调用fail
  4. storm只对可靠的spout调用ackfail

image_1al9dilk012vs14eg1bf31sd61bsn1t.png-8.5kB

  1. Bolts:消息处理者
  2. 消息处理逻辑被封装在bolts里面,Bolts可以做很多事情: 过滤, 聚合, 查询数据库等。
  3. Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤, 从而也就需要经过很多Bolts。第一级Bolt的输出可以作为下一级Bolt的输入。而Spout不能有一级。
  4. Bolts的主要方法是execute(死循环)连续处理传入的tuple,成功处理完每一个tuple调用OutputCollectorack方法,以通知storm这个tuple被处理完成了。当处理失败时,可以调fail方法通知Spout端可以重新发送该tuple
  5. 流程是: Bolts处理一个输入tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack
  6. Bolts使用OutputCollector来发射tuple到下一级Blot

三:storm 的 安装配置

3.1 安装storm环境准备

3.1.1 安装zookeeper服务:

  1. tar -zxvf zookeeper-3.4.5.tar.gz
  2. mv zookeeper-3.4.5 /usr/local/zookeeper
  3. ---
  4. cd /usr/local/zookeeper
  5. mkdir data
  6. cd data
  7. echo "1" > myid
  8. --
  9. cd /usr/local/zookeeper/conf
  10. cp -p zoo_sample.cfg zoo.cfg
  11. vim zoo.cfg
  12. dataDir=/usr/local/zookeeper/data
  13. server.1=master:2888:3888
  14. server.2=slave1:2888:3888
  15. server.3=slave2:2888:3888
  16. ---
  17. cd /usr/local/
  18. tar -zcvf zookeeper.tar.gz zookeeper
  19. --- 同步到slave1 slave2 节点----
  20. scp zookeeper.tar.gz root@slave1:/usr/local/
  21. scp zookeeper.tar.gz root@slave2:/usr/local/
  22. ------------------slave1 节点---------------------
  23. cd /usr/local/
  24. tar -zxvf zookeeepr.tar.gz
  25. cd zookeeper/data
  26. echo '2' > myid
  27. ------------------slave2 节点--------------------
  28. cd /usr/local/
  29. tar -zxvf zookeeepr.tar.gz
  30. cd zookeeper/data
  31. echo '3' > myid

image_1aled2trm1hvsft61t2ope0uto9.png-14.1kB

3.1.2 zookeeper 的启动脚本范例:

  1. #!/bin/bash
  2. if [ $# -ne 1 ]; then
  3. echo "Usage: sh start_zookeeper.sh [start|status|stop]"
  4. exit 2
  5. fi
  6. for node in master slave1 slave2 # ---这个地方有多少个主机就加多少
  7. do
  8. echo "$1 in $node"
  9. ssh $node "source /etc/profile && /opt/modules/zookeeper-3.4.5/bin/zkServer.sh $1"
  10. done

3.1.3 安装依赖包:

  1. 1. CentOS6.4 安装相关编译工具包
  2. yum install -y gcc gcc++* gcc-c++ uuid-devel libuuid-devel libtool git
  3. 2. 安装 ZeroMQ
  4. wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
  5. tar -xzvf zeromq-2.1.7.tar.gz
  6. cd zeromq-2.1.7
  7. ./configure
  8. make
  9. make install
  10. 3. JZMQ安装
  11. git clone https://github.com/nathanmarz/jzmq.git
  12. cd jzmq
  13. ./autogen.sh
  14. ./configure
  15. make
  16. make install

3.1.4 storm 安装:

  1. 上传文件apache-storm-0.9.0.6.tar.gz 到/home/hadoop下面
  2. cd /usr/local/storm
  3. mkdir data
  4. cd conf
  5. ---
  6. vim storm.yaml
  7. ########### These MUST be filled in for a storm configuration
  8. storm.zookeeper.servers:
  9. - "master"
  10. - "slave1"
  11. - "slave2"
  12. #
  13. nimbus.host: "master"
  14. #
  15. ---
  16. ## Locations of the drpc servers
  17. drpc.servers:
  18. - "master"
  19. - "slave1"
  20. - "slave2"
  21. ---
  22. 增加storm 任务的目录与端口:
  23. ---
  24. storm.local.dir: "/usr/local/storm/data"
  25. supervisor.slots.ports:
  26. - 6701
  27. - 6702
  28. - 6703
  29. - 6704
  30. ui.port: 8081
  31. ------同步所有节点----------
  32. cd /usr/local/
  33. tar -zcvf storm.tar.gz storm
  34. scp storm.tar.gz root@slave1:/usr/local/
  35. scp storm.tar.gz root@slave2:/usr/local/
  36. ---------------slave1节点----------
  37. tar -zxvf storm.tar.gz
  38. ---------------slave2节点----------
  39. tar -zxvf storm.tar.gz

image_1aled42gdh591i606uhk88js5m.png-10.3kB
image_1aled4leitf0nflvdjl9h8t313.png-25.9kB

3.1.4 启动服务与浏览器访问

  1. 1. 启动zookeeper 服务
  2. master
  3. cd /usr/local/zookeeper/
  4. bin/zkServer.sh start
  5. --------------------------------------
  6. slave1
  7. cd /usr/local/zookeeper/
  8. bin/zkServer.sh start
  9. --------------------------------------
  10. slave2
  11. cd /usr/local/zookeeper/
  12. bin/zkServer.sh start
  13. --------------------------------------
  14. 2. 启动storm的相关服务
  15. master
  16. cd /usr/local/storm/
  17. bin/storm nimbus &
  18. bin/storm ui &
  19. ------------------------------
  20. slave1
  21. cd /usr/local/storm/
  22. bin/storm supervisor &
  23. ------------------------------
  24. slave2
  25. cd /usr/local/storm/
  26. bin/storm supervisor &
  27. -------------------------------
  28. 打开浏览器访问:
  29. http://192.168.3.1:8081

image_1aledqcs19r7cub1jghfsd5a71g.png-66.8kB

运行wordcount 实例:

  1. bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount

image_1b3k1k1us1hck1ad834b15371sdk9.png-718.1kB

3.1.5 storm 的参数解析

  1. 1. storm.zookeeper.servers:这是一个为Storm集群配置的Zookeeper集群的主机列表
  2. 2. storm.local.dirNimbusSupervisor守护程序需要一个本地磁盘目录存储小量状态(像jarsconfs,其它),每台机器都创建这些目录,赋可写权限
  3. 3. java.library.path:这是Storm使用的本地库(ZeroMQJZMQ)载入路径。大多数安装,默认路径"/usr/local/lib:/opt/local/lib:/usr/lib"就行,所以你可能不需要配置它。
  4. 4. supervisor.slots.ports: 每一台worker机器,你用这个配置来指定多少workers运行在那台机。每个worker使用单一端口接收消息,并且这个设置定义哪个端口是打开的且可以使用。如果你定义5个端口,那么Storm将在这台机分配5worker运行。
  5. ------------------------------------
  6. Storm是一个快速失败(fail-fast)的系统,这意味着这些进程随时都可能因发生错误而停止。由于Storm的设计,所以它随时停止都是安全的,当进程重新启动时正确的恢复。这是为什么Storm保持进程无状态的原因-- 如果Nimbussupervisors重新启动,正在运行的topologies是不受影响的。
  7. Nohup挂到后台执行
  8. 1Nimbus
  9. master机器的supervision下运行命令”bin/storm nimbus
  10. 2Supervisor
  11. 在每个worker机器的supervision下运行命令”bin/storm supervisor”。Supervisor守护程序负责starting stopping 那台机上的worker进程
  12. 3UI
  13. 运行supervision下的命令”bin/storm ui”来运行Storm UI(你能从浏览器访问一个站点,它提供集群和topologies的诊断信息)。在你的浏览器中输入” http://{nimbus host}:8081”访问UI。

3.1.6 strom 的nimbus的主节点特点:

  1. nimbus topology任务提交后,程序是运行在supervisor节点上
  2. Nimbus不参与程序的运行
  3. Nimbus出现故障,不能提交Topology,已经提交了的Topology还是
  4. 正常运行在集群上
  5. 已经运行在集群上Topology,如果这时候某些task出现异常
  6. 则无法重现分配节点
  7. -----------------------------------------------------------------
  8. 查看Topology运行日志:
  9. 需要启动一个进程 logviewer
  10. 需要在每个supervisor节点上启动,不用在nimbus节点上启动
  11. bin/storm logviewer > ./logs/logviewer.out 2>&1 &
  12. nimbus supervisor ui logviewer

3.1.7 停掉storm 的 worldcount 程序

image_1b3k2uf1n232sems22ptc1u9nm.png-131kB

  1. activate 激活
  2. deactivate 暂停
  3. Repalance 从新分配
  4. kill 杀掉这个 toplogy

image_1b3k3287o10r87pd1lrt1ji81l5513.png-125.2kB

  1. 直接通过命令行执行:# bin/storm kill wordcount(提交的时候
  2. 指定的Topology名称)

image_1b3k39f4rdab1d4i14b4i8j1g1a1g.png-394.6kB


3.1.8 先看下Zookeeper怎么存储相关状态信息的

  1. 登录到zookeeper 的里面去
  2. cd /usr/local/zookeeper/bin
  3. ./zkCli.sh

image_1b3k3nv2n1ps31d9k16jm6ns1am61t.png-297.8kB

  1. znode:
  2. /storm
  3. /workerbeats worker心跳信息
  4. /errors topology运行过程中Task运行异常信息
  5. /supervisors 记录supervisor状态心跳信息
  6. /storms 记录的是topology任务信息
  7. /assignments 记录的是Topology任务的分配信息

3.1.9 nimbus supervisor ui logviewer进程的关闭

  1. kill -9 `ps -ef | grep daemon.nimbus | awk '{print $2}' | head -n 1`
  2. kill -9 `ps -ef | grep ui.core | awk '{print $2}' | head -n 1`
  3. kill -9 `ps -ef | grep daemon.supervisor | awk '{print $2}' | head -n 1`
  4. kill -9 `ps -ef | grep daemon.logviewer | awk '{print $2}' | head -n 1`

3.2.0 storm 的启动脚本

  1. #!/bin/bash
  2. source /etc/profile
  3. STORM_HOME=/opt/modules/apache-storm-0.9.6
  4. ## 主节点 nimbus ui
  5. ${STORM_HOME}/bin/storm nimbus > /dev/null 2>&1 &
  6. ${STORM_HOME}/bin/storm ui > /dev/null 2>&1 &
  7. ## 从节点 supervisor logviewer
  8. for supervisor in `cat ${STORM_HOME}/bin/stormSupervisorHosts`
  9. do
  10. echo "start supervisor and logviewer in $supervisor"
  11. ssh $supervisor "source /etc/profile && ${STORM_HOME}/bin/storm supervisor > /dev/null 2>&1 &" &
  12. ssh $supervisor "source /etc/profile && ${STORM_HOME}/bin/storm logviewer > /dev/null 2>&1 &" &
  13. done

3.2.1 storm 的停止脚本

  1. #!/bin/bash
  2. source /etc/profile
  3. STORM_HOME=/opt/modules/apache-storm-0.9.6
  4. ### 主节点 nimbus ui
  5. kill -9 `ps -ef | grep daemon.nimbus | awk '{print $2}' | head -n 1`
  6. kill -9 `ps -ef | grep ui.core | awk '{print $2}' | head -n 1`
  7. ### 从节点 supervisor logviewer
  8. for supervisor in `cat ${STORM_HOME}/bin/stormSupervisorHosts`
  9. do
  10. echo "stop supervisor and logviewer in $supervisor"
  11. ssh $supervisor kill -9 `ssh $supervisor "ps -ef| grep daemon.supervisor| awk '{print $2}' | head -n 1" ` > /dev/null 2>&1 &
  12. ssh $supervisor kill -9 `ssh $supervisor "ps -ef| grep daemon.logviewer| awk '{print $2}' | head -n 1" ` >/dev/null 2>&1 &
  13. done
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注