kafka采坑记

遇到的问题

关于kafka的入门,可以阅读这篇教程。下面主要讲述博主在安装过程中遇到的问题:

  • 因为kafka的broker、producer和consumer都严重依赖ZooKeeper,所以必须保证这三者都能连接到ZooKeeper,博主在安装测试过程中,使用了dnsmasq统一做内网域名的管理,并且把kafka broker所在的机器的dns服务器都设置为dnsmasq的地址,同时server.properties中的zookeeper.connect配置使用的也是内网域名,所以broker连接ZK是没问题的。但是运行producer和consumer的机器,并没有修改dns服务器,所以此时producer和consumer是连接不上ZK的。解决方式就是:
    • 将运行producer和consumer的机器的dns服务器设置为dnsmasq所在的机器
    • 在zookeeper.connect中使用ip,而非内网域名
  • server.properties属性文件中的listeners属性是用来指定broker所监听的地址的,如果没有配置的话,它会从java.net.InetAddress.getCanonicalHostName()获取主机名(默认的端口是9092),获取到的这个主机名所对应的ip很可能是127.0.0.1,从而导致broker之间是不能互相通信的,解决方法是:
    • 显示的指定listeners,而不是使用默认值

Java的Demo

  • pom.xml
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
        </dependency>
  • producer
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;  
import org.apache.kafka.clients.producer.Producer;  
import org.apache.kafka.clients.producer.ProducerRecord;

public class MyProducer  
{
    public static void main( String[] args )
    {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.100.101:9092,192.168.100.102:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 10);
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String,String> producer = new KafkaProducer<String,String>(props);

        for(int i=0;i<100;i++){
            ProducerRecord<String,String> r = new ProducerRecord<String,String>("test.message","key-"+i,"value-"+i);
            System.out.println(r);
            producer.send(r);
        }

        producer.close();
    }
}
  • consumer
import java.util.Arrays;  
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;  
import org.apache.kafka.clients.consumer.ConsumerRecord;  
import org.apache.kafka.clients.consumer.ConsumerRecords;  
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {  
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.100.102:9092,192.168.100.103:9092,192.168.100.101:9092");
        props.put("group.id", "TestGroupA");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList("test.message"));

        while(true){
            ConsumerRecords<String,String> records = consumer.poll(10);
            for(ConsumerRecord<String,String> record : records){
                System.out.println("offset=" + record.offset() + ",--key=" + record.key() + ",--value=" + record.value());
            }
        }
    }
}

感谢浏览tim chow的作品!

如果您喜欢,可以分享到: 更多

如果您有任何疑问或想要与tim chow进行交流

可点此给tim chow发信

如有问题,也可在下面留言: