将弹簧靴微服务与kafka连接
#java #springboot #kafka

有许多连接微服务的方法。但是,最可靠的方法之一是使用消息队列协议。 Apache Kafka是您可以自由使用的众多消息队列系统之一,它提供了许多功能,我们可以根据需要自定义。

在这篇博客文章中,我将与您分享如何使用Apache Kafka连接两个Spring Boot Microservices。

项目体系结构

Image description

这个项目做什么

我们将遵循上图所示的体系结构。这个项目中发生的事情是:

  • 我们创建一个项目,该项目(从公共新闻API中获取新闻),然后将数据存储在我们的Redis数据库中。

  • 我们在发布日期之前搜索新闻。 (yyyy-mm-dd)。

  • 如果数据已经存在于我们的REDIS数据库中,则将直接抓住数据库。否则,它将向新闻API发送请求,然后将其存储在REDIS数据库中。

微服务组成

我们的项目将由2个微服务组成:

  • user-api

    • 提供客户应用程序可访问的REST API。
    • 是Kafka的主题出版商。
    • 它将检查数据是否存在。否则,它将向Kafka发送一个主题,后来将被工人消费。
  • worker

    • 消耗出版商(用户API)通过kafka发表的主题。
    • 存储从新闻API到Redis的数据。

其他需求

要完成这个项目,我们需要在系统中安装几件事:

  • docker(运行容器)

  • Zookeeper(Contarered)

  • apache kafka(contained)

  • redis(容器)

设置KAFKA消息经纪 + Redis

要设置KAFKA消息经纪(加上Redis),我们将使用Docker选择最简单的方法。要将容器安装到Docker中,您可以使用此.yml脚本。

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CREATE_TOPICS: "news:1:3"
      KAFKA_OFFSETS_RETENTION_MINUTES: 60

  redis:
    image: redis:latest
    restart: always
    ports:
      - 6379:6379
    environment:
      REDIS_PASSWORD: <change to your password>
    volumes: 
      - redis-data:/data
volumes:
  redis-data:

重要解释:

  • 我们通过使用KAFKA_CREATE_TOPICS环境变量来设置Kafka主题。在这种情况下,我们将主题命名为新闻。

  • 我们可以使用KAFKA_OFFSETS_RETENTION_MINUTES设置KAFKA消息的偏移(保留期)。我们将其设置为60分钟。

  • REDIS密码是可选的。如果您不想在redis服务器中密码,则可以删除REDIS_PASSWORD

用名称docker-compose.yml保存此文件。然后,在将docker-compose.yml文件放置的同一目录中打开终端。运行此命令以安装:

docker-compose up -d

命令完成后,您可以使用此命令检查安装:

docker ps

如果安装成功,您将在容器列表中看到3个新容器:

CONTAINER ID   IMAGE                              COMMAND                  CREATED         STATUS        PORTS                                         NAMES
e5e5245248ed   confluentinc/cp-kafka:latest       "/etc/confluent/dock…"   47 hours ago    Up 11 hours   9092/tcp, 0.0.0.0:29092->29092/tcp            monorepo-kafka-1
bf3a4892effa   confluentinc/cp-zookeeper:latest   "/etc/confluent/dock…"   47 hours ago    Up 11 hours   2888/tcp, 3888/tcp, 0.0.0.0:22181->2181/tcp   monorepo-zookeeper-1
3b0f8606ad00   redis:latest                       "docker-entrypoint.s…"   47 hours ago    Up 12 hours   0.0.0.0:6379->6379/tcp                        monorepo-redis-1

设置新闻API

我们将使用MediaStack进行此项目。他们提供可用于该项目的免费层。要设置您的MediaStack帐户,您可以关注他们的documentation page。您需要一个访问键来使用其API。您可以在成功注册后生成它。

对于此项目,我们将仅使用MediaStack提供的一个API:

http://api.mediastack.com/v1/news?access_key=<your_access_key>&countries=us&date=2023-05-11&limit=25

这是示例数据:

{
    "pagination": {
        "limit": 25,
        "offset": 0,
        "count": 25,
        "total": 10000
    },
    "data": [
        {
            "author": "Central Oregon Daily News Sources",
            "title": "Bend-La Pine school bus driver job fairs begin Thursday",
            "description": "People interested in becoming a school bus driver ...",
            "url": "https://centraloregondaily.com/bend-la-pine-school-bus-driver-job-fair/",
            "source": "kohd",
            "image": null,
            "category": "general",
            "language": "en",
            "country": "us",
            "published_at": "2023-05-11T00:02:02+00:00"
        },
        {
            "author": "Norni Mahadi",
            "title": "Miri City Council embarks on ‘international links’ to lure tourists to the Resort City",
            "description": "MIRI (May 11): The Miri City Council (MCC) is ready to do more ...",
            "url": "https://www.theborneopost.com/2023/05/11/miri-city-council-embarks-on-international-links-to-lure-tourists-to-the-resort-city/",
            "source": "theborneopost",
            "image": "https://www.theborneopost.com/newsimages/2023/05/myy-100523-nm-MiriCityDay-p1.jpeg",
            "category": "general",
            "language": "en",
            "country": "us",
            "published_at": "2023-05-11T00:00:19+00:00"
        },
    ...
    ]
}

我们将直接保存数据到REDIS数据库,因为我们的重点仅是证明Kafka如何使用微服务。

发布者服务(用户api)

项目细节

您可以通过Spring Initializr生成项目模板,并提供一些注释以关注此项目:

  • 使用 gradle- groovy 项目经理(如果您对此感到更有信心,也可以使用Maven,只需相应地调整)。

  • 使用 java 作为编程语言。

依赖性列表:

  • lomok

  • kafka

  • 重新反应

  • webflux

项目结构

您可以使用此项目结构作为参考,以使遵循本教程更容易。


├───.gradle
│   └─── ...
├───gradle
│   └───wrapper
└───src
    ├───main
    │   ├───java
    │   │   └───com
    │   │       └───justahmed99
    │   │           └───userapi
    │   │               ├───UserApiApplication.java
    │   │               │   
    │   │               ├───config
    │ │             │   ├───KafkaProducerConfig.java
    │   │               │   ├───KafkaTopicConfig.java
    │   │               │   ├───KafkaTopicConfig.java
    │   │               │   └───RedisConfig.java
    │   │               │
    │   │               ├───controller
    │   │               │    └───MessageController.java
    │   │               ├───repository
    │   │               │
    │ │             ├───NewsRepository.java
    │   │               │   └───impl
    │   │               │        └───NewsRepositoryImpl.java
    │   │               │
    │   │               └───service
    │   │                   │   MessageService.java
    │   │                   │
    │   │                   └───impl
    │   │                           MessageServiceImpl.java
    │   │
    │   └───resources
    │        └───application.yml
    │
    └───test
        └─── ...

代码

USERAPIAPPLICATION.JAVA

package com.justahmed99.userapi;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class UserApiApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserApiApplication.class, args);
    }
}

config/kafkaproducerconfig.java

package com.justahmed99.userapi.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfig() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

config/kafkatopicconfig.java

package com.justahmed99.userapi.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {
    @Bean
    public NewTopic newsTopic() {
        return TopicBuilder.name("news").build();
    }
}

config/redisconfig.java

package com.justahmed99.userapi.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.Objects;

@Configuration
public class RedisConfig {
    @Value("${spring.data.redis.host}")
    private String host;

    @Value("${spring.data.redis.port}")
    private String port;

    @Value("${spring.data.redis.password}")
    private String password;

    @Bean
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName(Objects.requireNonNull(host));
        redisStandaloneConfiguration.setPort(Integer.parseInt(Objects.requireNonNull(port)));
        redisStandaloneConfiguration.setPassword(RedisPassword.of(Objects.requireNonNull(password)));
        return new LettuceConnectionFactory(redisStandaloneConfiguration);
    }

    @Bean
    public ReactiveRedisOperations<String, Object> redisOperations(
            ReactiveRedisConnectionFactory reactiveRedisConnectionFactory
    ) {
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);

        RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder =
                RedisSerializationContext.newSerializationContext(new StringRedisSerializer());

        RedisSerializationContext<String, Object> context = builder.value(serializer).hashValue(serializer)
                .hashKey(serializer).build();

        return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, context);
    }
}

dto/request/messagerequest.java

package com.justahmed99.userapi.dto.request;

import lombok.Data;

@Data
public class MessageRequest {
    private String message;
}

dto/响应/dataresponse.java

package com.justahmed99.userapi.dto.response;

import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class DataResponse<T> {
    private String message;
    private Boolean status;
    @JsonInclude(JsonInclude.Include.NON_NULL)
    private T data;
}

存储库/newsrepository.java

package com.justahmed99.userapi.repository;

import reactor.core.publisher.Mono;

public interface NewsRepository {
    Mono<Object> getNews(String date);
}

存储库/Impl/newsrepositoryimpl.java

package com.justahmed99.userapi.repository.impl;

import com.justahmed99.userapi.repository.NewsRepository;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;

@Repository
public class NewsRepositoryImpl implements NewsRepository {

    private final ReactiveRedisOperations<String, Object> redisOperations;

    public NewsRepositoryImpl(
            ReactiveRedisOperations<String, Object> redisOperations
    ) {
        this.redisOperations = redisOperations;
    }

    @Override
    public Mono<Object> getNews(String date) {
        return redisOperations.opsForValue().get(date);
    }
}

服务/消息服务

package com.justahmed99.userapi.service;

import reactor.core.publisher.Mono;

public interface MessageService {
    Mono<Void> publishToMessageBroker(String date);
    Mono<Object> getNews(String date);
}

服务/impl/messagserviceimpl.java

package com.justahmed99.userapi.service.impl;

import com.justahmed99.userapi.repository.NewsRepository;
import com.justahmed99.userapi.service.MessageService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
public class MessageServiceImpl implements MessageService {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final NewsRepository newsRepository;

    public MessageServiceImpl(
            KafkaTemplate<String, String> kafkaTemplate,
            NewsRepository newsRepository
    ) {
        this.kafkaTemplate = kafkaTemplate;
        this.newsRepository = newsRepository;
    }

    @Override
    public Mono<Void> publishToMessageBroker(String date) {
        ProducerRecord<String, String> record = new ProducerRecord<>("news", null, date);
        return Mono.fromFuture(kafkaTemplate.send(record))
                .then();
    }

    @Override
    public Mono<Object> getNews(String date) {
        return newsRepository.getNews(date)
                .flatMap(Mono::just)
                .switchIfEmpty(Mono.defer(() -> publishToMessageBroker(date)));
    }
}

控制器/MessageController.java

package com.justahmed99.userapi.controller;

import com.justahmed99.userapi.dto.request.MessageRequest;
import com.justahmed99.userapi.dto.response.DataResponse;
import com.justahmed99.userapi.service.MessageService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping(value = "/message")
public class MessageController {
    private final MessageService service;

    public MessageController(
            MessageService service
    ) {
        this.service = service;
    }

    @GetMapping("")
    public Mono<ResponseEntity<DataResponse<Object>>> getNews(@RequestParam(name = "date") String date) {
        return service.getNews(date)
                .flatMap(data -> Mono.just(
                        ResponseEntity.status(HttpStatus.OK)
                                .body(new DataResponse<>
                                        ("data found", true, data))))
                .switchIfEmpty(Mono.defer(() -> Mono.just(
                        ResponseEntity.status(HttpStatus.NOT_FOUND).
                                body(new DataResponse<>
                                        ("data not found, sending request to broker", false, null)))));
    }
}

Application.yml

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: localhost:29092
  data:
    redis:
      host: localhost
      port: 6379
      password: <your_redis_password>

使用Redis服务器的密码更改your_redis_password。否则,只需摆脱它即可。

消费者服务(工人)

项目细节

要初始化此项目,请与user-api项目进行相同的操作。

项目结构

您可以使用此项目结构作为参考,以使遵循本教程更容易。

├───.gradle
│   └─── ...
├───gradle
│   └───wrapper
└───src
    ├───main
    │   ├───java
    │   │   └───com
    │   │       └───justahmed99
    │   │           └───worker
    │   │               ├───WorkerApplication.java
    │   │               │   
    │   │               ├───config
    │ │               ├───KafkaProducerConfig.java
    │   │               │   ├───KafkaTopicConfig.java
    │   │               │   ├───KafkaTopicConfig.java
    │   │               │   └───RedisConfig.java
    │   │               │   └───WebClientConfig.java
    │   │               │
    │   │               ├───listener
    │   │               │    └───KafkaListener.java
    │   │               │
    │   │               ├───repository
    │   │               │
    │ │             ├───NewsRepository.java
    │   │               │   └───impl
    │   │               │        └───NewsRepositoryImpl.java
    │   │               │
    │   │               └───service
    │   │                   │   WebClientService.java
    │   │                   │
    │   │                   └───impl
    │   │                           WebClientServiceImpl.java
    │   │
    │   └───resources
    │        └───application.yml
    │
    └───test
        └─── ...

代码

config/kafkaconfigconsumer.java

package com.justahmed99.worker.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> consumerConfig() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "message-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

config/kafkatopicconfig.java

package com.justahmed99.worker.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {
    @Bean
    public NewTopic newsTopic() {
        return TopicBuilder.name("news").build();
    }
}

config/redisconfig.java

package com.justahmed99.worker.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.Objects;

@Configuration
public class RedisConfig {
    @Value("${spring.data.redis.host}")
    private String host;

    @Value("${spring.data.redis.port}")
    private String port;

    @Value("${spring.data.redis.password}")
    private String password;

    @Bean
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName(Objects.requireNonNull(host));
        redisStandaloneConfiguration.setPort(Integer.parseInt(Objects.requireNonNull(port)));
        redisStandaloneConfiguration.setPassword(RedisPassword.of(Objects.requireNonNull(password)));
        return new LettuceConnectionFactory(redisStandaloneConfiguration);
    }

    @Bean
    public ReactiveRedisOperations<String, Object> redisOperations(
            ReactiveRedisConnectionFactory reactiveRedisConnectionFactory
    ) {
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);

        RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder =
                RedisSerializationContext.newSerializationContext(new StringRedisSerializer());

        RedisSerializationContext<String, Object> context = builder.value(serializer).hashValue(serializer)
                .hashKey(serializer).build();

        return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, context);
    }
}

config/webclientconfig.java

package com.justahmed99.worker.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;

@Configuration
public class WebClientConfig {

    @Value("${mediastack.uri}")
    private String apiUri;

    @Bean
    public WebClient webClient() {
        return WebClient.create(apiUri);
    }
}

听众/kafkalisteners.java

package com.justahmed99.worker.listener;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.justahmed99.worker.repository.NewsRepository;
import com.justahmed99.worker.service.WebClientService;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;


@Component
public class KafkaListeners {

    private final WebClientService webClientService;
    private final NewsRepository newsRepository;

    public KafkaListeners (
            WebClientService webClientService,
            NewsRepository newsRepository
    ) {
        this.webClientService = webClientService;
        this.newsRepository = newsRepository;
    }

    @KafkaListener(topics = "news", groupId = "message-group")
    void listener(String date) {
        System.out.printf("Listener received: %s%n", date);

        Mono<ResponseEntity<String>> responseEntity = webClientService.sendRequest(date);

        responseEntity.subscribe(response -> {
            HttpStatus status = (HttpStatus) response.getStatusCode();
            if (status.equals(HttpStatus.OK)) {
                try {
                    newsRepository.saveNews(date, response.getBody())
                            .subscribe(isSaved -> {
                                if (isSaved)
                                    System.out.println("Data successfully saved in cache");
                                else
                                    System.out.println("Data save failed");
                            });
                } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                }
            }
            System.out.println(status.value());
        });
    }
}

存储库/newsrepository.java

package com.justahmed99.worker.repository;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;

@Repository
public interface NewsRepository {
    Mono<Boolean> saveNews(String date, Object newsObject) throws JsonProcessingException;
}

存储库/Impl/newsrepositoryimpl.java

package com.justahmed99.worker.repository.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.justahmed99.worker.repository.NewsRepository;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;

import java.time.Duration;

@Repository
public class NewsRepositoryImpl implements NewsRepository {
    private final ReactiveRedisOperations<String, Object> redisOperations;

    public NewsRepositoryImpl(
            ReactiveRedisOperations<String, Object> redisOperations
    ) {
        this.redisOperations = redisOperations;
    }

    @Override
    public Mono<Boolean> saveNews(String date, Object newsObject) throws JsonProcessingException {
        Duration ttl = Duration.ofHours(1);
        ObjectMapper objectMapper = new ObjectMapper();
        return redisOperations.opsForValue().set(date, objectMapper.readTree(newsObject.toString()), ttl);
    }
}

服务/webclientservice.java

package com.justahmed99.worker.service;

import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;

public interface WebClientService {
    Mono<ResponseEntity<String>> sendRequest(String date);
}

服务/Impl/webclientserviceimpl.java

package com.justahmed99.worker.service.impl;

import com.justahmed99.worker.service.WebClientService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

@Service
public class WebClientServiceImpl implements WebClientService {

    @Value("${mediastack.api-key}")
    private String apiKey;

    @Value("${mediastack.countries}")
    private String countries;

    @Value("${mediastack.limit}")
    private String limit;

    private final WebClient webClient;

    public WebClientServiceImpl(WebClient webClient) {
        this.webClient = webClient;
    }

    @Override
    public Mono<ResponseEntity<String>> sendRequest(String date) {
        return webClient.get()
                .uri(uriBuilder -> uriBuilder
                        .queryParam("access_key", apiKey)
                        .queryParam("countries", countries)
                        .queryParam("limit", limit)
                        .queryParam("date", date)
                        .build())
                .retrieve()
                .toEntity(String.class);
    }
}

Application.yml

server:
  port: 9090

spring:
  kafka:
    bootstrap-servers: localhost:29092
  data:
    redis:
      host: localhost
      port: 6379
      password: myredis

mediastack:
  uri: http://api.mediastack.com/v1/news
  api-key: <your-api-key>
  countries: us
  limit: 25

注意:

  • 使用Redis Server的密码更改your_redis_password。否则,只需摆脱它即可。

  • 用您从MediaStack获得的API键更改your-api-key

终点

尝试先提出请求:

  • 确保Zookeeper容器正在运行

  • 确保Kafka容器正在运行

  • 确保Redis容器正在运行

  • 运行user-api服务

  • 运行worker服务

要提出可以点击的请求:

http://localhost:8080/date=<yyyy-MM-dd>

结论

您可以将两个弹簧靴微服务与kafka连接。希望这可以帮助。 ð

这是GitHub repo