With a list of 500,000 CouchDB endpoints to hit for info I wanted to speed up the process. There are multiple ways to do that but queues and threads worked fine for me.
import queue
import threading
from cloudant.client import Cloudant
# Replace the placeholders with your actual CouchDB credentials
USERNAME = 'joeschmoe'
PASSWORD = 'opensesame'
HOST = 'someendpoint.somedomain.tld'
PORT = 5984
# Connect to the CouchDB server
client = Cloudant(USERNAME, PASSWORD, url=f'http://{HOST}', connect=True, port=PORT)
def build_queue_items():
items = []
with open("input.txt") as file:
for item in file:
items.append(item.strip())
return items
def get_db_docs(queue_name):
with open("output.txt", "a") as file:
while True:
try:
# Get a name from the queue
db_name = queue_name.get(block=False)
except queue.Empty:
# If the queue is empty, break out of the loop
break
else:
db = client[db_name]
for doc in db:
doc_id = doc['_id']
if doc_id.startswith('query_datasource'):
try:
path = str(doc['fields']['path'])
file.write(db_name + ' ' + doc_id + ' ' + path + '\n')
except KeyError:
file.write(db_name + " " + doc_id + "path not found in datasource query\n")
pass
def main():
queue_items = build_queue_items()
q = queue.Queue()
for item in queue_items:
q.put(item)
# Create a list of 200 worker threads
parameters = {'queue_name': q}
threads = [threading.Thread(target=get_db_docs, kwargs=parameters) for _ in range(200)]
# Start the threads
for thread in threads:
thread.start()
# Wait for all the threads to finish
for thread in threads:
thread.join()
print("Threads Finished. Script done.")
if __name__ == '__main__':
main()
The script reads in a list from the file input.txt , then uses that to retrieve a list of documents from an api endpoint. For the docs we are interested in, it prints out the doc name and the path we want to analyse.
Using 200 threads made this process much quicker.