[Announcement] Indexer ETL & OpenAI powered NLP on NEAR by PrimeLab

Indexer ETL & OpenAI GTP-3 powered NLP on NEAR by PrimeLab

PrimeLab is proud to announce our new Natural Language Processing (NLP) utility & ETL Pipeline w/Data Lake.


We have developed a highly available system for both relational and graph analysis of blockchain data. Our ETL platform allows for realtime processing of NEAR Indexer data. This is not a simple S3 Data Lake, however, could be integrated into our modular system quickly.

We leverage the improved PrimeLab Near Indexer to create a Debezium based Kafka message queue for the most critical tables in the Postgres instance. Accounts, Transactions, Receipts, Action Receipt Actions. These tables are streamed in realtime to the pipeline where they are transformed, joined and stored once again in the Redshift Data Lake.

GTP-3 Powered NLP

Indexer data is complex and each project on the NEAR ecosystem have unique questions they wish to answer using the data. This leads to long development times and complex queries. Using a simple implementation of NLP, we’re able to create a system that allows you to

  1. Submit a Question
  2. AI Magic
  3. View the Results via:
    a. Plotted Graph
    b. Table
    c. Diagram

It’s that simple!

PrimeLab Data Platform Team

GitHub: GitHub - PrimeLab-ETL-OpenAI-NLP

:bulb: Alternatively you may choose to also write these to and in any combination of DynamoDB, neo4jAura, postgres, Redshift.

Data Structure

PrimeLab’s contracts enrich transaction data with JSON arguments that facilitate easier analysis. Below we will outline some concepts to familiarize yourself with.

NearApps Tags

For the contracts methods, they are always used in the following format:

  • nearapps_tags: object - The tags’ hash information.
    • app_id: string - App tag hash.
    • action_id: string - Action number hash.
    • user_id: string - User account_id tag hash.

And their standalone json representation (example) is:

2    "nearapps_tags": {
3        "app_id": "fxbf4cd3",
4        "action_id": "43x55a4",
5        "user_id": "03a0x7d"
6    }


  • Indexer Instance and Postgres DB
  • Amazon MSK
  • Debeizum
  • AWS Lambda
  • Python 3.9 Primelab Custom Python Layer for ETL
  • Redshift (or choose your own Database)
  • Docker Container for explorer API

Test Drive

The ETL pipeline encompasses several desperate components, each requiring a bit of setup and tinkering to get humming properly. We will start with setting up the Debezium Connector. If you haven’t already, you must setup a indexer instance to install the debezium plug-in into.

Debezium and Kafka on Postgres

In order to stream data from Postgres as a source to a target data store, we use open source tooling in the form of Debezium and Apache Kafka (Kafka).

Debezium is a distributed Change Data Capture (CDC) tool built on top of Kafka to stream row-level database changes into event streams.

Kafka is a distributed event store and stream-processing platform.

Postgres Setup

In order for Debezium to work it needs to use logical decoding with the write-ahead log. The following parameters need to be set.

  • wal_level = logical
  • max_wal_senders = 1
  • max_replication_slots = 1

If using AWS you should also set rds.logical_replication = 1.

The database also must permit replication with the host that runs the Postgres connector. For more details see here.

Note that if you are using a postgres cluster, debezium must communicate with the writer / leader node.

Debezium Connector Setup

Debezium provides excellent documentation on how to setup the Postgres (and many other database flavors) connector. It is recommended to read the documentation before implementation.


  1. Create a debezium role in the source database and grant replication privileges.
  2. Create a replication group role and grant the debezium user and current table owner access. Assign tables you are interested in capturing to this owner.
  3. Create debezium heartbeat and signal tables.
  4. Create a publication for the tables you are interested in capturing.


  1. Debezium needs access to the database in order to capture the data changes. It also needs replication privileges in order to add tables to the publication which publishes the changes.
  2. The debezium user needs to be an owner of the tables it is operating on. As we want to assign least privileges (in a dev environment you can assign superuser), we transfer the tables we are interested in capturing to a shared ownership role between the current table owner and the debezium role. This ensures that there is no effect on current database practices whilst allowing debezium the access it needs.
  3. A debezium heartbeat table is a simple table which debezium updates periodically to stop connector failure / Postgres replication slots getting clogged up. The issue is documented here and here. It is recommended to add the heartbeat table and in our use case we faced the issues described without it.The signal table is used to send messages to Debezium for events such as adding messages to the log or taking ad hoc snapshots.
  4. The publication is used by Postgres to

Kafka Connector Setup

Debezium sits on top of kafka connect, and there are multiple options for how this is setup, such as using a VM, Docker, Kubernetes or a managed Kafka service like AWS MSK. See the deployment documentation for more details.

First you need to create a kafka cluster. This is a collection of one or more kafka brokers (or servers) that allows kafka to function in a distributed manner. It is recommended to have more than one broker to allow for data replication. Brokers can also fail over time or need to be restarted, so with a single broker you can end up in situations where you can’t write to a topic as the broker has failed.

Brokers are managed by Apache ZooKeeper.

In order to deploy the Debezium Postgres Connector, you need the following:

  1. Debezium postgres connector jar files.
  2. Debezium connector properties file.
  3. Kafka worker config file.
  4. Kafka cluster properties file.

The debezium connector properties file contains all of the information relating to the tables you are capturing, the format you want the data in and the database connection information. There are many options which can be added here and it’s best to consult the documentation for your particular use case.

The cluster configuration contains information such as how many partitions you wish to use, the maximum size of individual messages and the number of replicas and replication factor you want to use.

The replication factor is the number of times you want to replicate the data in the cluster. For example, if you have a replication factor of 3, then for every message written to the cluster, it will be written 3 times. This is useful for data redundancy in case of broker failure.

The worker configuration contains information such as the number of workers you want to use, the number of tasks per worker and the connector class name. The connector class name is the name of the connector you are using, in this case it is the debezium postgres connector.


Once you have the connector deployed, you can test it by writing to the database and checking that the changes are replicated to the kafka cluster.

AWS Lambda Setup


In order to follow this guide, you will need the following:

  • An AWS account
  • A text editor
  • Python 3.6 or later

Setting up the Lambda Function

  1. Log in to the AWS Management Console and navigate to the Lambda service.
  2. Click on “Create Function”, you will need 4 functions. So maybe pick a schema like primelab-transactions, primelab-accounts etc.
  3. Select “Author from scratch”.
  4. Enter a name for the function and select “Python 3.9” as the runtime.
  5. Create a role scoped with access to the MSK Cluster and your receiving data-lake.
  6. Click on “Create Function”.

Customizing the Lambda Function

  1. In the “Function code” section, select “Edit code inline”.
  2. Paste the code from the Github Repo, copy one lambda handler to each function you created above.
  3. Click on “Save” and Deploy.

Creating the Dependency Layer

In Lambda it is not possible to install python “pip” dependencies within the UI, if you are feeling lazy you can use the layer published here (arn:aws:lambda:us-east-1:165372248390:layer:pandas:3).

  1. Start a Cloud9 Linux instance in AWS. Go to the AWS Services and search for Cloud9. Click ‘Create Environment’. Name your environment and click next step. Keep the environment default settings and click next step. Click ‘Create Environment’ and you’re ready to go.

  2. Creating your Pandas Lambda layer. Type the following code line by line into the terminal at the bottom to create a Pandas Lambda layer. The pip install pandas command can be replaced with a package of your choosing. You can also install more than 1 package*.

mkdir folder
cd folder
virtualenv v-env
source ./v-env/bin/activate
pip install pandas
pip install requests

Then type the following code line by line to create your layer

mkdir python
cd python
cp -r ../v-env/lib64/python3.7/site-packages/* .
cd ..
zip -r panda_layer.zip python
aws lambda publish-layer-version --layer-name pandas --zip-file fileb://panda_layer.zip --compatible-runtimes python3.7


After populating each function with the appropriate code click deploy and move on to adding a trigger. Add the “MSK” trigger and configure to your preference, setting a low threshold for transaction’s may be prudent while testing. You should begin seeing transactions populate transformed in their new schema in your destination database.


-- Tables
create table primelab.etl_audits (
    etl_audit_id    integer generated always as identity
   ,record_count    integer
   ,audit_timestamp timestamp default current_timestamp 

-- Constraint
alter table primelab.etl_audits add constraint etl_audits_pkey primary key (etl_audit_id);

-- Views
create or replace view public.finalised_transactions as
    select ara.receipt_id
      from public.stg_transactions t
      join public.stg_action_receipt_actions ara on t.converted_into_receipt_id = ara.receipt_id;

-- Procedures
create or replace procedure primelab.move_matched_rows()
    -- Create a temporary table so we get a consistent view of the transactions at this point
    create temporary table txn_rec_tmp (
        receipt_id              text
       ,args                    jsonb
       ,wallet_id               text
       ,slice_id                text
       ,transaction_hash        text
       ,included_in_block_hash  numeric(20,0)
       ,block_timestamp         timestamp
       ,status                  text
    ) on commit drop;
    insert into txn_rec_tmp (receipt_id, args, wallet_id, slice_id
                            ,transaction_hash, included_in_block_hash, block_timestamp, status)
    select receipt_id, args, wallet_id, slice_id
          ,transaction_hash, included_in_block_hash, to_timestamp(block_timestamp/ 1000000000), status 
      from public.finalised_transactions;

    -- Create receipts
    insert into primelab.receipts(receipt_id, block_hash, status, created_at)
    select receipt_id, included_in_block_hash, status, block_timestamp
      from txn_rec_tmp 
        on conflict (receipt_id) do nothing;
    -- Create wallets
    insert into primelab.wallets(wallet_id, persona_id, created_at)
    select wallet_id, 1, block_timestamp /* TO DO - change persona to be dynamic once we have personas */
      from txn_rec_tmp
     where wallet_id is not null
        on conflict (wallet_id) do nothing;
    -- Create transactions /*TO DO - Add stack_name*/
    with txn_ins as (
        insert into primelab.transactions(transaction_id, receipt_id, slice_name
                                         ,wallet_id, tags_json, created_at, status)
        select transaction_hash, receipt_id, slice_id, wallet_id, args, block_timestamp, status
          from txn_rec_tmp
            on conflict (transaction_id) do nothing
        returning transaction_hash
    -- Write to audit table
    insert into primelab.etl_audit (record_count)
    select count(*)
      from txn_ins;

    -- Now delete these from the staging tables
    -- Commented out for now
    -- delete from public.stg_transactions as t
    --  using txn_rec_tmp as trt
    --  where t.transaction_hash = trt.transaction_hash;
    -- delete from public.stg_action_receipt_actions as ara
    --  using txn_rec_tmp as trt
    --  where ara.receipt_id = trt.receipt_id;
$$ language plpgsql;

Natural Language API

Problem statement

Oftentimes, the business depends on the analytics team for any kind of ad-hoc analysis to get meaningful insights from the data or answers from them. But this kind of deviates both the teams. It builds a dependency of the business on the data analyst to write the necessary codes to get the answers to their questions, which takes time. It also affects the data analysts for their long-term work if too many ad-hoc questions start coming in.

What if there was a way that would enable the business to directly ask the questions to the data itself and somehow, automagically get the answers from it?

Solution - An Artificial Intelligence powered app that understands English

How the app works

We have created an AI-powered app (With the help of the GPT3 engine) where the users can ask any questions about the data in simple English. Now a powerful AI agent will run in the backend which will parse this English question, understand its meaning, generate python codes, and run them in the backend to get the correct answer for the question.

How the AI agent works

GPT-3 is a language model based on the transformer architecture, pre-trained in a generative, unsupervised manner that shows decent performance in zero/one/few-shot multitask settings. It works by predicting the next token given a sequence of tokens and can do so for NLP tasks it hasn’t been trained on. After seeing just a few examples, it reached state-of-the-art levels in some benchmarks such as machine translation, Q&A, and also content generation.

We used this powerful pretrained model and use its embedding to parse user questions, generate their corresponding embeddings and then translate it into the vector space for writing the corresponding python codes that answer the original question.

Running the code

  1. Clone the repository here https://github.com/NearPrime/dataOps-notebooks/tree/master/sagarnil/fastapi_sql/fastapiapp_v2
  2. run pip install -r requirements.txt
  3. From outside the fastapiapp_v2 folder, run uvicorn fastapiapp_v2.main:app --reload --workers 1 --host --port 8001
  4. Go to http://localhost:8001/docs
  5. Here you will see two things you can try out…one is get transactions where you can skip rows and select how many rows you wanna see and the second one is given a transaction_hash, it will return the whole row to you

App demo

a) Direct Answers

The following 2 min video shows this app in action. We took a sample of NEAR data and tested the app with it. All the user needs to do is ask their questions in simple English and then they get the answer from the data in real-time as the AI agent writes the codes by itself.

Some examples of questions might be

a) How many transactions happened in total?

b) How many unique users are there?

c) Who are the top 5 users who did the highest number of transactions in March 2022?


b) Visualization**

We are also adding a separate section to this app where the AI can also generate visualization from the data. Currently, this is in the beta stage and we need to feed it more training examples. But here are some initial results.


The examples below were derived from a small subset of mainnet data used during testing.

1. Plot the daily time series of new users**

2. Plot the daily time series of returning users****

3. Plot the top 5 signer_account_id who did the highest numbers of transactions****

4. Plot the histogram of receipt_conversion_gas_burnt****

Useful Links

Debezium Postgres Documentation

Debezium Zulip Message Board

Kafka Connect Documentation

AWS Example WalkthroughAWS MSK Coinbase Article


@WillH This looks like a substantial achievement, and I am surprised no one left any comments.

Having worked as a Cloud Solutions Architect for the last decade, I know for a fact that building scalable AWS Lambda architectures can be a challenging endeavour, especially when it comes to areas like NLP.

But most importantly, the AWS infrastructure bill can be quite eye-watering.

Do you feel you are overpaying Amazon for infrastructure? Are you interested in architectural optimisations that can easily bring down your monthly AWS EC2, AWS Lambda, AWS Redshift utilisation by 25-40%? Are you open to architectural analysis review and consultancy?

If your answer to those questions is YES, do not hesitate to contact me and inquire the details.

Thanks and hope to hear from you

1 Like