Skip to content

Commit 46abbfb

Browse files
committed
Fixed Kafka bug & added a controller to send message using API
1 parent a7cc2c3 commit 46abbfb

File tree

7 files changed

+36
-10
lines changed

7 files changed

+36
-10
lines changed

Diff for: polling-publisher/polling-service/src/main/java/com/iluwatar/polling/DataRepository.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,21 @@
2727

2828
import java.util.HashMap;
2929
import java.util.Map;
30-
import org.springframework.stereotype.Repository;
3130
import javax.annotation.PostConstruct;
31+
import org.springframework.stereotype.Repository;
3232

3333

3434
/**
3535
* Data repository to keep or store data.
3636
*/
3737
@Repository
38-
public final class DataRepository {
38+
public class DataRepository {
3939

4040
private final Map<Integer, String> dataStorage = new HashMap<>();
4141

42+
/**
43+
* init after map creation ... to put dummy data.
44+
*/
4245
@PostConstruct
4346
public void init() {
4447
// Injecting dummy data at startup

Diff for: polling-publisher/polling-service/src/main/java/com/iluwatar/polling/DataSourceService.java

+3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public class DataSourceService {
3838

3939
private final DataRepository repository;
4040

41+
/**
42+
* Constructor & Scheduler to push random data.
43+
*/
4144
public DataSourceService(DataRepository repository) {
4245
this.repository = repository;
4346

Diff for: polling-publisher/polling-service/src/main/java/com/iluwatar/polling/KafkaProducer.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@
2727

2828
import org.springframework.beans.factory.annotation.Autowired;
2929
import org.springframework.kafka.core.KafkaTemplate;
30-
import org.springframework.stereotype.Service;
30+
import org.springframework.stereotype.Component;
3131

3232

3333
/**
3434
* This class is responsible for sending messages to Kafka.
3535
*/
36-
@Service
36+
@Component
3737
public class KafkaProducer {
3838

39+
@Autowired
3940
private final KafkaTemplate<String, String> kafkaTemplate;
4041

4142
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
@@ -47,7 +48,7 @@ public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
4748
*
4849
* @param message The message to send.
4950
*/
50-
public void sendMessage(String message) {
51-
kafkaTemplate.send("updates", message);
51+
public void sendMessage(String topic, String message) {
52+
kafkaTemplate.send(topic, message);
5253
}
5354
}

Diff for: polling-publisher/polling-service/src/main/java/com/iluwatar/polling/PollingController.java

+13
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525

2626
package com.iluwatar.polling;
2727

28+
import org.springframework.beans.factory.annotation.Autowired;
2829
import org.springframework.web.bind.annotation.GetMapping;
30+
import org.springframework.web.bind.annotation.PostMapping;
31+
import org.springframework.web.bind.annotation.RequestParam;
2932
import org.springframework.web.bind.annotation.RestController;
3033

3134
/**
@@ -38,4 +41,14 @@ public class PollingController {
3841
public String healthCheck() {
3942
return "Polling Service is up and running!";
4043
}
44+
45+
46+
@Autowired
47+
private KafkaProducer kafkaProducer;
48+
49+
@PostMapping("/send")
50+
public String sendMessage(@RequestParam("message") String message) {
51+
kafkaProducer.sendMessage("api-message", message);
52+
return "Message sent: " + message;
53+
}
4154
}

Diff for: polling-publisher/polling-service/src/main/java/com/iluwatar/polling/PollingScheduler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525

2626
package com.iluwatar.polling;
2727

28+
import java.util.Random;
2829
import org.springframework.beans.factory.annotation.Autowired;
2930
import org.springframework.scheduling.annotation.Scheduled;
3031
import org.springframework.stereotype.Component;
31-
import java.util.Random;
3232

3333
/**
3434
* This class is responsible for scheduling polling tasks.
@@ -52,7 +52,7 @@ public void pollDataSource() {
5252

5353
if (data != null) {
5454
System.out.println("🟢 Publishing Data: " + data);
55-
kafkaProducer.sendMessage(data);
55+
kafkaProducer.sendMessage("update", data);
5656
} else {
5757
System.out.println("🔴 No Data Found for ID: " + id);
5858
}

Diff for: polling-publisher/polling-service/src/main/resources/application.yml

+6-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@
2323

2424
server:
2525
port: 8081
26+
2627
spring:
2728
kafka:
29+
bootstrap-servers: localhost:9092 # Kafka broker address(es)
2830
consumer:
29-
group-id: subscriber-group
31+
group-id: subscriber-group # Consumer group ID
32+
auto-offset-reset: earliest # How to reset offset on consumer startup
33+
# Add other consumer properties here
34+

Diff for: polling-publisher/polling-service/src/test/java/com/iluwatar/polling/AppTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@
2424
*/
2525
package com.iluwatar.polling;
2626

27+
import org.junit.jupiter.api.Disabled;
2728
import org.junit.jupiter.api.Test;
2829
import org.springframework.boot.test.context.SpringBootTest;
2930

3031
import static org.junit.jupiter.api.Assertions.*;
3132

3233
@SpringBootTest
33-
class AppTest {
34+
public class AppTest {
3435

3536
@Test
3637
void polling() {

0 commit comments

Comments
 (0)