杜龙少(sdvdxl)

Spring-XD简介

Word count: 846 / Reading time: 4 min
2016/03/09 Share

简介

Spring XD is a unified, distributed, and extensible service for data ingestion, real time analytics, batch processing, and data export.
distributed-overview.

Streams

翻译过来就是流通过定义stream可以控制数据的流向比如从MongoDB读取数据然后存储到HDFS
stream

创建方式

一个简单的示例该示例创建一个名字叫ticktockstream每秒钟产生一条时间信息然后通过管道传送到log

1
xd:> stream create --definition "time | log" --name ticktock

销毁Stream

1
xd:> stream destroy --name stream-name

Modules

模块当前包含source, sink,processor, 和job

Souces

数据源Stream的来源有以下几种方式

方式描述
File文件方式
FTPFTP方式
GemFire Continuous Query(gemfire-cq)GemFire查询
GemFire source(gemfire)GemFire文件
HTTPhttp方式
JDBC Source(jdbc)关系型数据库jdbc
JMSJMS
KafkaKafka消息队列
Mail通过接收电子邮件
MongoDB Source(mongodb)MongoDB数据库
MQTTMQTT
RabbitMQRabbitMQ消息队列
Reactor IP(reactor-ip)Reactor IP(reactor-ip)
SFTPSFTP
Stdout Capture标准输入
Syslog系统日志
TailTail程序
TCPTCP
TCP Client(tcp-client)TCP 客户端
Time时间
Trigger Source(trigger)触发器
Twitter Search(twittersearch)Twitter搜索
Twitter Stream(twitterstream)Twitter Stream

Sinks

数据源Stream的输出有以下几种方式

方式描述
Dynamic Router(router)动态路由
File Sink(file)文件方式
FTP Sink(ftp)FTP方式
GemFire ServerGemFire服务器
GPFDISTGPFDIST
CassandraCassandra 数据库
Hadoop(HDFS) (hdfs)hdfs文件系统
HDFS Dataset(Avro/Parquet) (hdfs-dataset)hdfs文件系统中的avro或者parquet类型文件
JDBC Source(jdbc)关系型数据库jdbc
Kafka Sink (kafka)Kafka消息队列
Loglog 文件
MailMail 发送
MongoMongo数据库
MQTT Sink (mqtt)MQTT
Null Sink(null)null
RabbitMQRabbitMQ 消息队列
RedisRedis
Shell Sink (shell)shell
Splunk Server (splunk)splunk
TCP Sink (tcp)TCP

Processors

可用的处理器包括Aggregator Filter Header Enricher HTTP Client JSON to Tuple Object to JSON Script Shell Command Splitter Transform

  • Aggregator – 作用和 splitter相反用于聚合
  • Splitter – 用于拆解
  • Filter – 过滤器用于中间处理数据
  • Header Enricher (header-enricher) – 用于添加头部信息
  • HTTP Client – 通过httpClient方式发送URL请求
  • JSON to Tuple (json-to-tuple) – 转换json数据到Tuple类型
  • Object to JSON (object-to-json) – 将对象转换为json格式
  • Script 用于加载Groovy脚本
  • Shell – 用于加载Shell脚本
  • Transform – 用于负载类型转换

Taps

监听器窃听器
不用重复定义相同的stream然后监听此stream就可以做其他操作并且可以用Label来分别对每个部分内容做个别名定义Tab时候可以使用别名

1
2
stream create foo --definition "httpLabel: http | fLabel: filter --expression=payload.startsWith('A') | flibble: transform --expression=payload.toLowerCase() | log" --deploy
stream create fooTap --definition "tap:stream:foo.flibble > log" --deploy

上面对trasfrom部分做了一个别名叫做flibble然后下面定义一个Tap并且最后指定是flibble这个标签那么就是对foo这个streamflibble做监听

Jobs

Job相比Stream不同点在于Job算是静态的Stream是动态的Stream会持续接收数据处理数据Job是一次性接收数据处理数据如果数据改变那么是不会进行处理的除非有定时任务

1
2
job create --name jobtest --definition 'timestampfile --directory=D:/jobs' --deploy
stream create --name time-cron --definition "trigger --cron='* * * * * *' > queue:job:jobtest" --deploy

使用counter

1
2
3
4
5
6
stream create foo --definition 'http --outputType=application/json | log'
stream create countName --definition 'tap:stream:foo > field-value-counter --fieldName=name' --deploy
stream deploy --name foo
http post --data {"name":"a"}
http post --data {"name":"a"}
http post --data {"name":"b"}

field-value-counter list 列出field-value-counter的名字

1
2
3
FieldValueCounter name
----------------------
countName

field-value-counter display --name countName 列出名字为countName的描述

FieldValueCounter=countName
---------------------------  -  -----
VALUE                        -  COUNT
a                            |  2
b                            |  1
CATALOG
  1. 1. 简介
  2. 2. Streams
    1. 2.1. 创建方式
    2. 2.2. 销毁 Stream
  3. 3. Modules
  4. 4. Souces
  5. 5. Sinks
  6. 6. Processors
  7. 7. Taps
  8. 8. Jobs
  9. 9. 使用 counter