Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pulsar/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@ async def close(self) -> None:
self._consumer.close_async(functools.partial(_set_future, future, value=None))
await future

async def get_last_message_id(self) -> _pulsar.MessageId:
"""
Asynchronously get the last message id.
"""
future = asyncio.get_running_loop().create_future()
self._consumer.get_last_message_id_async(functools.partial(_set_future, future))
id = await future
return id

def topic(self) -> str:
"""
Return the topic this consumer is subscribed to.
Expand Down
8 changes: 7 additions & 1 deletion src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ MessageId Consumer_get_last_message_id(Consumer& consumer) {
return msgId;
}

void Consumer_get_last_message_id_async(Consumer& consumer, GetLastMessageIdCallback callback) {
py::gil_scoped_release release;
consumer.getLastMessageIdAsync(callback);
}

void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
py::gil_scoped_release release;
consumer.receiveAsync(callback);
Expand Down Expand Up @@ -194,7 +199,8 @@ void export_consumer(py::module_& m) {
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync)
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id)
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync)
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync_message_id)
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync_message_id)
.def("get_last_message_id_async", &Consumer_get_last_message_id_async)
.def("close_async", &Consumer_closeAsync)
.def("unsubscribe_async", &Consumer_unsubscribeAsync)
.def("seek_async", &Consumer_seekAsync)
Expand Down
16 changes: 16 additions & 0 deletions tests/asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)

import pulsar # pylint: disable=import-error
import _pulsar # pylint: disable=import-error
from pulsar.asyncio import ( # pylint: disable=import-error
Client,
Consumer,
Expand Down Expand Up @@ -267,6 +268,21 @@ async def verify_receive(consumer: Consumer):
await verify_receive(consumer)
await consumer.close()

async def test_consumer_get_last_message_id(self):
topic = f'asyncio-test-get-last-message-id-{time.time()}'
sub = 'sub'
consumer = await self._client.subscribe(topic, sub,
consumer_type=pulsar.ConsumerType.Shared)
producer = await self._client.create_producer(topic)
for i in range(5):
msg = f'msg-{i}'.encode()
await producer.send(msg)
last_msg_id = await consumer.get_last_message_id()
assert isinstance(last_msg_id, _pulsar.MessageId)
assert last_msg_id.entry_id() == i
await consumer.acknowledge(last_msg_id)
await consumer.close()

async def test_unsubscribe(self):
topic = f'asyncio-test-unsubscribe-{time.time()}'
sub = 'sub'
Expand Down