——麻辣火锅版 🍲

1. 项目层级

像火锅店的分工:点单员、传菜员、食客清清楚楚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kafka/
├── pom.xml # 根 POM(BOM对齐)
├── provider/ # 点单:生产者
│ ├── pom.xml # 子模块 POM
│ └── src/main/java/org/example/provider/
│ ├── ProviderApplication.java
│ ├── conf/KafkaTopicsConfig.java
│ ├── controller/ProviderController.java
│ └── service/KafkaProducerService.java
│ └── src/main/resources/application.yaml
└── consumer/ # 上桌:消费者
├── pom.xml # 子模块 POM
└── src/main/java/org/example/consumer/
├── ConsumerApplication.java
└── listener/KafkaConsumerListener.java
└── src/main/resources/application.yaml

2. 根 POM(大厨的调料表)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<modules>
<module>provider</module>
<module>consumer</module>
</modules>


<properties>
<java.version>17</java.version>
<spring.boot.version>3.4.3</spring.boot.version>
<spring.cloud.version>2024.0.2</spring.cloud.version>
</properties>

<!-- 关键:用 BOM 管理依赖版本(不用 parent 也行) -->
<dependencyManagement>
<dependencies>
<!-- Spring Boot 依赖版本对齐(含 starter、lombok 等) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- Spring Cloud 依赖版本对齐 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>


<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!-- hutool工具类 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-ai</artifactId>
<version>5.8.38</version>
</dependency>

</dependencies>

👉 全局版本对齐,避免“锅底和食材不搭”。


3. 子模块 POM

3.1 provider/pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<parent>
<groupId>org.example</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>org.example</groupId>
<artifactId>provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>provider</name>
<description>provider</description>
<packaging>jar</packaging>

<properties>
<java.version>17</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

3.2 consumer/pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<parent>
<groupId>org.example</groupId>
<artifactId>kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>org.example</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>consumer</name>
<description>consumer</description>
<packaging>jar</packaging>

<properties>
<java.version>17</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.58</version>
</dependency>
</dependencies>

4. 配置(菜单写清楚)

4.1 Provider(application.yaml - 生产者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
server:
port: 1003 # 本模块 HTTP 端口

app:
kafka:
topic: demo.topic.v1 # 要发送/创建的主题名
auto-create-topic: true # 开启后,会注册 NewTopic bean 从而在启动时创建主题(见 KafkaTopicsConfig)

spring:
kafka:
bootstrap-servers: yiqiquhuxi.cn:9092
# 数据网络IO 序列化方式
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 发送字符串
# 可靠性
acks: all
retries: 3

4.2 Consumer(application.yaml - 消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
server:
port: 1004 # 本模块端口(通常只看日志)

app:
kafka:
topic: demo.topic.v1 # 要订阅的主题名(与 provider 保持一致)


spring:
kafka:
bootstrap-servers: yiqiquhuxi.cn:9092
consumer:
group-id: demo-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 接收字符串

# 说明:
# - JsonDeserializer 的默认类型键为 spring.json.value.default.type(源码常量 VALUE_DEFAULT_TYPE)。:contentReference[oaicite:7]{index=7}
# - @KafkaListener 支持使用 ${...} 占位符读取上述配置。:contentReference[oaicite:8]{index=8}

5. 核心代码(厨师上阵)

5.1 入口

1
2
3
4
5
6
7
8
@SpringBootApplication
public class ProviderApplication {
public static void main(String[] args) { SpringApplication.run(ProviderApplication.class, args); }
}
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }
}

5.2 消息模型

1
public record MessagePayload(String id, String content, long ts) {}

5.3 Provider(点菜 + 上菜)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
public class KafkaProducerService {

@Autowired
private KafkaTemplate<String, String> kafka;

@Value("${app.kafka.topic}")
private String topic;


public void send(String content) {
MessagePayload payload = new MessagePayload(
UUID.randomUUID().toString(),
content,
System.currentTimeMillis()
);
//序列化
String jsonStr = JSONUtil.toJsonStr(payload);
kafka.send(topic, jsonStr);
}
}

@RestController
@RequestMapping("/provider")
public class ProviderController {
@Autowired private KafkaProducerService producer;
@GetMapping("/done") public String done() { producer.send("done"); return "done"; }
}

5.4 Provider(创建 Topic)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class KafkaTopicsConfig {

@Value("${app.kafka.topic}")
private String topic;

// 只有当 app.kafka.auto-create-topic=true(或缺省并 matchIfMissing=true)才注册 NewTopic
@Bean
@ConditionalOnProperty(name = "app.kafka.auto-create-topic", havingValue = "true", matchIfMissing = true)
public NewTopic demoTopic() {
// 分区/副本按你的集群实际调整;单 Broker 可用 (3,1)
return new NewTopic(topic, 3, (short) 1);
}
}

👉 有了它,就不用手动 kafka-topics.sh --create,Spring Boot 启动时就能帮你“先起锅烧水”。

5.5 Consumer(开吃)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

@Slf4j
@Component
public class KafkaConsumerListener {


@KafkaListener(
topics = "${app.kafka.topic}",
groupId = "${spring.kafka.consumer.group-id}"
)
public void onMessage(String msg) {
try {
// json反序列化成对象
MessagePayload payload = JSON.parseObject(msg, MessagePayload.class);
log.info("✅ received: id={}, content={}, ts={}",
payload.id(), payload.content(), payload.ts());
} catch (Exception e) {
log.error("❌ JSON解析失败,原始消息: {}", msg, e);
}
}
}

6. 运行流程

  1. 点火:Kafka Broker 先启动
  2. 开店:先跑 consumer,再跑 provider
  3. 点单GET http://localhost:1003/provider/done
  4. 吃菜:consumer 日志里出现 🍜 → 成功!

7. 常见坑

  • 锅点不着bootstrap-servers 不通,先查网络
  • 没菜:topic 不存在?开 auto-create-topic
  • 吃不到:改 group-id 或加 auto-offset-reset=earliest
  • 串味了:序列化不匹配 Producer 默认用 StringSerializer,Consumer 却用 JsonDeserializer,两边火候不对,消息就“夹生”了。
    • 建议:
      • 如果传字符串,就都用 StringSerializer / StringDeserializer
      • 如果传对象,就统一用 JsonSerializer / JsonDeserializer,并在 application.yaml 里显式声明 spring.json.value.default.type

8. 总结

Spring Boot + Kafka 的套路:
👉 Provider 点单,Kafka 传菜,Consumer 开吃。

就像火锅:食材扔进去,涮一涮,人人都能分到一口热乎的。