Skip to content

Commit 8b4b6ca

Browse files
committed
Calling the right function upon invocation
1 parent 705e463 commit 8b4b6ca

File tree

1 file changed

+52
-13
lines changed

1 file changed

+52
-13
lines changed

internal/stream/stream.go

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,37 @@ package stream
22

33
import (
44
"github.com/confluentinc/confluent-kafka-go/kafka"
5+
"gostreambridge/internal/database/dsprocessors"
56
"gostreambridge/internal/queue/processors"
67
"log"
78
"os"
89
"os/signal"
910
"syscall"
10-
"time"
1111
)
1212

1313
func StartStreamBridge(bridgeDetails map[string]string) {
14-
ConsumeMessages(bridgeDetails["upstreamApp"])
14+
ConsumeMessages(bridgeDetails)
1515
}
1616

17-
func ConsumeMessages(upstreamQueueType string) {
17+
func ConsumeMessages(bridgeDetails map[string]string) {
1818

19-
switch upstreamQueueType {
19+
switch bridgeDetails["upstreamApp"] {
2020
case "kafka":
2121
// Create a Kafka message consumer
2222
messages := processors.ConsumeKafkaMessages()
2323
// Process each message
2424
for msg := range messages {
25-
ProcessKafkaMessage(msg)
25+
ProcessKafkaMessage(bridgeDetails["downstreamApp"], msg)
2626
}
2727
case "rabbitmq":
2828
// Create a Kafka message consumer
2929
messages := processors.ConsumeAMQPMessages()
3030
// Process each message
3131
for msg := range messages {
32-
ProcessAMQPMessage(msg)
32+
ProcessAMQPMessage(bridgeDetails["downstreamApp"], msg)
3333
}
3434
default:
35-
log.Fatalf("Unsupported message queue type: %s", upstreamQueueType)
35+
log.Fatalf("Unsupported message queue type: %s", bridgeDetails["upstreamApp"])
3636
}
3737

3838
// Wait for termination signal
@@ -42,15 +42,54 @@ func ConsumeMessages(upstreamQueueType string) {
4242
}
4343

4444
// ProcessAMQPMessage Simulating message processing
45-
func ProcessAMQPMessage(msg []byte) {
46-
// Simulating processing time
47-
time.Sleep(2 * time.Second)
45+
func ProcessAMQPMessage(downstreamAppName string, msg []byte) {
46+
// Calling the appropriate downstream flow
47+
message := string(msg)
48+
err := DetermineDownstreamFlow(downstreamAppName, message)
49+
if err != nil {
50+
return
51+
}
4852
log.Printf("Processed message: %s\n", string(msg))
4953
}
5054

5155
// ProcessKafkaMessage Simulating message processing
52-
func ProcessKafkaMessage(msg *kafka.Message) {
53-
// Simulating processing time
54-
time.Sleep(2 * time.Second)
56+
func ProcessKafkaMessage(downstreamAppName string, msg *kafka.Message) {
57+
// Calling the appropriate downstream flow
58+
err := DetermineDownstreamFlow(downstreamAppName, string(msg.Value))
59+
if err != nil {
60+
return
61+
}
5562
log.Printf("Processed message: %s\n", string(msg.Value))
5663
}
64+
65+
// DetermineDownstreamFlow to determine which function to call downstream
66+
func DetermineDownstreamFlow(downstreamAppName string, message string) error {
67+
switch downstreamAppName {
68+
case "mysql":
69+
err := dsprocessors.WriteToMySQL(message)
70+
if err != nil {
71+
return err
72+
}
73+
return nil
74+
case "sqlserver":
75+
err := dsprocessors.WriteToSQLServer(message)
76+
if err != nil {
77+
return err
78+
}
79+
return nil
80+
case "elastic":
81+
err := dsprocessors.WriteToElastic(message)
82+
if err != nil {
83+
return err
84+
}
85+
return nil
86+
case "kafka":
87+
err := processors.WriteToKafka(message)
88+
if err != nil {
89+
return err
90+
}
91+
return nil
92+
default:
93+
return nil
94+
}
95+
}

0 commit comments

Comments
 (0)