Skip to content

Using BigQuery with Workers AI

Last reviewed: 3 months ago

The easiest way to get started with Workers AI is to try it out in the Multi-modal Playground and the LLM playground. If you decide that you want to integrate your code with Workers AI, you may then decide to then use its REST API endpoints or via a Worker binding.

But, what about the data? What if what you want these models to ingest data that is stored outside Cloudflare?

In this tutorial, you will learn how to bring data from Google BigQuery to a Cloudflare Worker so that it can be used as input for Workers AI models.

Prerequisites

You will need:

1. Set up your Cloudflare Worker

To ingest the data into Cloudflare and feed it into Workers AI, you will be using a Cloudflare Worker. If you have not created one yet, please feel free to review our tutorial on how to get started.

After following the steps to create a Worker, you should have the following code in your new Worker project:

export default {
async fetch(request, env, ctx) {
return new Response("Hello World!");
},
};

If the Worker project has successfully been created, you should also be able to run npx wrangler dev in a console to run the Worker locally:

Terminal window
[wrangler:inf] Ready on http://localhost:8787

Open a browser tab at http://localhost:8787/ to see your deployed Worker. Please note that the port 8787 may be a different one in your case.

You should be seeing Hello World! in your browser:

Terminal window
Hello World!

If you are running into any issues during this step, please make sure to review Worker's Get Started Guide.

2. Import GCP Service key into the Worker as Secrets

Now that you have verified that the Worker has been created successfully, you will need to reference the Google Cloud Platform service key created in the Prerequisites section of this tutorial.

Your downloaded key JSON file from Google Cloud Platform should have the following format:

{
"type": "service_account",
"project_id": "<your_project_id>",
"private_key_id": "<your_private_key_id>",
"private_key": "<your_private_key>",
"client_email": "<your_service_account_id>@<your_project_id>.iam.gserviceaccount.com",
"client_id": "<your_oauth2_client_id>",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/<your_service_account_id>%40<your_project_id>.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}

For this tutorial, you will only be needing the values of the following fields: client_email, private_key, private_key_id, and project_id.

Instead of storing this information in plain text in the Worker, you will use secrets to make sure its unencrypted content is only accessible via the Worker itself.

Import those three values from the JSON file into Secrets, starting with the field from the JSON key file called client_email, which we will now call BQ_CLIENT_EMAIL (you can use another variable name):

Terminal window
npx wrangler secret put BQ_CLIENT_EMAIL

You will be asked to enter a secret value, which will be the value of the field client_email in the JSON key file.

If the secret was uploaded successfully, the following message will be displayed:

Terminal window
Success! Uploaded secret BQ_CLIENT_EMAIL

Now import the secrets for the three remaining fields; private_key, private_key_id, and project_id as BQ_PRIVATE_KEY, BQ_PRIVATE_KEY_ID, and BQ_PROJECT_ID respectively:

Terminal window
npx wrangler secret put BQ_PRIVATE_KEY
Terminal window
npx wrangler secret put BQ_PRIVATE_KEY_ID
Terminal window
npx wrangler secret put BQ_PROJECT_ID

At this point, you have successfully imported three fields from the JSON key file downloaded from Google Cloud Platform into Cloudflare secrets to be used in a Worker.

Secrets are only made available to Workers once they are deployed. To make them available during development, create a .dev.vars file to locally store these credentials and reference them as environment variables.

Your dev.vars file should look like the following:

BQ_CLIENT_EMAIL="<your_service_account_id>@<your_project_id>.iam.gserviceaccount.com"
BQ_CLIENT_KEY="-----BEGIN PRIVATE KEY-----<content_of_your_private_key>-----END PRIVATE KEY-----\n"
BQ_PRIVATE_KEY_ID="<your_private_key_id>"
BQ_PROJECT_ID="<your_project_id>"

Make sure to include .dev.vars to your .gitignore file in your project to prevent getting your credentials uploaded to a repository if you are using a version control system.

Check that secrets are loaded correctly in src/index.js by logging their values into a console output:

export default {
async fetch(request, env, ctx) {
console.log("BQ_CLIENT_EMAIL: ", env.BQ_CLIENT_EMAIL);
console.log("BQ_PRIVATE_KEY: ", env.BQ_PRIVATE_KEY);
console.log("BQ_PRIVATE_KEY_ID: ", env.BQ_PRIVATE_KEY_ID);
console.log("BQ_PROJECT_ID: ", env.BQ_PROJECT_ID);
return new Response("Hello World!");
},
};

Restart the Worker and run npx wrangler dev. You should see that the server now mentions the newly added variables:

Using vars defined in .dev.vars
Your worker has access to the following bindings:
- Vars:
- BQ_CLIENT_EMAIL: "(hidden)"
- BQ_PRIVATE_KEY: "(hidden)"
- BQ_PRIVATE_KEY_ID: "(hidden)"
- BQ_PROJECT_ID: "(hidden)"
[wrangler:inf] Ready on http://localhost:8787

If you open http://localhost:8787 in your browser, you should see the values of the variables show up in your console where the npx wrangler dev command is running, while still seeing only the Hello World! text in the browser window.

You now have access to the GCP credentials from a Worker. Next, you will install a library to help with the creation of the JSON Web Token needed to interact with GCP's API.

3. Install library to handle JWT operations

To interact with BigQuery's REST API, you will need to generate a JSON Web Token to authenticate your requests using the credentials that you have loaded into Worker secrets in the previous step.

For this tutorial, you will be using the jose library for JWT-related operations. Install it by running the following command in a console:

Terminal window
npm i jose

To verify that the installation succeeded, you can run npm list, which lists all the installed packages and see if the jose dependency has been added:

Terminal window
<project_name>@0.0.0
/<path_to_your_project>/<project_name>
├── @cloudflare/vitest-pool-workers@0.4.29
├── jose@5.9.2
├── vitest@1.5.0
└── wrangler@3.75.0

4. Generate JSON Web Token

Now that you have installed the jose library, it is time to import it and add a function to your code that generates a signed JWT:

import * as jose from 'jose';
...
const generateBQJWT = async (aCryptoKey, env) => {
const algorithm = "RS256";
const audience = "https://bigquery.googleapis.com/";
const expiryAt = (new Date().valueOf() / 1000);
const privateKey = await jose.importPKCS8(env.BQ_PRIVATE_KEY, algorithm);
// Generate signed JSON Web Token (JWT)
return new jose.SignJWT()
.setProtectedHeader({
typ: 'JWT',
alg: algorithm,
kid: env.BQ_PRIVATE_KEY_ID
})
.setIssuer(env.BQ_CLIENT_EMAIL)
.setSubject(env.BQ_CLIENT_EMAIL)
.setAudience(audience)
.setExpirationTime(expiryAt)
.setIssuedAt()
.sign(privateKey)
}
export default {
async fetch(request, env, ctx) {
...
// Create JWT to authenticate the BigQuery API call
let bqJWT;
try {
bqJWT = await generateBQJWT(env);
} catch (e) {
return new Response('An error has ocurred while generating the JWT', { status: 500 })
}
},
...
};

Now that you have created a JWT, it is time to do an API call to BigQuery to fetch some data.

5. Make authenticated requests to Google BigQuery

With the JWT token created in the previous step, issue an API request to BigQuery's API to retrieve data from a table.

You will now query the table that you already have created in BigQuery as part of the prerequisites of this tutorial. This example uses a sampled version of the Hacker News Corpus that was used under its MIT licence and uploaded to BigQuery.

const queryBQ = async (bqJWT, path) => {
const bqEndpoint = `https://bigquery.googleapis.com${path}`
// In this example, text is a field in the BigQuery table that is being queried (hn.news_sampled)
const query = 'SELECT text FROM hn.news_sampled LIMIT 3';
const response = await fetch(bqEndpoint, {
method: "POST",
body: JSON.stringify({
"query": query
}),
headers: {
Authorization: `Bearer ${bqJWT}`
}
})
return response.json()
}
...
export default {
async fetch(request, env, ctx) {
...
let ticketInfo;
try {
ticketInfo = await queryBQ(bqJWT);
} catch (e) {
return new Response('An error has occurred while querying BQ', { status: 500 });
}
...
},
};

Having the raw row data from BigQuery means that you can now format it in a JSON-like style up next.

6. Format results from the query

Now that you have retrieved the data from BigQuery, it is time to note that a BigQuery API response looks something like this:

{
...
"schema": {
"fields": [
{
"name": "title",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "text",
"type": "STRING",
"mode": "NULLABLE"
}
]
},
...
"rows": [
{
"f": [
{
"v": "<some_value>"
},
{
"v": "<some_value>"
}
]
},
{
"f": [
{
"v": "<some_value>"
},
{
"v": "<some_value>"
}
]
},
{
"f": [
{
"v": "<some_value>"
},
{
"v": "<some_value>"
}
]
}
],
...
}

This format may be difficult to read and to work with when iterating through results, which will go on to do later in this tutorial. So you will now implement a function that maps the schema into each individual value, and the resulting output will be easier to read, as shown below. Each row corresponds to an object within an array.

[
{
title: "<some_value>",
text: "<some_value>",
},
{
title: "<some_value>",
text: "<some_value>",
},
{
title: "<some_value>",
text: "<some_value>",
},
];

Create a formatRows function that takes a number of rows and fields returned from the BigQuery response body and returns an array of results as objects with named fields.

const formatRows = (rowsWithoutFieldNames, fields) => {
// Depending on the position of each value, it is known what field you should assign to it.
const fieldsByIndex = new Map();
// Load all fields name and have their index in the array result as their key
fields.forEach((field, index) => {
fieldsByIndex.set(index, field.name)
})
// Iterate through rows
const rowsWithFieldNames = rowsWithoutFieldNames.map(row => {
// Per each row represented by an array f, iterate through the unnamed values and find their field names by searching them in the fieldsByIndex.
let newRow = {}
row.f.forEach((field, index) => {
const fieldName = fieldsByIndex.get(index);
if (fieldName) {
// For every field in a row, add them to newRow
newRow = ({ ...newRow, [fieldName]: field.v });
}
})
return newRow
})
return rowsWithFieldNames
}
export default {
async fetch(request, env, ctx) {
...
// Transform output format into array of objects with named fields
let formattedResults;
if ('rows' in ticketInfo) {
formattedResults = formatRows(ticketInfo.rows, ticketInfo.schema.fields);
console.log(formattedResults)
} else if ('error' in ticketInfo) {
return new Response(ticketInfo.error.message, { status: 500 })
}
...
},
};

7. Feed data into Workers AI

Now that you have converted the response from the BigQuery API into an array of results, generate some tags and attach an associated sentiment score using an LLM via Workers AI:

const generateTags = (data, env) => {
return env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
prompt: `Create three one-word tags for the following text. return only these three tags separated by a comma. don't return text that is not a category.Lowercase only. ${JSON.stringify(data)}`,
});
}
const generateSentimentScore = (data, env) => {
return env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
prompt: `return a float number between 0 and 1 measuring the sentiment of the following text. 0 being negative and 1 positive. return only the number, no text. ${JSON.stringify(data)}`,
});
}
// Iterates through values, sends them to an AI handler and encapsulates all responses into a single Promise
const getAIGeneratedContent = (data, env, aiHandler) => {
let results = data?.map(dataPoint => {
return aiHandler(dataPoint, env)
})
return Promise.all(results)
}
...
export default {
async fetch(request, env, ctx) {
...
let summaries, sentimentScores;
try {
summaries = await getAIGeneratedContent(formattedResults, env, generateTags);
sentimentScores = await getAIGeneratedContent(formattedResults, env, generateSentimentScore)
} catch {
return new Response('There was an error while generating the text summaries or sentiment scores')
}
},
formattedResults = formattedResults?.map((formattedResult, i) => {
if (sentimentScores[i].response && summaries[i].response) {
return {
...formattedResult,
'sentiment': parseFloat(sentimentScores[i].response).toFixed(2),
'tags': summaries[i].response.split(',').map((result) => result.trim())
}
}
}
};

Uncomment the following lines from the wrangler.toml file in your project:

[ai]
binding = "AI"

Restart the Worker that is running locally, and after doing so, go to your application endpoint:

Terminal window
curl http://localhost:8787

It is likely that you will be asked to log in to your Cloudflare account and grant temporary access to Wrangler (the Cloudflare CLI) to use your account when using Worker AI.

Once you access http://localhost:8787 you should see an output similar to the following:

Terminal window
{
"data": [
{
"text": "You can see a clear spike in submissions right around US Thanksgiving.",
"sentiment": "0.61",
"tags": [
"trends",
"submissions",
"thanksgiving"
]
},
{
"text": "I didn't test the changes before I published them. I basically did development on the running server. In fact for about 30 seconds the comments page was broken due to a bug.",
"sentiment": "0.35",
"tags": [
"software",
"deployment",
"error"
]
},
{
"text": "I second that. As I recall, it's a very enjoyable 700-page brain dump by someone who's really into his subject. The writing has a personal voice; there are lots of asides, dry wit, and typos that suggest restrained editing. The discussion is intelligent and often theoretical (and Bartle is not scared to use mathematical metaphors), but the tone is not academic.",
"sentiment": "0.86",
"tags": [
"review",
"game",
"design"
]
}
]
}

The actual values and fields will mostly depend on the query made in Step 5 that are then fed into the LLMs models.

Final result

All the code shown in the different steps are combined into the following code in src/index.js:

import * as jose from "jose";
const generateBQJWT = async (env) => {
const algorithm = "RS256";
const audience = "https://bigquery.googleapis.com/";
const expiryAt = new Date().valueOf() / 1000;
const privateKey = await jose.importPKCS8(env.BQ_PRIVATE_KEY, algorithm);
// Generate signed JSON Web Token (JWT)
return new jose.SignJWT()
.setProtectedHeader({
typ: "JWT",
alg: algorithm,
kid: env.BQ_PRIVATE_KEY_ID,
})
.setIssuer(env.BQ_CLIENT_EMAIL)
.setSubject(env.BQ_CLIENT_EMAIL)
.setAudience(audience)
.setExpirationTime(expiryAt)
.setIssuedAt()
.sign(privateKey);
};
const queryBQ = async (bgJWT, path) => {
const bqEndpoint = `https://bigquery.googleapis.com${path}`;
const query = "SELECT text FROM hn.news_sampled LIMIT 3";
const response = await fetch(bqEndpoint, {
method: "POST",
body: JSON.stringify({
query: query,
}),
headers: {
Authorization: `Bearer ${bgJWT}`,
},
});
return response.json();
};
const formatRows = (rowsWithoutFieldNames, fields) => {
// Index to fieldName
const fieldsByIndex = new Map();
fields.forEach((field, index) => {
fieldsByIndex.set(index, field.name);
});
const rowsWithFieldNames = rowsWithoutFieldNames.map((row) => {
// Map rows into an array of objects with field names
let newRow = {};
row.f.forEach((field, index) => {
const fieldName = fieldsByIndex.get(index);
if (fieldName) {
newRow = { ...newRow, [fieldName]: field.v };
}
});
return newRow;
});
return rowsWithFieldNames;
};
const generateTags = (data, env) => {
return env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
prompt: `Create three one-word tags for the following text. return only these three tags separated by a comma. don't return text that is not a category.Lowercase only. ${JSON.stringify(data)}`,
});
};
const generateSentimentScore = (data, env) => {
return env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
prompt: `return a float number between 0 and 1 measuring the sentiment of the following text. 0 being negative and 1 positive. return only the number, no text. ${JSON.stringify(data)}`,
});
};
const getAIGeneratedContent = (data, env, aiHandler) => {
let results = data?.map((dataPoint) => {
return aiHandler(dataPoint, env);
});
return Promise.all(results);
};
export default {
async fetch(request, env, ctx) {
// Create JWT to authenticate the BigQuery API call
let bqJWT;
try {
bqJWT = await generateBQJWT(env);
} catch (error) {
console.log(error);
return new Response("An error has ocurred while generating the JWT", {
status: 500,
});
}
// Fetch results from BigQuery
let ticketInfo;
try {
ticketInfo = await queryBQ(
bqJWT,
`/bigquery/v2/projects/${env.BQ_PROJECT_ID}/queries`,
);
} catch (error) {
console.log(error);
return new Response("An error has occurred while querying BQ", {
status: 500,
});
}
// Transform output format into array of objects with named fields
let formattedResults;
if ("rows" in ticketInfo) {
formattedResults = formatRows(ticketInfo.rows, ticketInfo.schema.fields);
} else if ("error" in ticketInfo) {
return new Response(ticketInfo.error.message, { status: 500 });
}
// Generate AI summaries and sentiment scores
let summaries, sentimentScores;
try {
summaries = await getAIGeneratedContent(
formattedResults,
env,
generateTags,
);
sentimentScores = await getAIGeneratedContent(
formattedResults,
env,
generateSentimentScore,
);
} catch {
return new Response(
"There was an error while generating the text summaries or sentiment scores",
);
}
// Add AI summaries and sentiment scores to previous results
formattedResults = formattedResults?.map((formattedResult, i) => {
if (sentimentScores[i].response && summaries[i].response) {
return {
...formattedResult,
sentiment: parseFloat(sentimentScores[i].response).toFixed(2),
tags: summaries[i].response.split(",").map((result) => result.trim()),
};
}
});
const response = { data: formattedResults };
return new Response(JSON.stringify(response), {
headers: { "Content-Type": "application/json" },
});
},
};

If you wish to deploy this Worker, you can do so by running npx wrangler deploy:

Terminal window
Total Upload: <size_of_your_worker> KiB / gzip: <compressed_size_of_your_worker> KiB
Uploaded <name_of_your_worker> (x sec)
Deployed <name_of_your_worker> triggers (x sec)
https://<your_public_worker_endpoint>
Current Version ID: <worker_script_version_id>

This will create a public endpoint that you can use to access the Worker globally. Please keep this in mind when using production data, and make sure to include additional access controls in place.

Conclusion

In this tutorial, you have learnt how to integrate Google BigQuery and Cloudflare Workers by creating a GCP service account key and storing part of it as Worker secrets. This was later imported in the code, and by using the jose npm library, you created a JSON Web Token to authenticate the API query to BigQuery.

Once you obtained the results, you formatted them to later be passed to generative AI models via Workers AI to generate tags and to perform sentiment analysis on the extracted data.

Next Steps

If, instead of displaying the results of ingesting the data to the AI model in a browser, your workflow requires fetching and store data (for example in R2 or D1) on regular intervals, you may want to consider adding a scheduled handler for this Worker. It allows triggering the Worker with a predefined cadence via a Cron Trigger. Consider reviewing the Reference Architecture Diagrams on Ingesting BigQuery Data into Workers AI.

A use case to ingest data from other sources, like you did in this tutorial, is to create a RAG system. If this sounds relevant to you, please check out the tutorial Build a Retrieval Augmented Generation (RAG) AI.

To learn more about what other AI models you can use at Cloudflare, please visit the Workers AI section of our docs.