From 4f19a12f9a899a87abf26674e51af02a6f3e1b15 Mon Sep 17 00:00:00 2001 From: javierdfm Date: Fri, 9 Jan 2026 14:17:48 +0100 Subject: [PATCH 1/6] feat: add batch updates support --- .../async_chat/channel_batch_updater.py | 239 ++++++++++++++ stream_chat/async_chat/client.py | 28 ++ stream_chat/base/client.py | 21 ++ stream_chat/channel_batch_updater.py | 239 ++++++++++++++ stream_chat/client.py | 29 +- .../async_chat/test_channel_batch_updater.py | 310 ++++++++++++++++++ .../tests/test_channel_batch_updater.py | 304 +++++++++++++++++ stream_chat/types/channel_batch.py | 93 ++++++ 8 files changed, 1262 insertions(+), 1 deletion(-) create mode 100644 stream_chat/async_chat/channel_batch_updater.py create mode 100644 stream_chat/channel_batch_updater.py create mode 100644 stream_chat/tests/async_chat/test_channel_batch_updater.py create mode 100644 stream_chat/tests/test_channel_batch_updater.py create mode 100644 stream_chat/types/channel_batch.py diff --git a/stream_chat/async_chat/channel_batch_updater.py b/stream_chat/async_chat/channel_batch_updater.py new file mode 100644 index 0000000..b544a24 --- /dev/null +++ b/stream_chat/async_chat/channel_batch_updater.py @@ -0,0 +1,239 @@ +from typing import List + +from stream_chat.types.channel_batch import ( + ChannelBatchMemberRequest, + ChannelDataUpdate, + ChannelsBatchFilters, + ChannelsBatchOptions, +) +from stream_chat.types.stream_response import StreamResponse + + +class ChannelBatchUpdater: + """ + Provides convenience methods for batch channel operations (async). + """ + + def __init__(self, client): + self.client = client + + async def add_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Adds members to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to add. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addMembers", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def remove_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Removes members from channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to remove. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "removeMembers", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def invite_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Invites members to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to invite. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "inviteMembers", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def add_moderators( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Adds moderators to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to add as moderators. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addModerators", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def demote_moderators( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Removes moderator role from members in channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to demote from moderators. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "demoteModerators", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def assign_roles( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Assigns roles to members in channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members with roles to assign. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "assignRoles", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def hide( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Hides channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to hide channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "hide", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def show( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Shows channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to show channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "show", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def archive( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Archives channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to archive channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "archive", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def unarchive( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Unarchives channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to unarchive channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "unarchive", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def update_data( + self, filter: ChannelsBatchFilters, data: ChannelDataUpdate + ) -> StreamResponse: + """ + Updates data on channels matching the filter. + + :param filter: The filter to match channels. + :param data: Channel data to update. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "updateData", + "filter": filter, + "data": data, + } + return await self.client.update_channels_batch(options) + + async def add_filter_tags( + self, filter: ChannelsBatchFilters, tags: List[str] + ) -> StreamResponse: + """ + Adds filter tags to channels matching the filter. + + :param filter: The filter to match channels. + :param tags: List of filter tags to add. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addFilterTags", + "filter": filter, + "filter_tags_update": tags, + } + return await self.client.update_channels_batch(options) + + async def remove_filter_tags( + self, filter: ChannelsBatchFilters, tags: List[str] + ) -> StreamResponse: + """ + Removes filter tags from channels matching the filter. + + :param filter: The filter to match channels. + :param tags: List of filter tags to remove. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "removeFilterTags", + "filter": filter, + "filter_tags_update": tags, + } + return await self.client.update_channels_batch(options) diff --git a/stream_chat/async_chat/client.py b/stream_chat/async_chat/client.py index 75b073b..2874dfb 100644 --- a/stream_chat/async_chat/client.py +++ b/stream_chat/async_chat/client.py @@ -4,6 +4,7 @@ import warnings from types import TracebackType from typing import ( + TYPE_CHECKING, Any, AsyncContextManager, Callable, @@ -15,6 +16,9 @@ Union, cast, ) + +if TYPE_CHECKING: + from stream_chat.types.channel_batch import ChannelsBatchOptions from urllib.parse import urlparse from stream_chat.async_chat.campaign import Campaign @@ -1028,6 +1032,30 @@ async def mark_delivered_simple( return await self.mark_delivered(data) + async def update_channels_batch( + self, options: "ChannelsBatchOptions" + ) -> StreamResponse: + """ + Updates channels in batch based on the provided options. + + :param options: ChannelsBatchOptions containing operation, filter, and operation-specific data. + :return: StreamResponse containing task_id. + """ + if options is None: + raise ValueError("options must not be None") + + return await self.put("channels/batch", data=options) + + def channel_batch_updater(self) -> "ChannelBatchUpdater": + """ + Returns a ChannelBatchUpdater instance for batch channel operations. + + :return: ChannelBatchUpdater instance. + """ + from stream_chat.async_chat.channel_batch_updater import ChannelBatchUpdater + + return ChannelBatchUpdater(self) + async def close(self) -> None: await self.session.close() diff --git a/stream_chat/base/client.py b/stream_chat/base/client.py index d64c6f9..b9c9827 100644 --- a/stream_chat/base/client.py +++ b/stream_chat/base/client.py @@ -1302,6 +1302,27 @@ def get_task( """ pass + @abc.abstractmethod + def update_channels_batch( + self, options: Any + ) -> Union[StreamResponse, Awaitable[StreamResponse]]: + """ + Updates channels in batch based on the provided options. + + :param options: ChannelsBatchOptions containing operation, filter, and operation-specific data. + :return: StreamResponse containing task_id. + """ + pass + + @abc.abstractmethod + def channel_batch_updater(self) -> Any: + """ + Returns a ChannelBatchUpdater instance for batch channel operations. + + :return: ChannelBatchUpdater instance. + """ + pass + @abc.abstractmethod def send_user_custom_event( self, user_id: str, event: Dict diff --git a/stream_chat/channel_batch_updater.py b/stream_chat/channel_batch_updater.py new file mode 100644 index 0000000..5007d92 --- /dev/null +++ b/stream_chat/channel_batch_updater.py @@ -0,0 +1,239 @@ +from typing import List, Optional + +from stream_chat.types.channel_batch import ( + ChannelBatchMemberRequest, + ChannelDataUpdate, + ChannelsBatchFilters, + ChannelsBatchOptions, +) +from stream_chat.types.stream_response import StreamResponse + + +class ChannelBatchUpdater: + """ + Provides convenience methods for batch channel operations. + """ + + def __init__(self, client): + self.client = client + + def add_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Adds members to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to add. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addMembers", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def remove_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Removes members from channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to remove. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "removeMembers", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def invite_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Invites members to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to invite. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "inviteMembers", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def add_moderators( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Adds moderators to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to add as moderators. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addModerators", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def demote_moderators( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Removes moderator role from members in channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to demote from moderators. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "demoteModerators", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def assign_roles( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Assigns roles to members in channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members with roles to assign. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "assignRoles", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def hide( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Hides channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to hide channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "hide", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def show( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Shows channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to show channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "show", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def archive( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Archives channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to archive channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "archive", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def unarchive( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Unarchives channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to unarchive channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "unarchive", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def update_data( + self, filter: ChannelsBatchFilters, data: ChannelDataUpdate + ) -> StreamResponse: + """ + Updates data on channels matching the filter. + + :param filter: The filter to match channels. + :param data: Channel data to update. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "updateData", + "filter": filter, + "data": data, + } + return self.client.update_channels_batch(options) + + def add_filter_tags( + self, filter: ChannelsBatchFilters, tags: List[str] + ) -> StreamResponse: + """ + Adds filter tags to channels matching the filter. + + :param filter: The filter to match channels. + :param tags: List of filter tags to add. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addFilterTags", + "filter": filter, + "filter_tags_update": tags, + } + return self.client.update_channels_batch(options) + + def remove_filter_tags( + self, filter: ChannelsBatchFilters, tags: List[str] + ) -> StreamResponse: + """ + Removes filter tags from channels matching the filter. + + :param filter: The filter to match channels. + :param tags: List of filter tags to remove. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "removeFilterTags", + "filter": filter, + "filter_tags_update": tags, + } + return self.client.update_channels_batch(options) diff --git a/stream_chat/client.py b/stream_chat/client.py index 20681bf..3a7e268 100644 --- a/stream_chat/client.py +++ b/stream_chat/client.py @@ -2,10 +2,13 @@ import json import sys import warnings -from typing import Any, Callable, Dict, Iterable, List, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Union, cast from urllib.parse import urlparse from urllib.request import Request, urlopen +if TYPE_CHECKING: + from stream_chat.types.channel_batch import ChannelsBatchOptions + from stream_chat.campaign import Campaign from stream_chat.segment import Segment from stream_chat.types.base import SortParam @@ -985,3 +988,27 @@ def mark_delivered_simple( "user_id": user_id, } return self.mark_delivered(data=data) + + def update_channels_batch( + self, options: "ChannelsBatchOptions" + ) -> StreamResponse: + """ + Updates channels in batch based on the provided options. + + :param options: ChannelsBatchOptions containing operation, filter, and operation-specific data. + :return: StreamResponse containing task_id. + """ + if options is None: + raise ValueError("options must not be None") + + return self.put("channels/batch", data=options) + + def channel_batch_updater(self) -> "ChannelBatchUpdater": + """ + Returns a ChannelBatchUpdater instance for batch channel operations. + + :return: ChannelBatchUpdater instance. + """ + from stream_chat.channel_batch_updater import ChannelBatchUpdater + + return ChannelBatchUpdater(self) diff --git a/stream_chat/tests/async_chat/test_channel_batch_updater.py b/stream_chat/tests/async_chat/test_channel_batch_updater.py new file mode 100644 index 0000000..0291ec9 --- /dev/null +++ b/stream_chat/tests/async_chat/test_channel_batch_updater.py @@ -0,0 +1,310 @@ +import asyncio +import time +import uuid +from typing import Dict + +import pytest + +from stream_chat.async_chat import StreamChatAsync +from stream_chat.async_chat.channel import Channel +from stream_chat.base.exceptions import StreamAPIException +from stream_chat.tests.async_chat.conftest import hard_delete_users +from stream_chat.types.channel_batch import ChannelsBatchOptions + + +class TestChannelBatchUpdater: + @pytest.mark.asyncio + async def test_update_channels_batch_none_options(self, client: StreamChatAsync): + """Test that update_channels_batch raises error when options is None""" + with pytest.raises(ValueError, match="options must not be None"): + await client.update_channels_batch(None) + + @pytest.mark.asyncio + async def test_update_channels_batch_valid_options( + self, client: StreamChatAsync, random_user: Dict + ): + """Test batch update channels with valid options""" + # Create two channels + ch1 = client.channel("messaging", str(uuid.uuid4())) + await ch1.create(random_user["id"]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + await ch2.create(random_user["id"]) + + try: + # Create a user to add + user_to_add = {"id": str(uuid.uuid4())} + await client.upsert_user(user_to_add) + + # Perform batch update + options: ChannelsBatchOptions = { + "operation": "addMembers", + "filter": { + "cids": {"$in": [ch1.cid, ch2.cid]} + }, + "members": [{"user_id": user_to_add["id"]}], + } + + response = await client.update_channels_batch(options) + assert "task_id" in response + assert len(response["task_id"]) > 0 + + # Cleanup + await hard_delete_users(client, [user_to_add["id"]]) + finally: + try: + await client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + @pytest.mark.asyncio + async def test_channel_batch_updater_add_members( + self, client: StreamChatAsync, random_user: Dict + ): + """Test ChannelBatchUpdater.add_members""" + # Create two channels + ch1 = client.channel("messaging", str(uuid.uuid4())) + await ch1.create(random_user["id"]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + await ch2.create(random_user["id"]) + + try: + # Create users to add + users_to_add = [{"id": str(uuid.uuid4())}, {"id": str(uuid.uuid4())}] + await client.upsert_users(users_to_add) + + updater = client.channel_batch_updater() + + members = [ + {"user_id": user["id"]} + for user in users_to_add + ] + + response = await updater.add_members( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, members + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + await asyncio.sleep(2) + + # Poll for task completion + for _ in range(120): + task = await client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + await asyncio.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify members were added + for _ in range(120): + await asyncio.sleep(1) + ch1_members = await ch1.query_members({}) + ch2_members = await ch2.query_members({}) + + ch1_member_ids = [m["user_id"] for m in ch1_members] + all_found = all( + user_id in ch1_member_ids + for user_id in [u["id"] for u in users_to_add] + ) + if all_found: + return + + pytest.fail("changes not visible after 2 minutes") + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + await asyncio.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + await asyncio.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + await asyncio.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + await hard_delete_users(client, [u["id"] for u in users_to_add]) + await client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + @pytest.mark.asyncio + async def test_channel_batch_updater_remove_members( + self, client: StreamChatAsync, random_user: Dict + ): + """Test ChannelBatchUpdater.remove_members""" + # Create users + members_id = [str(uuid.uuid4()), str(uuid.uuid4())] + users = [{"id": uid} for uid in members_id] + await client.upsert_users(users) + + # Create channels with members + ch1 = client.channel("messaging", str(uuid.uuid4())) + await ch1.create(random_user["id"]) + for member_id in members_id: + await ch1.add_members([member_id]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + await ch2.create(random_user["id"]) + for member_id in members_id: + await ch2.add_members([member_id]) + + try: + # Verify members are present + ch1_members = await ch1.query_members({}) + ch2_members = await ch2.query_members({}) + assert len(ch1_members) >= 2 + assert len(ch2_members) >= 2 + + ch1_member_ids = [m["user_id"] for m in ch1_members] + ch2_member_ids = [m["user_id"] for m in ch2_members] + assert all(mid in ch1_member_ids for mid in members_id) + assert all(mid in ch2_member_ids for mid in members_id) + + # Remove a member + updater = client.channel_batch_updater() + member_to_remove = members_id[0] + + response = await updater.remove_members( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, + [{"user_id": member_to_remove}], + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + await asyncio.sleep(2) + + # Poll for task completion + for _ in range(120): + task = await client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + await asyncio.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify member was removed + for _ in range(120): + await asyncio.sleep(1) + ch1_members = await ch1.query_members({}) + + ch1_member_ids = [m["user_id"] for m in ch1_members] + if member_to_remove not in ch1_member_ids: + return + + pytest.fail( + f"changes not visible after 2 minutes. Channel 1 still has members: {ch1_member_ids}" + ) + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + await asyncio.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + await asyncio.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + await asyncio.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + await hard_delete_users(client, members_id) + await client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + @pytest.mark.asyncio + async def test_channel_batch_updater_archive( + self, client: StreamChatAsync, random_user: Dict + ): + """Test ChannelBatchUpdater.archive""" + # Create users + members_id = [str(uuid.uuid4()), str(uuid.uuid4())] + users = [{"id": uid} for uid in members_id] + await client.upsert_users(users) + + # Create channels with members + ch1 = client.channel("messaging", str(uuid.uuid4())) + await ch1.create(random_user["id"]) + for member_id in members_id: + await ch1.add_members([member_id]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + await ch2.create(random_user["id"]) + for member_id in members_id: + await ch2.add_members([member_id]) + + try: + updater = client.channel_batch_updater() + + response = await updater.archive( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, + [{"user_id": members_id[0]}], + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + await asyncio.sleep(2) + + # Poll for task completion + for _ in range(120): + task = await client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + await asyncio.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify channel was archived + for _ in range(120): + await asyncio.sleep(1) + ch1_state = await ch1.query() + + # Check archived_at in channel state for the user + if "channel" in ch1_state and "members" in ch1_state["channel"]: + for m in ch1_state["channel"]["members"]: + if m["user_id"] == members_id[0]: + if m.get("archived_at") is not None: + return + break + # Also check top-level members if present + if "members" in ch1_state: + for m in ch1_state["members"]: + if m["user_id"] == members_id[0]: + if m.get("archived_at") is not None: + return + break + + pytest.fail("changes not visible after 2 minutes") + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + await asyncio.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + await asyncio.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + await asyncio.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + await hard_delete_users(client, members_id) + await client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass diff --git a/stream_chat/tests/test_channel_batch_updater.py b/stream_chat/tests/test_channel_batch_updater.py new file mode 100644 index 0000000..d6cca8d --- /dev/null +++ b/stream_chat/tests/test_channel_batch_updater.py @@ -0,0 +1,304 @@ +import time +import uuid +from typing import Dict + +import pytest + +from stream_chat import StreamChat +from stream_chat.base.exceptions import StreamAPIException +from stream_chat.channel import Channel +from stream_chat.tests.conftest import hard_delete_users +from stream_chat.tests.utils import wait_for +from stream_chat.types.channel_batch import ChannelsBatchOptions + + +class TestChannelBatchUpdater: + def test_update_channels_batch_none_options(self, client: StreamChat): + """Test that update_channels_batch raises error when options is None""" + with pytest.raises(ValueError, match="options must not be None"): + client.update_channels_batch(None) + + def test_update_channels_batch_valid_options( + self, client: StreamChat, random_user: Dict + ): + """Test batch update channels with valid options""" + # Create two channels + ch1 = client.channel("messaging", str(uuid.uuid4())) + ch1.create(random_user["id"]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + ch2.create(random_user["id"]) + + try: + # Create a user to add + user_to_add = {"id": str(uuid.uuid4())} + client.upsert_user(user_to_add) + + # Perform batch update + options: ChannelsBatchOptions = { + "operation": "addMembers", + "filter": { + "cids": {"$in": [ch1.cid, ch2.cid]} + }, + "members": [{"user_id": user_to_add["id"]}], + } + + response = client.update_channels_batch(options) + assert "task_id" in response + assert len(response["task_id"]) > 0 + + # Cleanup + hard_delete_users(client, [user_to_add["id"]]) + finally: + try: + client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + def test_channel_batch_updater_add_members( + self, client: StreamChat, random_user: Dict + ): + """Test ChannelBatchUpdater.add_members""" + # Create two channels + ch1 = client.channel("messaging", str(uuid.uuid4())) + ch1.create(random_user["id"]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + ch2.create(random_user["id"]) + + try: + # Create users to add + users_to_add = [{"id": str(uuid.uuid4())}, {"id": str(uuid.uuid4())}] + client.upsert_users(users_to_add) + + updater = client.channel_batch_updater() + + members = [ + {"user_id": user["id"]} + for user in users_to_add + ] + + response = updater.add_members( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, members + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + time.sleep(2) + + # Poll for task completion + for _ in range(120): + task = client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + time.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify members were added + for _ in range(120): + time.sleep(1) + ch1_members = ch1.query_members({}) + ch2_members = ch2.query_members({}) + + ch1_member_ids = [m["user_id"] for m in ch1_members] + all_found = all( + user_id in ch1_member_ids for user_id in [u["id"] for u in users_to_add] + ) + if all_found: + return + + pytest.fail("changes not visible after 2 minutes") + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + time.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + time.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + time.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + hard_delete_users(client, [u["id"] for u in users_to_add]) + client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + def test_channel_batch_updater_remove_members( + self, client: StreamChat, random_user: Dict + ): + """Test ChannelBatchUpdater.remove_members""" + # Create users + members_id = [str(uuid.uuid4()), str(uuid.uuid4())] + users = [{"id": uid} for uid in members_id] + client.upsert_users(users) + + # Create channels with members + ch1 = client.channel("messaging", str(uuid.uuid4())) + ch1.create(random_user["id"]) + for member_id in members_id: + ch1.add_members([member_id]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + ch2.create(random_user["id"]) + for member_id in members_id: + ch2.add_members([member_id]) + + try: + # Verify members are present + ch1_members = ch1.query_members({}) + ch2_members = ch2.query_members({}) + assert len(ch1_members) >= 2 + assert len(ch2_members) >= 2 + + ch1_member_ids = [m["user_id"] for m in ch1_members] + ch2_member_ids = [m["user_id"] for m in ch2_members] + assert all(mid in ch1_member_ids for mid in members_id) + assert all(mid in ch2_member_ids for mid in members_id) + + # Remove a member + updater = client.channel_batch_updater() + member_to_remove = members_id[0] + + response = updater.remove_members( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, + [{"user_id": member_to_remove}], + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + time.sleep(2) + + # Poll for task completion + for _ in range(120): + task = client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + time.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify member was removed + for _ in range(120): + time.sleep(1) + ch1_members = ch1.query_members({}) + + ch1_member_ids = [m["user_id"] for m in ch1_members] + if member_to_remove not in ch1_member_ids: + return + + pytest.fail( + f"changes not visible after 2 minutes. Channel 1 still has members: {ch1_member_ids}" + ) + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + time.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + time.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + time.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + hard_delete_users(client, members_id) + client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + def test_channel_batch_updater_archive( + self, client: StreamChat, random_user: Dict + ): + """Test ChannelBatchUpdater.archive""" + # Create users + members_id = [str(uuid.uuid4()), str(uuid.uuid4())] + users = [{"id": uid} for uid in members_id] + client.upsert_users(users) + + # Create channels with members + ch1 = client.channel("messaging", str(uuid.uuid4())) + ch1.create(random_user["id"]) + for member_id in members_id: + ch1.add_members([member_id]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + ch2.create(random_user["id"]) + for member_id in members_id: + ch2.add_members([member_id]) + + try: + updater = client.channel_batch_updater() + + response = updater.archive( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, + [{"user_id": members_id[0]}], + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + time.sleep(2) + + # Poll for task completion + for _ in range(120): + task = client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + time.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify channel was archived + for _ in range(120): + time.sleep(1) + ch1_state = ch1.query() + + # Check archived_at in channel state for the user + if "channel" in ch1_state and "members" in ch1_state["channel"]: + for m in ch1_state["channel"]["members"]: + if m["user_id"] == members_id[0]: + if m.get("archived_at") is not None: + return + break + # Also check top-level members if present + if "members" in ch1_state: + for m in ch1_state["members"]: + if m["user_id"] == members_id[0]: + if m.get("archived_at") is not None: + return + break + + pytest.fail("changes not visible after 2 minutes") + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + time.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + time.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + time.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + hard_delete_users(client, members_id) + client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass diff --git a/stream_chat/types/channel_batch.py b/stream_chat/types/channel_batch.py new file mode 100644 index 0000000..4ec8b84 --- /dev/null +++ b/stream_chat/types/channel_batch.py @@ -0,0 +1,93 @@ +import sys +from typing import Any, Dict, List, Optional + +if sys.version_info >= (3, 8): + from typing import Literal, TypedDict +else: + from typing_extensions import Literal, TypedDict + +ChannelBatchOperation = Literal[ + "addMembers", + "removeMembers", + "inviteMembers", + "assignRoles", + "addModerators", + "demoteModerators", + "hide", + "show", + "archive", + "unarchive", + "updateData", + "addFilterTags", + "removeFilterTags", +] + + +class ChannelBatchMemberRequest(TypedDict, total=False): + """ + Represents a member in batch operations. + + Parameters: + user_id: The ID of the user. + channel_role: The role of the user in the channel (optional). + """ + + user_id: str + channel_role: Optional[str] + + +class ChannelDataUpdate(TypedDict, total=False): + """ + Represents data that can be updated on channels in batch. + + Parameters: + frozen: Whether the channel is frozen. + disabled: Whether the channel is disabled. + custom: Custom data for the channel. + team: The team ID for the channel. + config_overrides: Configuration overrides for the channel. + auto_translation_enabled: Whether auto-translation is enabled. + auto_translation_language: The language for auto-translation. + """ + + frozen: Optional[bool] + disabled: Optional[bool] + custom: Optional[Dict[str, Any]] + team: Optional[str] + config_overrides: Optional[Dict[str, Any]] + auto_translation_enabled: Optional[bool] + auto_translation_language: Optional[str] + + +class ChannelsBatchFilters(TypedDict, total=False): + """ + Represents filters for batch channel updates. + + Parameters: + cids: Filter by channel CIDs (can be a dict with operators like $in). + types: Filter by channel types (can be a dict with operators like $in). + filter_tags: Filter by filter tags (can be a dict with operators like $in). + """ + + cids: Optional[Any] + types: Optional[Any] + filter_tags: Optional[Any] + + +class ChannelsBatchOptions(TypedDict): + """ + Represents options for batch channel updates. + + Parameters: + operation: The batch operation to perform. + filter: The filter to match channels. + members: List of members for member-related operations (optional). + data: Channel data updates for updateData operation (optional). + filter_tags_update: List of filter tags for filter tag operations (optional). + """ + + operation: ChannelBatchOperation + filter: ChannelsBatchFilters + members: Optional[List[ChannelBatchMemberRequest]] + data: Optional[ChannelDataUpdate] + filter_tags_update: Optional[List[str]] From 53ab37f3f0427ef2a0e38df824c6c1987db0fd7a Mon Sep 17 00:00:00 2001 From: javierdfm Date: Fri, 9 Jan 2026 14:33:08 +0100 Subject: [PATCH 2/6] fix: lint fix --- stream_chat/client.py | 16 ++++++++++++---- .../async_chat/test_channel_batch_updater.py | 9 ++------- stream_chat/tests/test_channel_batch_updater.py | 16 +++++----------- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/stream_chat/client.py b/stream_chat/client.py index 3a7e268..3fd765b 100644 --- a/stream_chat/client.py +++ b/stream_chat/client.py @@ -2,7 +2,17 @@ import json import sys import warnings -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Iterable, + List, + Optional, + Union, + cast, +) from urllib.parse import urlparse from urllib.request import Request, urlopen @@ -989,9 +999,7 @@ def mark_delivered_simple( } return self.mark_delivered(data=data) - def update_channels_batch( - self, options: "ChannelsBatchOptions" - ) -> StreamResponse: + def update_channels_batch(self, options: "ChannelsBatchOptions") -> StreamResponse: """ Updates channels in batch based on the provided options. diff --git a/stream_chat/tests/async_chat/test_channel_batch_updater.py b/stream_chat/tests/async_chat/test_channel_batch_updater.py index 0291ec9..bd86375 100644 --- a/stream_chat/tests/async_chat/test_channel_batch_updater.py +++ b/stream_chat/tests/async_chat/test_channel_batch_updater.py @@ -39,9 +39,7 @@ async def test_update_channels_batch_valid_options( # Perform batch update options: ChannelsBatchOptions = { "operation": "addMembers", - "filter": { - "cids": {"$in": [ch1.cid, ch2.cid]} - }, + "filter": {"cids": {"$in": [ch1.cid, ch2.cid]}}, "members": [{"user_id": user_to_add["id"]}], } @@ -76,10 +74,7 @@ async def test_channel_batch_updater_add_members( updater = client.channel_batch_updater() - members = [ - {"user_id": user["id"]} - for user in users_to_add - ] + members = [{"user_id": user["id"]} for user in users_to_add] response = await updater.add_members( {"cids": {"$in": [ch1.cid, ch2.cid]}}, members diff --git a/stream_chat/tests/test_channel_batch_updater.py b/stream_chat/tests/test_channel_batch_updater.py index d6cca8d..7bb81e7 100644 --- a/stream_chat/tests/test_channel_batch_updater.py +++ b/stream_chat/tests/test_channel_batch_updater.py @@ -37,9 +37,7 @@ def test_update_channels_batch_valid_options( # Perform batch update options: ChannelsBatchOptions = { "operation": "addMembers", - "filter": { - "cids": {"$in": [ch1.cid, ch2.cid]} - }, + "filter": {"cids": {"$in": [ch1.cid, ch2.cid]}}, "members": [{"user_id": user_to_add["id"]}], } @@ -73,10 +71,7 @@ def test_channel_batch_updater_add_members( updater = client.channel_batch_updater() - members = [ - {"user_id": user["id"]} - for user in users_to_add - ] + members = [{"user_id": user["id"]} for user in users_to_add] response = updater.add_members( {"cids": {"$in": [ch1.cid, ch2.cid]}}, members @@ -103,7 +98,8 @@ def test_channel_batch_updater_add_members( ch1_member_ids = [m["user_id"] for m in ch1_members] all_found = all( - user_id in ch1_member_ids for user_id in [u["id"] for u in users_to_add] + user_id in ch1_member_ids + for user_id in [u["id"] for u in users_to_add] ) if all_found: return @@ -219,9 +215,7 @@ def test_channel_batch_updater_remove_members( except Exception: pass - def test_channel_batch_updater_archive( - self, client: StreamChat, random_user: Dict - ): + def test_channel_batch_updater_archive(self, client: StreamChat, random_user: Dict): """Test ChannelBatchUpdater.archive""" # Create users members_id = [str(uuid.uuid4()), str(uuid.uuid4())] From 5029743507f059f596fcb606e834189ce3589409 Mon Sep 17 00:00:00 2001 From: javierdfm Date: Fri, 9 Jan 2026 14:44:12 +0100 Subject: [PATCH 3/6] fix: fix mypy --- stream_chat/async_chat/channel_batch_updater.py | 8 ++++++-- stream_chat/async_chat/client.py | 5 +++-- stream_chat/channel_batch_updater.py | 8 ++++++-- stream_chat/client.py | 5 +++-- .../tests/async_chat/test_channel_batch_updater.py | 11 ++++++++--- stream_chat/tests/test_channel_batch_updater.py | 11 ++++++++--- stream_chat/types/channel_batch.py | 6 +++--- 7 files changed, 37 insertions(+), 17 deletions(-) diff --git a/stream_chat/async_chat/channel_batch_updater.py b/stream_chat/async_chat/channel_batch_updater.py index b544a24..f78ff8c 100644 --- a/stream_chat/async_chat/channel_batch_updater.py +++ b/stream_chat/async_chat/channel_batch_updater.py @@ -1,4 +1,5 @@ -from typing import List +import sys +from typing import TYPE_CHECKING, List from stream_chat.types.channel_batch import ( ChannelBatchMemberRequest, @@ -8,13 +9,16 @@ ) from stream_chat.types.stream_response import StreamResponse +if TYPE_CHECKING: + from stream_chat.async_chat.client import StreamChatAsync + class ChannelBatchUpdater: """ Provides convenience methods for batch channel operations (async). """ - def __init__(self, client): + def __init__(self, client: "StreamChatAsync") -> None: self.client = client async def add_members( diff --git a/stream_chat/async_chat/client.py b/stream_chat/async_chat/client.py index 2874dfb..da8c8df 100644 --- a/stream_chat/async_chat/client.py +++ b/stream_chat/async_chat/client.py @@ -18,6 +18,7 @@ ) if TYPE_CHECKING: + from stream_chat.async_chat.channel_batch_updater import ChannelBatchUpdater from stream_chat.types.channel_batch import ChannelsBatchOptions from urllib.parse import urlparse @@ -1033,7 +1034,7 @@ async def mark_delivered_simple( return await self.mark_delivered(data) async def update_channels_batch( - self, options: "ChannelsBatchOptions" + self, options: ChannelsBatchOptions ) -> StreamResponse: """ Updates channels in batch based on the provided options. @@ -1046,7 +1047,7 @@ async def update_channels_batch( return await self.put("channels/batch", data=options) - def channel_batch_updater(self) -> "ChannelBatchUpdater": + def channel_batch_updater(self) -> ChannelBatchUpdater: """ Returns a ChannelBatchUpdater instance for batch channel operations. diff --git a/stream_chat/channel_batch_updater.py b/stream_chat/channel_batch_updater.py index 5007d92..8d8d3c3 100644 --- a/stream_chat/channel_batch_updater.py +++ b/stream_chat/channel_batch_updater.py @@ -1,4 +1,5 @@ -from typing import List, Optional +import sys +from typing import TYPE_CHECKING, List, Optional from stream_chat.types.channel_batch import ( ChannelBatchMemberRequest, @@ -8,13 +9,16 @@ ) from stream_chat.types.stream_response import StreamResponse +if TYPE_CHECKING: + from stream_chat.client import StreamChat + class ChannelBatchUpdater: """ Provides convenience methods for batch channel operations. """ - def __init__(self, client): + def __init__(self, client: "StreamChat") -> None: self.client = client def add_members( diff --git a/stream_chat/client.py b/stream_chat/client.py index 3fd765b..0cfe349 100644 --- a/stream_chat/client.py +++ b/stream_chat/client.py @@ -17,6 +17,7 @@ from urllib.request import Request, urlopen if TYPE_CHECKING: + from stream_chat.channel_batch_updater import ChannelBatchUpdater from stream_chat.types.channel_batch import ChannelsBatchOptions from stream_chat.campaign import Campaign @@ -999,7 +1000,7 @@ def mark_delivered_simple( } return self.mark_delivered(data=data) - def update_channels_batch(self, options: "ChannelsBatchOptions") -> StreamResponse: + def update_channels_batch(self, options: ChannelsBatchOptions) -> StreamResponse: """ Updates channels in batch based on the provided options. @@ -1011,7 +1012,7 @@ def update_channels_batch(self, options: "ChannelsBatchOptions") -> StreamRespon return self.put("channels/batch", data=options) - def channel_batch_updater(self) -> "ChannelBatchUpdater": + def channel_batch_updater(self) -> ChannelBatchUpdater: """ Returns a ChannelBatchUpdater instance for batch channel operations. diff --git a/stream_chat/tests/async_chat/test_channel_batch_updater.py b/stream_chat/tests/async_chat/test_channel_batch_updater.py index bd86375..8a7f231 100644 --- a/stream_chat/tests/async_chat/test_channel_batch_updater.py +++ b/stream_chat/tests/async_chat/test_channel_batch_updater.py @@ -93,18 +93,23 @@ async def test_channel_batch_updater_add_members( continue if task["status"] == "completed": - # Wait a bit and verify members were added + # Wait a bit and verify members were added to both channels for _ in range(120): await asyncio.sleep(1) ch1_members = await ch1.query_members({}) ch2_members = await ch2.query_members({}) ch1_member_ids = [m["user_id"] for m in ch1_members] - all_found = all( + ch2_member_ids = [m["user_id"] for m in ch2_members] + all_found_ch1 = all( user_id in ch1_member_ids for user_id in [u["id"] for u in users_to_add] ) - if all_found: + all_found_ch2 = all( + user_id in ch2_member_ids + for user_id in [u["id"] for u in users_to_add] + ) + if all_found_ch1 and all_found_ch2: return pytest.fail("changes not visible after 2 minutes") diff --git a/stream_chat/tests/test_channel_batch_updater.py b/stream_chat/tests/test_channel_batch_updater.py index 7bb81e7..88d1969 100644 --- a/stream_chat/tests/test_channel_batch_updater.py +++ b/stream_chat/tests/test_channel_batch_updater.py @@ -90,18 +90,23 @@ def test_channel_batch_updater_add_members( continue if task["status"] == "completed": - # Wait a bit and verify members were added + # Wait a bit and verify members were added to both channels for _ in range(120): time.sleep(1) ch1_members = ch1.query_members({}) ch2_members = ch2.query_members({}) ch1_member_ids = [m["user_id"] for m in ch1_members] - all_found = all( + ch2_member_ids = [m["user_id"] for m in ch2_members] + all_found_ch1 = all( user_id in ch1_member_ids for user_id in [u["id"] for u in users_to_add] ) - if all_found: + all_found_ch2 = all( + user_id in ch2_member_ids + for user_id in [u["id"] for u in users_to_add] + ) + if all_found_ch1 and all_found_ch2: return pytest.fail("changes not visible after 2 minutes") diff --git a/stream_chat/types/channel_batch.py b/stream_chat/types/channel_batch.py index 4ec8b84..a3db484 100644 --- a/stream_chat/types/channel_batch.py +++ b/stream_chat/types/channel_batch.py @@ -74,13 +74,13 @@ class ChannelsBatchFilters(TypedDict, total=False): filter_tags: Optional[Any] -class ChannelsBatchOptions(TypedDict): +class ChannelsBatchOptions(TypedDict, total=False): """ Represents options for batch channel updates. Parameters: - operation: The batch operation to perform. - filter: The filter to match channels. + operation: The batch operation to perform (required). + filter: The filter to match channels (required). members: List of members for member-related operations (optional). data: Channel data updates for updateData operation (optional). filter_tags_update: List of filter tags for filter tag operations (optional). From 1882971fde9fbe730059746d12998dfb450b44a5 Mon Sep 17 00:00:00 2001 From: javierdfm Date: Fri, 9 Jan 2026 14:51:25 +0100 Subject: [PATCH 4/6] fix: fix imports --- stream_chat/async_chat/channel_batch_updater.py | 1 - stream_chat/channel_batch_updater.py | 3 +-- stream_chat/tests/async_chat/test_channel_batch_updater.py | 3 --- stream_chat/tests/test_channel_batch_updater.py | 3 --- 4 files changed, 1 insertion(+), 9 deletions(-) diff --git a/stream_chat/async_chat/channel_batch_updater.py b/stream_chat/async_chat/channel_batch_updater.py index f78ff8c..ea8ef45 100644 --- a/stream_chat/async_chat/channel_batch_updater.py +++ b/stream_chat/async_chat/channel_batch_updater.py @@ -1,4 +1,3 @@ -import sys from typing import TYPE_CHECKING, List from stream_chat.types.channel_batch import ( diff --git a/stream_chat/channel_batch_updater.py b/stream_chat/channel_batch_updater.py index 8d8d3c3..32cae70 100644 --- a/stream_chat/channel_batch_updater.py +++ b/stream_chat/channel_batch_updater.py @@ -1,5 +1,4 @@ -import sys -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, List from stream_chat.types.channel_batch import ( ChannelBatchMemberRequest, diff --git a/stream_chat/tests/async_chat/test_channel_batch_updater.py b/stream_chat/tests/async_chat/test_channel_batch_updater.py index 8a7f231..8bba749 100644 --- a/stream_chat/tests/async_chat/test_channel_batch_updater.py +++ b/stream_chat/tests/async_chat/test_channel_batch_updater.py @@ -1,13 +1,10 @@ import asyncio -import time import uuid from typing import Dict import pytest from stream_chat.async_chat import StreamChatAsync -from stream_chat.async_chat.channel import Channel -from stream_chat.base.exceptions import StreamAPIException from stream_chat.tests.async_chat.conftest import hard_delete_users from stream_chat.types.channel_batch import ChannelsBatchOptions diff --git a/stream_chat/tests/test_channel_batch_updater.py b/stream_chat/tests/test_channel_batch_updater.py index 88d1969..0687656 100644 --- a/stream_chat/tests/test_channel_batch_updater.py +++ b/stream_chat/tests/test_channel_batch_updater.py @@ -5,10 +5,7 @@ import pytest from stream_chat import StreamChat -from stream_chat.base.exceptions import StreamAPIException -from stream_chat.channel import Channel from stream_chat.tests.conftest import hard_delete_users -from stream_chat.tests.utils import wait_for from stream_chat.types.channel_batch import ChannelsBatchOptions From dfed5de5aabef664edcc23c2772e4243e411a467 Mon Sep 17 00:00:00 2001 From: javierdfm Date: Fri, 9 Jan 2026 15:03:46 +0100 Subject: [PATCH 5/6] fix: updated imports --- stream_chat/async_chat/client.py | 4 ++-- stream_chat/client.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/stream_chat/async_chat/client.py b/stream_chat/async_chat/client.py index da8c8df..d16a4ed 100644 --- a/stream_chat/async_chat/client.py +++ b/stream_chat/async_chat/client.py @@ -19,7 +19,6 @@ if TYPE_CHECKING: from stream_chat.async_chat.channel_batch_updater import ChannelBatchUpdater - from stream_chat.types.channel_batch import ChannelsBatchOptions from urllib.parse import urlparse from stream_chat.async_chat.campaign import Campaign @@ -34,6 +33,7 @@ SegmentType, SegmentUpdatableFields, ) +from stream_chat.types.channel_batch import ChannelsBatchOptions from stream_chat.types.shared_locations import SharedLocationsOptions if sys.version_info >= (3, 8): @@ -1047,7 +1047,7 @@ async def update_channels_batch( return await self.put("channels/batch", data=options) - def channel_batch_updater(self) -> ChannelBatchUpdater: + def channel_batch_updater(self) -> "ChannelBatchUpdater": """ Returns a ChannelBatchUpdater instance for batch channel operations. diff --git a/stream_chat/client.py b/stream_chat/client.py index 0cfe349..d0ae0b3 100644 --- a/stream_chat/client.py +++ b/stream_chat/client.py @@ -18,7 +18,6 @@ if TYPE_CHECKING: from stream_chat.channel_batch_updater import ChannelBatchUpdater - from stream_chat.types.channel_batch import ChannelsBatchOptions from stream_chat.campaign import Campaign from stream_chat.segment import Segment @@ -32,6 +31,7 @@ SegmentType, SegmentUpdatableFields, ) +from stream_chat.types.channel_batch import ChannelsBatchOptions from stream_chat.types.shared_locations import SharedLocationsOptions if sys.version_info >= (3, 8): @@ -1012,7 +1012,7 @@ def update_channels_batch(self, options: ChannelsBatchOptions) -> StreamResponse return self.put("channels/batch", data=options) - def channel_batch_updater(self) -> ChannelBatchUpdater: + def channel_batch_updater(self) -> "ChannelBatchUpdater": """ Returns a ChannelBatchUpdater instance for batch channel operations. From 36f41c2b982e16d50ff7ac8025aa6325abaf9b6d Mon Sep 17 00:00:00 2001 From: javierdfm Date: Fri, 9 Jan 2026 15:06:03 +0100 Subject: [PATCH 6/6] fix: lint fix --- stream_chat/async_chat/client.py | 3 ++- stream_chat/client.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/stream_chat/async_chat/client.py b/stream_chat/async_chat/client.py index d16a4ed..ebecf7c 100644 --- a/stream_chat/async_chat/client.py +++ b/stream_chat/async_chat/client.py @@ -19,12 +19,14 @@ if TYPE_CHECKING: from stream_chat.async_chat.channel_batch_updater import ChannelBatchUpdater + from urllib.parse import urlparse from stream_chat.async_chat.campaign import Campaign from stream_chat.async_chat.segment import Segment from stream_chat.types.base import SortParam from stream_chat.types.campaign import CampaignData, QueryCampaignsOptions +from stream_chat.types.channel_batch import ChannelsBatchOptions from stream_chat.types.draft import QueryDraftsFilter, QueryDraftsOptions from stream_chat.types.segment import ( QuerySegmentsOptions, @@ -33,7 +35,6 @@ SegmentType, SegmentUpdatableFields, ) -from stream_chat.types.channel_batch import ChannelsBatchOptions from stream_chat.types.shared_locations import SharedLocationsOptions if sys.version_info >= (3, 8): diff --git a/stream_chat/client.py b/stream_chat/client.py index d0ae0b3..c2a0de8 100644 --- a/stream_chat/client.py +++ b/stream_chat/client.py @@ -23,6 +23,7 @@ from stream_chat.segment import Segment from stream_chat.types.base import SortParam from stream_chat.types.campaign import CampaignData, QueryCampaignsOptions +from stream_chat.types.channel_batch import ChannelsBatchOptions from stream_chat.types.draft import QueryDraftsFilter, QueryDraftsOptions from stream_chat.types.segment import ( QuerySegmentsOptions, @@ -31,7 +32,6 @@ SegmentType, SegmentUpdatableFields, ) -from stream_chat.types.channel_batch import ChannelsBatchOptions from stream_chat.types.shared_locations import SharedLocationsOptions if sys.version_info >= (3, 8):