Spring-XD简介
简介
Spring XD is a unified, distributed, and extensible service for data ingestion, real time analytics, batch processing, and data export.
Streams
翻译过来就是流,通过定义stream可以控制数据的流向,比如从MongoDB读取数据然后存储到HDFS中。
创建方式
一个简单的示例:该示例创建一个名字叫ticktock
的stream,每秒钟产生一条时间信息然后通过管道传送到log中。
1 | xd:> stream create --definition "time | log" --name ticktock |
销毁Stream
1 | xd:> stream destroy --name stream-name |
Modules
模块,当前包含source
, sink
,processor
, 和job
。
Souces
数据源,Stream的来源,有以下几种方式:
方式 | 描述 |
---|---|
File | 文件方式 |
FTP | FTP方式 |
GemFire Continuous Query(gemfire-cq) | GemFire查询 |
GemFire source(gemfire) | GemFire文件 |
HTTP | http方式 |
JDBC Source(jdbc) | 关系型数据库jdbc |
JMS | JMS |
Kafka | Kafka消息队列 |
通过接收电子邮件 | |
MongoDB Source(mongodb) | MongoDB数据库 |
MQTT | MQTT |
RabbitMQ | RabbitMQ消息队列 |
Reactor IP(reactor-ip) | Reactor IP(reactor-ip) |
SFTP | SFTP |
Stdout Capture | 标准输入 |
Syslog | 系统日志 |
Tail | Tail程序 |
TCP | TCP |
TCP Client(tcp-client) | TCP 客户端 |
Time | 时间 |
Trigger Source(trigger) | 触发器 |
Twitter Search(twittersearch) | Twitter搜索 |
Twitter Stream(twitterstream) | Twitter Stream流 |
Sinks
数据源,Stream的输出,有以下几种方式:
方式 | 描述 |
---|---|
Dynamic Router(router) | 动态路由 |
File Sink(file) | 文件方式 |
FTP Sink(ftp) | FTP方式 |
GemFire Server | GemFire服务器 |
GPFDIST | GPFDIST |
Cassandra | Cassandra 数据库 |
Hadoop(HDFS) (hdfs) | hdfs文件系统 |
HDFS Dataset(Avro/Parquet) (hdfs-dataset) | hdfs文件系统中的avro或者parquet类型文件 |
JDBC Source(jdbc) | 关系型数据库jdbc |
Kafka Sink (kafka) | Kafka消息队列 |
Log | log 文件 |
Mail 发送 | |
Mongo | Mongo数据库 |
MQTT Sink (mqtt) | MQTT |
Null Sink(null) | null |
RabbitMQ | RabbitMQ 消息队列 |
Redis | Redis |
Shell Sink (shell) | shell |
Splunk Server (splunk) | splunk |
TCP Sink (tcp) | TCP |
Processors
可用的处理器包括Aggregator
Filter
Header Enricher
HTTP Client
JSON to Tuple
Object to JSON
Script
Shell Command
Splitter
Transform
- Aggregator – 作用和 splitter相反,用于聚合,
- Splitter – 用于拆解
- Filter – 过滤器,用于中间处理数据
- Header Enricher (header-enricher) – 用于添加头部信息
- HTTP Client – 通过httpClient方式发送URL请求
- JSON to Tuple (json-to-tuple) – 转换json数据到Tuple类型
- Object to JSON (object-to-json) – 将对象转换为json格式
- Script 用于加载Groovy脚本
- Shell – 用于加载Shell脚本
- Transform – 用于负载类型转换
Taps
监听器,窃听器。
不用重复定义相同的stream,然后监听此stream就可以做其他操作。并且可以用Label来分别对每个部分内容做个别名,定义Tab时候可以使用别名。如
1 | stream create foo --definition "httpLabel: http | fLabel: filter --expression=payload.startsWith('A') | flibble: transform --expression=payload.toLowerCase() | log" --deploy |
上面对trasfrom部分做了一个别名,叫做flibble
,然后下面定义一个Tap,并且最后指定是flibble这个标签,那么就是对foo
这个stream的flibble
做监听。
Jobs
Job相比Stream不同点在于,Job算是静态的,Stream是动态的。Stream会持续接收数据,处理数据;Job是一次性接收数据,处理数据,如果数据改变,那么是不会进行处理的,除非有定时任务。
1 | job create --name jobtest --definition 'timestampfile --directory=D:/jobs' --deploy |
使用counter
1 | stream create foo --definition 'http --outputType=application/json | log' |
field-value-counter list
列出field-value-counter的名字
1 | FieldValueCounter name |
field-value-counter display --name countName
列出名字为countName
的描述
FieldValueCounter=countName
--------------------------- - -----
VALUE - COUNT
a | 2
b | 1