diff --git a/stream_chat/async_chat/channel_batch_updater.py b/stream_chat/async_chat/channel_batch_updater.py new file mode 100644 index 0000000..ea8ef45 --- /dev/null +++ b/stream_chat/async_chat/channel_batch_updater.py @@ -0,0 +1,242 @@ +from typing import TYPE_CHECKING, List + +from stream_chat.types.channel_batch import ( + ChannelBatchMemberRequest, + ChannelDataUpdate, + ChannelsBatchFilters, + ChannelsBatchOptions, +) +from stream_chat.types.stream_response import StreamResponse + +if TYPE_CHECKING: + from stream_chat.async_chat.client import StreamChatAsync + + +class ChannelBatchUpdater: + """ + Provides convenience methods for batch channel operations (async). + """ + + def __init__(self, client: "StreamChatAsync") -> None: + self.client = client + + async def add_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Adds members to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to add. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addMembers", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def remove_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Removes members from channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to remove. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "removeMembers", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def invite_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Invites members to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to invite. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "inviteMembers", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def add_moderators( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Adds moderators to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to add as moderators. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addModerators", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def demote_moderators( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Removes moderator role from members in channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to demote from moderators. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "demoteModerators", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def assign_roles( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Assigns roles to members in channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members with roles to assign. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "assignRoles", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def hide( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Hides channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to hide channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "hide", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def show( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Shows channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to show channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "show", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def archive( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Archives channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to archive channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "archive", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def unarchive( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Unarchives channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to unarchive channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "unarchive", + "filter": filter, + "members": members, + } + return await self.client.update_channels_batch(options) + + async def update_data( + self, filter: ChannelsBatchFilters, data: ChannelDataUpdate + ) -> StreamResponse: + """ + Updates data on channels matching the filter. + + :param filter: The filter to match channels. + :param data: Channel data to update. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "updateData", + "filter": filter, + "data": data, + } + return await self.client.update_channels_batch(options) + + async def add_filter_tags( + self, filter: ChannelsBatchFilters, tags: List[str] + ) -> StreamResponse: + """ + Adds filter tags to channels matching the filter. + + :param filter: The filter to match channels. + :param tags: List of filter tags to add. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addFilterTags", + "filter": filter, + "filter_tags_update": tags, + } + return await self.client.update_channels_batch(options) + + async def remove_filter_tags( + self, filter: ChannelsBatchFilters, tags: List[str] + ) -> StreamResponse: + """ + Removes filter tags from channels matching the filter. + + :param filter: The filter to match channels. + :param tags: List of filter tags to remove. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "removeFilterTags", + "filter": filter, + "filter_tags_update": tags, + } + return await self.client.update_channels_batch(options) diff --git a/stream_chat/async_chat/client.py b/stream_chat/async_chat/client.py index 75b073b..ebecf7c 100644 --- a/stream_chat/async_chat/client.py +++ b/stream_chat/async_chat/client.py @@ -4,6 +4,7 @@ import warnings from types import TracebackType from typing import ( + TYPE_CHECKING, Any, AsyncContextManager, Callable, @@ -15,12 +16,17 @@ Union, cast, ) + +if TYPE_CHECKING: + from stream_chat.async_chat.channel_batch_updater import ChannelBatchUpdater + from urllib.parse import urlparse from stream_chat.async_chat.campaign import Campaign from stream_chat.async_chat.segment import Segment from stream_chat.types.base import SortParam from stream_chat.types.campaign import CampaignData, QueryCampaignsOptions +from stream_chat.types.channel_batch import ChannelsBatchOptions from stream_chat.types.draft import QueryDraftsFilter, QueryDraftsOptions from stream_chat.types.segment import ( QuerySegmentsOptions, @@ -1028,6 +1034,30 @@ async def mark_delivered_simple( return await self.mark_delivered(data) + async def update_channels_batch( + self, options: ChannelsBatchOptions + ) -> StreamResponse: + """ + Updates channels in batch based on the provided options. + + :param options: ChannelsBatchOptions containing operation, filter, and operation-specific data. + :return: StreamResponse containing task_id. + """ + if options is None: + raise ValueError("options must not be None") + + return await self.put("channels/batch", data=options) + + def channel_batch_updater(self) -> "ChannelBatchUpdater": + """ + Returns a ChannelBatchUpdater instance for batch channel operations. + + :return: ChannelBatchUpdater instance. + """ + from stream_chat.async_chat.channel_batch_updater import ChannelBatchUpdater + + return ChannelBatchUpdater(self) + async def close(self) -> None: await self.session.close() diff --git a/stream_chat/base/client.py b/stream_chat/base/client.py index d64c6f9..b9c9827 100644 --- a/stream_chat/base/client.py +++ b/stream_chat/base/client.py @@ -1302,6 +1302,27 @@ def get_task( """ pass + @abc.abstractmethod + def update_channels_batch( + self, options: Any + ) -> Union[StreamResponse, Awaitable[StreamResponse]]: + """ + Updates channels in batch based on the provided options. + + :param options: ChannelsBatchOptions containing operation, filter, and operation-specific data. + :return: StreamResponse containing task_id. + """ + pass + + @abc.abstractmethod + def channel_batch_updater(self) -> Any: + """ + Returns a ChannelBatchUpdater instance for batch channel operations. + + :return: ChannelBatchUpdater instance. + """ + pass + @abc.abstractmethod def send_user_custom_event( self, user_id: str, event: Dict diff --git a/stream_chat/channel_batch_updater.py b/stream_chat/channel_batch_updater.py new file mode 100644 index 0000000..32cae70 --- /dev/null +++ b/stream_chat/channel_batch_updater.py @@ -0,0 +1,242 @@ +from typing import TYPE_CHECKING, List + +from stream_chat.types.channel_batch import ( + ChannelBatchMemberRequest, + ChannelDataUpdate, + ChannelsBatchFilters, + ChannelsBatchOptions, +) +from stream_chat.types.stream_response import StreamResponse + +if TYPE_CHECKING: + from stream_chat.client import StreamChat + + +class ChannelBatchUpdater: + """ + Provides convenience methods for batch channel operations. + """ + + def __init__(self, client: "StreamChat") -> None: + self.client = client + + def add_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Adds members to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to add. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addMembers", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def remove_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Removes members from channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to remove. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "removeMembers", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def invite_members( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Invites members to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to invite. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "inviteMembers", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def add_moderators( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Adds moderators to channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to add as moderators. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addModerators", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def demote_moderators( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Removes moderator role from members in channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members to demote from moderators. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "demoteModerators", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def assign_roles( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Assigns roles to members in channels matching the filter. + + :param filter: The filter to match channels. + :param members: List of members with roles to assign. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "assignRoles", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def hide( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Hides channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to hide channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "hide", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def show( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Shows channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to show channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "show", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def archive( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Archives channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to archive channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "archive", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def unarchive( + self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest] + ) -> StreamResponse: + """ + Unarchives channels matching the filter for the specified members. + + :param filter: The filter to match channels. + :param members: List of members for whom to unarchive channels. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "unarchive", + "filter": filter, + "members": members, + } + return self.client.update_channels_batch(options) + + def update_data( + self, filter: ChannelsBatchFilters, data: ChannelDataUpdate + ) -> StreamResponse: + """ + Updates data on channels matching the filter. + + :param filter: The filter to match channels. + :param data: Channel data to update. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "updateData", + "filter": filter, + "data": data, + } + return self.client.update_channels_batch(options) + + def add_filter_tags( + self, filter: ChannelsBatchFilters, tags: List[str] + ) -> StreamResponse: + """ + Adds filter tags to channels matching the filter. + + :param filter: The filter to match channels. + :param tags: List of filter tags to add. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "addFilterTags", + "filter": filter, + "filter_tags_update": tags, + } + return self.client.update_channels_batch(options) + + def remove_filter_tags( + self, filter: ChannelsBatchFilters, tags: List[str] + ) -> StreamResponse: + """ + Removes filter tags from channels matching the filter. + + :param filter: The filter to match channels. + :param tags: List of filter tags to remove. + :return: StreamResponse containing task_id. + """ + options: ChannelsBatchOptions = { + "operation": "removeFilterTags", + "filter": filter, + "filter_tags_update": tags, + } + return self.client.update_channels_batch(options) diff --git a/stream_chat/client.py b/stream_chat/client.py index 20681bf..c2a0de8 100644 --- a/stream_chat/client.py +++ b/stream_chat/client.py @@ -2,14 +2,28 @@ import json import sys import warnings -from typing import Any, Callable, Dict, Iterable, List, Optional, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Iterable, + List, + Optional, + Union, + cast, +) from urllib.parse import urlparse from urllib.request import Request, urlopen +if TYPE_CHECKING: + from stream_chat.channel_batch_updater import ChannelBatchUpdater + from stream_chat.campaign import Campaign from stream_chat.segment import Segment from stream_chat.types.base import SortParam from stream_chat.types.campaign import CampaignData, QueryCampaignsOptions +from stream_chat.types.channel_batch import ChannelsBatchOptions from stream_chat.types.draft import QueryDraftsFilter, QueryDraftsOptions from stream_chat.types.segment import ( QuerySegmentsOptions, @@ -985,3 +999,25 @@ def mark_delivered_simple( "user_id": user_id, } return self.mark_delivered(data=data) + + def update_channels_batch(self, options: ChannelsBatchOptions) -> StreamResponse: + """ + Updates channels in batch based on the provided options. + + :param options: ChannelsBatchOptions containing operation, filter, and operation-specific data. + :return: StreamResponse containing task_id. + """ + if options is None: + raise ValueError("options must not be None") + + return self.put("channels/batch", data=options) + + def channel_batch_updater(self) -> "ChannelBatchUpdater": + """ + Returns a ChannelBatchUpdater instance for batch channel operations. + + :return: ChannelBatchUpdater instance. + """ + from stream_chat.channel_batch_updater import ChannelBatchUpdater + + return ChannelBatchUpdater(self) diff --git a/stream_chat/tests/async_chat/test_channel_batch_updater.py b/stream_chat/tests/async_chat/test_channel_batch_updater.py new file mode 100644 index 0000000..8bba749 --- /dev/null +++ b/stream_chat/tests/async_chat/test_channel_batch_updater.py @@ -0,0 +1,307 @@ +import asyncio +import uuid +from typing import Dict + +import pytest + +from stream_chat.async_chat import StreamChatAsync +from stream_chat.tests.async_chat.conftest import hard_delete_users +from stream_chat.types.channel_batch import ChannelsBatchOptions + + +class TestChannelBatchUpdater: + @pytest.mark.asyncio + async def test_update_channels_batch_none_options(self, client: StreamChatAsync): + """Test that update_channels_batch raises error when options is None""" + with pytest.raises(ValueError, match="options must not be None"): + await client.update_channels_batch(None) + + @pytest.mark.asyncio + async def test_update_channels_batch_valid_options( + self, client: StreamChatAsync, random_user: Dict + ): + """Test batch update channels with valid options""" + # Create two channels + ch1 = client.channel("messaging", str(uuid.uuid4())) + await ch1.create(random_user["id"]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + await ch2.create(random_user["id"]) + + try: + # Create a user to add + user_to_add = {"id": str(uuid.uuid4())} + await client.upsert_user(user_to_add) + + # Perform batch update + options: ChannelsBatchOptions = { + "operation": "addMembers", + "filter": {"cids": {"$in": [ch1.cid, ch2.cid]}}, + "members": [{"user_id": user_to_add["id"]}], + } + + response = await client.update_channels_batch(options) + assert "task_id" in response + assert len(response["task_id"]) > 0 + + # Cleanup + await hard_delete_users(client, [user_to_add["id"]]) + finally: + try: + await client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + @pytest.mark.asyncio + async def test_channel_batch_updater_add_members( + self, client: StreamChatAsync, random_user: Dict + ): + """Test ChannelBatchUpdater.add_members""" + # Create two channels + ch1 = client.channel("messaging", str(uuid.uuid4())) + await ch1.create(random_user["id"]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + await ch2.create(random_user["id"]) + + try: + # Create users to add + users_to_add = [{"id": str(uuid.uuid4())}, {"id": str(uuid.uuid4())}] + await client.upsert_users(users_to_add) + + updater = client.channel_batch_updater() + + members = [{"user_id": user["id"]} for user in users_to_add] + + response = await updater.add_members( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, members + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + await asyncio.sleep(2) + + # Poll for task completion + for _ in range(120): + task = await client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + await asyncio.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify members were added to both channels + for _ in range(120): + await asyncio.sleep(1) + ch1_members = await ch1.query_members({}) + ch2_members = await ch2.query_members({}) + + ch1_member_ids = [m["user_id"] for m in ch1_members] + ch2_member_ids = [m["user_id"] for m in ch2_members] + all_found_ch1 = all( + user_id in ch1_member_ids + for user_id in [u["id"] for u in users_to_add] + ) + all_found_ch2 = all( + user_id in ch2_member_ids + for user_id in [u["id"] for u in users_to_add] + ) + if all_found_ch1 and all_found_ch2: + return + + pytest.fail("changes not visible after 2 minutes") + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + await asyncio.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + await asyncio.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + await asyncio.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + await hard_delete_users(client, [u["id"] for u in users_to_add]) + await client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + @pytest.mark.asyncio + async def test_channel_batch_updater_remove_members( + self, client: StreamChatAsync, random_user: Dict + ): + """Test ChannelBatchUpdater.remove_members""" + # Create users + members_id = [str(uuid.uuid4()), str(uuid.uuid4())] + users = [{"id": uid} for uid in members_id] + await client.upsert_users(users) + + # Create channels with members + ch1 = client.channel("messaging", str(uuid.uuid4())) + await ch1.create(random_user["id"]) + for member_id in members_id: + await ch1.add_members([member_id]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + await ch2.create(random_user["id"]) + for member_id in members_id: + await ch2.add_members([member_id]) + + try: + # Verify members are present + ch1_members = await ch1.query_members({}) + ch2_members = await ch2.query_members({}) + assert len(ch1_members) >= 2 + assert len(ch2_members) >= 2 + + ch1_member_ids = [m["user_id"] for m in ch1_members] + ch2_member_ids = [m["user_id"] for m in ch2_members] + assert all(mid in ch1_member_ids for mid in members_id) + assert all(mid in ch2_member_ids for mid in members_id) + + # Remove a member + updater = client.channel_batch_updater() + member_to_remove = members_id[0] + + response = await updater.remove_members( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, + [{"user_id": member_to_remove}], + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + await asyncio.sleep(2) + + # Poll for task completion + for _ in range(120): + task = await client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + await asyncio.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify member was removed + for _ in range(120): + await asyncio.sleep(1) + ch1_members = await ch1.query_members({}) + + ch1_member_ids = [m["user_id"] for m in ch1_members] + if member_to_remove not in ch1_member_ids: + return + + pytest.fail( + f"changes not visible after 2 minutes. Channel 1 still has members: {ch1_member_ids}" + ) + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + await asyncio.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + await asyncio.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + await asyncio.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + await hard_delete_users(client, members_id) + await client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + @pytest.mark.asyncio + async def test_channel_batch_updater_archive( + self, client: StreamChatAsync, random_user: Dict + ): + """Test ChannelBatchUpdater.archive""" + # Create users + members_id = [str(uuid.uuid4()), str(uuid.uuid4())] + users = [{"id": uid} for uid in members_id] + await client.upsert_users(users) + + # Create channels with members + ch1 = client.channel("messaging", str(uuid.uuid4())) + await ch1.create(random_user["id"]) + for member_id in members_id: + await ch1.add_members([member_id]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + await ch2.create(random_user["id"]) + for member_id in members_id: + await ch2.add_members([member_id]) + + try: + updater = client.channel_batch_updater() + + response = await updater.archive( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, + [{"user_id": members_id[0]}], + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + await asyncio.sleep(2) + + # Poll for task completion + for _ in range(120): + task = await client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + await asyncio.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify channel was archived + for _ in range(120): + await asyncio.sleep(1) + ch1_state = await ch1.query() + + # Check archived_at in channel state for the user + if "channel" in ch1_state and "members" in ch1_state["channel"]: + for m in ch1_state["channel"]["members"]: + if m["user_id"] == members_id[0]: + if m.get("archived_at") is not None: + return + break + # Also check top-level members if present + if "members" in ch1_state: + for m in ch1_state["members"]: + if m["user_id"] == members_id[0]: + if m.get("archived_at") is not None: + return + break + + pytest.fail("changes not visible after 2 minutes") + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + await asyncio.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + await asyncio.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + await asyncio.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + await hard_delete_users(client, members_id) + await client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass diff --git a/stream_chat/tests/test_channel_batch_updater.py b/stream_chat/tests/test_channel_batch_updater.py new file mode 100644 index 0000000..0687656 --- /dev/null +++ b/stream_chat/tests/test_channel_batch_updater.py @@ -0,0 +1,300 @@ +import time +import uuid +from typing import Dict + +import pytest + +from stream_chat import StreamChat +from stream_chat.tests.conftest import hard_delete_users +from stream_chat.types.channel_batch import ChannelsBatchOptions + + +class TestChannelBatchUpdater: + def test_update_channels_batch_none_options(self, client: StreamChat): + """Test that update_channels_batch raises error when options is None""" + with pytest.raises(ValueError, match="options must not be None"): + client.update_channels_batch(None) + + def test_update_channels_batch_valid_options( + self, client: StreamChat, random_user: Dict + ): + """Test batch update channels with valid options""" + # Create two channels + ch1 = client.channel("messaging", str(uuid.uuid4())) + ch1.create(random_user["id"]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + ch2.create(random_user["id"]) + + try: + # Create a user to add + user_to_add = {"id": str(uuid.uuid4())} + client.upsert_user(user_to_add) + + # Perform batch update + options: ChannelsBatchOptions = { + "operation": "addMembers", + "filter": {"cids": {"$in": [ch1.cid, ch2.cid]}}, + "members": [{"user_id": user_to_add["id"]}], + } + + response = client.update_channels_batch(options) + assert "task_id" in response + assert len(response["task_id"]) > 0 + + # Cleanup + hard_delete_users(client, [user_to_add["id"]]) + finally: + try: + client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + def test_channel_batch_updater_add_members( + self, client: StreamChat, random_user: Dict + ): + """Test ChannelBatchUpdater.add_members""" + # Create two channels + ch1 = client.channel("messaging", str(uuid.uuid4())) + ch1.create(random_user["id"]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + ch2.create(random_user["id"]) + + try: + # Create users to add + users_to_add = [{"id": str(uuid.uuid4())}, {"id": str(uuid.uuid4())}] + client.upsert_users(users_to_add) + + updater = client.channel_batch_updater() + + members = [{"user_id": user["id"]} for user in users_to_add] + + response = updater.add_members( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, members + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + time.sleep(2) + + # Poll for task completion + for _ in range(120): + task = client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + time.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify members were added to both channels + for _ in range(120): + time.sleep(1) + ch1_members = ch1.query_members({}) + ch2_members = ch2.query_members({}) + + ch1_member_ids = [m["user_id"] for m in ch1_members] + ch2_member_ids = [m["user_id"] for m in ch2_members] + all_found_ch1 = all( + user_id in ch1_member_ids + for user_id in [u["id"] for u in users_to_add] + ) + all_found_ch2 = all( + user_id in ch2_member_ids + for user_id in [u["id"] for u in users_to_add] + ) + if all_found_ch1 and all_found_ch2: + return + + pytest.fail("changes not visible after 2 minutes") + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + time.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + time.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + time.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + hard_delete_users(client, [u["id"] for u in users_to_add]) + client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + def test_channel_batch_updater_remove_members( + self, client: StreamChat, random_user: Dict + ): + """Test ChannelBatchUpdater.remove_members""" + # Create users + members_id = [str(uuid.uuid4()), str(uuid.uuid4())] + users = [{"id": uid} for uid in members_id] + client.upsert_users(users) + + # Create channels with members + ch1 = client.channel("messaging", str(uuid.uuid4())) + ch1.create(random_user["id"]) + for member_id in members_id: + ch1.add_members([member_id]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + ch2.create(random_user["id"]) + for member_id in members_id: + ch2.add_members([member_id]) + + try: + # Verify members are present + ch1_members = ch1.query_members({}) + ch2_members = ch2.query_members({}) + assert len(ch1_members) >= 2 + assert len(ch2_members) >= 2 + + ch1_member_ids = [m["user_id"] for m in ch1_members] + ch2_member_ids = [m["user_id"] for m in ch2_members] + assert all(mid in ch1_member_ids for mid in members_id) + assert all(mid in ch2_member_ids for mid in members_id) + + # Remove a member + updater = client.channel_batch_updater() + member_to_remove = members_id[0] + + response = updater.remove_members( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, + [{"user_id": member_to_remove}], + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + time.sleep(2) + + # Poll for task completion + for _ in range(120): + task = client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + time.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify member was removed + for _ in range(120): + time.sleep(1) + ch1_members = ch1.query_members({}) + + ch1_member_ids = [m["user_id"] for m in ch1_members] + if member_to_remove not in ch1_member_ids: + return + + pytest.fail( + f"changes not visible after 2 minutes. Channel 1 still has members: {ch1_member_ids}" + ) + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + time.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + time.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + time.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + hard_delete_users(client, members_id) + client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass + + def test_channel_batch_updater_archive(self, client: StreamChat, random_user: Dict): + """Test ChannelBatchUpdater.archive""" + # Create users + members_id = [str(uuid.uuid4()), str(uuid.uuid4())] + users = [{"id": uid} for uid in members_id] + client.upsert_users(users) + + # Create channels with members + ch1 = client.channel("messaging", str(uuid.uuid4())) + ch1.create(random_user["id"]) + for member_id in members_id: + ch1.add_members([member_id]) + + ch2 = client.channel("messaging", str(uuid.uuid4())) + ch2.create(random_user["id"]) + for member_id in members_id: + ch2.add_members([member_id]) + + try: + updater = client.channel_batch_updater() + + response = updater.archive( + {"cids": {"$in": [ch1.cid, ch2.cid]}}, + [{"user_id": members_id[0]}], + ) + assert "task_id" in response + task_id = response["task_id"] + + # Wait for task to complete + time.sleep(2) + + # Poll for task completion + for _ in range(120): + task = client.get_task(task_id) + if task["status"] in ["waiting", "pending", "running"]: + time.sleep(1) + continue + + if task["status"] == "completed": + # Wait a bit and verify channel was archived + for _ in range(120): + time.sleep(1) + ch1_state = ch1.query() + + # Check archived_at in channel state for the user + if "channel" in ch1_state and "members" in ch1_state["channel"]: + for m in ch1_state["channel"]["members"]: + if m["user_id"] == members_id[0]: + if m.get("archived_at") is not None: + return + break + # Also check top-level members if present + if "members" in ch1_state: + for m in ch1_state["members"]: + if m["user_id"] == members_id[0]: + if m.get("archived_at") is not None: + return + break + + pytest.fail("changes not visible after 2 minutes") + + if task["status"] == "failed": + if len(task.get("result", {})) == 0: + time.sleep(2) + continue + desc = task.get("result", {}).get("description", "") + if "rate limit" in desc.lower(): + time.sleep(2) + continue + pytest.fail(f"task failed with result: {task.get('result')}") + + time.sleep(1) + + pytest.fail("task did not complete in 2 minutes") + + finally: + try: + hard_delete_users(client, members_id) + client.delete_channels([ch1.cid, ch2.cid], hard_delete=True) + except Exception: + pass diff --git a/stream_chat/types/channel_batch.py b/stream_chat/types/channel_batch.py new file mode 100644 index 0000000..a3db484 --- /dev/null +++ b/stream_chat/types/channel_batch.py @@ -0,0 +1,93 @@ +import sys +from typing import Any, Dict, List, Optional + +if sys.version_info >= (3, 8): + from typing import Literal, TypedDict +else: + from typing_extensions import Literal, TypedDict + +ChannelBatchOperation = Literal[ + "addMembers", + "removeMembers", + "inviteMembers", + "assignRoles", + "addModerators", + "demoteModerators", + "hide", + "show", + "archive", + "unarchive", + "updateData", + "addFilterTags", + "removeFilterTags", +] + + +class ChannelBatchMemberRequest(TypedDict, total=False): + """ + Represents a member in batch operations. + + Parameters: + user_id: The ID of the user. + channel_role: The role of the user in the channel (optional). + """ + + user_id: str + channel_role: Optional[str] + + +class ChannelDataUpdate(TypedDict, total=False): + """ + Represents data that can be updated on channels in batch. + + Parameters: + frozen: Whether the channel is frozen. + disabled: Whether the channel is disabled. + custom: Custom data for the channel. + team: The team ID for the channel. + config_overrides: Configuration overrides for the channel. + auto_translation_enabled: Whether auto-translation is enabled. + auto_translation_language: The language for auto-translation. + """ + + frozen: Optional[bool] + disabled: Optional[bool] + custom: Optional[Dict[str, Any]] + team: Optional[str] + config_overrides: Optional[Dict[str, Any]] + auto_translation_enabled: Optional[bool] + auto_translation_language: Optional[str] + + +class ChannelsBatchFilters(TypedDict, total=False): + """ + Represents filters for batch channel updates. + + Parameters: + cids: Filter by channel CIDs (can be a dict with operators like $in). + types: Filter by channel types (can be a dict with operators like $in). + filter_tags: Filter by filter tags (can be a dict with operators like $in). + """ + + cids: Optional[Any] + types: Optional[Any] + filter_tags: Optional[Any] + + +class ChannelsBatchOptions(TypedDict, total=False): + """ + Represents options for batch channel updates. + + Parameters: + operation: The batch operation to perform (required). + filter: The filter to match channels (required). + members: List of members for member-related operations (optional). + data: Channel data updates for updateData operation (optional). + filter_tags_update: List of filter tags for filter tag operations (optional). + """ + + operation: ChannelBatchOperation + filter: ChannelsBatchFilters + members: Optional[List[ChannelBatchMemberRequest]] + data: Optional[ChannelDataUpdate] + filter_tags_update: Optional[List[str]]