Linux运维知识之Linux下单机安装部署kafka及代码实现
小标 2018-10-26 来源 : 阅读 964 评论 0

摘要:本文主要向大家介绍了Linux运维知识之Linux下单机安装部署kafka及代码实现,通过具体的内容向大家展现,希望对大家学习Linux运维知识有所帮助。

本文主要向大家介绍了Linux运维知识之Linux下单机安装部署kafka及代码实现,通过具体的内容向大家展现,希望对大家学习Linux运维知识有所帮助。


这几天研究了kafka的安装及使用,在网上找了很多教程但是均以失败告终,直到最后想起网络方面的问题最终才安装部署成功,下面就介绍一下kafka的安装部署及代码实现
一、关闭防火墙
重要的事情说100遍,关闭防火墙...(如果不关闭防火墙就会出现Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.等各种奇葩的问题)
1、关闭firewall:systemctl stop firewalld.service                             #停止firewallsystemctl disable firewalld.service                        #禁止firewall开机启动firewall -cmd --state                                              #查看默认防火墙状态(关闭后显示notrunning,开启后显示running)
2、关闭iptables
service iptables stop                                           #停止iptableschkconfig iptables off                                          #永久关闭防火墙
service iptables status                                        #查看防火墙关闭状态
以上提供了关闭两种防火墙的命令,可以选择性操作
二、kafka安装测试
1、安装JRE/JDK,(kafka的运行要依赖于jdk,这里就省略了jdk的安装,需要注意的是jdk的版本一定要支持所下载的kafka版本,否则就会报错,这里我安装的是jdk1.7)
2、下载地址://kafka.apache.org/downloads.html(我下载的版本是kafka_2.11-0.11.0.1)
3、解压:
tar -xzvf kafka_2.11-0.11.0.1.tgz  
rm kafka_2.11-0.11.0.1.tgz  (这里一定要删除压缩包,不然会出现zk或kafka启动不起来的问题)
cd kafka_2.11-0.11.0.1
4、在kafka_2.11-0.11.0.1目录下
/bin       启动和停止命令等。 /config  配置文件 /libs      类库 
5、修改配置
在config下修改zookeeper.properties为如下配置
maxClientCnxns=100tickTime=2000initLimit=10syncLimit=5
在server.properties添加如下配置
port=9092host.name=10.61.8.6
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
(以上配置没有的就需要添加)
 6、启动、测试、停止
(1)、启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &    (&是为了能退出命令行)
(2)、启动kafka
bin/kafka-server-start.sh config/server.properties &
(3)、查看kafka和zk是否启动
ps -ef|grep kafka
(4)、创建topic(topic的名字叫abc)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 8 --replication-factor 2 --topic abc
(5)、删除topic
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic abc --zookeeper localhost:2181
(6)、查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
(7)、producter推送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc
(8)、consumer消费消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic abc --from-beginning
(9)、停止kafka
bin/kafka-server-stop.sh 
(10)、停止zookeeper
bin/zookeeper-server-stop.sh  
(11)、杀死服务
kill -9 123     (123是进程号)
三、java代码实现
producter

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;


/**
 * Created by Administrator on 2017/10/23 0023.
 */
public class KafkaProducter {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducter.class);
    private final Producer producer;
    public final static String TOPIC = "abc";

    public static void main(String[] args) {
        new KafkaProducter().produce();
    }

    private KafkaProducter() {
        Properties props = new Properties();
        //此处配置的是kafka的端口
        props.put("metadata.broker.list", "10.61.8.6:9092");
        //配置value的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //配置key的序列化类
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        //0、这意味着生产者从不等待来自代理的确认(与0.7相同的行为)。这个选项提供了最低的延迟,但是最弱的持久性保证(当服务器失败时,一些数据将丢失)。
        //1、这意味着在主副本接收到数据后,生产者得到确认。这个选项提供了更好的持久性,因为客户机一直等待直到服务器确认请求成功(只有消息被写入到已死的领导人,但尚未被复制的消息将会丢失)。
        //-1、这意味着在所有同步副本都接收到数据之后,生产者得到确认。这个选项提供了最好的持久性,我们保证只要至少有一个同步副本,就不会丢失任何消息。
        props.put("request.required.acks", "-1");
        producer = new Producer(new ProducerConfig(props));
    }

    void produce() {
        int messageNo = 1;
        final int COUNT = 10;
        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "hello kafka" + key;
            producer.send(new KeyedMessage(TOPIC, key, data));
            log.info("",data);
            messageNo++;
        }
    }
}

consumer
 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Created by Administrator on 2017/10/25 0025.
 */
public class KafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    private final ConsumerConnector consumer;
    public final static String TOPIC = "abc";


    public static void main(String[] args) {
        new KafkaConsumer().consume();
    }

    private KafkaConsumer() {
        Properties props = new Properties();
        //zookeeper 配置
        props.put("zookeeper.connect", "10.61.8.6:2181");
        //group 代表一个消费组
        props.put("group.id", "jd-group");
        //zk连接超时
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        ConsumerConfig config = new ConsumerConfig(props);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
        Map topicCountMap = new HashMap();
        topicCountMap.put(TOPIC, new Integer(1));
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
        Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
        KafkaStream stream = consumerMap.get(TOPIC).get(0);
        ConsumerIterator it = stream.iterator();
        while (it.hasNext()) {
            log.info("kafka监听到的消息:{}", it.next().message());
        }
        log.info("kafka监听完毕");
    }

}

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注系统运维Linux频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved