Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
22c7117
initial scaffolding
bryzettler Aug 21, 2025
53f0012
scaffolding part 2
bryzettler Aug 25, 2025
f992205
add last_block_height to pgsink
bryzettler Aug 25, 2025
f11b4b2
wip
bryzettler Aug 27, 2025
461bdd0
queries
bryzettler Aug 27, 2025
4c4c5a0
pg-sink and asset-ownership updated
bryzettler Aug 27, 2025
77efa39
fix models
bryzettler Aug 27, 2025
463bcce
logging event and constructing data
bryzettler Aug 29, 2025
6dc08eb
proper gitignore
bryzettler Aug 29, 2025
dc5f2fc
updated queries
bryzettler Aug 29, 2025
581b508
working but needs optimization
bryzettler Sep 4, 2025
bc1a97e
good spot to commit
bryzettler Sep 5, 2025
999ed13
minifan out logic back
bryzettler Sep 5, 2025
7708806
tracking if jobs are running
bryzettler Sep 5, 2025
97933a4
remove unused
bryzettler Sep 5, 2025
ef59691
pruned unused logic
bryzettler Sep 5, 2025
e49cbf5
good spot to implment job queue
bryzettler Sep 5, 2025
4ed0789
gets through iot
bryzettler Sep 5, 2025
4116dfd
add simple queue logic for jobs
bryzettler Sep 8, 2025
6b6a523
clean up metrics
bryzettler Sep 8, 2025
95a3328
add batch processign
bryzettler Sep 8, 2025
462890d
process jobs fully before waiting polling period
bryzettler Sep 8, 2025
e37d944
process blocks in chunks
bryzettler Sep 8, 2025
ccdfd4e
move some logic to the bldr fns
bryzettler Sep 8, 2025
05ca9c1
parsing both iot and mobile
bryzettler Sep 8, 2025
3661589
clean up
bryzettler Sep 9, 2025
20fc83a
fmt
bryzettler Sep 9, 2025
f2bf151
shutdown handler
bryzettler Sep 9, 2025
2df55b3
cleanup
bryzettler Sep 9, 2025
72934fe
add ingestor logic
bryzettler Sep 9, 2025
49bfb50
Merge branch 'develop' into feat/atomic-data-publisher
bryzettler Sep 9, 2025
4d60b59
cleanup
bryzettler Sep 11, 2025
4915061
update readme
bryzettler Sep 11, 2025
ed6a658
cleanup
bryzettler Sep 12, 2025
9a6a4fc
fix dryrun config
bryzettler Sep 12, 2025
b6a992a
rm comments in query
bryzettler Sep 12, 2025
ba012ff
cleanup default.toml
bryzettler Sep 12, 2025
18f0eae
turn on iot
bryzettler Sep 12, 2025
63091b9
patch
bryzettler Sep 12, 2025
a26ba23
run off toml and use proper keypair signing
bryzettler Sep 17, 2025
85d2f37
decouple queries
bryzettler Sep 17, 2025
6e0929b
rm comments and use ingest_time_seconds
bryzettler Sep 18, 2025
87e706d
cargo
bryzettler Sep 18, 2025
82979bf
add last_block_height to plugins and remove snakecase
bryzettler Sep 18, 2025
5b415bf
using prom metrics
bryzettler Sep 18, 2025
9812b15
cleanup
bryzettler Sep 18, 2025
e878a77
use blockheight instead of slot, update logs in publisher
bryzettler Sep 18, 2025
72a4efe
cleanup
bryzettler Sep 18, 2025
cd4e052
just use tokio
bryzettler Sep 18, 2025
0c52f3a
Using slot instead of block height
bryzettler Sep 19, 2025
9d7ef39
tweak
bryzettler Sep 20, 2025
f14ec22
tweaks
bryzettler Sep 21, 2025
987655a
add comment
bryzettler Sep 24, 2025
1cd09de
dont refrence block_height
bryzettler Sep 25, 2025
f708f26
pr feedback
bryzettler Sep 25, 2025
b2c7fda
add max block logic
bryzettler Sep 25, 2025
9fc7e61
no need for max_block_occurrences
bryzettler Sep 26, 2025
5dd2bbe
use metrics_exporter_prometheus
bryzettler Sep 26, 2025
d436283
turn on jobs
bryzettler Sep 26, 2025
37ff340
alias block_height
bryzettler Sep 26, 2025
775f382
restructure toml
bryzettler Sep 26, 2025
8e7761a
Bugfixes
ChewingGlass Sep 30, 2025
e0e6c0c
Fix protobuf helium keys
ChewingGlass Sep 30, 2025
f75eef2
use composite index
bryzettler Oct 1, 2025
6895f3b
proper type
bryzettler Oct 1, 2025
c089479
add syncWithViews
bryzettler Oct 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ exclude = [
"utils/generate-test-gateway-txn",
"utils/standardize-hotspot-metadata",
"utils/pyth_solana_receiver_sdk",
"utils/atomic-data-publisher",
]

[workspace.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion packages/account-postgres-sink-service/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export const OMIT_KEYS = ["refreshed_at", "createdAt"];
export const OMIT_KEYS = ["createdAt", "refreshedAt", "lastBlock"];
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ export const EncodeEntityKeyPlugin = ((): IPlugin => {
};
};

const processAccount = async (account: { [key: string]: any }) => {
const processAccount = async (
account: { [key: string]: any },
transaction?: any,
lastBlock?: number | null
) => {
try {
const entityKey = account[camelize(config.field || "entity_key", true)];
const keySerializationRaw = account[camelize("key_serialization", true)];
const keySerializationRaw =
account[camelize("key_serialization", true)];
const keySerialization =
typeof keySerializationRaw === "string"
? keySerializationRaw.trim().toLowerCase()
Expand All @@ -48,7 +53,10 @@ export const EncodeEntityKeyPlugin = ((): IPlugin => {
if (entityKey && keySerialization) {
if (keySerialization === "utf8") {
encodedEntityKey = Buffer.from(entityKey, "utf8").toString("utf8");
} else if (keySerialization === "b58" || keySerialization === "bs58") {
} else if (
keySerialization === "b58" ||
keySerialization === "bs58"
) {
encodedEntityKey = bs58.encode(entityKey);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import BN from "bn.js";
import { cellToLatLng } from "h3-js";
import { camelize } from "inflection";
import _omit from "lodash/omit";
import { DataTypes, Model, QueryTypes } from "sequelize";
import { IPlugin } from "../types";
import { database } from "../utils/database";
import { MapboxService } from "../utils/mapboxService";
import { PublicKey } from "@solana/web3.js";
import sequelize from "sequelize";
import { syncTableWithViews } from "../utils/syncTableWithViews";

export class RewardsRecipient extends Model {
declare asset: string;
Expand All @@ -16,10 +15,10 @@ export class RewardsRecipient extends Model {
declare destination: string;
declare entityKey: string;
declare encodedEntityKey: string;
declare shares: number
declare totalShares: number
declare fixedAmount: number
declare type: 'direct' | 'fanout'
declare shares: number;
declare totalShares: number;
declare fixedAmount: number;
declare type: "direct" | "fanout";
}

RewardsRecipient.init(
Expand Down Expand Up @@ -47,7 +46,7 @@ RewardsRecipient.init(
field: "encoded_entity_key",
},
keySerialization: {
type: DataTypes.STRING,
type: "TEXT",
allowNull: false,
},
shares: {
Expand All @@ -66,6 +65,11 @@ RewardsRecipient.init(
type: DataTypes.STRING,
allowNull: false,
},
lastBlock: {
type: DataTypes.DECIMAL.UNSIGNED,
allowNull: true,
defaultValue: null,
},
},
{
sequelize: database,
Expand All @@ -74,40 +78,46 @@ RewardsRecipient.init(
underscored: true,
timestamps: true,
indexes: [
{
fields: ["asset"],
},
{
fields: ["destination"],
},
{
fields: ["owner"],
},
{
fields: ["type"],
fields: ["last_block"],
},
{
fields: ["encoded_entity_key"],
},
{
fields: ["asset", "last_block"],
},
{
fields: ["type", "last_block"],
},
],
}
);

type MiniFanoutShare = {
wallet: string,
delegate: string,
share: Share,
totalDust: number,
totalOwed: number,
}
wallet: string;
delegate: string;
share: Share;
totalDust: number;
totalOwed: number;
};

type Share = {
share?: { amount: number },
fixed?: { amount: number },
}
share?: { amount: number };
fixed?: { amount: number };
};

export class Recipient extends Model {
declare address: string
declare asset: string
declare destination: string
declare lazyDistributor: string
declare address: string;
declare asset: string;
declare destination: string;
declare lazyDistributor: string;
}

Recipient.init(
Expand All @@ -133,15 +143,15 @@ Recipient.init(
underscored: true,
timestamps: false,
}
)
);

export class KeyToAsset extends Model {
declare address: string
declare asset: string
declare dao: string
declare entityKey: Buffer
declare keySerialization: string
declare encodedEntityKey: string
declare address: string;
declare asset: string;
declare dao: string;
declare entityKey: Buffer;
declare keySerialization: string;
declare encodedEntityKey: string;
}

KeyToAsset.init(
Expand All @@ -152,12 +162,13 @@ KeyToAsset.init(
},
asset: {
type: DataTypes.STRING,
allowNull: false,
},
dao: {
type: DataTypes.STRING,
},
keySerialization: {
type: DataTypes.STRING,
type: DataTypes.JSONB,
},
entityKey: {
type: "BYTEA",
Expand All @@ -177,7 +188,7 @@ KeyToAsset.init(
underscored: true,
timestamps: false,
}
)
);

export class MiniFanout extends Model {
declare owner: string;
Expand Down Expand Up @@ -262,29 +273,44 @@ MiniFanout.init(
}
);

export const HNT_LAZY_DISTRIBUTOR = "6gcZXjHgKUBMedc2V1aZLFPwh8M1rPVRw7kpo2KqNrFq"
export const HNT_LAZY_DISTRIBUTOR =
"6gcZXjHgKUBMedc2V1aZLFPwh8M1rPVRw7kpo2KqNrFq";

export async function handleMiniFanout(asset: string, account: { [key: string]: any }, transaction: any) {
const prevAccount = await MiniFanout.findByPk(account.address, { transaction })
const oldShares = prevAccount?.shares || []
const newShares = (account.shares || []) as MiniFanoutShare[]
export async function handleMiniFanout(
asset: string,
account: { [key: string]: any },
transaction: any,
lastBlock?: number | null
) {
const prevAccount = await MiniFanout.findByPk(account.address, {
transaction,
});
const oldShares = prevAccount?.shares || [];
const newShares = (account.shares || []) as MiniFanoutShare[];

function getEffectiveDestination(share: MiniFanoutShare) {
return (!share.delegate || share.delegate === PublicKey.default.toBase58()) ? share.wallet : share.delegate
return !share.delegate || share.delegate === PublicKey.default.toBase58()
? share.wallet
: share.delegate;
}

function getShareKey(share: MiniFanoutShare) {
return `${asset}-${getEffectiveDestination(share)}-${JSON.stringify(share.share)}`
return `${asset}-${getEffectiveDestination(share)}-${JSON.stringify(
share.share
)}`;
}
// Create a map of wallet+delegate to share for easy lookup
const oldSharesMap = new Map(
oldShares.map(share => [getShareKey(share), share])
)
oldShares.map((share) => [getShareKey(share), share])
);
const newSharesMap = new Map(
newShares.map(share => [getShareKey(share), share])
)
newShares.map((share) => [getShareKey(share), share])
);

const totalShares = newShares.reduce((acc, share) => acc + (share.share?.share?.amount || 0), 0)
const totalShares = newShares.reduce(
(acc, share) => acc + (share.share?.share?.amount || 0),
0
);

// Handle deletions - remove shares that exist in old but not in new
for (const [key, oldShare] of oldSharesMap) {
Expand All @@ -295,20 +321,20 @@ export async function handleMiniFanout(asset: string, account: { [key: string]:
asset,
shares: oldShare.share?.share?.amount || 0,
},
transaction
})
transaction,
});
}
}

// Handle updates and additions
for (const [key, newShare] of newSharesMap) {
const kta = await KeyToAsset.findOne({
where: {
dao: 'BQ3MCuTT5zVBhNfQ4SjMh3NPVhFy73MPV8rjfq5d1zie',
dao: "BQ3MCuTT5zVBhNfQ4SjMh3NPVhFy73MPV8rjfq5d1zie",
asset: asset,
},
transaction
})
transaction,
});

const toCreate = {
asset,
Expand All @@ -320,20 +346,20 @@ export async function handleMiniFanout(asset: string, account: { [key: string]:
entityKey: kta?.entityKey,
encodedEntityKey: kta?.encodedEntityKey,
keySerialization: kta?.keySerialization,
type: 'fanout'
}
type: "fanout",
lastBlock,
};

await RewardsRecipient.upsert(toCreate, { transaction })
await RewardsRecipient.upsert(toCreate, { transaction });
}

return account
return account;
}

export const ExplodeMiniFanoutOwnershipPlugin = ((): IPlugin => {
const name = "ExplodeMiniFanoutOwnership";
const init = async (config: { [key: string]: any }) => {
const updateOnDuplicateFields: string[] = [];

const existingColumns = (
await database.query(
`
Expand All @@ -353,34 +379,48 @@ export const ExplodeMiniFanoutOwnershipPlugin = ((): IPlugin => {
!existingColumns.length ||
!columns.every((col) => existingColumns.includes(col))
) {
await RewardsRecipient.sync({ alter: true });
console.log("Syncing rewards_recipients table");
await syncTableWithViews(
database,
"rewards_recipients",
"public",
async () => {
await RewardsRecipient.sync({ alter: true });
}
);
}

const addFields = () => { };
const addFields = () => {};

const processAccount = async (
account: { [key: string]: any },
transaction?: any
transaction?: any,
lastBlock?: number | null
) => {
try {
const asset = account.preTask?.remoteV0?.url?.replace("https://hnt-rewards.oracle.helium.io/v1/tuktuk/asset/", "").replace("https://hnt-rewards.oracle.test-helium.com/v1/tuktuk/asset/", "")
const asset = account.preTask?.remoteV0?.url
?.replace("https://hnt-rewards.oracle.helium.io/v1/tuktuk/asset/", "")
.replace(
"https://hnt-rewards.oracle.test-helium.com/v1/tuktuk/asset/",
""
);
if (!asset) {
return account
return account;
}
const recipient = await Recipient.findOne({
where: {
destination: account.address,
asset,
lazyDistributor: HNT_LAZY_DISTRIBUTOR
}
})
lazyDistributor: HNT_LAZY_DISTRIBUTOR,
},
});
if (!recipient) {
return account
return account;
}
return handleMiniFanout(asset, account, transaction)
return handleMiniFanout(asset, account, transaction, lastBlock);
} catch (err) {
console.error("Error exploding mini fanout ownership", err)
throw err
console.error("Error exploding mini fanout ownership", err);
throw err;
}
};

Expand Down
Loading
Loading