初始化git 基础代码
This commit is contained in:
33
heima-leadnews-test/kafka-demo/.gitignore
vendored
Normal file
33
heima-leadnews-test/kafka-demo/.gitignore
vendored
Normal file
@@ -0,0 +1,33 @@
|
||||
HELP.md
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**/target/
|
||||
!**/src/test/**/target/
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
59
heima-leadnews-test/kafka-demo/pom.xml
Normal file
59
heima-leadnews-test/kafka-demo/pom.xml
Normal file
@@ -0,0 +1,59 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.bao</groupId>
|
||||
<artifactId>kafka-demo</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>kafka-demo</name>
|
||||
<description>kafka-demo</description>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<spring-boot.version>2.6.13</spring-boot.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.83</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-dependencies</artifactId>
|
||||
<version>${spring-boot.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.bao.kafkademo;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class KafkaDemoApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(KafkaDemoApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.bao.kafkademo.controller;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.bao.kafkademo.pojo.User;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* @author Redmi G Pro
|
||||
*/
|
||||
|
||||
@RestController
|
||||
public class HelloController {
|
||||
|
||||
@Autowired
|
||||
private KafkaTemplate <String,String> kafkaTemplate;
|
||||
|
||||
@GetMapping("/hello")
|
||||
public String hello(){
|
||||
User user = new User();
|
||||
user.setName("xiaowang");
|
||||
user.setAge(18);
|
||||
|
||||
kafkaTemplate.send("user-topic", JSON.toJSONString(user));
|
||||
|
||||
return "ok";
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.bao.kafkademo.listener;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.bao.kafkademo.pojo.User;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@Component
|
||||
public class HelloListener {
|
||||
|
||||
@KafkaListener(topics = "user-topic")
|
||||
public void onMessage(String message){
|
||||
if(!StringUtils.isEmpty(message)){
|
||||
User user = JSON.parseObject(message, User.class);
|
||||
System.out.println(user);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.bao.kafkademo.pojo;
|
||||
|
||||
|
||||
public class User {
|
||||
|
||||
private String name;
|
||||
private int age;
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public int getAge() {
|
||||
return age;
|
||||
}
|
||||
|
||||
public void setAge(int age) {
|
||||
this.age = age;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "User{" +
|
||||
"name='" + name + '\'' +
|
||||
", age=" + age +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
package com.bao.kafkademo.sample;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* 消费者
|
||||
*/
|
||||
public class ConsumerQuickStart {
|
||||
public static void main(String[] args) {
|
||||
// kafka配置信息
|
||||
Properties prop = new Properties();
|
||||
|
||||
// Kafka服务器连接信息
|
||||
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
|
||||
|
||||
// key和value的反序列化器
|
||||
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
|
||||
// 设置消费者组
|
||||
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
|
||||
|
||||
// 手动提交偏移量
|
||||
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||
|
||||
// 创建消费者对象
|
||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
|
||||
// 订阅主题
|
||||
consumer.subscribe(Collections.singleton("topic-first"));
|
||||
|
||||
// 同步提交和异步提交偏移量
|
||||
try {
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
|
||||
for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
|
||||
System.out.println(stringStringConsumerRecord.key());
|
||||
System.out.println(stringStringConsumerRecord.value());
|
||||
System.out.println(stringStringConsumerRecord.offset());
|
||||
System.out.println(stringStringConsumerRecord.partition());
|
||||
}
|
||||
// 异步提交偏移量
|
||||
consumer.commitAsync();
|
||||
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
System.out.println("记录错误的信息:" + e);
|
||||
}finally {
|
||||
// 同步提交
|
||||
consumer.commitSync();
|
||||
}
|
||||
|
||||
// 拉取消息
|
||||
/*while (true) {
|
||||
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
|
||||
for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
|
||||
System.out.println(stringStringConsumerRecord.key());
|
||||
System.out.println(stringStringConsumerRecord.value());
|
||||
System.out.println(stringStringConsumerRecord.offset());
|
||||
System.out.println(stringStringConsumerRecord.partition());
|
||||
|
||||
*//* try{
|
||||
// 同步提交偏移量
|
||||
consumer.commitSync();
|
||||
}catch (CommitFailedException e){
|
||||
System.out.println("记录提交失败的异常:"+e);
|
||||
}*//*
|
||||
}
|
||||
// 异步方式提交偏移量
|
||||
*//* consumer.commitAsync(new OffsetCommitCallback() {
|
||||
@Override
|
||||
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
|
||||
if (e != null){
|
||||
System.out.println("记录错误的提交的偏移量:" + map + ",异常信息为:" + e);
|
||||
}
|
||||
}
|
||||
});*//*
|
||||
|
||||
|
||||
}*/
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.bao.kafkademo.sample;
|
||||
|
||||
import org.apache.kafka.clients.producer.*;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
* 生产者
|
||||
*/
|
||||
public class ProducerQuickStart {
|
||||
public static void main(String[] args) throws ExecutionException, InterruptedException {
|
||||
|
||||
// 1.kafka连接配置信息
|
||||
Properties prop = new Properties();
|
||||
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
|
||||
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
|
||||
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
|
||||
|
||||
// ack配置 消息确认
|
||||
prop.put(ProducerConfig.ACKS_CONFIG, "all");
|
||||
|
||||
// 重试次数
|
||||
prop.put(ProducerConfig.RETRIES_CONFIG,10);
|
||||
|
||||
// 数据压缩
|
||||
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
|
||||
|
||||
// 2.创建kafka生产者对象
|
||||
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);
|
||||
|
||||
// 3.发送消息
|
||||
ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<String, String>("topic-first", "key-001", "hello kafka");
|
||||
// 同步发送
|
||||
/*RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
|
||||
System.out.println(recordMetadata.offset());*/
|
||||
|
||||
// 异步发送
|
||||
producer.send(kvProducerRecord, new Callback() {
|
||||
@Override
|
||||
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
|
||||
if (e != null) {
|
||||
System.out.println("记录添加到日志中");
|
||||
}
|
||||
System.out.println(recordMetadata.offset());
|
||||
}
|
||||
});
|
||||
|
||||
// 4.关闭通道
|
||||
producer.close();
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
server:
|
||||
port: 9991
|
||||
spring:
|
||||
application:
|
||||
name: kafka-demo
|
||||
kafka:
|
||||
bootstrap-servers: 192.168.200.130:9092
|
||||
producer:
|
||||
retries: 10
|
||||
key-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
value-serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
consumer:
|
||||
group-id: ${spring.application.name}-text
|
||||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.bao.kafkademo;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
@SpringBootTest
|
||||
class KafkaDemoApplicationTests {
|
||||
|
||||
@Test
|
||||
void contextLoads() {
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user