Skip to main content

Example: Build Blockchain Indexer

Complete working example of a production-ready blockchain indexer that processes events and stores them in a database.

What You'll Build

A real-time blockchain indexer that:

  • 📡 Monitors blockchain - Subscribes to new blocks and events
  • 💾 Stores data - Persists events to PostgreSQL/SQLite
  • 🔍 Provides queries - Fast lookups via indexed data
  • 🔄 Handles reorgs - Manages chain reorganizations
  • 📊 Tracks progress - Maintains sync state

Prerequisites

  • Node.js 18+ (for TypeScript version)
  • Rust 1.70+ (for Rust version)
  • PostgreSQL or SQLite
  • GLIN Network access

Architecture

Blockchain → Indexer → Database → Query API
↓ ↓ ↓ ↓
Blocks Processing Storage Clients

Project Setup

TypeScript Version

mkdir glin-indexer
cd glin-indexer
npm init -y
npm install @glin-ai/sdk @polkadot/api
npm install pg prisma @prisma/client
npm install -D @types/node @types/pg typescript tsx

Rust Version

cargo new glin-indexer
cd glin-indexer
cargo add glin-client tokio sqlx --features sqlx/postgres,sqlx/runtime-tokio-rustls
cargo add serde serde_json anyhow tracing tracing-subscriber

Database Schema

Prisma Schema (TypeScript)

prisma/schema.prisma
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}

generator client {
provider = "prisma-client-js"
}

model Block {
id Int @id @default(autoincrement())
number BigInt @unique
hash String @unique
parentHash String
timestamp DateTime
createdAt DateTime @default(now())

events Event[]
transfers Transfer[]

@@index([number])
@@index([timestamp])
}

model Event {
id Int @id @default(autoincrement())
blockId Int
block Block @relation(fields: [blockId], references: [id], onDelete: Cascade)

blockNumber BigInt
eventIndex Int
section String
method String
data Json

createdAt DateTime @default(now())

@@unique([blockNumber, eventIndex])
@@index([section, method])
@@index([blockNumber])
}

model Transfer {
id Int @id @default(autoincrement())
blockId Int
block Block @relation(fields: [blockId], references: [id], onDelete: Cascade)

blockNumber BigInt
from String
to String
amount String

createdAt DateTime @default(now())

@@index([from])
@@index([to])
@@index([blockNumber])
}

model SyncState {
id Int @id @default(autoincrement())
lastBlock BigInt
lastBlockHash String
updatedAt DateTime @updatedAt
}

SQL Schema (Rust)

migrations/001_create_tables.sql
CREATE TABLE IF NOT EXISTS blocks (
id SERIAL PRIMARY KEY,
number BIGINT UNIQUE NOT NULL,
hash TEXT UNIQUE NOT NULL,
parent_hash TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_blocks_number ON blocks(number);
CREATE INDEX idx_blocks_timestamp ON blocks(timestamp);

CREATE TABLE IF NOT EXISTS events (
id SERIAL PRIMARY KEY,
block_id INTEGER REFERENCES blocks(id) ON DELETE CASCADE,
block_number BIGINT NOT NULL,
event_index INTEGER NOT NULL,
section TEXT NOT NULL,
method TEXT NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(block_number, event_index)
);

CREATE INDEX idx_events_section_method ON events(section, method);
CREATE INDEX idx_events_block_number ON events(block_number);

CREATE TABLE IF NOT EXISTS transfers (
id SERIAL PRIMARY KEY,
block_id INTEGER REFERENCES blocks(id) ON DELETE CASCADE,
block_number BIGINT NOT NULL,
from_address TEXT NOT NULL,
to_address TEXT NOT NULL,
amount TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_transfers_from ON transfers(from_address);
CREATE INDEX idx_transfers_to ON transfers(to_address);
CREATE INDEX idx_transfers_block_number ON transfers(block_number);

CREATE TABLE IF NOT EXISTS sync_state (
id SERIAL PRIMARY KEY,
last_block BIGINT NOT NULL,
last_block_hash TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

TypeScript Implementation

1. Database Client

src/db.ts
import { PrismaClient } from '@prisma/client';

export const prisma = new PrismaClient();

export async function initDatabase() {
try {
await prisma.$connect();
console.log('✅ Database connected');
} catch (error) {
console.error('❌ Database connection failed:', error);
throw error;
}
}

export async function getSyncState() {
const state = await prisma.syncState.findFirst({
orderBy: { id: 'desc' }
});

return state || null;
}

export async function updateSyncState(blockNumber: bigint, blockHash: string) {
await prisma.syncState.create({
data: {
lastBlock: blockNumber,
lastBlockHash: blockHash
}
});
}

2. Block Processor

src/processor.ts
import { GlinClient } from '@glin-ai/sdk';
import { prisma } from './db';
import type { SignedBlock, EventRecord } from '@polkadot/types/interfaces';

export class BlockProcessor {
constructor(private client: GlinClient) {}

async processBlock(block: SignedBlock, events: EventRecord[]) {
const blockNumber = block.block.header.number.toBigInt();
const blockHash = block.block.header.hash.toHex();
const parentHash = block.block.header.parentHash.toHex();

// Extract timestamp from block
const timestamp = await this.getBlockTimestamp(blockHash);

console.log(`Processing block #${blockNumber} (${blockHash})`);

// Use transaction for atomicity
await prisma.$transaction(async (tx) => {
// 1. Insert block
const dbBlock = await tx.block.create({
data: {
number: blockNumber,
hash: blockHash,
parentHash: parentHash,
timestamp: new Date(timestamp)
}
});

// 2. Process events
for (let i = 0; i < events.length; i++) {
const event = events[i];

// Store all events
await tx.event.create({
data: {
blockId: dbBlock.id,
blockNumber: blockNumber,
eventIndex: i,
section: event.event.section,
method: event.event.method,
data: event.event.data.toJSON()
}
});

// Extract transfers
if (
event.event.section === 'balances' &&
event.event.method === 'Transfer'
) {
const [from, to, amount] = event.event.data;

await tx.transfer.create({
data: {
blockId: dbBlock.id,
blockNumber: blockNumber,
from: from.toString(),
to: to.toString(),
amount: amount.toString()
}
});
}
}
});

console.log(`✅ Block #${blockNumber}: ${events.length} events processed`);
}

private async getBlockTimestamp(blockHash: string): Promise<number> {
const apiAt = await this.client.api.at(blockHash);
const timestamp = await apiAt.query.timestamp.now();
return timestamp.toNumber();
}
}

3. Indexer Service

src/indexer.ts
import { GlinClient } from '@glin-ai/sdk';
import { BlockProcessor } from './processor';
import { getSyncState, updateSyncState } from './db';

export class Indexer {
private processor: BlockProcessor;
private isRunning = false;

constructor(private client: GlinClient) {
this.processor = new BlockProcessor(client);
}

async start() {
this.isRunning = true;
console.log('🚀 Starting indexer...');

// Get last synced block
const syncState = await getSyncState();
const startBlock = syncState ? syncState.lastBlock + 1n : 0n;

console.log(`📍 Starting from block #${startBlock}`);

// Subscribe to new blocks
const unsubscribe = await this.client.api.rpc.chain.subscribeNewHeads(
async (header) => {
if (!this.isRunning) {
unsubscribe();
return;
}

const currentBlock = header.number.toBigInt();

// Catch up if behind
if (currentBlock > startBlock) {
await this.catchUp(startBlock, currentBlock);
}

// Process new block
await this.processNewBlock(header.hash.toHex());
}
);

console.log('✅ Indexer started and listening for blocks');
}

private async catchUp(from: bigint, to: bigint) {
console.log(`⏩ Catching up from #${from} to #${to}`);

for (let i = from; i < to; i++) {
if (!this.isRunning) break;

const blockHash = await this.client.api.rpc.chain.getBlockHash(i);
await this.processNewBlock(blockHash.toHex());

// Log progress every 100 blocks
if (i % 100n === 0n) {
console.log(` Progress: ${i}/${to}`);
}
}
}

private async processNewBlock(blockHash: string) {
try {
// Get block and events
const [block, events] = await Promise.all([
this.client.api.rpc.chain.getBlock(blockHash),
this.client.api.query.system.events.at(blockHash)
]);

// Process block
await this.processor.processBlock(block, events);

// Update sync state
const blockNumber = block.block.header.number.toBigInt();
await updateSyncState(blockNumber, blockHash);

} catch (error) {
console.error(`❌ Error processing block ${blockHash}:`, error);
throw error;
}
}

stop() {
console.log('🛑 Stopping indexer...');
this.isRunning = false;
}
}

4. Main Entry Point

src/index.ts
import { GlinClient } from '@glin-ai/sdk';
import { Indexer } from './indexer';
import { initDatabase } from './db';

async function main() {
try {
// 1. Initialize database
await initDatabase();

// 2. Connect to blockchain
const client = await GlinClient.connect(
process.env.GLIN_RPC || 'wss://testnet.glin.ai'
);

console.log('✅ Connected to GLIN Network');

// 3. Start indexer
const indexer = new Indexer(client);
await indexer.start();

// Handle graceful shutdown
process.on('SIGINT', () => {
console.log('\n👋 Shutting down...');
indexer.stop();
process.exit(0);
});

} catch (error) {
console.error('❌ Fatal error:', error);
process.exit(1);
}
}

main();

5. Query API

src/api.ts
import express from 'express';
import { prisma } from './db';

const app = express();

// Get block by number
app.get('/blocks/:number', async (req, res) => {
const block = await prisma.block.findUnique({
where: { number: BigInt(req.params.number) },
include: {
events: true,
transfers: true
}
});

res.json(block);
});

// Get transfers for address
app.get('/transfers/:address', async (req, res) => {
const { address } = req.params;
const { limit = 100, offset = 0 } = req.query;

const transfers = await prisma.transfer.findMany({
where: {
OR: [
{ from: address },
{ to: address }
]
},
orderBy: { blockNumber: 'desc' },
take: Number(limit),
skip: Number(offset),
include: {
block: true
}
});

res.json(transfers);
});

// Get events by type
app.get('/events/:section/:method', async (req, res) => {
const { section, method } = req.params;
const { limit = 100, offset = 0 } = req.query;

const events = await prisma.event.findMany({
where: { section, method },
orderBy: { blockNumber: 'desc' },
take: Number(limit),
skip: Number(offset),
include: {
block: true
}
});

res.json(events);
});

// Get sync status
app.get('/status', async (req, res) => {
const syncState = await prisma.syncState.findFirst({
orderBy: { id: 'desc' }
});

res.json({
lastBlock: syncState?.lastBlock.toString(),
lastBlockHash: syncState?.lastBlockHash,
updatedAt: syncState?.updatedAt
});
});

app.listen(3000, () => {
console.log('🌐 API server running on http://localhost:3000');
});

Rust Implementation

1. Database Models

src/db/models.rs
use serde::{Deserialize, Serialize};
use sqlx::FromRow;

#[derive(Debug, FromRow, Serialize)]
pub struct Block {
pub id: i32,
pub number: i64,
pub hash: String,
pub parent_hash: String,
pub timestamp: chrono::NaiveDateTime,
}

#[derive(Debug, FromRow, Serialize)]
pub struct Event {
pub id: i32,
pub block_id: i32,
pub block_number: i64,
pub event_index: i32,
pub section: String,
pub method: String,
pub data: serde_json::Value,
}

#[derive(Debug, FromRow, Serialize)]
pub struct Transfer {
pub id: i32,
pub block_id: i32,
pub block_number: i64,
pub from_address: String,
pub to_address: String,
pub amount: String,
}

#[derive(Debug, FromRow)]
pub struct SyncState {
pub id: i32,
pub last_block: i64,
pub last_block_hash: String,
}

2. Database Operations

src/db/mod.rs
use sqlx::{PgPool, postgres::PgPoolOptions};
use anyhow::Result;

pub mod models;
use models::*;

pub struct Database {
pool: PgPool,
}

impl Database {
pub async fn connect(url: &str) -> Result<Self> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(url)
.await?;

tracing::info!("✅ Database connected");

Ok(Self { pool })
}

pub async fn run_migrations(&self) -> Result<()> {
sqlx::migrate!("./migrations")
.run(&self.pool)
.await?;

Ok(())
}

pub async fn insert_block(
&self,
number: i64,
hash: &str,
parent_hash: &str,
timestamp: chrono::NaiveDateTime,
) -> Result<i32> {
let row = sqlx::query!(
r#"
INSERT INTO blocks (number, hash, parent_hash, timestamp)
VALUES ($1, $2, $3, $4)
RETURNING id
"#,
number,
hash,
parent_hash,
timestamp
)
.fetch_one(&self.pool)
.await?;

Ok(row.id)
}

pub async fn insert_event(
&self,
block_id: i32,
block_number: i64,
event_index: i32,
section: &str,
method: &str,
data: serde_json::Value,
) -> Result<()> {
sqlx::query!(
r#"
INSERT INTO events (block_id, block_number, event_index, section, method, data)
VALUES ($1, $2, $3, $4, $5, $6)
"#,
block_id,
block_number,
event_index,
section,
method,
data
)
.execute(&self.pool)
.await?;

Ok(())
}

pub async fn insert_transfer(
&self,
block_id: i32,
block_number: i64,
from: &str,
to: &str,
amount: &str,
) -> Result<()> {
sqlx::query!(
r#"
INSERT INTO transfers (block_id, block_number, from_address, to_address, amount)
VALUES ($1, $2, $3, $4, $5)
"#,
block_id,
block_number,
from,
to,
amount
)
.execute(&self.pool)
.await?;

Ok(())
}

pub async fn get_sync_state(&self) -> Result<Option<SyncState>> {
let state = sqlx::query_as!(
SyncState,
"SELECT * FROM sync_state ORDER BY id DESC LIMIT 1"
)
.fetch_optional(&self.pool)
.await?;

Ok(state)
}

pub async fn update_sync_state(&self, block_number: i64, block_hash: &str) -> Result<()> {
sqlx::query!(
r#"
INSERT INTO sync_state (last_block, last_block_hash, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
"#,
block_number,
block_hash
)
.execute(&self.pool)
.await?;

Ok(())
}
}

3. Block Processor

src/processor.rs
use anyhow::Result;
use subxt::OnlineClient;
use subxt::config::SubstrateConfig;
use crate::db::Database;

pub struct BlockProcessor {
db: Database,
}

impl BlockProcessor {
pub fn new(db: Database) -> Self {
Self { db }
}

pub async fn process_block(
&self,
client: &OnlineClient<SubstrateConfig>,
block_number: u64,
block_hash: String,
) -> Result<()> {
tracing::info!("Processing block #{}", block_number);

// Get block details
let block_hash_bytes = hex::decode(&block_hash[2..])?;
let block = client.blocks().at(block_hash_bytes.try_into()?).await?;

// Get parent hash
let header = block.header();
let parent_hash = format!("0x{}", hex::encode(header.parent_hash));

// Get timestamp (simplified - you'd need to query timestamp pallet)
let timestamp = chrono::Utc::now().naive_utc();

// Insert block
let block_id = self.db.insert_block(
block_number as i64,
&block_hash,
&parent_hash,
timestamp,
).await?;

// Process events
let events = block.events().await?;
for (idx, event) in events.iter().enumerate() {
let event = event?;

// Store event
self.db.insert_event(
block_id,
block_number as i64,
idx as i32,
event.pallet_name(),
event.variant_name(),
serde_json::json!({
"pallet": event.pallet_name(),
"event": event.variant_name(),
}),
).await?;

// Extract transfers
if event.pallet_name() == "Balances" && event.variant_name() == "Transfer" {
// Parse transfer data
// This is simplified - you'd need proper event decoding
tracing::info!(" 📤 Transfer event found");
}
}

tracing::info!("✅ Block #{} processed", block_number);
Ok(())
}
}

4. Indexer Service

src/indexer.rs
use anyhow::Result;
use glin_client::create_client;
use subxt::OnlineClient;
use subxt::config::SubstrateConfig;
use tokio::time::{sleep, Duration};
use crate::db::Database;
use crate::processor::BlockProcessor;

pub struct Indexer {
client: OnlineClient<SubstrateConfig>,
db: Database,
processor: BlockProcessor,
}

impl Indexer {
pub async fn new(rpc_url: &str, db_url: &str) -> Result<Self> {
let client = create_client(rpc_url).await?;
let db = Database::connect(db_url).await?;

// Run migrations
db.run_migrations().await?;

let processor = BlockProcessor::new(db.clone());

Ok(Self { client, db, processor })
}

pub async fn start(&self) -> Result<()> {
tracing::info!("🚀 Starting indexer...");

// Get last synced block
let start_block = match self.db.get_sync_state().await? {
Some(state) => (state.last_block + 1) as u64,
None => 0,
};

tracing::info!("📍 Starting from block #{}", start_block);

let mut current_block = start_block;

loop {
// Get latest block
let latest_hash = self.client.rpc().finalized_head().await?;
let latest_block = self.client.blocks().at(latest_hash).await?;
let latest_number = latest_block.number() as u64;

// Process blocks
while current_block <= latest_number {
let block_hash = self.get_block_hash(current_block).await?;

self.processor.process_block(
&self.client,
current_block,
block_hash.clone(),
).await?;

// Update sync state
self.db.update_sync_state(current_block as i64, &block_hash).await?;

current_block += 1;

// Log progress
if current_block % 100 == 0 {
tracing::info!("⏩ Progress: {}/{}", current_block, latest_number);
}
}

// Wait for new blocks
sleep(Duration::from_secs(6)).await;
}
}

async fn get_block_hash(&self, block_number: u64) -> Result<String> {
let hash = self.client.rpc().block_hash(Some(block_number.into())).await?;
match hash {
Some(h) => Ok(format!("0x{}", hex::encode(h))),
None => anyhow::bail!("Block {} not found", block_number),
}
}
}

5. Main Entry Point

src/main.rs
mod db;
mod processor;
mod indexer;

use anyhow::Result;
use tracing_subscriber;

#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();

// Load config from environment
let rpc_url = std::env::var("GLIN_RPC")
.unwrap_or_else(|_| "wss://testnet.glin.ai".to_string());

let db_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");

// Create and start indexer
let indexer = indexer::Indexer::new(&rpc_url, &db_url).await?;

tracing::info!("✅ Connected to GLIN Network");

indexer.start().await?;

Ok(())
}

Running the Indexer

TypeScript

# Setup database
npx prisma migrate dev

# Set environment variables
export DATABASE_URL="postgresql://user:password@localhost:5432/glin_indexer"
export GLIN_RPC="wss://testnet.glin.ai"

# Run indexer
tsx src/index.ts

# Run API (separate terminal)
tsx src/api.ts

Rust

# Run migrations
sqlx migrate run

# Set environment variables
export DATABASE_URL="postgresql://user:password@localhost:5432/glin_indexer"
export GLIN_RPC="wss://testnet.glin.ai"

# Run indexer
cargo run --release

Query Examples

Get Block Information

curl http://localhost:3000/blocks/1000

Response:

{
"id": 1,
"number": "1000",
"hash": "0x...",
"parentHash": "0x...",
"timestamp": "2024-01-15T10:30:00Z",
"events": [...],
"transfers": [...]
}

Get Transfers for Address

curl "http://localhost:3000/transfers/5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY?limit=10"

Get Events by Type

curl http://localhost:3000/events/balances/Transfer

Check Sync Status

curl http://localhost:3000/status

Response:

{
"lastBlock": "12543",
"lastBlockHash": "0x...",
"updatedAt": "2024-01-15T10:35:22Z"
}

Production Optimizations

1. Batch Processing

// Process multiple blocks in parallel
async function processBatchParallel(blockNumbers: bigint[]) {
const promises = blockNumbers.map(num => processBlock(num));
await Promise.all(promises);
}

2. Database Indexing

-- Add composite indexes for common queries
CREATE INDEX idx_transfers_from_to ON transfers(from_address, to_address);
CREATE INDEX idx_events_section_method_block ON events(section, method, block_number DESC);

-- Add partial indexes for active data
CREATE INDEX idx_recent_transfers ON transfers(block_number DESC)
WHERE block_number > (SELECT MAX(block_number) - 10000 FROM blocks);

3. Connection Pooling

// Configure connection pool
const prisma = new PrismaClient({
datasources: {
db: {
url: process.env.DATABASE_URL
}
},
log: ['error'],
// Connection pool settings
__internal: {
engine: {
connection_limit: 10
}
}
});

4. Error Recovery

async function processWithRetry(blockNumber: bigint, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
await processBlock(blockNumber);
return;
} catch (error) {
console.error(`Attempt ${i + 1} failed for block ${blockNumber}`);
if (i === maxRetries - 1) throw error;
await sleep(1000 * (i + 1)); // Exponential backoff
}
}
}

5. Handle Chain Reorgs

async function handleReorg(newHead: string) {
// Find common ancestor
const commonBlock = await findCommonAncestor(newHead);

// Delete blocks after reorg point
await prisma.block.deleteMany({
where: {
number: { gt: commonBlock.number }
}
});

// Re-index from common ancestor
await resumeFromBlock(commonBlock.number + 1n);
}

Monitoring

Health Check Endpoint

app.get('/health', async (req, res) => {
const syncState = await prisma.syncState.findFirst({
orderBy: { id: 'desc' }
});

const latestBlock = await client.api.rpc.chain.getBlock();
const chainHead = latestBlock.block.header.number.toBigInt();

const lag = chainHead - (syncState?.lastBlock || 0n);

res.json({
status: lag < 10n ? 'healthy' : 'lagging',
lastIndexedBlock: syncState?.lastBlock.toString(),
chainHead: chainHead.toString(),
lag: lag.toString(),
uptime: process.uptime()
});
});

Prometheus Metrics

import { register, Counter, Gauge } from 'prom-client';

const blocksProcessed = new Counter({
name: 'indexer_blocks_processed_total',
help: 'Total blocks processed'
});

const eventsProcessed = new Counter({
name: 'indexer_events_processed_total',
help: 'Total events processed'
});

const indexerLag = new Gauge({
name: 'indexer_lag_blocks',
help: 'Number of blocks behind chain head'
});

app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
});

Troubleshooting

Indexer Falls Behind

Problem: Indexer can't keep up with block production

Solutions:

  • Increase batch size for historical sync
  • Use multiple indexer instances with partitioning
  • Optimize database queries with proper indexes
  • Use read replicas for API queries

Database Disk Space

Problem: Database grows too large

Solutions:

  • Implement data retention policies
  • Archive old blocks to cold storage
  • Use table partitioning by block number
  • Compress historical data

Missing Events

Problem: Some events are not being indexed

Solutions:

  • Check event filters are not too restrictive
  • Verify all event types are handled
  • Check for transaction errors during indexing
  • Review logs for processing errors

Next Steps


Need help? Join our Discord