参考地址:https://blog.csdn.net/zhongwumao/article/details/81171143
启动zookeeper nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka nohup ./bin/kafka-server-start.sh ./config/server.properties > ./logs/kafka.log &
1,创建Kakfa的topic:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic mytest001
2,启动生产者 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest001
2. 生产数据
准备数据:
{"timestamp":1542424134, "level":"high", "message":"hehe"}
{"timestamp":1542424132, "level":"high", "message":"hehe"}
{"timestamp":1542424133, "level":"mid", "message":"hehe"}
{"timestamp":1542424134, "level":"low", "message":"hehe"}
{"timestamp":1542434134, "level":"high", "message":"hehe"}
(1)、创建kafka topic的数据流表:
CREATE TABLE mytest001_kafka (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'mytest001', 'mytest001', 'JSONEachRow');
(2)、创建保存清单的表以及以及相应的物化视图:
CREATE TABLE mytest001_data (
timestamp UInt64,
level String,
message String
) ENGINE = MergeTree()
order by (timestamp);
CREATE MATERIALIZED VIEW mytest001_data_view TO mytest001_data
AS SELECT timestamp, level, message
FROM tmytest001_kafka;
这样Clickhouse就和Kafka对接好了
以后查询数据,直接用下面的语句就OK
select * from mytest001_data;
全部评论