-
Notifications
You must be signed in to change notification settings - Fork 552
[Tests] Improve kafka trigger tests #3821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development
Are you sure you want to change the base?
Conversation
8d0ca8b
to
5cb4eab
Compare
3ba58cb
to
a73faf2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty good overall!
Left some comments and suggestions
events_log_file.write(serialized_record + ', ') | ||
|
||
# mark offset | ||
context.last_processed_offsets.set_last_processed_offset(topic=event.topic, partition= event.shard_id, offset=event.offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context.last_processed_offsets.set_last_processed_offset(topic=event.topic, partition= event.shard_id, offset=event.offset) | |
context.last_processed_offsets.set_last_processed_offset(topic=event.topic, partition=event.shard_id, offset=event.offset) |
|
||
def set_last_processed_offset(self, topic, partition, offset): | ||
self.last_processed_offset_of_topic[(topic, partition)] = offset | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def drain_callback(): | ||
print('Drain callback called!') | ||
sleep_interval = os.getenv("DRAIN_SLEEP_INTERVAL", "1") | ||
time.sleep(float(sleep_interval)) | ||
print('Slept for {0} seconds'.format(sleep_interval)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used at all, can be removed
def drain(self): | ||
return self._drain() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed and not just async def drain(self)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomerShor that's a good question. Because now drain can only be a sync function, so if we return async function, it won't be awaited on, but if we return from a sync function function that is awaitable=a coroutine, then it will be awaited on
nuclio/pkg/processor/runtime/python/py/_nuclio_async_wrapper.py
Lines 196 to 198 in 3ef6f63
result = self._call_drain_handler() | |
if asyncio.iscoroutine(result): | |
await result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at the beginning it wasn't meant to be async function, but then this part was added as workaround by MM team. The same logic was used for handling async handlers and we had to keep BC here #3810
.PHONY: python-runtime-tests | ||
python-runtime-tests: | ||
@make list-all-dirs-with-tests | grep "pkg/processor/runtime/python/test" | ||
@make list-all-dirs-with-tests | grep "pkg/processor/trigger/kafka/test" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure about that?
python-runtime-tests
should run python runtime tests, not kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomerShor this was temporiry for running tests, I'll change it back. This was needed to be able to run kafka tests separately and avoid timeout, because I could not change the ci worflow itself because it always runs the development
suite.Require().NoError(err, "Failed to close shared file") | ||
|
||
// remove the file at the end of the test | ||
// since the fine can be quite big, do not rely on autodelete of temp dir |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// since the fine can be quite big, do not rely on autodelete of temp dir | |
// since the file can be quite big, do not rely on auto-deletion of temp dir |
// wait a bit for the function to start | ||
time.Sleep(5 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment why is this needed?
Also usually sleeping is not stable in CI, so maybe there's another way for achieving the same thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll a comment, unfortunaltely I didn't notice any normal way to do so and it haven't reproduced it CI, so maybe it's just local setup issue.
for partitionIdx := 0; partitionIdx < testCase.partitionNum; partitionIdx++ { | ||
for i := 0; i < messagesPerPartitionPerCycle; i++ { | ||
body := fmt.Sprintf("cycle-%d-msg-%d-part-%d-round-2", cycle, i, partitionIdx) | ||
err := suite.publishMessageToTopicOnSpecificShard(topic, body, int32(partitionIdx)) | ||
messageHistory = append(messageHistory, body) | ||
suite.Require().NoError(err, "Failed to publish message") | ||
} | ||
} | ||
|
||
err := common.RetryUntilSuccessful(30*time.Second, | ||
2*time.Second, | ||
func() bool { | ||
receivedBodies := suite.resolveReceivedEventBodies(deployResult) | ||
// wait until we see at least one new message in the original function | ||
return len(receivedBodies) > 0 | ||
}) | ||
suite.Require().NoError(err, "Failed to get initial events") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is duplicated for the third time - move to a new method
// wait until we see at least one new message in the original function | ||
return len(receivedBodies) > 0 | ||
}) | ||
suite.Require().NoError(err, "Failed to get initial events") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No initial events here, add the cycle number to the error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomerShor they are for this function, I'll just add the cycle
// return true | ||
// }) | ||
//} | ||
func (suite *testSuite) TestFeatureCombinations() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment on what this test aims to check
### 📝 Description Bug was found during writing tests in #3821 We did `docker rm -f` instead of `docker stop`, which didn’t allow to run termination callback properly Docker stop docs: [docker container stop](https://docs.docker.com/reference/cli/docker/container/stop/) docker rm docs: [docker container rm](https://docs.docker.com/reference/cli/docker/container/rm/) --- ### ✅ Checklist - [ ] I updated the documentation (if applicable) - [x] I have tested the changes in this PR --- ### 🧪 Testing tested as part of #3821 --- ### 🔗 References - Ticket link: https://iguazio.atlassian.net/browse/NUC-609 - Design docs links: - External links: --- ### 🚨 Breaking Changes? - [ ] Yes (explain below) - [x] No <!-- If yes, describe what needs to be changed downstream: --> ---
# Conflicts: # pkg/dockerclient/shell.go # pkg/platform/local/platform.go # pkg/platform/local/test/platform_test.go # pkg/processor/test/suite/suite.go
📝 Description
Adds a test with configurable number of draining cycles which are created by deploying new functions on every cycle. This test ensures that all the messages were processed and committed.
🛠️ Changes Made
docker rm -f
which is basically aSIGKILL
which didn't allow for graceful termination. Now when deleting function, nuclio doesdocker stop -t 30
which gives 30s to function to terminate (the same time as kubernetes has). And then we dorm -f
just to make sure that function is removed no matter what.✅ Checklist
🔗 References
🚨 Breaking Changes?