目录
[TOC]
1 利用Kafka提供的命令测试
1.1 创建主题
1 2
   |  [root@hopestation kafka_2.13-2.8.0]
 
  | 
 
| 参数 | 
描述 | 
| –zookeeper | 
指定了Kafka所连接的Zookeeper服务地址 | 
| –create | 
创建主题的命令 | 
| –topic | 
指定了所要创建主题的名称 | 
| –replication-factor | 
指定了副本银子 | 
| –partitions | 
指定了分区个数 | 
1.2 消费者
1 2 3 4 5
   | 
 
  [root@hopestation kafka_2.13-2.8.0]
 
 
  | 
 
1.3 生产者
1 2 3 4 5 6 7 8
   | 
 
 
  [root@hopestation kafka_2.13-2.8.0] >hello,kafka >NICE >
 
  | 
 
1.4 测试
然后就会发现消费者窗口变成了
1 2 3 4
   | [root@hopestation kafka_2.13-2.8.0] hello,kafka NICE
 
   | 
 
2 Java客户端测试
使用本地Java程序连接服务器kafka进行简单消费
创建普通的Java程序即可
 
POM
1 2 3 4
   | <dependency>     <groupId>org.apache.kafka</groupId>     <artifactId>kafka-clients</artifactId> </dependency>
   | 
 
2.1 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
   | package top.hopestation.simplekafka;
  import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;
  import java.util.Properties;
 
 
 
 
  public class ProducerFastStart {          public static final String brokerList = "yourHostIp:9092";     public static final String topic = "topic-demo";
      public static void main(String[] args) {         Properties properties = new Properties();         properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");         properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");         properties.put("bootstrap.servers",brokerList);                  KafkaProducer<String,String> producer = new KafkaProducer<>(properties);                  ProducerRecord<String,String> record = new ProducerRecord<>(topic,"hello,Kafka!");                  try{             producer.send(record);         }catch (Exception e){             e.printStackTrace();         }                  producer.close();     } }
 
   | 
 
2.2 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
   | package top.hopestation.simplekafka;
  import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;
  import java.time.Duration; import java.util.Collections; import java.util.Properties;
 
 
 
 
  public class ConsumerFastStart {          public static final String brokerList = "yourHostIp:9092";     public static final String topic = "topic-demo";     public static final String groupId = "group.demo";
      public static void main(String[] args) {         Properties properties = new Properties();         properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");         properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");         properties.put("bootstrap.servers",brokerList);                  properties.put("group.id",groupId);                  KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);                  consumer.subscribe(Collections.singletonList(topic));                  while (true){             ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));             for(ConsumerRecord<String,String> record : records){                 System.err.println("record.value() = " + record.value());             }         }     } }
 
   | 
 
2.3 测试
先运行消费者,再运行生产者
1 2
   |  record.value() = hello,Kafka!
 
  | 
 
2.4 异常以及解决
若出现异常:Connection refused: no further information
注意服务器放行9092端口
1 2 3
   |  telnet IP Port 比如:telnet 192.168.1.1 9092
 
  | 
 
 
修改Kafka安装目录的config/server.properties 
1 2 3 4
   |  listeners = PLAINTEXT://0.0.0.0:9092
  advertised.listeners=PLAINTEXT://xx.xx.xx.xx:9092
 
  | 
 
参考文献