Binding & Binders
关于 Spring Cloud 流,有三个重要的概念 -
-
外部消息传递系统- 这是外部管理的组件,负责存储应用程序生成的事件/消息,这些事件/消息可由其订阅者/消费者读取。请注意,这不在 app/Spring 中管理。几个例子是 Apache Kafka、RabbitMQ
-
Binders − 这是提供与消息系统集成的组件,例如,由消息系统的 IP 地址、身份验证等组成。
-
Bindings - 该组件使用绑定器向消息传递系统生成消息或使用来自特定主题/队列的消息。
以上所有属性都定义在 application properties file.
例如
让我们使用我们之前使用过的 Restaurant 案例。因此,让我们假设每当向客户服务添加新服务时,我们都希望将客户信息通知给附近的餐馆关于他/她的信息。
为此,让我们首先更新我们的客户服务以包含和使用 Kafka。请注意,我们将使用客户服务作为数据的生产者。也就是说,每当我们通过 API 添加 Customer 时,它也会被添加到 Kafka。
spring:
application:
name: customer-service
cloud:
stream:
source: customerBinding-out-0
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-out-0:
destination: customer
producer:
partitionCount: 3
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: http://localhost:8900/eureka
注意点 −
-
我们已经定义了一个带有本地 Kafka 实例地址的绑定器。
-
我们还定义了绑定“customerBinding-out-0”,它使用“customer”主题来输出消息。
-
我们还在 stream.source 这样我们就可以在我们的代码中强制使用它。
完成后,让我们现在通过添加一个新方法“addCustomer”来更新我们的控制器,该方法负责为 POST 请求提供服务。然后,从post 请求,我们将数据发送到 Kafka Broker。
package com.jc2182;
import java.util.HashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
class RestaurantCustomerInstancesController {
@Autowired
private StreamBridge streamBridge;
static HashMap<Long, Customer> mockCustomerData = new HashMap();
static{
mockCustomerData.put(1L, new Customer(1, "Jane", "DC"));
mockCustomerData.put(2L, new Customer(2, "John", "SFO"));
mockCustomerData.put(3L, new Customer(3, "Kate", "NY"));
}
@RequestMapping("/customer/{id}")
public Customer getCustomerInfo(@PathVariable("id") Long id) {
System.out.println("Querying customer for id with: " + id);
return mockCustomerData.get(id);
}
@RequestMapping(path = "/customer/{id}", method = RequestMethod.POST)
public Customer addCustomer(@PathVariable("id") Long id) {
// add default name
Customer defaultCustomer = new Customer(id, "Dwayne", "NY");
streamBridge.send("customerBinding-out-0", defaultCustomer);
return defaultCustomer;
}
}
注意点
现在让我们更新我们的餐厅服务以包含和订阅“客户”主题。请注意,我们将使用 Restaurant Service 作为数据的消费者。也就是说,每当我们通过 API 添加客户时,餐厅服务就会通过 Kafka 知道它。
首先,让我们更新 application properties 文件。
spring:
application:
name: restaurant-service
cloud:
function:
definition: customerBinding
stream:
kafka:
binder:
brokers: localhost:9092
replicationFactor: 1
bindings:
customerBinding-in-0:
destination: customer
server:
port: ${app_port}
eureka:
client:
serviceURL:
defaultZone: http://localhost:8900/eureka
完成后,让我们现在通过添加一个新方法“customerBinding”来更新我们的控制器,该方法负责获取请求并提供一个函数,该函数将打印请求及其元数据详细信息。
package com.jc2182;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
class RestaurantController {
@Autowired
CustomerService customerService;
@Autowired
private StreamBridge streamBridge;
static HashMap<Long, Restaurant> mockRestaurantData = new HashMap();
static{
mockRestaurantData.put(1L, new Restaurant(1, "Pandas", "DC"));
mockRestaurantData.put(2L, new Restaurant(2, "Indies", "SFO"));
mockRestaurantData.put(3L, new Restaurant(3, "Little Italy", "DC"));
mockRestaurantData.put(4L, new Restaurant(4, "Pizeeria", "NY"));
}
@RequestMapping("/restaurant/customer/{id}")
public List<Restaurant> getRestaurantForCustomer(@PathVariable("id") Long id) {
System.out.println("Got request for customer with id: " + id);
String customerCity = customerService.getCustomerById(id).getCity();
return mockRestaurantData.entrySet().stream().filter(
entry -> entry.getValue().getCity().equals(customerCity))
.map(entry -> entry.getValue())
.collect(Collectors.toList());
}
@RequestMapping("/restaurant/cust/{id}")
public void getRestaurantForCust(@PathVariable("id") Long id) {
streamBridge.send("ordersBinding-out-0", id);
}
@Bean
public Consumer<Message<Customer>> customerBinding() {
return msg -> {
System.out.println(msg);
};
}
}
Points to note −
现在,让我们像往常一样执行上面的代码,启动 Eureka Server。请注意,这不是硬性要求,为了完整起见,在此列出。
然后,让我们使用以下命令编译并开始更新客户服务 -
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
1.0.jar --spring.config.location=classpath:application-kafka.yml
然后,让我们使用以下命令编译并开始更新餐厅服务 -
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
1.0.jar --spring.config.location=classpath:application-kafka.yml
我们准备好了,现在让我们通过点击 API 来测试我们的代码片段 -
curl -X POST http://localhost:8083/customer/1
这是我们将为此 API 获得的输出 -
{
"id": 1,
"name": "Dwayne",
"city": "NY"
}
现在,让我们检查餐厅服务的日志 -
GenericMessage [payload=Customer [id=1, name=Dwayne, city=NY],
headers={kafka_offset=1,...
因此,实际上,您会看到使用 Kafka Broker,Restaurant Service 收到了有关新添加的 Customer 的通知。