@@ -2,37 +2,37 @@ package stream
22
33import (
44 "github.com/confluentinc/confluent-kafka-go/kafka"
5+ "gostreambridge/internal/database/dsprocessors"
56 "gostreambridge/internal/queue/processors"
67 "log"
78 "os"
89 "os/signal"
910 "syscall"
10- "time"
1111)
1212
1313func StartStreamBridge (bridgeDetails map [string ]string ) {
14- ConsumeMessages (bridgeDetails [ "upstreamApp" ] )
14+ ConsumeMessages (bridgeDetails )
1515}
1616
17- func ConsumeMessages (upstreamQueueType string ) {
17+ func ConsumeMessages (bridgeDetails map [ string ] string ) {
1818
19- switch upstreamQueueType {
19+ switch bridgeDetails [ "upstreamApp" ] {
2020 case "kafka" :
2121 // Create a Kafka message consumer
2222 messages := processors .ConsumeKafkaMessages ()
2323 // Process each message
2424 for msg := range messages {
25- ProcessKafkaMessage (msg )
25+ ProcessKafkaMessage (bridgeDetails [ "downstreamApp" ], msg )
2626 }
2727 case "rabbitmq" :
2828 // Create a Kafka message consumer
2929 messages := processors .ConsumeAMQPMessages ()
3030 // Process each message
3131 for msg := range messages {
32- ProcessAMQPMessage (msg )
32+ ProcessAMQPMessage (bridgeDetails [ "downstreamApp" ], msg )
3333 }
3434 default :
35- log .Fatalf ("Unsupported message queue type: %s" , upstreamQueueType )
35+ log .Fatalf ("Unsupported message queue type: %s" , bridgeDetails [ "upstreamApp" ] )
3636 }
3737
3838 // Wait for termination signal
@@ -42,15 +42,54 @@ func ConsumeMessages(upstreamQueueType string) {
4242}
4343
4444// ProcessAMQPMessage Simulating message processing
45- func ProcessAMQPMessage (msg []byte ) {
46- // Simulating processing time
47- time .Sleep (2 * time .Second )
45+ func ProcessAMQPMessage (downstreamAppName string , msg []byte ) {
46+ // Calling the appropriate downstream flow
47+ message := string (msg )
48+ err := DetermineDownstreamFlow (downstreamAppName , message )
49+ if err != nil {
50+ return
51+ }
4852 log .Printf ("Processed message: %s\n " , string (msg ))
4953}
5054
5155// ProcessKafkaMessage Simulating message processing
52- func ProcessKafkaMessage (msg * kafka.Message ) {
53- // Simulating processing time
54- time .Sleep (2 * time .Second )
56+ func ProcessKafkaMessage (downstreamAppName string , msg * kafka.Message ) {
57+ // Calling the appropriate downstream flow
58+ err := DetermineDownstreamFlow (downstreamAppName , string (msg .Value ))
59+ if err != nil {
60+ return
61+ }
5562 log .Printf ("Processed message: %s\n " , string (msg .Value ))
5663}
64+
65+ // DetermineDownstreamFlow to determine which function to call downstream
66+ func DetermineDownstreamFlow (downstreamAppName string , message string ) error {
67+ switch downstreamAppName {
68+ case "mysql" :
69+ err := dsprocessors .WriteToMySQL (message )
70+ if err != nil {
71+ return err
72+ }
73+ return nil
74+ case "sqlserver" :
75+ err := dsprocessors .WriteToSQLServer (message )
76+ if err != nil {
77+ return err
78+ }
79+ return nil
80+ case "elastic" :
81+ err := dsprocessors .WriteToElastic (message )
82+ if err != nil {
83+ return err
84+ }
85+ return nil
86+ case "kafka" :
87+ err := processors .WriteToKafka (message )
88+ if err != nil {
89+ return err
90+ }
91+ return nil
92+ default :
93+ return nil
94+ }
95+ }
0 commit comments