Skip to content

Conversation

rokatyy
Copy link
Contributor

@rokatyy rokatyy commented Sep 25, 2025

📝 Description

Adds a test with configurable number of draining cycles which are created by deploying new functions on every cycle. This test ensures that all the messages were processed and committed.


🛠️ Changes Made

  • add a test for combination of different streaming features
  • fixed bug https://iguazio.atlassian.net/browse/NUC-609. We used to terminate functions in a local platform using docker rm -f which is basically a SIGKILL which didn't allow for graceful termination. Now when deleting function, nuclio does docker stop -t 30 which gives 30s to function to terminate (the same time as kubernetes has). And then we do rm -f just to make sure that function is removed no matter what.

✅ Checklist

  • I updated the documentation (if applicable)
  • I have tested the changes in this PR

🔗 References


🚨 Breaking Changes?

  • Yes (explain below)
  • No

Copy link
Contributor

@TomerShor TomerShor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty good overall!
Left some comments and suggestions

events_log_file.write(serialized_record + ', ')

# mark offset
context.last_processed_offsets.set_last_processed_offset(topic=event.topic, partition= event.shard_id, offset=event.offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
context.last_processed_offsets.set_last_processed_offset(topic=event.topic, partition= event.shard_id, offset=event.offset)
context.last_processed_offsets.set_last_processed_offset(topic=event.topic, partition=event.shard_id, offset=event.offset)


def set_last_processed_offset(self, topic, partition, offset):
self.last_processed_offset_of_topic[(topic, partition)] = offset

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

Comment on lines 81 to 85
def drain_callback():
print('Drain callback called!')
sleep_interval = os.getenv("DRAIN_SLEEP_INTERVAL", "1")
time.sleep(float(sleep_interval))
print('Slept for {0} seconds'.format(sleep_interval))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used at all, can be removed

Comment on lines +96 to +97
def drain(self):
return self._drain()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed and not just async def drain(self) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomerShor that's a good question. Because now drain can only be a sync function, so if we return async function, it won't be awaited on, but if we return from a sync function function that is awaitable=a coroutine, then it will be awaited on

result = self._call_drain_handler()
if asyncio.iscoroutine(result):
await result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at the beginning it wasn't meant to be async function, but then this part was added as workaround by MM team. The same logic was used for handling async handlers and we had to keep BC here #3810

.PHONY: python-runtime-tests
python-runtime-tests:
@make list-all-dirs-with-tests | grep "pkg/processor/runtime/python/test"
@make list-all-dirs-with-tests | grep "pkg/processor/trigger/kafka/test"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure about that?
python-runtime-tests should run python runtime tests, not kafka

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomerShor this was temporiry for running tests, I'll change it back. This was needed to be able to run kafka tests separately and avoid timeout, because I could not change the ci worflow itself because it always runs the development

suite.Require().NoError(err, "Failed to close shared file")

// remove the file at the end of the test
// since the fine can be quite big, do not rely on autodelete of temp dir
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// since the fine can be quite big, do not rely on autodelete of temp dir
// since the file can be quite big, do not rely on auto-deletion of temp dir

Comment on lines 604 to 605
// wait a bit for the function to start
time.Sleep(5 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment why is this needed?
Also usually sleeping is not stable in CI, so maybe there's another way for achieving the same thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll a comment, unfortunaltely I didn't notice any normal way to do so and it haven't reproduced it CI, so maybe it's just local setup issue.

Comment on lines 655 to 671
for partitionIdx := 0; partitionIdx < testCase.partitionNum; partitionIdx++ {
for i := 0; i < messagesPerPartitionPerCycle; i++ {
body := fmt.Sprintf("cycle-%d-msg-%d-part-%d-round-2", cycle, i, partitionIdx)
err := suite.publishMessageToTopicOnSpecificShard(topic, body, int32(partitionIdx))
messageHistory = append(messageHistory, body)
suite.Require().NoError(err, "Failed to publish message")
}
}

err := common.RetryUntilSuccessful(30*time.Second,
2*time.Second,
func() bool {
receivedBodies := suite.resolveReceivedEventBodies(deployResult)
// wait until we see at least one new message in the original function
return len(receivedBodies) > 0
})
suite.Require().NoError(err, "Failed to get initial events")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicated for the third time - move to a new method

// wait until we see at least one new message in the original function
return len(receivedBodies) > 0
})
suite.Require().NoError(err, "Failed to get initial events")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No initial events here, add the cycle number to the error

Copy link
Contributor Author

@rokatyy rokatyy Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomerShor they are for this function, I'll just add the cycle

// return true
// })
//}
func (suite *testSuite) TestFeatureCombinations() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment on what this test aims to check

rokatyy added a commit that referenced this pull request Oct 1, 2025
### 📝 Description
Bug was found during writing tests in
#3821

We did `docker rm -f` instead of `docker stop`, which didn’t allow to
run termination callback properly

Docker stop docs: [docker container
stop](https://docs.docker.com/reference/cli/docker/container/stop/)
docker rm docs: [docker container
rm](https://docs.docker.com/reference/cli/docker/container/rm/)

---

### ✅ Checklist
- [ ] I updated the documentation (if applicable)
- [x] I have tested the changes in this PR

---

### 🧪 Testing
tested as part of #3821

---

### 🔗 References
- Ticket link: https://iguazio.atlassian.net/browse/NUC-609
- Design docs links:
- External links:

---

### 🚨 Breaking Changes?

- [ ] Yes (explain below)
- [x] No

<!-- If yes, describe what needs to be changed downstream: -->

---
# Conflicts:
#	pkg/dockerclient/shell.go
#	pkg/platform/local/platform.go
#	pkg/platform/local/test/platform_test.go
#	pkg/processor/test/suite/suite.go
@rokatyy rokatyy marked this pull request as ready for review October 3, 2025 15:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants