杜龙少(sdvdxl)

Spring-XD简介

Word count: 846 / Reading time: 4 min
2016/03/09 Share

简介

Spring XD is a unified, distributed, and extensible service for data ingestion, real time analytics, batch processing, and data export.
distributed-overview.

Streams

翻译过来就是流通过定义stream可以控制数据的流向比如从MongoDB读取数据然后存储到HDFS
stream

创建方式

一个简单的示例该示例创建一个名字叫ticktockstream每秒钟产生一条时间信息然后通过管道传送到log

1
xd:> stream create --definition "time | log" --name ticktock

销毁Stream

1
xd:> stream destroy --name stream-name

Modules

模块当前包含source, sink,processor, 和job

Souces

数据源Stream的来源有以下几种方式

方式 描述
File 文件方式
FTP FTP方式
GemFire Continuous Query(gemfire-cq) GemFire查询
GemFire source(gemfire) GemFire文件
HTTP http方式
JDBC Source(jdbc) 关系型数据库jdbc
JMS JMS
Kafka Kafka消息队列
Mail 通过接收电子邮件
MongoDB Source(mongodb) MongoDB数据库
MQTT MQTT
RabbitMQ RabbitMQ消息队列
Reactor IP(reactor-ip) Reactor IP(reactor-ip)
SFTP SFTP
Stdout Capture 标准输入
Syslog 系统日志
Tail Tail程序
TCP TCP
TCP Client(tcp-client) TCP 客户端
Time 时间
Trigger Source(trigger) 触发器
Twitter Search(twittersearch) Twitter搜索
Twitter Stream(twitterstream) Twitter Stream

Sinks

数据源Stream的输出有以下几种方式

方式 描述
Dynamic Router(router) 动态路由
File Sink(file) 文件方式
FTP Sink(ftp) FTP方式
GemFire Server GemFire服务器
GPFDIST GPFDIST
Cassandra Cassandra 数据库
Hadoop(HDFS) (hdfs) hdfs文件系统
HDFS Dataset(Avro/Parquet) (hdfs-dataset) hdfs文件系统中的avro或者parquet类型文件
JDBC Source(jdbc) 关系型数据库jdbc
Kafka Sink (kafka) Kafka消息队列
Log log 文件
Mail Mail 发送
Mongo Mongo数据库
MQTT Sink (mqtt) MQTT
Null Sink(null) null
RabbitMQ RabbitMQ 消息队列
Redis Redis
Shell Sink (shell) shell
Splunk Server (splunk) splunk
TCP Sink (tcp) TCP

Processors

可用的处理器包括Aggregator Filter Header Enricher HTTP Client JSON to Tuple Object to JSON Script Shell Command Splitter Transform

  • Aggregator – 作用和 splitter相反用于聚合
  • Splitter – 用于拆解
  • Filter – 过滤器用于中间处理数据
  • Header Enricher (header-enricher) – 用于添加头部信息
  • HTTP Client – 通过httpClient方式发送URL请求
  • JSON to Tuple (json-to-tuple) – 转换json数据到Tuple类型
  • Object to JSON (object-to-json) – 将对象转换为json格式
  • Script 用于加载Groovy脚本
  • Shell – 用于加载Shell脚本
  • Transform – 用于负载类型转换

Taps

监听器窃听器
不用重复定义相同的stream然后监听此stream就可以做其他操作并且可以用Label来分别对每个部分内容做个别名定义Tab时候可以使用别名

1
2
stream create foo --definition "httpLabel: http | fLabel: filter --expression=payload.startsWith('A') | flibble: transform --expression=payload.toLowerCase() | log" --deploy
stream create fooTap --definition "tap:stream:foo.flibble > log" --deploy

上面对trasfrom部分做了一个别名叫做flibble然后下面定义一个Tap并且最后指定是flibble这个标签那么就是对foo这个streamflibble做监听

Jobs

Job相比Stream不同点在于Job算是静态的Stream是动态的Stream会持续接收数据处理数据Job是一次性接收数据处理数据如果数据改变那么是不会进行处理的除非有定时任务

1
2
job create --name jobtest --definition 'timestampfile --directory=D:/jobs' --deploy
stream create --name time-cron --definition "trigger --cron='* * * * * *' > queue:job:jobtest" --deploy

使用counter

1
2
3
4
5
6
stream create foo --definition 'http --outputType=application/json | log'
stream create countName --definition 'tap:stream:foo > field-value-counter --fieldName=name' --deploy
stream deploy --name foo
http post --data {"name":"a"}
http post --data {"name":"a"}
http post --data {"name":"b"}

field-value-counter list 列出field-value-counter的名字

1
2
3
FieldValueCounter name
----------------------
countName

field-value-counter display --name countName 列出名字为countName的描述

FieldValueCounter=countName
---------------------------  -  -----
VALUE                        -  COUNT
a                            |  2
b                            |  1
CATALOG
  1. 1. 简介
  2. 2. Streams
    1. 2.1. 创建方式
    2. 2.2. 销毁 Stream
  3. 3. Modules
  4. 4. Souces
  5. 5. Sinks
  6. 6. Processors
  7. 7. Taps
  8. 8. Jobs
  9. 9. 使用 counter