Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/processor/runtime/python/py/requirements/dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pytest
memory_profiler
six

62 changes: 36 additions & 26 deletions pkg/processor/runtime/python/py/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import base64
import functools
import http.client
import json
Expand Down Expand Up @@ -129,6 +130,18 @@ async def append_event(_event):
self.assertEqual(recorded_event_index, recorded_event.id)
self.assertEqual('e{}'.format(recorded_event_index), self._ensure_str(recorded_event.body))

def test_sync_handler_that_returns_None(self):
def sync_handler(context, event):
async def async_work():
# Simulate I/O or async computation
await asyncio.sleep(0.01)
return "result_from_async"
return async_work() # returns coroutine

self._wrapper._entrypoint = sync_handler
output = asyncio.run(self._wrapper._call_entrypoint(event=nuclio_sdk.Event(_id=1)))
assert output == 'result_from_async'

def test_non_utf8_headers(self):
"""
This test validates the expected behavior for a non-utf8 event field contents
Expand Down Expand Up @@ -292,8 +305,7 @@ def event_recorder(ctx, event):
self.assertEqual(recorded_event_index, recorded_event.id)
self.assertEqual('e{}'.format(recorded_event_index), self._ensure_str(recorded_event.body))


async def test_encode_streaming_entrypoint_output(self):
def test_encode_streaming_entrypoint_output(self):
# Simulated streaming output (e.g., async generator)
async def streaming_handler_output():
yield "chunk1"
Expand All @@ -303,10 +315,7 @@ async def streaming_handler_output():
entrypoint_output = streaming_handler_output()

# Collect packets from the async generator
packets = [
(prefix, payload)
async for prefix, payload in self._wrapper._generate_processor_packets(entrypoint_output, start_time=0)
]
packets = asyncio.run(self._collect_packets_async(entrypoint_output))

# Extract prefix sequence for ordering check
prefixes = [prefix for prefix, _ in packets]
Expand All @@ -322,25 +331,20 @@ async def streaming_handler_output():
prefix: payload for prefix, payload in packets
}

self.assertEqual(payload_by_prefix[PacketType.STREAM_START], json.dumps("chunk1"))
self.assertEqual(payload_by_prefix[PacketType.BODY_CHUNK], json.dumps("chunk2"))
self.assertEqual(json.loads(payload_by_prefix[PacketType.STREAM_START])["body"], "chunk1")
self.assertEqual(payload_by_prefix[PacketType.BODY_CHUNK], base64.b64encode(b"chunk2").decode())
self.assertIn(PacketType.END_OF_STREAM, prefixes)
self.assertIn(PacketType.METRICS, prefixes)
self.assertNotIn(PacketType.SINGLE_RESPONSE, prefixes)

async def test_encode_single_value_entrypoint_output(self):
def test_encode_single_value_entrypoint_output(self):
# Simulate regular async function returning a single value
async def single_value_handler_output():
return "ok"

# Call the function and await the result
entrypoint_output = await single_value_handler_output()

# Pass it into the packet generator
packets = [
(prefix, payload)
async for prefix, payload in self._wrapper._generate_processor_packets(entrypoint_output, start_time=0)
]
entrypoint_output = asyncio.run(single_value_handler_output())
packets = asyncio.run(self._collect_packets_async(entrypoint_output))
self.assertEqual(len(packets), 2)

prefixes = [prefix for prefix, _ in packets]
Expand All @@ -358,10 +362,16 @@ async def single_value_handler_output():

self.assertEqual(
payload_by_prefix[PacketType.SINGLE_RESPONSE],
json.dumps({"body": "ok", "status_code": 200})
json.dumps({
"body": "ok",
"content_type": "text/plain",
"headers": {},
"status_code": 200,
"body_encoding": "text",
})
)

async def test_encode_batched_entrypoint_output(self):
def test_encode_batched_entrypoint_output(self):
single_response = nuclio_sdk.Response(
body=str(123),
headers={},
Expand All @@ -370,16 +380,10 @@ async def test_encode_batched_entrypoint_output(self):
)

# Consume single response packets
single_packets = [
(prefix, payload) async for prefix, payload in
self._wrapper._generate_processor_packets(single_response, start_time=0)
]
single_packets = asyncio.run(self._collect_packets_async(single_response))

# Consume batch response packets
batch_packets = [
(prefix, payload) async for prefix, payload in
self._wrapper._generate_processor_packets([single_response, single_response], start_time=0)
]
batch_packets = asyncio.run(self._collect_packets_async([single_response, single_response]))

# Extract the actual payloads for comparison
single_payload = next((payload for prefix, payload in single_packets if prefix == "r"), None)
Expand Down Expand Up @@ -425,6 +429,12 @@ async def test_encode_batched_entrypoint_output(self):
# profiled_serve_requests_func(num_requests=num_of_events)
# self.assertEqual(num_of_events, self._wrapper._entrypoint.call_count, 'Received unexpected number of events')

async def _collect_packets_async(self, entrypoint_output):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add newline before function

return [
(prefix, payload)
async for prefix, payload in self._wrapper._generate_processor_packets(entrypoint_output, start_time=0)
]

def _send_events(self, events):
self._wait_for_socket_creation()
for event in events:
Expand Down
8 changes: 6 additions & 2 deletions pkg/processor/runtime/python/py/wrapper_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,17 @@ def _connect_to_processor(self, socket_path, timeout=60):
async def _handle_event(self, event, sock):
# take call time
start_time = time.time()
entrypoint_output = await self._call_entrypoint(event)
await self._handle_entrypoint_output(entrypoint_output, start_time, sock)

async def _call_entrypoint(self, event):
if self._should_await_entrypoint:
entrypoint_output = await self._entrypoint(self._context, event)
else:
entrypoint_output = self._entrypoint(self._context, event)

await self._handle_entrypoint_output(entrypoint_output, start_time, sock)
if asyncio.iscoroutine(entrypoint_output):
entrypoint_output = await entrypoint_output
return entrypoint_output

async def _handle_entrypoint_output(self, entrypoint_output, start_time, sock):
async for prefix, payload in self._generate_processor_packets(entrypoint_output, start_time):
Expand Down
Loading