有许多连接微服务的方法。但是,最可靠的方法之一是使用消息队列协议。 Apache Kafka是您可以自由使用的众多消息队列系统之一,它提供了许多功能,我们可以根据需要自定义。
在这篇博客文章中,我将与您分享如何使用Apache Kafka连接两个Spring Boot Microservices。
项目体系结构
这个项目做什么
我们将遵循上图所示的体系结构。这个项目中发生的事情是:
-
我们创建一个项目,该项目(从公共新闻API中获取新闻),然后将数据存储在我们的Redis数据库中。
-
我们在发布日期之前搜索新闻。 (yyyy-mm-dd)。
-
如果数据已经存在于我们的REDIS数据库中,则将直接抓住数据库。否则,它将向新闻API发送请求,然后将其存储在REDIS数据库中。
微服务组成
我们的项目将由2个微服务组成:
-
user-api
- 提供客户应用程序可访问的REST API。
- 是Kafka的主题出版商。
- 它将检查数据是否存在。否则,它将向Kafka发送一个主题,后来将被工人消费。
-
worker
- 消耗出版商(用户API)通过kafka发表的主题。
- 存储从新闻API到Redis的数据。
其他需求
要完成这个项目,我们需要在系统中安装几件事:
-
docker(运行容器)
-
Zookeeper(Contarered)
-
apache kafka(contained)
-
redis(容器)
设置KAFKA消息经纪 + Redis
要设置KAFKA消息经纪(加上Redis),我们将使用Docker选择最简单的方法。要将容器安装到Docker中,您可以使用此.yml
脚本。
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "news:1:3"
KAFKA_OFFSETS_RETENTION_MINUTES: 60
redis:
image: redis:latest
restart: always
ports:
- 6379:6379
environment:
REDIS_PASSWORD: <change to your password>
volumes:
- redis-data:/data
volumes:
redis-data:
重要解释:
-
我们通过使用
KAFKA_CREATE_TOPICS
环境变量来设置Kafka主题。在这种情况下,我们将主题命名为新闻。 -
我们可以使用
KAFKA_OFFSETS_RETENTION_MINUTES
设置KAFKA消息的偏移(保留期)。我们将其设置为60分钟。 -
REDIS密码是可选的。如果您不想在redis服务器中密码,则可以删除
REDIS_PASSWORD
。
用名称docker-compose.yml
保存此文件。然后,在将docker-compose.yml
文件放置的同一目录中打开终端。运行此命令以安装:
docker-compose up -d
命令完成后,您可以使用此命令检查安装:
docker ps
如果安装成功,您将在容器列表中看到3个新容器:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e5e5245248ed confluentinc/cp-kafka:latest "/etc/confluent/dock…" 47 hours ago Up 11 hours 9092/tcp, 0.0.0.0:29092->29092/tcp monorepo-kafka-1
bf3a4892effa confluentinc/cp-zookeeper:latest "/etc/confluent/dock…" 47 hours ago Up 11 hours 2888/tcp, 3888/tcp, 0.0.0.0:22181->2181/tcp monorepo-zookeeper-1
3b0f8606ad00 redis:latest "docker-entrypoint.s…" 47 hours ago Up 12 hours 0.0.0.0:6379->6379/tcp monorepo-redis-1
设置新闻API
我们将使用MediaStack进行此项目。他们提供可用于该项目的免费层。要设置您的MediaStack帐户,您可以关注他们的documentation page。您需要一个访问键来使用其API。您可以在成功注册后生成它。
对于此项目,我们将仅使用MediaStack提供的一个API:
http://api.mediastack.com/v1/news?access_key=<your_access_key>&countries=us&date=2023-05-11&limit=25
这是示例数据:
{
"pagination": {
"limit": 25,
"offset": 0,
"count": 25,
"total": 10000
},
"data": [
{
"author": "Central Oregon Daily News Sources",
"title": "Bend-La Pine school bus driver job fairs begin Thursday",
"description": "People interested in becoming a school bus driver ...",
"url": "https://centraloregondaily.com/bend-la-pine-school-bus-driver-job-fair/",
"source": "kohd",
"image": null,
"category": "general",
"language": "en",
"country": "us",
"published_at": "2023-05-11T00:02:02+00:00"
},
{
"author": "Norni Mahadi",
"title": "Miri City Council embarks on ‘international links’ to lure tourists to the Resort City",
"description": "MIRI (May 11): The Miri City Council (MCC) is ready to do more ...",
"url": "https://www.theborneopost.com/2023/05/11/miri-city-council-embarks-on-international-links-to-lure-tourists-to-the-resort-city/",
"source": "theborneopost",
"image": "https://www.theborneopost.com/newsimages/2023/05/myy-100523-nm-MiriCityDay-p1.jpeg",
"category": "general",
"language": "en",
"country": "us",
"published_at": "2023-05-11T00:00:19+00:00"
},
...
]
}
我们将直接保存数据到REDIS数据库,因为我们的重点仅是证明Kafka如何使用微服务。
发布者服务(用户api)
项目细节
您可以通过Spring Initializr生成项目模板,并提供一些注释以关注此项目:
-
使用 gradle- groovy 项目经理(如果您对此感到更有信心,也可以使用Maven,只需相应地调整)。
-
使用 java 作为编程语言。
依赖性列表:
-
lomok p> li>
-
kafka p> li>
-
重新反应
-
webflux
项目结构
您可以使用此项目结构作为参考,以使遵循本教程更容易。
├───.gradle
│ └─── ...
├───gradle
│ └───wrapper
└───src
├───main
│ ├───java
│ │ └───com
│ │ └───justahmed99
│ │ └───userapi
│ │ ├───UserApiApplication.java
│ │ │
│ │ ├───config
│ │ │ ├───KafkaProducerConfig.java
│ │ │ ├───KafkaTopicConfig.java
│ │ │ ├───KafkaTopicConfig.java
│ │ │ └───RedisConfig.java
│ │ │
│ │ ├───controller
│ │ │ └───MessageController.java
│ │ ├───repository
│ │ │
│ │ ├───NewsRepository.java
│ │ │ └───impl
│ │ │ └───NewsRepositoryImpl.java
│ │ │
│ │ └───service
│ │ │ MessageService.java
│ │ │
│ │ └───impl
│ │ MessageServiceImpl.java
│ │
│ └───resources
│ └───application.yml
│
└───test
└─── ...
代码
USERAPIAPPLICATION.JAVA
package com.justahmed99.userapi;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class UserApiApplication {
public static void main(String[] args) {
SpringApplication.run(UserApiApplication.class, args);
}
}
config/kafkaproducerconfig.java
package com.justahmed99.userapi.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfig() {
HashMap<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
config/kafkatopicconfig.java
package com.justahmed99.userapi.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic newsTopic() {
return TopicBuilder.name("news").build();
}
}
config/redisconfig.java
package com.justahmed99.userapi.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.Objects;
@Configuration
public class RedisConfig {
@Value("${spring.data.redis.host}")
private String host;
@Value("${spring.data.redis.port}")
private String port;
@Value("${spring.data.redis.password}")
private String password;
@Bean
public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(Objects.requireNonNull(host));
redisStandaloneConfiguration.setPort(Integer.parseInt(Objects.requireNonNull(port)));
redisStandaloneConfiguration.setPassword(RedisPassword.of(Objects.requireNonNull(password)));
return new LettuceConnectionFactory(redisStandaloneConfiguration);
}
@Bean
public ReactiveRedisOperations<String, Object> redisOperations(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory
) {
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder =
RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
RedisSerializationContext<String, Object> context = builder.value(serializer).hashValue(serializer)
.hashKey(serializer).build();
return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, context);
}
}
dto/request/messagerequest.java
package com.justahmed99.userapi.dto.request;
import lombok.Data;
@Data
public class MessageRequest {
private String message;
}
dto/响应/dataresponse.java
package com.justahmed99.userapi.dto.response;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class DataResponse<T> {
private String message;
private Boolean status;
@JsonInclude(JsonInclude.Include.NON_NULL)
private T data;
}
存储库/newsrepository.java
package com.justahmed99.userapi.repository;
import reactor.core.publisher.Mono;
public interface NewsRepository {
Mono<Object> getNews(String date);
}
存储库/Impl/newsrepositoryimpl.java
package com.justahmed99.userapi.repository.impl;
import com.justahmed99.userapi.repository.NewsRepository;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
@Repository
public class NewsRepositoryImpl implements NewsRepository {
private final ReactiveRedisOperations<String, Object> redisOperations;
public NewsRepositoryImpl(
ReactiveRedisOperations<String, Object> redisOperations
) {
this.redisOperations = redisOperations;
}
@Override
public Mono<Object> getNews(String date) {
return redisOperations.opsForValue().get(date);
}
}
服务/消息服务
package com.justahmed99.userapi.service;
import reactor.core.publisher.Mono;
public interface MessageService {
Mono<Void> publishToMessageBroker(String date);
Mono<Object> getNews(String date);
}
服务/impl/messagserviceimpl.java
package com.justahmed99.userapi.service.impl;
import com.justahmed99.userapi.repository.NewsRepository;
import com.justahmed99.userapi.service.MessageService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class MessageServiceImpl implements MessageService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final NewsRepository newsRepository;
public MessageServiceImpl(
KafkaTemplate<String, String> kafkaTemplate,
NewsRepository newsRepository
) {
this.kafkaTemplate = kafkaTemplate;
this.newsRepository = newsRepository;
}
@Override
public Mono<Void> publishToMessageBroker(String date) {
ProducerRecord<String, String> record = new ProducerRecord<>("news", null, date);
return Mono.fromFuture(kafkaTemplate.send(record))
.then();
}
@Override
public Mono<Object> getNews(String date) {
return newsRepository.getNews(date)
.flatMap(Mono::just)
.switchIfEmpty(Mono.defer(() -> publishToMessageBroker(date)));
}
}
控制器/MessageController.java
package com.justahmed99.userapi.controller;
import com.justahmed99.userapi.dto.request.MessageRequest;
import com.justahmed99.userapi.dto.response.DataResponse;
import com.justahmed99.userapi.service.MessageService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping(value = "/message")
public class MessageController {
private final MessageService service;
public MessageController(
MessageService service
) {
this.service = service;
}
@GetMapping("")
public Mono<ResponseEntity<DataResponse<Object>>> getNews(@RequestParam(name = "date") String date) {
return service.getNews(date)
.flatMap(data -> Mono.just(
ResponseEntity.status(HttpStatus.OK)
.body(new DataResponse<>
("data found", true, data))))
.switchIfEmpty(Mono.defer(() -> Mono.just(
ResponseEntity.status(HttpStatus.NOT_FOUND).
body(new DataResponse<>
("data not found, sending request to broker", false, null)))));
}
}
Application.yml
server:
port: 8080
spring:
kafka:
bootstrap-servers: localhost:29092
data:
redis:
host: localhost
port: 6379
password: <your_redis_password>
使用Redis服务器的密码更改your_redis_password
。否则,只需摆脱它即可。
消费者服务(工人)
项目细节
要初始化此项目,请与user-api
项目进行相同的操作。
项目结构
您可以使用此项目结构作为参考,以使遵循本教程更容易。
├───.gradle
│ └─── ...
├───gradle
│ └───wrapper
└───src
├───main
│ ├───java
│ │ └───com
│ │ └───justahmed99
│ │ └───worker
│ │ ├───WorkerApplication.java
│ │ │
│ │ ├───config
│ │ ├───KafkaProducerConfig.java
│ │ │ ├───KafkaTopicConfig.java
│ │ │ ├───KafkaTopicConfig.java
│ │ │ └───RedisConfig.java
│ │ │ └───WebClientConfig.java
│ │ │
│ │ ├───listener
│ │ │ └───KafkaListener.java
│ │ │
│ │ ├───repository
│ │ │
│ │ ├───NewsRepository.java
│ │ │ └───impl
│ │ │ └───NewsRepositoryImpl.java
│ │ │
│ │ └───service
│ │ │ WebClientService.java
│ │ │
│ │ └───impl
│ │ WebClientServiceImpl.java
│ │
│ └───resources
│ └───application.yml
│
└───test
└─── ...
代码
config/kafkaconfigconsumer.java
package com.justahmed99.worker.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> consumerConfig() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "message-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
config/kafkatopicconfig.java
package com.justahmed99.worker.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic newsTopic() {
return TopicBuilder.name("news").build();
}
}
config/redisconfig.java
package com.justahmed99.worker.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.Objects;
@Configuration
public class RedisConfig {
@Value("${spring.data.redis.host}")
private String host;
@Value("${spring.data.redis.port}")
private String port;
@Value("${spring.data.redis.password}")
private String password;
@Bean
public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(Objects.requireNonNull(host));
redisStandaloneConfiguration.setPort(Integer.parseInt(Objects.requireNonNull(port)));
redisStandaloneConfiguration.setPassword(RedisPassword.of(Objects.requireNonNull(password)));
return new LettuceConnectionFactory(redisStandaloneConfiguration);
}
@Bean
public ReactiveRedisOperations<String, Object> redisOperations(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory
) {
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder =
RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
RedisSerializationContext<String, Object> context = builder.value(serializer).hashValue(serializer)
.hashKey(serializer).build();
return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, context);
}
}
config/webclientconfig.java
package com.justahmed99.worker.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
@Configuration
public class WebClientConfig {
@Value("${mediastack.uri}")
private String apiUri;
@Bean
public WebClient webClient() {
return WebClient.create(apiUri);
}
}
听众/kafkalisteners.java
package com.justahmed99.worker.listener;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.justahmed99.worker.repository.NewsRepository;
import com.justahmed99.worker.service.WebClientService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@Component
public class KafkaListeners {
private final WebClientService webClientService;
private final NewsRepository newsRepository;
public KafkaListeners (
WebClientService webClientService,
NewsRepository newsRepository
) {
this.webClientService = webClientService;
this.newsRepository = newsRepository;
}
@KafkaListener(topics = "news", groupId = "message-group")
void listener(String date) {
System.out.printf("Listener received: %s%n", date);
Mono<ResponseEntity<String>> responseEntity = webClientService.sendRequest(date);
responseEntity.subscribe(response -> {
HttpStatus status = (HttpStatus) response.getStatusCode();
if (status.equals(HttpStatus.OK)) {
try {
newsRepository.saveNews(date, response.getBody())
.subscribe(isSaved -> {
if (isSaved)
System.out.println("Data successfully saved in cache");
else
System.out.println("Data save failed");
});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
System.out.println(status.value());
});
}
}
存储库/newsrepository.java
package com.justahmed99.worker.repository;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
@Repository
public interface NewsRepository {
Mono<Boolean> saveNews(String date, Object newsObject) throws JsonProcessingException;
}
存储库/Impl/newsrepositoryimpl.java
package com.justahmed99.worker.repository.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.justahmed99.worker.repository.NewsRepository;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
import java.time.Duration;
@Repository
public class NewsRepositoryImpl implements NewsRepository {
private final ReactiveRedisOperations<String, Object> redisOperations;
public NewsRepositoryImpl(
ReactiveRedisOperations<String, Object> redisOperations
) {
this.redisOperations = redisOperations;
}
@Override
public Mono<Boolean> saveNews(String date, Object newsObject) throws JsonProcessingException {
Duration ttl = Duration.ofHours(1);
ObjectMapper objectMapper = new ObjectMapper();
return redisOperations.opsForValue().set(date, objectMapper.readTree(newsObject.toString()), ttl);
}
}
服务/webclientservice.java
package com.justahmed99.worker.service;
import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;
public interface WebClientService {
Mono<ResponseEntity<String>> sendRequest(String date);
}
服务/Impl/webclientserviceimpl.java
package com.justahmed99.worker.service.impl;
import com.justahmed99.worker.service.WebClientService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@Service
public class WebClientServiceImpl implements WebClientService {
@Value("${mediastack.api-key}")
private String apiKey;
@Value("${mediastack.countries}")
private String countries;
@Value("${mediastack.limit}")
private String limit;
private final WebClient webClient;
public WebClientServiceImpl(WebClient webClient) {
this.webClient = webClient;
}
@Override
public Mono<ResponseEntity<String>> sendRequest(String date) {
return webClient.get()
.uri(uriBuilder -> uriBuilder
.queryParam("access_key", apiKey)
.queryParam("countries", countries)
.queryParam("limit", limit)
.queryParam("date", date)
.build())
.retrieve()
.toEntity(String.class);
}
}
Application.yml
server:
port: 9090
spring:
kafka:
bootstrap-servers: localhost:29092
data:
redis:
host: localhost
port: 6379
password: myredis
mediastack:
uri: http://api.mediastack.com/v1/news
api-key: <your-api-key>
countries: us
limit: 25
注意:
-
使用Redis Server的密码更改
your_redis_password
。否则,只需摆脱它即可。 -
用您从MediaStack获得的API键更改
your-api-key
。
终点
尝试先提出请求:
-
确保Zookeeper容器正在运行
-
确保Kafka容器正在运行
-
确保Redis容器正在运行
-
运行
user-api
服务 -
运行
worker
服务
要提出可以点击的请求:
http://localhost:8080/date=<yyyy-MM-dd>
结论
您可以将两个弹簧靴微服务与kafka连接。希望这可以帮助。 ð
这是GitHub repo。