With Clarifai's new Data Mode in Clarifai Portal, we've made it easy for anyone to load their data into the Clarifai platform so that they can label, train, search and predict. Data Mode is available in Portal and comes with a ton of useful features for uploading images, videos and text.
Data Mode is the best place to start for most users, but what happens when you are working with extremely large datasets (ones where you might not want to have to keep a bowser window open), or you want to integrate data loading directly within your application pipeline? This is where the Clarifai API comes in. Let's take a look at Data Mode, and then take a look at how you can build your own data ingestion pipeline with the Clarifai API.
Introducing Data Mode
Data mode makes data ingestion a snap with features that will be familiar to anyone who has ever uploaded an image, video, or text to the web. You can upload images from a URL if the images are already hosted online, or directly from your computer. To get started with Data Mode visit Portal, and click the data tab on the lefthand side of the screen.
Upload CSV and TSV files in Data Mode
CSV (comma separated values) and/or TSV (table separated values) files can be a convenient way to work with multiple inputs, since these files can be easily managed by your favorite spreadsheet editor. The following API example parses .csv
files, and Portal supports CSV and TSV file uploads as well. Please see our documentation for more details on how to set up your CSV and TSV files for ingestion via Data Mode.
Integration with the Clarifai API
When you are working with very large amounts of data, or want to integrate data ingestion directly within your application, the Clarifai API is here to help.
Let's take a look at how data ingestion might be configured using our Python gRPC client when uploading images. In this blog post we will take a look at a script developed by one of our own applied machine learning engineers. This script is designed to upload images, their associated concepts and metadata. The details of your implementation may be different of course, but we hope that this script can help address common issues and help to jumpstart your own integration.
Dependencies
Lets begin with the imports that we are using in this example. Here you will see argparse
is being used so that we can pass arguments through the command line, json
is used to decode your optional metadata blob, pandas
is being used to help us load and manage our .csv
file as a DataFrame for convenient batching, Stuct
is used to translate dictionaries into a format readable by Google protobufs, and ThreadPoolExecutor
helps us handle multithreading. tqdm
provides us with an optional status bar, so that we can keep track of how our data uploads are going. The rest is standard Clarifai initialization code that you can learn more about here.
import argparse | |
import json | |
import pandas as pd | |
from tqdm import tqdm | |
from google.protobuf.struct_pb2 import Struct | |
from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel | |
from clarifai_grpc.grpc.api import service_pb2_grpc, service_pb2, resources_pb2 | |
from clarifai_grpc.grpc.api.status import status_code_pb2 | |
from concurrent.futures import ThreadPoolExecutor, as_completed |
Functions
Instead of iterating through the DataFrame generated by our .csv
file line-by-line, we are going to break it up into batches or "chunks". This is where the chunker
function helps us out.
def chunker(seq, size): | |
""" | |
:param seq: any sequence structure | |
:param size: the size of the chunk to return when looping through the generator | |
:return: a generator which produces smaller chunks of the seq based on the size parameter | |
""" | |
return (seq[pos:pos + size] for pos in range(0, len(seq), size)) |
Next up, lets begin to parse our .csv
file. We begin by loading the .csv
file as a DataFrame, replacing any empty values with empty strings (otherwise the DataFrame would treat these values as NANs
, a less convenient option in our case), and then pulling up a list of column names from the .csv
file.
We then check to see if a given column names exists, and separate any values by vertical "pipe" or "bar" and then turn these items into a list. If values are detected in the metadata column, these values will be loaded as a Python dictionary, using json.loads
.
def initial_csv_wrangling(csv_file): | |
""" | |
Takes in a formatted csv, parses concepts into lists, metadata into a dict, then returns a dataframe. | |
:param csv_file: input csv with a required "url" column and optional "pos_concepts", "neg_concepts", and "metadata" | |
:return: a Pandas DataFrame | |
""" | |
df = pd.read_csv(csv_file) | |
df = df.fillna('') | |
columns = list(df.columns) | |
# check that "url" column exists (required) | |
if 'url' not in columns: | |
raise Exception('Input csv file requires a "url" column, which does not seem to exist. Exiting.') | |
# check if "pos_concepts" column exists and parse accordingly (not required) | |
if 'pos_concepts' in columns: | |
print('Found "pos_concepts" column. Values will be split by pipe/vertical bar "|" into a python list.') | |
df['pos_concepts'] = df['pos_concepts'].map(lambda x: list(set(x.split('|')))) | |
# check if "neg_concepts" column exists and parse accordingly (not required) | |
if "neg_concepts" in columns: | |
print('Found "neg_concepts" column. Values will be split by pipe/vertical bar "|" into a python list.') | |
df['neg_concepts'] = df['neg_concepts'].map(lambda x: list(set(x.split('|')))) | |
# check if "metadata" column exists and load accordingly (not required) | |
if "metadata" in columns: | |
print('Found "metadata" column. Attempting to ingest.') | |
try: | |
df['metadata'] = df['metadata'].replace('','{}').map(json.loads) | |
except: | |
raise Exception('Value in "metadata" column does not seem to be a properly JSON formatted str.') | |
return df |
Now we "process and upload" our chunk, iterating through our batches and processing the individual lines of our .csv
file. We create an empty inputs list and go through each line in the list and convert it into an input proto - this is the format we need to create inputs to send into our API. Each individual row is passed through the process_one_line
function, and converted into their respective input proto. Note that the value of "1" is used to denote positive concepts and the value of "0" is used to denote negative concepts.
The input_proto
defines the input itself and passes in the URL of the image in question. Finally we make our request call to the API and pass in the list of input protos that we have created. Our authentication metadata is required here. Finally we return a response.status.code
so that we can know if our request has been successful.
def process_and_upload_chunk(stub, api_key, chunk, allow_duplicate_url): | |
""" | |
:param stub: the grpc client stub | |
:param api_key: the API key for the app to upload inputs into | |
:param chunk: a subset of the dataframe created from the csv file | |
:param allow_duplicate_url: boolean - True/False | |
:return: | |
""" | |
def process_one_line(df_row, allow_duplicate_url): | |
""" | |
:param df_row: an individual dataframe row | |
:param allow_duplicate_url: boolean - True/False | |
:return: an Input proto | |
""" | |
concepts = [] | |
metadata = Struct() | |
if 'metadata' in list(df_row.keys()): | |
metadata.update(df_row['metadata']) | |
# parse pos_concepts | |
if 'pos_concepts' in list(df_row.keys()): | |
for concept in df_row['pos_concepts']: | |
if concept != '': | |
concept_proto = resources_pb2.Concept( | |
id=concept, | |
name=concept, | |
value=1) | |
concepts.append(concept_proto) | |
# parse neg_concepts | |
if 'neg_concepts' in list(df_row.keys()): | |
for concept in df_row['neg_concepts']: | |
if concept != '': | |
concept_proto = resources_pb2.Concept( | |
id=concept, | |
name=concept, | |
value=0) | |
concepts.append(concept_proto) | |
# create Input proto using the url + any concepts and metadata | |
input_proto = resources_pb2.Input( | |
id = str(df_row['input_id']), | |
data=resources_pb2.Data( | |
image=resources_pb2.Image( | |
url=df_row['url'], | |
allow_duplicate_url=allow_duplicate_url | |
), | |
concepts=concepts, | |
metadata=metadata)) | |
return input_proto | |
inputs = [] | |
# iterate through lines and convert into Input protos | |
for i, each in chunk.iterrows(): | |
single_input = process_one_line(df_row=each, allow_duplicate_url=allow_duplicate_url) | |
inputs.append(single_input) | |
# build PostInputsRequest | |
request = service_pb2.PostInputsRequest(inputs=inputs) | |
auth_metadata = (('authorization', f'Key {api_key}'),) | |
# upload the batch of input protos using the PostInputs call | |
response = stub.PostInputs(request, metadata=auth_metadata) | |
return response.status.code |
The main
function starts by setting up the various arguments that we want to be able to pass in with argparse
. Next we construct the stub
that will allow us call the API endpoints that we will be using. We then read in our .csv
file as a DataFrame through our initial_csv_wrangling
function.
Finally we create an empty list called threads
, we insert our optional tqdm
function so that we can see a progress bar as our job completes, and then we create a thread to iterate trough our "chunks" in batch sizes of 32. We then read in the response from our PostInputsRequest
call, add one tick to our progress bar, and capture the main error cases that we want to be looking out for.
def main(): | |
# the parser lines below are used to take in user arguments through the command line | |
parser = argparse.ArgumentParser( | |
description='Given an API key and a properly formatted csv file, upload image urls to an application.') | |
parser.add_argument( | |
'--api_key', | |
required=True, | |
help='An application\'s API key with PostInput scopes', | |
type=str) | |
parser.add_argument( | |
'--csv_file', | |
required=True, | |
help='Full pathname to csv file with a "url", "pos_concepts", "neg_concepts", and "metadata" header columns', | |
type=str) | |
parser.add_argument( | |
'--batch_size', | |
default=32, | |
help='The size of the batches to process and upload at once. Batch size 32 is recommended. \ | |
This can be scaled up to a max of 128, although that will not necessarily make the uploads go quicker.', | |
type=int) | |
parser.add_argument( | |
'--allow_duplicate_url', | |
default=True, | |
help='If True, any duplicate urls found will be uploaded as a separate input. \ | |
If False, only the first encountered url (and any additional concepts/metadata) will be uploaded.', | |
type=bool | |
) | |
args = parser.parse_args() | |
# construct a client stub using the grpc_channel | |
channel = ClarifaiChannel.get_json_channel() | |
stub = service_pb2_grpc.V2Stub(channel) | |
# read in csv file as a pandas dataframe and do some initial wrangling | |
# specifically: checks that a "url" column exists and splits the pipe-separated list of concepts into python lists | |
dataframe = initial_csv_wrangling(args.csv_file) | |
# iterate through the dataframe in chunks, process the items, and upload them in batches | |
# multi-threaded version | |
threads = [] | |
expected_batch_nums = len(dataframe)//args.batch_size + 1 | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
for chunk in chunker(dataframe, args.batch_size): | |
threads.append(executor.submit(process_and_upload_chunk, stub, args.api_key, chunk, args.allow_duplicate_url)) | |
for task in tqdm(as_completed(threads), total=expected_batch_nums): | |
if task.result() == status_code_pb2.SUCCESS: | |
continue | |
elif task.result() == status_code_pb2.CONN_INSUFFICIENT_SCOPES: | |
raise Exception('The API key provided does not have enough scopes to post inputs/annotations/concepts.') | |
elif task.result() == status_code_pb2.CONN_KEY_INVALID or task.result() == status_code_pb2.CONN_KEY_NOT_FOUND: | |
raise Exception('The API key provided is either invalid or does not exist.') | |
else: | |
# generic catch all. | |
print(f'Received status code: {task.result()}. Attempting to continue. \ | |
Visit https://docs.clarifai.com/api-guide/api-overview/status-codes to learn more.') | |
continue |
Please visit this GitHub repository to view the full code for this and other helper scripts.