{{ v.name }}
{{ v.cls }}类
{{ v.price }} ¥{{ v.price }}
版本和集群环境
kafka版本:2.11
集群环境:两台虚拟机(我这里是一台当producer,一台当consumer)
官网:http://kafka.apache.org/官方文档:http://kafka.apache.org/documentation.html#quickstart
参考:https://kafka.apache.org/08/quickstart.html
基础配置:已安装jdk和zookeeper(zookeeper安装参照http://blog.csdn.net/unix21/article/details/18990123)
step1:下载kafka
到官网上下载稳定版本的kafka,解压
tar-zxvfkafka_2.11-0.8.2.2.tgz
step2:修改配置文件
修改config下的server.properties就可以了。需要配置的属性有:broker.id(标示当前server在集群中的id,从0开始),port,host.name(当前的serverhostname),zookeeper.connect(连接的zookeeper集群),log.dirs(log的存储目录,记得对应的去建立这个目录)等,其他的一些配置可以看相应的注释:(截图不全,仅供参考)
step3:把配置好的kafka复制到其它server上
scp-rkafka_2.11-0.8.2.2slave1:~
step4:修改每台server的配置文件,主要是broker.id和host.name
step5:先启动zookeeper集群,再启动kafka集群
启动zookeeper集群:./zkserver.shstart
在每台server上启动kafka:./kafka-server-start.sh../config/server.properties
step6:创建topic
./kafka-topics.sh--zookeepermaster:2181--topictest--replication-factor2--partitions3--create
step7:查看topic
查看所有topic:./kafka-topics.sh--list--zookeepermaster:2181
查看一个topic详情:./kafka-topics.sh--describe--zookeepermaster:2181--topictest
partiton:partionid从0开始leader:当前负责读写的leadbrokeridrelicas:当前partition的所有replicationbrokerlistisr:relicas的子集,只包含出于活动状态的broker
step8:创建producer
./kafka-console-producer.sh--broker-listmaster:9092,slave1:9092--topictest
thisisamessage
注意,如果上述命令不能发送接收消息,报一下错误:
errorfailedtocollatemessagesbytopic,partitiondueto:fetchingtopicmetadatafortopics[set(test)]frombroker[arraybuffer(id:1,host:slave1,port:9092,id:0,host:master,port:9092)]failed(kafka.producer.async.defaulteventhandler)
则说明broker的host.name属性配置有问题,可以用ip或者是hosts里配好的域名
step9:创建consumer
./kafka-console-consumer.sh--zookeepermaster:2181--from-beginning--topictest
就可以看到消息了。
如果要最新的数据,可以不带--from-beginning参数即可。最后测试容错能力:如果broker0作为leader运行,现在我们杀掉broker0,在查看topic的详细信息,发现leader都是broker1了,并且consumer依然可以消费消息
maven,springmvcmybatisshiro,druid,restful,dubbo,zookeeper,redis,fastdfs,activemq,nginx1.项目核心代码结构截图
项目模块依赖
特别提醒:开发人员在开发的时候可以将自己的业务rest服务化或者dubbo服务化