September 28, 2020

Data Mode and Integration with the Clarifai API

Table of Contents:

With Clarifai's new Data Mode in Clarifai Portal, we've made it easy for anyone to load their data into the Clarifai platform so that they can label, train, search and predict. Data Mode is available in Portal and comes with a ton of useful features for uploading images, videos and text.

Data Mode is the best place to start for most users, but what happens when you are working with extremely large datasets (ones where you might not want to have to keep a bowser window open), or you want to integrate data loading directly within your application pipeline? This is where the Clarifai API comes in. Let's take a look at Data Mode, and then take a look at how you can build your own data ingestion pipeline with the Clarifai API.

Introducing Data Mode

Data mode makes data ingestion a snap with features that will be familiar to anyone who has ever uploaded an image, video, or text to the web. You can upload images from a URL if the images are already hosted online, or directly from your computer. To get started with Data Mode visit Portal, and click the data tab on the lefthand side of the screen.

data_mode

Upload CSV and TSV files in Data Mode

CSV (comma separated values) and/or TSV (table separated values) files can be a convenient way to work with multiple inputs, since these files can be easily managed by your favorite spreadsheet editor. The following API example parses .csv files, and Portal supports CSV and TSV file uploads as well. Please see our documentation for more details on how to set up your CSV and TSV files for ingestion via Data Mode.

 

Integration with the Clarifai API

When you are working with very large amounts of data, or want to integrate data ingestion directly within your application, the Clarifai API is here to help.

Let's take a look at how data ingestion might be configured using our Python gRPC client when uploading images. In this blog post we will take a look at a script developed by one of our own applied machine learning engineers. This script is designed to upload images, their associated concepts and metadata. The details of your implementation may be different of course, but we hope that this script can help address common issues and help to jumpstart your own integration. 

Dependencies

Lets begin with the imports that we are using in this example. Here you will see argparse is being used so that we can pass arguments through the command line, json is used to decode your optional metadata blob, pandas is being used to help us load and manage our .csv file as a DataFrame for convenient batching, Stuct is used to translate dictionaries into a format readable by Google protobufs, and ThreadPoolExecutor helps us handle multithreading. tqdm provides us with an optional status bar, so that we can keep track of how our data uploads are going. The rest is standard Clarifai initialization code that you can learn more about here

import argparse
import json
import pandas as pd
from tqdm import tqdm
from google.protobuf.struct_pb2 import Struct
from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import service_pb2_grpc, service_pb2, resources_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2
from concurrent.futures import ThreadPoolExecutor, as_completed
view raw imports.py hosted with ❤ by GitHub

Functions

Instead of iterating through the DataFrame generated by our .csv file line-by-line, we are going to break it up into batches or "chunks". This is where the chunker function helps us out.

def chunker(seq, size):
"""
:param seq: any sequence structure
:param size: the size of the chunk to return when looping through the generator
:return: a generator which produces smaller chunks of the seq based on the size parameter
"""
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
view raw chunker.py hosted with ❤ by GitHub

Next up, lets begin to parse our .csv file. We begin by loading the .csv file as a DataFrame, replacing any empty values with empty strings (otherwise the DataFrame would treat these values as NANs, a less convenient option in our case), and then pulling up a list of column names from the .csv file.

We then check to see if a given column names exists, and separate any values by vertical "pipe" or "bar" and then turn these items into a list. If values are detected in the metadata column, these values will be loaded as a Python dictionary, using json.loads.

def initial_csv_wrangling(csv_file):
"""
Takes in a formatted csv, parses concepts into lists, metadata into a dict, then returns a dataframe.
:param csv_file: input csv with a required "url" column and optional "pos_concepts", "neg_concepts", and "metadata"
:return: a Pandas DataFrame
"""
df = pd.read_csv(csv_file)
df = df.fillna('')
columns = list(df.columns)
# check that "url" column exists (required)
if 'url' not in columns:
raise Exception('Input csv file requires a "url" column, which does not seem to exist. Exiting.')
# check if "pos_concepts" column exists and parse accordingly (not required)
if 'pos_concepts' in columns:
print('Found "pos_concepts" column. Values will be split by pipe/vertical bar "|" into a python list.')
df['pos_concepts'] = df['pos_concepts'].map(lambda x: list(set(x.split('|'))))
# check if "neg_concepts" column exists and parse accordingly (not required)
if "neg_concepts" in columns:
print('Found "neg_concepts" column. Values will be split by pipe/vertical bar "|" into a python list.')
df['neg_concepts'] = df['neg_concepts'].map(lambda x: list(set(x.split('|'))))
# check if "metadata" column exists and load accordingly (not required)
if "metadata" in columns:
print('Found "metadata" column. Attempting to ingest.')
try:
df['metadata'] = df['metadata'].replace('','{}').map(json.loads)
except:
raise Exception('Value in "metadata" column does not seem to be a properly JSON formatted str.')
return df

Now we "process and upload" our chunk, iterating through our batches and processing the individual lines of our .csv file. We create an empty inputs list and go through each line in the list and convert it into an input proto - this is the format we need to create inputs to send into our API. Each individual row is passed through the process_one_line function, and converted into their respective input proto. Note that the value of "1" is used to denote positive concepts and the value of "0" is used to denote negative concepts.

The input_proto defines the input itself and passes in the URL of the image in question. Finally we make our request call to the API and pass in the list of input protos that we have created. Our authentication metadata is required here. Finally we return a response.status.code so that we can know if our request has been successful. 

def process_and_upload_chunk(stub, api_key, chunk, allow_duplicate_url):
"""
:param stub: the grpc client stub
:param api_key: the API key for the app to upload inputs into
:param chunk: a subset of the dataframe created from the csv file
:param allow_duplicate_url: boolean - True/False
:return:
"""
def process_one_line(df_row, allow_duplicate_url):
"""
:param df_row: an individual dataframe row
:param allow_duplicate_url: boolean - True/False
:return: an Input proto
"""
concepts = []
metadata = Struct()
if 'metadata' in list(df_row.keys()):
metadata.update(df_row['metadata'])
# parse pos_concepts
if 'pos_concepts' in list(df_row.keys()):
for concept in df_row['pos_concepts']:
if concept != '':
concept_proto = resources_pb2.Concept(
id=concept,
name=concept,
value=1)
concepts.append(concept_proto)
# parse neg_concepts
if 'neg_concepts' in list(df_row.keys()):
for concept in df_row['neg_concepts']:
if concept != '':
concept_proto = resources_pb2.Concept(
id=concept,
name=concept,
value=0)
concepts.append(concept_proto)
# create Input proto using the url + any concepts and metadata
input_proto = resources_pb2.Input(
id = str(df_row['input_id']),
data=resources_pb2.Data(
image=resources_pb2.Image(
url=df_row['url'],
allow_duplicate_url=allow_duplicate_url
),
concepts=concepts,
metadata=metadata))
return input_proto
inputs = []
# iterate through lines and convert into Input protos
for i, each in chunk.iterrows():
single_input = process_one_line(df_row=each, allow_duplicate_url=allow_duplicate_url)
inputs.append(single_input)
# build PostInputsRequest
request = service_pb2.PostInputsRequest(inputs=inputs)
auth_metadata = (('authorization', f'Key {api_key}'),)
# upload the batch of input protos using the PostInputs call
response = stub.PostInputs(request, metadata=auth_metadata)
return response.status.code

The main function starts by setting up the various arguments that we want to be able to pass in with argparse. Next we construct the stub that will allow us call the API endpoints that we will be using. We then read in our .csv file as a DataFrame through our initial_csv_wrangling function.

Finally we create an empty list called threads, we insert our optional tqdm function so that we can see a progress bar as our job completes, and then we create a thread to iterate trough our "chunks" in batch sizes of 32. We then read in the response from our PostInputsRequest call, add one tick to our progress bar, and capture the main error cases that we want to be looking out for. 

def main():
# the parser lines below are used to take in user arguments through the command line
parser = argparse.ArgumentParser(
description='Given an API key and a properly formatted csv file, upload image urls to an application.')
parser.add_argument(
'--api_key',
required=True,
help='An application\'s API key with PostInput scopes',
type=str)
parser.add_argument(
'--csv_file',
required=True,
help='Full pathname to csv file with a "url", "pos_concepts", "neg_concepts", and "metadata" header columns',
type=str)
parser.add_argument(
'--batch_size',
default=32,
help='The size of the batches to process and upload at once. Batch size 32 is recommended. \
This can be scaled up to a max of 128, although that will not necessarily make the uploads go quicker.',
type=int)
parser.add_argument(
'--allow_duplicate_url',
default=True,
help='If True, any duplicate urls found will be uploaded as a separate input. \
If False, only the first encountered url (and any additional concepts/metadata) will be uploaded.',
type=bool
)
args = parser.parse_args()
# construct a client stub using the grpc_channel
channel = ClarifaiChannel.get_json_channel()
stub = service_pb2_grpc.V2Stub(channel)
# read in csv file as a pandas dataframe and do some initial wrangling
# specifically: checks that a "url" column exists and splits the pipe-separated list of concepts into python lists
dataframe = initial_csv_wrangling(args.csv_file)
# iterate through the dataframe in chunks, process the items, and upload them in batches
# multi-threaded version
threads = []
expected_batch_nums = len(dataframe)//args.batch_size + 1
with ThreadPoolExecutor(max_workers=10) as executor:
for chunk in chunker(dataframe, args.batch_size):
threads.append(executor.submit(process_and_upload_chunk, stub, args.api_key, chunk, args.allow_duplicate_url))
for task in tqdm(as_completed(threads), total=expected_batch_nums):
if task.result() == status_code_pb2.SUCCESS:
continue
elif task.result() == status_code_pb2.CONN_INSUFFICIENT_SCOPES:
raise Exception('The API key provided does not have enough scopes to post inputs/annotations/concepts.')
elif task.result() == status_code_pb2.CONN_KEY_INVALID or task.result() == status_code_pb2.CONN_KEY_NOT_FOUND:
raise Exception('The API key provided is either invalid or does not exist.')
else:
# generic catch all.
print(f'Received status code: {task.result()}. Attempting to continue. \
Visit https://docs.clarifai.com/api-guide/api-overview/status-codes to learn more.')
continue
view raw main.py hosted with ❤ by GitHub

 

Please visit this GitHub repository to view the full code for this and other helper scripts.