Kafka之简单的生产、消费 (三)
2022-09-22 22:45:09

目录
[TOC]

1 利用Kafka提供的命令测试

1.1 创建主题

1
2
#由于之前没有创建Kafka集群这里,先把副本和分区设置为1了
[root@hopestation kafka_2.13-2.8.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 1 --partitions 1
参数 描述
–zookeeper 指定了Kafka所连接的Zookeeper服务地址
–create 创建主题的命令
–topic 指定了所要创建主题的名称
–replication-factor 指定了副本银子
–partitions 指定了分区个数

1.2 消费者

1
2
3
4
5
#参数描述
#--bootstrap-server指定了连接的Kafka集群地址
#--topic指定了消费者订阅的主题
[root@hopestation kafka_2.13-2.8.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo
#监听topic-demo主题,等待生产消息

1.3 生产者

1
2
3
4
5
6
7
8
#上一个消费者窗口不要动,打开一个新的Shell创建生产者
#参数描述
#--broker-list 指定了连接的Kafka集群地址
#--topic指定了消费者订阅的主题
[root@hopestation kafka_2.13-2.8.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo
>hello,kafka
>NICE
>

1.4 测试

然后就会发现消费者窗口变成了

1
2
3
4
[root@hopestation kafka_2.13-2.8.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo
hello,kafka
NICE
#说明消费成功

2 Java客户端测试

使用本地Java程序连接服务器kafka进行简单消费

  • 创建普通的Java程序即可

  • POM

    1
    2
    3
    4
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    </dependency>

2.1 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package top.hopestation.simplekafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
* 生产者客户端
* Created by HopeStation on 2021/4/27
*/
public class ProducerFastStart {
//brokerList = 你的Kafka服务器IP:9092
public static final String brokerList = "yourHostIp:9092";
public static final String topic = "topic-demo";

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers",brokerList);
//配置生产者客户端参数并创建KafkaProducer实例
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
//构建所需要发送的消息
ProducerRecord<String,String> record = new ProducerRecord<>(topic,"hello,Kafka!");
//发送消息
try{
producer.send(record);
}catch (Exception e){
e.printStackTrace();
}
//关闭生产者客户端实例
producer.close();
}
}

2.2 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package top.hopestation.simplekafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
* 消费者客户端
* Created by HopeStation on 2021/4/27
*/
public class ConsumerFastStart {
//brokerList = 你的Kafka服务器IP:9092
public static final String brokerList = "yourHostIp:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers",brokerList);
//设置消费组的名称
properties.put("group.id",groupId);
//创建一个消费者客户端实例
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singletonList(topic));
//循环消费消息
while (true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String,String> record : records){
System.err.println("record.value() = " + record.value());
}
}
}
}

2.3 测试

先运行消费者,再运行生产者

1
2
# ConsumerFastStart消费者的控制台输出
record.value() = hello,Kafka!

2.4 异常以及解决

若出现异常:Connection refused: no further information

  1. 注意服务器放行9092端口

    1
    2
    3
    #可以用WIndows命令行来测试 端口是否被放行
    telnet IP Port
    比如:telnet 192.168.1.1 9092
  2. 修改Kafka安装目录的config/server.properties

    1
    2
    3
    4
    #listeners==>固定0.0.0.0:9092
    listeners = PLAINTEXT://0.0.0.0:9092
    #advertised.listeners==>你的服务器IP:9092
    advertised.listeners=PLAINTEXT://xx.xx.xx.xx:9092

参考文献

Prev
2022-09-22 22:45:09
Next