Python Threads Querying CouchDB

With a list of 500,000 CouchDB endpoints to hit for info I wanted to speed up the process. There are multiple ways to do that but queues and threads worked fine for me.

import queue
import threading
from cloudant.client import Cloudant

# Replace the placeholders with your actual CouchDB credentials
USERNAME = 'joeschmoe'
PASSWORD = 'opensesame'
HOST = 'someendpoint.somedomain.tld'
PORT = 5984

# Connect to the CouchDB server
client = Cloudant(USERNAME, PASSWORD, url=f'http://{HOST}', connect=True, port=PORT)


def build_queue_items():
    items = []
    with open("input.txt") as file:
        for item in file:
            items.append(item.strip())
    return items


def get_db_docs(queue_name):
  with open("output.txt", "a") as file:
    while True:
        try:
            # Get a name from the queue
            db_name = queue_name.get(block=False)
        except queue.Empty:
            # If the queue is empty, break out of the loop
            break
        else:
            db = client[db_name]
            for doc in db:
                doc_id = doc['_id']
                if doc_id.startswith('query_datasource'):
                    try:
                        path = str(doc['fields']['path'])
                        file.write(db_name + ' ' + doc_id + ' ' + path + '\n')
                    except KeyError:
                        file.write(db_name + " " + doc_id + "path not found in datasource query\n")
                        pass


def main():
    queue_items = build_queue_items()
    q = queue.Queue()
    for item in queue_items:
        q.put(item)
    
    # Create a list of 200 worker threads
    parameters = {'queue_name': q}
    threads = [threading.Thread(target=get_db_docs, kwargs=parameters) for _ in range(200)]

    # Start the threads
    for thread in threads:
        thread.start()

    # Wait for all the threads to finish
    for thread in threads:
        thread.join()
    
    print("Threads Finished. Script done.")


if __name__ == '__main__':
    main()

The script reads in a list from the file input.txt , then uses that to retrieve a list of documents from an api endpoint. For the docs we are interested in, it prints out the doc name and the path we want to analyse.

Using 200 threads made this process much quicker.

Leave a Reply

  • (will not be published)

XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>