Clickhouse 直接对接 Kafka

431
0

参考地址:https://blog.csdn.net/zhongwumao/article/details/81171143

启动zookeeper nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties & 

启动kafka         nohup ./bin/kafka-server-start.sh  ./config/server.properties > ./logs/kafka.log &

 
1,创建Kakfa的topic: 

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic mytest001 

2,启动生产者 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest001 

2. 生产数据 

准备数据:
{"timestamp":1542424134, "level":"high", "message":"hehe"}
{"timestamp":1542424132, "level":"high", "message":"hehe"}
{"timestamp":1542424133, "level":"mid", "message":"hehe"}
{"timestamp":1542424134, "level":"low", "message":"hehe"}
{"timestamp":1542434134, "level":"high", "message":"hehe"} 

 

(1)、创建kafka topic的数据流表:
CREATE TABLE mytest001_kafka (
  timestamp UInt64,
  level String,
  message String
) ENGINE = Kafka('localhost:9092', 'mytest001', 'mytest001', 'JSONEachRow');


(2)、创建保存清单的表以及以及相应的物化视图:
 
CREATE TABLE mytest001_data (
  timestamp UInt64,
  level String,
  message String
) ENGINE = MergeTree()
 order by (timestamp);

 
CREATE MATERIALIZED VIEW mytest001_data_view TO mytest001_data
  AS SELECT timestamp, level, message
  FROM tmytest001_kafka;

 

这样Clickhouse就和Kafka对接好了 

以后查询数据,直接用下面的语句就OK

select * from mytest001_data;

全部评论