——麻辣火锅版 🍲
1. 项目层级 像火锅店的分工:点单员、传菜员、食客清清楚楚。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 kafka/ ├── pom.xml # 根 POM(BOM对齐) ├── provider/ # 点单:生产者 │ ├── pom.xml # 子模块 POM │ └── src/main/java/org/example/provider/ │ ├── ProviderApplication.java │ ├── conf/KafkaTopicsConfig.java │ ├── controller/ProviderController.java │ └── service/KafkaProducerService.java │ └── src/main/resources/application.yaml └── consumer/ # 上桌:消费者 ├── pom.xml # 子模块 POM └── src/main/java/org/example/consumer/ ├── ConsumerApplication.java └── listener/KafkaConsumerListener.java └── src/main/resources/application.yaml
2. 根 POM(大厨的调料表) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 <modules> <module>provider</module> <module>consumer</module> </modules> <properties> <java.version>17</java.version> <spring.boot.version>3.4.3</spring.boot.version> <spring.cloud.version>2024.0.2</spring.cloud.version> </properties> <!-- 关键:用 BOM 管理依赖版本(不用 parent 也行) --> <dependencyManagement> <dependencies> <!-- Spring Boot 依赖版本对齐(含 starter、lombok 等) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!-- Spring Cloud 依赖版本对齐 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring.cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- hutool工具类 --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-ai</artifactId> <version>5.8.38</version> </dependency> </dependencies>
👉 全局版本对齐,避免“锅底和食材不搭”。
3. 子模块 POM 3.1 provider/pom.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 <parent> <groupId>org.example</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>provider</artifactId> <version>0.0.1-SNAPSHOT</version> <name>provider</name> <description>provider</description> <packaging>jar</packaging> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>
3.2 consumer/pom.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 <parent> <groupId>org.example</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>consumer</name> <description>consumer</description> <packaging>jar</packaging> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.58</version> </dependency> </dependencies>
4. 配置(菜单写清楚) 4.1 Provider(application.yaml - 生产者) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 server: port: 1003 # 本模块 HTTP 端口 app: kafka: topic: demo.topic.v1 # 要发送/创建的主题名 auto-create-topic: true # 开启后,会注册 NewTopic bean 从而在启动时创建主题(见 KafkaTopicsConfig) spring: kafka: bootstrap-servers: yiqiquhuxi.cn:9092 # 数据网络IO 序列化方式 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 发送字符串 # 可靠性 acks: all retries: 3
4.2 Consumer(application.yaml - 消费者) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 server: port: 1004 # 本模块端口(通常只看日志) app: kafka: topic: demo.topic.v1 # 要订阅的主题名(与 provider 保持一致) spring: kafka: bootstrap-servers: yiqiquhuxi.cn:9092 consumer: group-id: demo-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 接收字符串 # 说明: # - JsonDeserializer 的默认类型键为 spring.json.value.default.type(源码常量 VALUE_DEFAULT_TYPE)。:contentReference[oaicite:7]{index=7} # - @KafkaListener 支持使用 ${...} 占位符读取上述配置。:contentReference[oaicite:8]{index=8}
5. 核心代码(厨师上阵) 5.1 入口 1 2 3 4 5 6 7 8 @SpringBootApplication public class ProviderApplication { public static void main(String[] args) { SpringApplication.run(ProviderApplication.class, args); } } @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
5.2 消息模型 1 public record MessagePayload(String id, String content, long ts) {}
5.3 Provider(点菜 + 上菜) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafka; @Value("${app.kafka.topic}") private String topic; public void send(String content) { MessagePayload payload = new MessagePayload( UUID.randomUUID().toString(), content, System.currentTimeMillis() ); //序列化 String jsonStr = JSONUtil.toJsonStr(payload); kafka.send(topic, jsonStr); } } @RestController @RequestMapping("/provider") public class ProviderController { @Autowired private KafkaProducerService producer; @GetMapping("/done") public String done() { producer.send("done"); return "done"; } }
5.4 Provider(创建 Topic) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Configuration public class KafkaTopicsConfig { @Value("${app.kafka.topic}") private String topic; // 只有当 app.kafka.auto-create-topic=true(或缺省并 matchIfMissing=true)才注册 NewTopic @Bean @ConditionalOnProperty(name = "app.kafka.auto-create-topic", havingValue = "true", matchIfMissing = true) public NewTopic demoTopic() { // 分区/副本按你的集群实际调整;单 Broker 可用 (3,1) return new NewTopic(topic, 3, (short) 1); } }
👉 有了它,就不用手动 kafka-topics.sh --create,Spring Boot 启动时就能帮你“先起锅烧水”。
5.5 Consumer(开吃) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Slf4j @Component public class KafkaConsumerListener { @KafkaListener( topics = "${app.kafka.topic}", groupId = "${spring.kafka.consumer.group-id}" ) public void onMessage(String msg) { try { // json反序列化成对象 MessagePayload payload = JSON.parseObject(msg, MessagePayload.class); log.info("✅ received: id={}, content={}, ts={}", payload.id(), payload.content(), payload.ts()); } catch (Exception e) { log.error("❌ JSON解析失败,原始消息: {}", msg, e); } } }
6. 运行流程
点火 :Kafka Broker 先启动
开店 :先跑 consumer,再跑 provider
点单 :GET http://localhost:1003/provider/done
吃菜 :consumer 日志里出现 🍜 → 成功!
7. 常见坑
锅点不着 :bootstrap-servers 不通,先查网络
没菜 :topic 不存在?开 auto-create-topic
吃不到 :改 group-id 或加 auto-offset-reset=earliest
串味了:序列化不匹配 Producer 默认用 StringSerializer,Consumer 却用 JsonDeserializer,两边火候不对,消息就“夹生”了。
建议:
如果传字符串,就都用 StringSerializer / StringDeserializer。
如果传对象,就统一用 JsonSerializer / JsonDeserializer,并在 application.yaml 里显式声明 spring.json.value.default.type。
8. 总结 Spring Boot + Kafka 的套路: 👉 Provider 点单,Kafka 传菜,Consumer 开吃。
就像火锅:食材扔进去,涮一涮,人人都能分到一口热乎的。