September 20, 2021

How to Integrate Clarifai AI and Snowflake

Table of Contents:

Call Clarifai AI Models Using External Functions

This tutorial shows you how to integrate Clarifai with Snowflake. Clarifai can automatically recognize, classify and label unstructured data streams in the form of images, video, text, and audio. You can now link Clarifai's predictive capabilities with recently released unstructured data support in Snowflake. Bringing these two platforms together allows you to analyze unstructured data and leverage powerful tools for managing data pipelines.

 

 

Clarifai provides the most complete, accurate, and user-friendly platform for working with and understanding unstructured data. In essence, this means that by using the most advanced tools in deep learning AI, Clarifai helps you transform unstructured data (the type of data that you typically see in the form of image, video, text, and audio data), into structured data - data that is organized in a way that is useful and meaningful to you.

 

But what can you do with your data once it is structured in the way you want? What if you want to expand your capabilities with a powerful suite of tools that can help you accelerate data analytics, improve data access, and construct data pipelines?

 

This is where integrating Clarifai with a platform like Snowflake can make so much sense. Combining the functionality of these two platforms can provide unprecedented insights and control over your data pipeline. We will be setting up this integration to trigger a call to the Clarifai API, and then format the response in a way that can be used within the Snowflake platform.

 

Configure AWS and Deploy Lambda Function

You will need to set up a call to an external function through AWS Lambda that will call the Clarifai API and then map it into Snowflake. AWS Lambda functions let you run code for applications and backend services. Snowflake offers in-depth instructions on how to do this here. In our case you will create layers over AWS Lambda, please reference this documentation for more information. To integrate with Clarifai you will need the following code and corresponding function.

import json
from functools import lru_cache
from logging import INFO, error, getLogger, info
from typing import Sequence, Any, Dict, Iterable, Iterator, NamedTuple
from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel
from clarifai_grpc.grpc.api import resources_pb2, service_pb2, service_pb2_grpc
from clarifai_grpc.grpc.api.status import status_code_pb2
getLogger().setLevel(INFO)
_MAX_WORKFLOW_INPUT = 32
class Row(NamedTuple):
"""
Represent a list of arguments of the external function.
The first argument is always a row number.
It should present both in request and response.
"""
row_number: int
input_type: str
url: str
workflow_name: str
app_id: str
pat: str
class EasyException(Exception):
pass
def lambda_handler(event, context):
try:
info(f"Start event {event}")
# We need to return list of rows together with its row id.
# For that purpose we create a map from (url, workflow_id) to row_id map.
input_map = {row: row.row_number for row in get_rows(event)}
rows = tuple(input_map)
if not rows:
return make_success_response(context, [])
info(f"Inputs are: {rows}")
outputs = predict(*rows)
snowflake_rows = list(match_input_with_row_number(rows, outputs, input_map))
return make_success_response(context, snowflake_rows)
except Exception as err:
error(err, exc_info=True)
if isinstance(err, EasyException):
error_message = str(err)
else:
error_message = "An unexpected exception occurred"
return make_error_response(context, error_message)
def match_input_with_row_number(inputs, outputs, row_id_map):
"""
Match Clarifai output with the row number.
"""
for key, prediction in zip(inputs, outputs):
response_as_variant = make_prediction_response(prediction)
yield [row_id_map[key], response_as_variant]
def get_rows(event) -> Iterator[Row]:
"""
Each row is an array, where the first argument is a row number and other are function arguments.
"""
event_body = event.get("body", None)
if not event_body:
raise EasyException("Body not set")
if not isinstance(event_body, str):
raise EasyException("Body is expected to be string")
payload = json.loads(event_body)
data = payload.get("data", [])
if not isinstance(data, list):
raise EasyException("Body data is expected to be a list")
for row in data:
yield Row(*row)
def make_chunks(sequence, size):
start = 0
while start < len(sequence):
yield sequence[start : start + size + 1]
start += size
def get_input_chunk(rows: Sequence[Row]):
first_row = rows[0]
workflow_id = first_row.workflow_name
app_id = first_row.app_id
pat = first_row.pat
meta = make_metadata(pat)
user_app_id = resources_pb2.UserAppIDSet(app_id=app_id)
for chunk in make_chunks(rows, _MAX_WORKFLOW_INPUT):
inputs = make_input(chunk)
yield inputs, meta, workflow_id, user_app_id
def predict(*data: Row, threshold=0.9) -> Sequence[Sequence[Dict[str, str]]]:
stub = make_connection()
# This version assumes that all URL uses the same workflow_id
image_concepts = []
for inputs, meta, workflow_id, auth in get_input_chunk(data):
post_workflow_results_response = stub.PostWorkflowResults(
service_pb2.PostWorkflowResultsRequest(
user_app_id=auth,
workflow_id=workflow_id,
inputs=inputs,
),
metadata=meta,
)
info("Got response status %s", post_workflow_results_response.status)
if post_workflow_results_response.status.code != status_code_pb2.SUCCESS:
error_message = (
f"Failed to get prediction: "
f" {post_workflow_results_response.status.description}"
f" {post_workflow_results_response.status.details}"
)
for result in post_workflow_results_response.results:
for output in result.outputs:
if (
output.status.code != status_code_pb2.SUCCESS
and output.status.details
):
error_message = output.status.details
break
raise EasyException(error_message)
for results in post_workflow_results_response.results:
outputs = results.outputs
concept_names = []
for output in outputs:
for region in output.data.regions:
concept_names.extend(
{
"text": region.data.text.raw,
"name": concept.name,
"value": f"{concept.value:.5f}",
}
for concept in region.data.concepts
if concept.value >= threshold
)
concept_names.extend(
{"name": concept.name, "value": f"{concept.value:.5f}"}
for concept in output.data.concepts
if concept.value >= threshold
)
image_concepts.append(sorted(concept_names, key=lambda item: item["name"]))
return image_concepts
@lru_cache(maxsize=1)
def make_connection():
channel = ClarifaiChannel.get_json_channel()
return service_pb2_grpc.V2Stub(channel)
def make_metadata(pat):
return (("authorization", f"Key {pat}"),)
def is_image(row: Row) -> bool:
return row.input_type.lower().startswith(("image", "img"))
def _make_input(row: Row) -> resources_pb2.Input:
if is_image(row):
data = resources_pb2.Data(image=resources_pb2.Image(url=row.url))
else:
data = resources_pb2.Data(text=resources_pb2.Text(url=row.url))
return resources_pb2.Input(data=data)
def make_input(rows: Iterable[Row]) -> Sequence[resources_pb2.Input]:
return [_make_input(row) for row in rows]
def make_success_response(context, data: Any):
body = {
"request_id": context.aws_request_id,
"data": data,
}
info("Response to return: %s", body)
return {
"statusCode": 200,
"body": json.dumps(body),
}
def make_error_response(context, error_message: str):
info("Error to return: %s", error_message)
return {
"statusCode": 400,
"body": json.dumps(
{
"request_id": context.aws_request_id,
"error": {"message": error_message},
},
),
}
def make_prediction_response(labels: Sequence) -> Dict[str, Any]:
"""
Create response for prediction.
For each row we return only single value, and this value could be a VARIANT or ARRAY.
Python equivalent for a VARIANT is a dict and for ARRAY is a list.
"""
return {"tags": labels}

 

 

Calling the Clarifai API

Once we have registered the external function, we can call upon various Clarifai models and pass arguments to them. In this example, we are calling on the Clarifai General Model which can identify over 11,000 concepts in images, and then we call the Clarifai Named Entity Recognition (ner_english) model, which can identify key information from parts of the text.

 

In order to use these models, you will need to create an application and then add the desired model to a workflow. In this example, we create two different workflows and then add just one model to them (general, and ner_english respectively). You can learn more about working with workflows here

The following code shows you how to create an external function, and includes a couple of example calls to Clarifai workflows.

 

 

USE DEMO_DB;
CREATE OR REPLACE EXTERNAL FUNCTION clarifai_predict(
type String,
url String,
workflowId String,
appId String,
personalAccessToken String
)
RETURNS ARRAY
api_integration = <api_integration_name>
as '<resource_invocation_url>';
// https://www.clarifai.com/models/image-recognition-ai
select clarifai_predict(
'image',
get_presigned_url(@stage, <image_path>),
'General', '{YOUR APP NAME}', '{YOUR PERSONAL ACCESS TOKEN (PAT)}'
);
// https://www.clarifai.com/models/named-entity-recognition
// Content: Alfredo likes to read books at the Toronto library after enjoying a nice beverage in Old Town Tallinn, Estonia.
select clarifai_predict(
'text',
get_presigned_url(@stage, <text_path>),
'ner-snowflake', '{YOUR APP NAME}', '{YOUR PERSONAL ACCESS TOKEN (PAT)}'
);

Create a place where we can put our files

Snowflake data pipelines automate many of the manual steps involved in transforming and optimizing continuous data loads. Typically, data is first loaded into a staging table used for interim storage and then transformed using a series of SQL statements before it is inserted into the destination reporting tables. The most efficient workflow for this process involves transforming only data that is new or modified.

 

Streams are typically used as part of this staging pipeline. A stream object records data manipulation language (DML) changes made to tables, including inserts, updates, and deletes, as well as metadata about each change so that actions can be taken using the changed data. Learn more about streams here.

 

In order to load these files to the directory table, you need to refresh it. When you do this the stream will automatically detect changes and update itself. Once this is configured, you can create a stream to get recent events.

 

USE DEMO_DB;
// Create stream on directory table
CREATE OR REPLACE STREAM documents_stream ON DIRECTORY TABLE clarifai_directory_table;
// Select new files from the stream: expect 0 entries
SELECT * FROM documents_stream;
// Put a file (run this command in snowsql tool)
// put 'file://~/Downloads/cats.jpg' @CLARIFAI_STAGE/image AUTO_COMPRESS=False;
// put 'file://~/Downloads/sample.txt' @CLARIFAI_STAGE/text/ AUTO_COMPRESS=False;
LIST @CLARIFAI_STAGE;
// Select new files from the stream: expect 0 entries
SELECT * FROM documents_stream;
// Refresh directory table
ALTER DIRECTORY TABLE clarifai_directory_table REFRESH;
// ALTER STAGE
// Select new files from the stream: expect 2 entry
SELECT * FROM documents_stream;
SELECT
RELATIVE_PATH,
GET_PRESIGNED_URL(@CLARIFAI_STAGE, RELATIVE_PATH) as URL,
clarifai_predict('image', GET_PRESIGNED_URL(@CLARIFAI_STAGE, RELATIVE_PATH), 'General', '{YOUR APP NAME}', '{YOUR PERSONAL ACCESS TOKEN (PAT)}') as PREDICTIONS
FROM documents_stream
WHERE metadata$action = 'INSERT';
view raw create-table hosted with ❤ by GitHub