Skip to content

Commit e55fda1

Browse files
committed
Added ProductPriceChangedEventHandler and IntgLogEntry Fixes
1 parent 0881f24 commit e55fda1

File tree

10 files changed

+174
-135
lines changed

10 files changed

+174
-135
lines changed

src/BuildingBlocks/EventBus/eventbus-rabbitmq/src/main/java/com/eshoponcontainers/EventBusRabbitMQ.java

Lines changed: 74 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -30,85 +30,79 @@
3030
@Component
3131
public class EventBusRabbitMQ implements EventBus {
3232

33-
private static final String EXCHANGE_TYPE_DIRECT = "direct";
34-
private static final String BROKER_NAME = "eshop_event_bus_exchange";
35-
private static final String QUEUE_NAME = "Catalog";
36-
37-
private final Sender sender;
38-
private final InMemoryEventBusSubscriptionManager subscriptionManager;
39-
private final Receiver receiver;
40-
41-
@Override
42-
public Mono<Void> publish(IntegrationEvent event) {
43-
44-
ObjectMapper mapper = new ObjectMapper();
45-
String eventName = event.getClass().getSimpleName();
46-
BasicProperties basicProperties = new BasicProperties().builder().deliveryMode(2).build();
47-
48-
try {
49-
50-
byte[] body = mapper.writeValueAsBytes(event);
51-
OutboundMessage message = new OutboundMessage(BROKER_NAME, eventName, basicProperties, body);
52-
53-
return sender.declare(ExchangeSpecification.exchange(BROKER_NAME).type(EXCHANGE_TYPE_DIRECT))
54-
.then(sender.send(Mono.just(message)));
55-
56-
} catch (JsonProcessingException e) {
57-
return Mono.error(e);
58-
}
59-
}
60-
61-
@Override
62-
public <T extends IntegrationEvent, TH extends IntegrationEventHandler<T>> Mono<Void> subscribe(Class<T> eventType,
63-
Class<TH> eventHandler) {
64-
65-
subscriptionManager.addSubscription(eventType, eventHandler);
66-
log.info("Entered subscribe method");
67-
68-
Mono<Void> mono = Mono.fromRunnable(() -> {
69-
receiver.consumeAutoAck(QUEUE_NAME).subscribe(message -> {
70-
71-
String routingKey = message.getEnvelope().getRoutingKey();
72-
// Class eventType = subscriptionManager.getEventTypeByName(routingKey);
73-
byte[] messageBody = message.getBody();
74-
try {
75-
Object event = new ObjectMapper().readValue(messageBody, eventType);
76-
log.info(event.toString());
77-
List<SubscriptionInfo> subscriptions = subscriptionManager.getHandlersForEvent(eventType);
78-
if (subscriptions != null && !subscriptions.isEmpty()) {
79-
for (SubscriptionInfo subscription : subscriptions) {
80-
Class handler = subscription.getHandler();
81-
Method methodInfo = null;
82-
try {
83-
methodInfo = handler.getDeclaredMethod("handle", IntegrationEvent.class);
84-
} catch (NoSuchMethodException e) {
85-
// TODO Auto-generated catch block
86-
e.printStackTrace();
87-
} catch (SecurityException e) {
88-
// TODO Auto-generated catch block
89-
e.printStackTrace();
90-
}
91-
92-
if(methodInfo != null)
93-
try {
94-
methodInfo.invoke(handler.newInstance(), event);
95-
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException | InstantiationException e) {
96-
// TODO Auto-generated catch block
97-
e.printStackTrace();
98-
}
99-
}
100-
101-
}
102-
} catch (IOException e) {
103-
e.printStackTrace();
104-
}
105-
});
106-
});
107-
return sender.declare(ExchangeSpecification.exchange(BROKER_NAME))
108-
.then(sender.declareQueue(QueueSpecification.queue(QUEUE_NAME)))
109-
.then(sender.bind(BindingSpecification.binding(BROKER_NAME, eventType.getSimpleName(), QUEUE_NAME)))
110-
.then(mono);
111-
112-
}
33+
private static final String EXCHANGE_TYPE_DIRECT = "direct";
34+
private static final String BROKER_NAME = "eshop_event_bus_exchange";
35+
private static final String QUEUE_NAME = "Catalog";
36+
37+
private final Sender sender;
38+
private final InMemoryEventBusSubscriptionManager subscriptionManager;
39+
private final Receiver receiver;
40+
41+
@Override
42+
public Mono<Void> publish(IntegrationEvent event) {
43+
ObjectMapper mapper = new ObjectMapper();
44+
String eventName = event.getClass().getSimpleName();
45+
System.out.println("Event Name : " + eventName);
46+
BasicProperties basicProperties = new BasicProperties().builder().deliveryMode(2).build();
47+
try {
48+
byte[] body = mapper.writeValueAsBytes(event);
49+
OutboundMessage message = new OutboundMessage(BROKER_NAME, eventName, basicProperties, body);
50+
return sender.declare(ExchangeSpecification.exchange(BROKER_NAME).type(EXCHANGE_TYPE_DIRECT))
51+
.then(sender.send(Mono.just(message)));
52+
} catch (JsonProcessingException e) {
53+
return Mono.error(e);
54+
}
55+
}
56+
57+
@Override
58+
public <T extends IntegrationEvent, TH extends IntegrationEventHandler<T>> Mono<Void> subscribe(Class<T> eventType,
59+
Class<TH> eventHandler) {
60+
subscriptionManager.addSubscription(eventType, eventHandler);
61+
log.info("Entered subscribe method");
62+
Mono<Void> mono = Mono.fromRunnable(() -> {
63+
receiver.consumeAutoAck(QUEUE_NAME).subscribe(message -> {
64+
String routingKey = message.getEnvelope().getRoutingKey();
65+
// Class eventType = subscriptionManager.getEventTypeByName(routingKey);
66+
byte[] messageBody = message.getBody();
67+
try {
68+
Object event = new ObjectMapper().readValue(messageBody, eventType);
69+
log.info(event.toString());
70+
List<SubscriptionInfo> subscriptions = subscriptionManager.getHandlersForEvent(eventType);
71+
if (subscriptions != null && !subscriptions.isEmpty()) {
72+
for (SubscriptionInfo subscription : subscriptions) {
73+
Class handler = subscription.getHandler();
74+
Method methodInfo = null;
75+
try {
76+
methodInfo = handler.getDeclaredMethod("handle", IntegrationEvent.class);
77+
} catch (NoSuchMethodException e) {
78+
// TODO Auto-generated catch block
79+
e.printStackTrace();
80+
} catch (SecurityException e) {
81+
// TODO Auto-generated catch block
82+
e.printStackTrace();
83+
}
84+
85+
if (methodInfo != null)
86+
try {
87+
methodInfo.invoke(handler.newInstance(), event);
88+
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
89+
| InstantiationException e) {
90+
// TODO Auto-generated catch block
91+
e.printStackTrace();
92+
}
93+
}
94+
95+
}
96+
} catch (IOException e) {
97+
e.printStackTrace();
98+
}
99+
});
100+
});
101+
return sender.declare(ExchangeSpecification.exchange(BROKER_NAME))
102+
.then(sender.declareQueue(QueueSpecification.queue(QUEUE_NAME)))
103+
.then(sender.bind(BindingSpecification.binding(BROKER_NAME, eventType.getSimpleName(), QUEUE_NAME)))
104+
.then(mono);
105+
106+
}
113107

114108
}

src/BuildingBlocks/EventBus/eventbus/src/main/java/com/eshoponcontainers/EventStateEnum.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,7 @@ public enum EventStateEnum {
1010
private EventStateEnum(int value) {
1111
this.value = value;
1212
}
13+
public int getValue() {
14+
return value;
15+
}
1316
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package com.eshoponcontainers.entities;
22

3-
import java.util.Date;
3+
import java.time.LocalDateTime;
4+
import java.time.ZoneId;
45
import java.util.UUID;
56

67
import org.springframework.data.annotation.Id;
8+
import org.springframework.data.annotation.PersistenceConstructor;
79
import org.springframework.data.annotation.Transient;
10+
import org.springframework.data.domain.Persistable;
811
import org.springframework.data.relational.core.mapping.Column;
912
import org.springframework.data.relational.core.mapping.Table;
1013

@@ -19,23 +22,23 @@
1922

2023
@Table(name = "IntegrationEventLog")
2124
@Data
22-
public class IntegrationEventLogEntry {
25+
public class IntegrationEventLogEntry implements Persistable<UUID> {
2326

24-
@Id
27+
@Id
2528
@Column(value = "EventId")
2629
private UUID eventId;
2730

2831
@Column(value = "Content")
2932
private String content;
3033

3134
@Column(value = "CreationTime")
32-
private Date creationTime;
35+
private LocalDateTime creationTime;
3336

3437
@Column(value = "EventTypeName")
3538
private String eventTypeName;
3639

3740
@Column(value = "State")
38-
private EventStateEnum state;
41+
private Integer state;
3942

4043
@Column(value = "TimesSent")
4144
private Integer timesSent;
@@ -46,31 +49,35 @@ public class IntegrationEventLogEntry {
4649
@Transient
4750
private IntegrationEvent event;
4851

49-
//Added this as a constructor to avoid the error during save of entity. Seems hibernate needs a default constructor, but does't care if its public:
50-
private IntegrationEventLogEntry() {
51-
52-
}
52+
// //Added this as a constructor to avoid the error during save of entity. Seems hibernate needs a default constructor, but does't care if its public:
53+
// private IntegrationEventLogEntry() {
54+
//
55+
// }
5356

5457
public IntegrationEventLogEntry(IntegrationEvent event, UUID transId) {
5558
eventId = event.getId();
56-
creationTime = event.getCreationDate();
59+
creationTime = LocalDateTime.ofInstant(event.getCreationDate().toInstant(), ZoneId.systemDefault());
5760
eventTypeName = event.getClass().getName();
5861

5962
try {
6063
content = new ObjectMapper().writeValueAsString(event);
6164
} catch (JsonProcessingException e) {
6265
e.printStackTrace();
6366
}
64-
state = EventStateEnum.NOT_PUBLISHED;
67+
state = EventStateEnum.NOT_PUBLISHED.getValue();
6568
timesSent = 0;
6669
transactionId = transId.toString();
70+
newIntegrationEventLogEntry = true;
71+
this.event = null;
72+
6773
}
74+
6875

69-
public Date getCreationTime() {
76+
public LocalDateTime getCreationTime() {
7077
return creationTime;
7178
}
7279

73-
public EventStateEnum getState() {
80+
public Integer getState() {
7481
return state;
7582
}
7683

@@ -82,12 +89,40 @@ public void setTimesSent(Integer timesSent) {
8289
this.timesSent = timesSent;
8390
}
8491

85-
public void setState(EventStateEnum state) {
92+
public void setState(Integer state) {
8693
this.state = state;
8794
}
8895

8996
public void deserializeEventContent() {
9097
this.event = new ObjectMapper().convertValue(this.content, IntegrationEvent.class);
9198
}
9299

100+
@Transient
101+
private boolean newIntegrationEventLogEntry;
102+
103+
@Override
104+
@Transient
105+
public boolean isNew() {
106+
return newIntegrationEventLogEntry || eventId == null;
107+
}
108+
109+
@Override
110+
public UUID getId() {
111+
return eventId;
112+
}
113+
114+
@PersistenceConstructor
115+
public IntegrationEventLogEntry(UUID eventId, String content, LocalDateTime creationTime, String eventTypeName,
116+
Integer state, Integer timesSent, String transactionId) {
117+
this.eventId = eventId;
118+
this.content = content;
119+
this.creationTime = creationTime;
120+
this.eventTypeName = eventTypeName;
121+
this.state = state;
122+
this.timesSent = timesSent;
123+
this.transactionId = transactionId;
124+
this.event = null;
125+
this.newIntegrationEventLogEntry = false;
126+
}
127+
93128
}

src/BuildingBlocks/EventBus/eventbus/src/main/java/com/eshoponcontainers/eventbus/events/IntegrationEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.eshoponcontainers.eventbus.events;
22

3+
import java.time.LocalDateTime;
34
import java.util.Date;
45
import java.util.UUID;
56

src/BuildingBlocks/EventBus/eventbus/src/main/java/com/eshoponcontainers/services/impl/DefaultIntegrationEventLogService.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,35 +35,35 @@ public Flux<IntegrationEventLogEntry> retrieveEventLogsPendingToPublish(UUID tra
3535
@Override
3636
public Mono<Void> saveEvent(IntegrationEvent event, UUID transactionId) {
3737
IntegrationEventLogEntry logEntry = new IntegrationEventLogEntry(event, transactionId);
38-
eventLogRepository.save(logEntry);
39-
return Mono.empty();
38+
System.out.println("logEntry : " + logEntry);
39+
return Mono.just(eventLogRepository.save(logEntry).subscribe()).then();
4040
}
4141

4242
@Override
4343
public Mono<Void> markEventAsPublished(UUID eventId) {
44-
45-
return updateEventStatus(eventId, EventStateEnum.PUBLISHED);
44+
return updateEventStatus(eventId, EventStateEnum.PUBLISHED.getValue());
4645
}
4746

4847
@Override
4948
public Mono<Void> markEventAsInProgress(UUID eventId) {
50-
return updateEventStatus(eventId, EventStateEnum.IN_PROGRESS);
49+
return updateEventStatus(eventId, EventStateEnum.IN_PROGRESS.getValue());
5150
}
5251

5352
@Override
5453
public Mono<Void> markEventAsFailed(UUID eventId) {
55-
return updateEventStatus(eventId, EventStateEnum.PUBLISH_FAILED);
56-
54+
return updateEventStatus(eventId, EventStateEnum.PUBLISH_FAILED.getValue());
5755
}
5856

59-
private Mono<Void> updateEventStatus(UUID eventId, EventStateEnum status) {
60-
eventLogRepository.findById(eventId).map(event -> {
57+
private Mono<Void> updateEventStatus(UUID eventId, Integer status) {
58+
return Mono.just(eventLogRepository.findById(eventId)
59+
.map(event -> {
6160
event.setState(status);
62-
if (event.getState() == EventStateEnum.IN_PROGRESS) {
61+
System.out.println("eventId : " + eventId + " Status : " + status);
62+
if (event.getState() == EventStateEnum.IN_PROGRESS.getValue()) {
6363
event.setTimesSent(event.getTimesSent() + 1);
6464
}
65-
return eventLogRepository.save(event);
66-
});
67-
return Mono.empty();
65+
event.setNewIntegrationEventLogEntry(false);
66+
return eventLogRepository.save(event).subscribe();
67+
}).subscribe()).then();
6868
}
6969
}

0 commit comments

Comments
 (0)