目录
[TOC]
1 利用Kafka提供的命令测试
1.1 创建主题
1 2
| [root@hopestation kafka_2.13-2.8.0]
|
参数 |
描述 |
–zookeeper |
指定了Kafka所连接的Zookeeper服务地址 |
–create |
创建主题的命令 |
–topic |
指定了所要创建主题的名称 |
–replication-factor |
指定了副本银子 |
–partitions |
指定了分区个数 |
1.2 消费者
1 2 3 4 5
|
[root@hopestation kafka_2.13-2.8.0]
|
1.3 生产者
1 2 3 4 5 6 7 8
|
[root@hopestation kafka_2.13-2.8.0] >hello,kafka >NICE >
|
1.4 测试
然后就会发现消费者窗口变成了
1 2 3 4
| [root@hopestation kafka_2.13-2.8.0] hello,kafka NICE
|
2 Java客户端测试
使用本地Java程序连接服务器kafka进行简单消费
创建普通的Java程序即可
POM
1 2 3 4
| <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
|
2.1 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package top.hopestation.simplekafka;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerFastStart { public static final String brokerList = "yourHostIp:9092"; public static final String topic = "topic-demo";
public static void main(String[] args) { Properties properties = new Properties(); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("bootstrap.servers",brokerList); KafkaProducer<String,String> producer = new KafkaProducer<>(properties); ProducerRecord<String,String> record = new ProducerRecord<>(topic,"hello,Kafka!"); try{ producer.send(record); }catch (Exception e){ e.printStackTrace(); } producer.close(); } }
|
2.2 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package top.hopestation.simplekafka;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration; import java.util.Collections; import java.util.Properties;
public class ConsumerFastStart { public static final String brokerList = "yourHostIp:9092"; public static final String topic = "topic-demo"; public static final String groupId = "group.demo";
public static void main(String[] args) { Properties properties = new Properties(); properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("bootstrap.servers",brokerList); properties.put("group.id",groupId); KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList(topic)); while (true){ ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord<String,String> record : records){ System.err.println("record.value() = " + record.value()); } } } }
|
2.3 测试
先运行消费者,再运行生产者
1 2
| record.value() = hello,Kafka!
|
2.4 异常以及解决
若出现异常:Connection refused: no further information
注意服务器放行9092端口
1 2 3
| telnet IP Port 比如:telnet 192.168.1.1 9092
|
修改Kafka安装目录的config/server.properties
1 2 3 4
| listeners = PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://xx.xx.xx.xx:9092
|
参考文献