Skip to content

Conversation

@chandra-siri
Copy link
Collaborator

fix: instance grpc client once per process in benchmarks

@product-auto-label product-auto-label bot added size: m Pull request size is medium. api: storage Issues related to the googleapis/python-storage API. labels Jan 26, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @chandra-siri, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refactors the microbenchmarking suite to optimize resource usage by ensuring that gRPC and Google Cloud Storage clients are initialized only once per worker process. This change prevents redundant client creation and destruction, which can improve the accuracy and efficiency of performance measurements, especially in multiprocessing contexts.

Highlights

  • Client Instantiation Optimization: gRPC and Google Cloud Storage clients are now instantiated only once per worker process in multiprocessing benchmarks, preventing redundant client creation and destruction.
  • Worker Initialization Function: A new _worker_init function has been introduced to handle the one-time setup of asyncio event loops and clients for each worker process when the multiprocessing pool starts.
  • Multiprocessing Pool Management: The multiprocessing.Pool is now initialized with the _worker_init function, ensuring proper client setup for each worker, and the pool is explicitly closed and joined after use.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the multi-process benchmarks to instantiate gRPC and JSON clients only once per worker process, which should improve benchmark performance and accuracy by removing client creation overhead from the measured execution time. The use of multiprocessing.Pool's initializer is a good approach for this.

My main feedback is regarding resource cleanup in the worker processes. The newly created clients and event loops are not being closed, which can lead to resource leaks. I've added a suggestion to use the atexit module to ensure graceful cleanup when worker processes terminate.

Overall, this is a good optimization for the benchmarks.

Comment on lines +308 to +318
def _worker_init(bucket_type):
"""Initializes a persistent event loop and client for each worker process."""
global worker_loop, worker_client, worker_json_client
if bucket_type == "zonal":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = loop.run_until_complete(create_client())
try:
# download_files_using_mrd_multi_coro returns max latency of coros
result = download_files_using_mrd_multi_coro(
loop, client, files_to_download, other_params, chunks
)
finally:
tasks = asyncio.all_tasks(loop=loop)
for task in tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
loop.close()
return result
worker_loop = asyncio.new_event_loop()
asyncio.set_event_loop(worker_loop)
worker_client = worker_loop.run_until_complete(create_client())
else: # regional
from google.cloud import storage

json_client = storage.Client()
worker_json_client = storage.Client()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The resources created in this initializer (worker_loop, worker_client, worker_json_client) are not being cleaned up. This can lead to resource leaks, such as open network connections.

While terminating worker processes will cause the OS to reclaim these resources, it's better to perform a graceful shutdown. You can use the atexit module to register cleanup functions that will be called when the worker process exits.

I suggest updating _worker_init to register cleanup functions. Please also add import atexit at the top of the file.

def _worker_init(bucket_type):
    """Initializes a persistent event loop and client for each worker process."""
    global worker_loop, worker_client, worker_json_client
    import atexit

    if bucket_type == "zonal":
        worker_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(worker_loop)
        worker_client = worker_loop.run_until_complete(create_client())

        def _cleanup_zonal():
            # Ensure resources are cleaned up when the worker process exits.
            if worker_client and worker_loop and not worker_loop.is_closed():
                try:
                    worker_loop.run_until_complete(worker_client.close())
                finally:
                    worker_loop.close()

        atexit.register(_cleanup_zonal)
    else:  # regional
        from google.cloud import storage

        worker_json_client = storage.Client()

        def _cleanup_regional():
            # Ensure resources are cleaned up when the worker process exits.
            if worker_json_client:
                worker_json_client.close()

        atexit.register(_cleanup_regional)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: storage Issues related to the googleapis/python-storage API. size: m Pull request size is medium.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant