Indexer ETL & OpenAI GTP-3 powered NLP on NEAR by PrimeLab
PrimeLab is proud to announce our new Natural Language Processing (NLP) utility & ETL Pipeline w/Data Lake.
ETL
We have developed a highly available system for both relational and graph analysis of blockchain data. Our ETL platform allows for realtime processing of NEAR Indexer data. This is not a simple S3 Data Lake, however, could be integrated into our modular system quickly.
We leverage the improved PrimeLab Near Indexer to create a Debezium based Kafka message queue for the most critical tables in the Postgres instance. Accounts, Transactions, Receipts, Action Receipt Actions. These tables are streamed in realtime to the pipeline where they are transformed, joined and stored once again in the Redshift Data Lake.
GTP-3 Powered NLP
Indexer data is complex and each project on the NEAR ecosystem have unique questions they wish to answer using the data. This leads to long development times and complex queries. Using a simple implementation of NLP, weâre able to create a system that allows you to
- Submit a Question
- AI Magic
- View the Results via:
a. Plotted Graph
b. Table
c. Diagram
Itâs that simple!
Contributors:
PrimeLab Data Platform Team
GitHub: GitHub - PrimeLab-ETL-OpenAI-NLP
Alternatively you may choose to also write these to and in any combination of DynamoDB, neo4jAura, postgres, Redshift.
Data Structure
PrimeLabâs contracts enrich transaction data with JSON arguments that facilitate easier analysis. Below we will outline some concepts to familiarize yourself with.
NearApps Tags
For the contracts methods, they are always used in the following format:
nearapps_tags
: object - The tagsâ hash information.app_id
: string - App tag hash.action_id
: string - Action number hash.user_id
: string - User account_id tag hash.
And their standalone json representation (example) is:
1{
2 "nearapps_tags": {
3 "app_id": "fxbf4cd3",
4 "action_id": "43x55a4",
5 "user_id": "03a0x7d"
6 }
7}
Components
- Indexer Instance and Postgres DB
- Amazon MSK
- Debeizum
- AWS Lambda
- Python 3.9 Primelab Custom Python Layer for ETL
- Redshift (or choose your own Database)
- Docker Container for explorer API
Test Drive
The ETL pipeline encompasses several desperate components, each requiring a bit of setup and tinkering to get humming properly. We will start with setting up the Debezium Connector. If you havenât already, you must setup a indexer instance to install the debezium plug-in into.
Debezium and Kafka on Postgres
In order to stream data from Postgres as a source to a target data store, we use open source tooling in the form of Debezium and Apache Kafka (Kafka).
Debezium is a distributed Change Data Capture (CDC) tool built on top of Kafka to stream row-level database changes into event streams.
Kafka is a distributed event store and stream-processing platform.
Postgres Setup
In order for Debezium to work it needs to use logical decoding with the write-ahead log. The following parameters need to be set.
- wal_level = logical
- max_wal_senders = 1
- max_replication_slots = 1
If using AWS you should also set rds.logical_replication = 1.
The database also must permit replication with the host that runs the Postgres connector. For more details see here.
Note that if you are using a postgres cluster, debezium must communicate with the writer / leader node.
Debezium Connector Setup
Debezium provides excellent documentation on how to setup the Postgres (and many other database flavors) connector. It is recommended to read the documentation before implementation.
tl;dr:
- Create a debezium role in the source database and grant replication privileges.
- Create a replication group role and grant the debezium user and current table owner access. Assign tables you are interested in capturing to this owner.
- Create debezium heartbeat and signal tables.
- Create a publication for the tables you are interested in capturing.
Details:
- Debezium needs access to the database in order to capture the data changes. It also needs replication privileges in order to add tables to the publication which publishes the changes.
- The debezium user needs to be an owner of the tables it is operating on. As we want to assign least privileges (in a dev environment you can assign superuser), we transfer the tables we are interested in capturing to a shared ownership role between the current table owner and the debezium role. This ensures that there is no effect on current database practices whilst allowing debezium the access it needs.
- A debezium heartbeat table is a simple table which debezium updates periodically to stop connector failure / Postgres replication slots getting clogged up. The issue is documented here and here. It is recommended to add the heartbeat table and in our use case we faced the issues described without it.The signal table is used to send messages to Debezium for events such as adding messages to the log or taking ad hoc snapshots.
- The publication is used by Postgres to
Kafka Connector Setup
Debezium sits on top of kafka connect, and there are multiple options for how this is setup, such as using a VM, Docker, Kubernetes or a managed Kafka service like AWS MSK. See the deployment documentation for more details.
First you need to create a kafka cluster. This is a collection of one or more kafka brokers (or servers) that allows kafka to function in a distributed manner. It is recommended to have more than one broker to allow for data replication. Brokers can also fail over time or need to be restarted, so with a single broker you can end up in situations where you canât write to a topic as the broker has failed.
Brokers are managed by Apache ZooKeeper.
In order to deploy the Debezium Postgres Connector, you need the following:
- Debezium postgres connector jar files.
- Debezium connector properties file.
- Kafka worker config file.
- Kafka cluster properties file.
The debezium connector properties file contains all of the information relating to the tables you are capturing, the format you want the data in and the database connection information. There are many options which can be added here and itâs best to consult the documentation for your particular use case.
The cluster configuration contains information such as how many partitions you wish to use, the maximum size of individual messages and the number of replicas and replication factor you want to use.
The replication factor is the number of times you want to replicate the data in the cluster. For example, if you have a replication factor of 3, then for every message written to the cluster, it will be written 3 times. This is useful for data redundancy in case of broker failure.
The worker configuration contains information such as the number of workers you want to use, the number of tasks per worker and the connector class name. The connector class name is the name of the connector you are using, in this case it is the debezium postgres connector.
Testing
Once you have the connector deployed, you can test it by writing to the database and checking that the changes are replicated to the kafka cluster.
AWS Lambda Setup
Prerequisites
In order to follow this guide, you will need the following:
- An AWS account
- A text editor
- Python 3.6 or later
Setting up the Lambda Function
- Log in to the AWS Management Console and navigate to the Lambda service.
- Click on âCreate Functionâ, you will need 4 functions. So maybe pick a schema like primelab-transactions, primelab-accounts etc.
- Select âAuthor from scratchâ.
- Enter a name for the function and select âPython 3.9â as the runtime.
- Create a role scoped with access to the MSK Cluster and your receiving data-lake.
- Click on âCreate Functionâ.
Customizing the Lambda Function
- In the âFunction codeâ section, select âEdit code inlineâ.
- Paste the code from the Github Repo, copy one lambda handler to each function you created above.
- Click on âSaveâ and Deploy.
Creating the Dependency Layer
In Lambda it is not possible to install python âpipâ dependencies within the UI, if you are feeling lazy you can use the layer published here (arn:aws:lambda:us-east-1:165372248390:layer:pandas:3).
-
Start a Cloud9 Linux instance in AWS. Go to the AWS Services and search for Cloud9. Click âCreate Environmentâ. Name your environment and click next step. Keep the environment default settings and click next step. Click âCreate Environmentâ and youâre ready to go.
-
Creating your Pandas Lambda layer. Type the following code line by line into the terminal at the bottom to create a Pandas Lambda layer. The pip install pandas command can be replaced with a package of your choosing. You can also install more than 1 package*.
mkdir folder
cd folder
virtualenv v-env
source ./v-env/bin/activate
pip install pandas
pip install requests
deactivate
Then type the following code line by line to create your layer
mkdir python
cd python
cp -r ../v-env/lib64/python3.7/site-packages/* .
cd ..
zip -r panda_layer.zip python
aws lambda publish-layer-version --layer-name pandas --zip-file fileb://panda_layer.zip --compatible-runtimes python3.7
Deploying
After populating each function with the appropriate code click deploy and move on to adding a trigger. Add the âMSKâ trigger and configure to your preference, setting a low threshold for transactionâs may be prudent while testing. You should begin seeing transactions populate transformed in their new schema in your destination database.
Schema
-------------------------------------------------
-- Tables
-------------------------------------------------
create table primelab.etl_audits (
etl_audit_id integer generated always as identity
,record_count integer
,audit_timestamp timestamp default current_timestamp
);
-------------------------------------------------
-- Constraint
-------------------------------------------------
alter table primelab.etl_audits add constraint etl_audits_pkey primary key (etl_audit_id);
-------------------------------------------------
-- Views
-------------------------------------------------
create or replace view public.finalised_transactions as
select ara.receipt_id
,ara.args
,ara.wallet_id
,ara.slice_id
,t.transaction_hash
,t.included_in_block_hash
,t.block_timestamp
,t.status
from public.stg_transactions t
join public.stg_action_receipt_actions ara on t.converted_into_receipt_id = ara.receipt_id;
-------------------------------------------------
-- Procedures
-------------------------------------------------
create or replace procedure primelab.move_matched_rows()
as
$$
begin
-- Create a temporary table so we get a consistent view of the transactions at this point
create temporary table txn_rec_tmp (
receipt_id text
,args jsonb
,wallet_id text
,slice_id text
,transaction_hash text
,included_in_block_hash numeric(20,0)
,block_timestamp timestamp
,status text
) on commit drop;
insert into txn_rec_tmp (receipt_id, args, wallet_id, slice_id
,transaction_hash, included_in_block_hash, block_timestamp, status)
select receipt_id, args, wallet_id, slice_id
,transaction_hash, included_in_block_hash, to_timestamp(block_timestamp/ 1000000000), status
from public.finalised_transactions;
-- Create receipts
insert into primelab.receipts(receipt_id, block_hash, status, created_at)
select receipt_id, included_in_block_hash, status, block_timestamp
from txn_rec_tmp
on conflict (receipt_id) do nothing;
-- Create wallets
insert into primelab.wallets(wallet_id, persona_id, created_at)
select wallet_id, 1, block_timestamp /* TO DO - change persona to be dynamic once we have personas */
from txn_rec_tmp
where wallet_id is not null
on conflict (wallet_id) do nothing;
-- Create transactions /*TO DO - Add stack_name*/
with txn_ins as (
insert into primelab.transactions(transaction_id, receipt_id, slice_name
,wallet_id, tags_json, created_at, status)
select transaction_hash, receipt_id, slice_id, wallet_id, args, block_timestamp, status
from txn_rec_tmp
on conflict (transaction_id) do nothing
returning transaction_hash
)
-- Write to audit table
insert into primelab.etl_audit (record_count)
select count(*)
from txn_ins;
-- Now delete these from the staging tables
-- Commented out for now
-- delete from public.stg_transactions as t
-- using txn_rec_tmp as trt
-- where t.transaction_hash = trt.transaction_hash;
-- delete from public.stg_action_receipt_actions as ara
-- using txn_rec_tmp as trt
-- where ara.receipt_id = trt.receipt_id;
end;
$$ language plpgsql;
Natural Language API
Problem statement
Oftentimes, the business depends on the analytics team for any kind of ad-hoc analysis to get meaningful insights from the data or answers from them. But this kind of deviates both the teams. It builds a dependency of the business on the data analyst to write the necessary codes to get the answers to their questions, which takes time. It also affects the data analysts for their long-term work if too many ad-hoc questions start coming in.
What if there was a way that would enable the business to directly ask the questions to the data itself and somehow, automagically get the answers from it?
Solution - An Artificial Intelligence powered app that understands English
How the app works
We have created an AI-powered app (With the help of the GPT3 engine) where the users can ask any questions about the data in simple English. Now a powerful AI agent will run in the backend which will parse this English question, understand its meaning, generate python codes, and run them in the backend to get the correct answer for the question.
How the AI agent works
GPT-3 is a language model based on the transformer architecture, pre-trained in a generative, unsupervised manner that shows decent performance in zero/one/few-shot multitask settings. It works by predicting the next token given a sequence of tokens and can do so for NLP tasks it hasnât been trained on. After seeing just a few examples, it reached state-of-the-art levels in some benchmarks such as machine translation, Q&A, and also content generation.
We used this powerful pretrained model and use its embedding to parse user questions, generate their corresponding embeddings and then translate it into the vector space for writing the corresponding python codes that answer the original question.
Running the code
- Clone the repository here https://github.com/NearPrime/dataOps-notebooks/tree/master/sagarnil/fastapi_sql/fastapiapp_v2
- run
pip install -r requirements.txt
- From outside the fastapiapp_v2 folder, run uvicorn fastapiapp_v2.main:app --reload --workers 1 --host 0.0.0.0 --port 8001
- Go to http://localhost:8001/docs
- Here you will see two things you can try outâŚone is get transactions where you can skip rows and select how many rows you wanna see and the second one is given a transaction_hash, it will return the whole row to you
App demo
a) Direct Answers
The following 2 min video shows this app in action. We took a sample of NEAR data and tested the app with it. All the user needs to do is ask their questions in simple English and then they get the answer from the data in real-time as the AI agent writes the codes by itself.
Some examples of questions might be
a) How many transactions happened in total?
b) How many unique users are there?
c) Who are the top 5 users who did the highest number of transactions in March 2022?
etcâŚ
b) Visualization**
We are also adding a separate section to this app where the AI can also generate visualization from the data. Currently, this is in the beta stage and we need to feed it more training examples. But here are some initial results.
Examples
The examples below were derived from a small subset of mainnet data used during testing.