This commit is contained in:
Philip Ahlqvist 2024-07-23 00:55:51 +02:00
commit bb5b61f3b0
26 changed files with 1631 additions and 0 deletions

11
.dockerignore Normal file
View file

@ -0,0 +1,11 @@
node_modules/
npm-debug.log
Dockerfile
.dockerignore
data/
.env
state.json
config.yml

8
.gitignore vendored Normal file
View file

@ -0,0 +1,8 @@
node_modules/
npm-debug.log
data/
.env
state.json
config.yml

13
Dockerfile Normal file
View file

@ -0,0 +1,13 @@
FROM docker.io/node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY . .
EXPOSE 8080
CMD ["npm", "run", "start"]

15
LICENSE Normal file
View file

@ -0,0 +1,15 @@
The MIT License (MIT)
Copyright (c) 2024 Philip Ahlqvist.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the
following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS-IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OF OTHER DEALINGS IN THE SOFTWARE.

56
README.md Normal file
View file

@ -0,0 +1,56 @@
# Lightseeker - A supercharged search engine API for LBRY.
![GitHub License](https://img.shields.io/github/license/LBRYFoundation/lightseeker)
Lightseeker consists of 4 different components, that together offer a supercharged search of publications on the LBRY network.
* `MeiliSearch` - as the database backend.
* `Sync` - a service that syncs claims in the MeiliSearch database.
* `Purge` - a service that removes any blocked/filtered content from the database.
* `Lightseeker` - a search API server, which is a drop-in replacement for Lighthouse.
## Roadmap
* Use HUB servers for Sync instead of Chainquery.
## Usage
* To make a simple search by string:
```
https://{LIGHTSEEKER_INSTANCE}/search?s={STRING_TO_SEARCH}
```
* To get autocomplete suggestions:
```
https://{LIGHTSEEKER_INSTANCE}/autocomplete?s={STRING_TO_COMPLETE}
```
## Installation
### Prerequisites
* [Node](https://nodejs.org/en/download/)
* [MeiliSearch](https://www.meilisearch.com/)
After you've made sure all of that is set up, you are almost all set!
You are now just three simple steps away from a working Lightseeker instance.
> Clone the repo
```bash
git clone https://github.com/LBRYFoundation/lightseeker.git
```
> Install dependencies
```bash
npm install
```
> Start Lightseeker
```bash
npm run start
```
That's it! Now it should be live at http://localhost:3000, or whatever the `PORT` environment variable is set to.
## Contributing
Contributions to this project are welcome and encouraged. For more details, see https://lbry.tech/contribute.
## Licence
This project is MIT licensed. For the full license, see [LICENSE](/LICENSE).

41
docker-compose.yml Normal file
View file

@ -0,0 +1,41 @@
services:
# lightseeker:
# build:
# context: .
# dockerfile: ./Dockerfile
# ports:
# - 3000:8080
# volumes:
# - ./data:/data
# environment:
# - PORT=8080
# - DATA_DIR=/data
# - MEILISEARCH_HOST=meilisearch:${MEILI_PORT:-7700}
# - MEILISEARCH_API_KEY=${MEILI_MASTER_KEY:-CYDDKecywVUcW5mNC7LaGil6qxuXuPZ7gGSWWLnz4XE}
# depends_on:
# - meilisearch
meilisearch:
container_name: meilisearch
image: docker.io/getmeili/meilisearch:v1.8.3
environment:
- http_proxy
- https_proxy
- MEILI_MASTER_KEY=${MEILI_MASTER_KEY:-CYDDKecywVUcW5mNC7LaGil6qxuXuPZ7gGSWWLnz4XE}
- MEILI_NO_ANALYTICS=${MEILI_NO_ANALYTICS:-true}
- MEILI_ENV=${MEILI_ENV:-production}
- MEILI_LOG_LEVEL
- MEILI_DB_PATH=${MEILI_DB_PATH:-/data.ms}
ports:
- ${MEILI_PORT:-7700}:7700
volumes:
- ./data:/data.ms
#deploy:
# resources:
# limits:
# memory: 5G
restart: unless-stopped

125
package-lock.json generated Normal file
View file

@ -0,0 +1,125 @@
{
"name": "lightseeker",
"version": "0.0.1",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "lightseeker",
"version": "0.0.1",
"license": "MIT",
"dependencies": {
"@hono/node-server": "^1.11.3",
"hono": "^4.4.6",
"meilisearch": "^0.40.0",
"yaml": "^2.4.5"
},
"devDependencies": {
"dotenv": "^16.4.5"
}
},
"node_modules/@hono/node-server": {
"version": "1.11.3",
"resolved": "https://registry.npmjs.org/@hono/node-server/-/node-server-1.11.3.tgz",
"integrity": "sha512-mFg3qlKkDtMWSalX5Gyh6Zd3MXay0biGobFlyJ49i6R1smBBS1CYkNZbvwLlw+4sSrHO4ZiH7kj4TcLpl2Jr3g==",
"license": "MIT",
"engines": {
"node": ">=18.14.1"
}
},
"node_modules/cross-fetch": {
"version": "3.1.8",
"resolved": "https://registry.npmjs.org/cross-fetch/-/cross-fetch-3.1.8.tgz",
"integrity": "sha512-cvA+JwZoU0Xq+h6WkMvAUqPEYy92Obet6UdKLfW60qn99ftItKjB5T+BkyWOFWe2pUyfQ+IJHmpOTznqk1M6Kg==",
"license": "MIT",
"dependencies": {
"node-fetch": "^2.6.12"
}
},
"node_modules/dotenv": {
"version": "16.4.5",
"resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.4.5.tgz",
"integrity": "sha512-ZmdL2rui+eB2YwhsWzjInR8LldtZHGDoQ1ugH85ppHKwpUHL7j7rN0Ti9NCnGiQbhaZ11FpR+7ao1dNsmduNUg==",
"dev": true,
"license": "BSD-2-Clause",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://dotenvx.com"
}
},
"node_modules/hono": {
"version": "4.4.6",
"resolved": "https://registry.npmjs.org/hono/-/hono-4.4.6.tgz",
"integrity": "sha512-XGRnoH8WONv60+PPvP9Sn067A9r/8JdHDJ5bgon0DVEHeR1cJPkWjv2aT+DBfMH9/mEkYa1+VEVFp1DT1lIwjw==",
"license": "MIT",
"engines": {
"node": ">=16.0.0"
}
},
"node_modules/meilisearch": {
"version": "0.40.0",
"resolved": "https://registry.npmjs.org/meilisearch/-/meilisearch-0.40.0.tgz",
"integrity": "sha512-BoRhQMr2mBFLEeCfsvPluksGb01IaOiWvV3Deu3iEY+yYJ4jdGTu+IQi5FCjKlNQ7/TMWSN2XUToSgvH1tj0BQ==",
"license": "MIT",
"dependencies": {
"cross-fetch": "^3.1.6"
}
},
"node_modules/node-fetch": {
"version": "2.7.0",
"resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz",
"integrity": "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==",
"license": "MIT",
"dependencies": {
"whatwg-url": "^5.0.0"
},
"engines": {
"node": "4.x || >=6.0.0"
},
"peerDependencies": {
"encoding": "^0.1.0"
},
"peerDependenciesMeta": {
"encoding": {
"optional": true
}
}
},
"node_modules/tr46": {
"version": "0.0.3",
"resolved": "https://registry.npmjs.org/tr46/-/tr46-0.0.3.tgz",
"integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==",
"license": "MIT"
},
"node_modules/webidl-conversions": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-3.0.1.tgz",
"integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==",
"license": "BSD-2-Clause"
},
"node_modules/whatwg-url": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-5.0.0.tgz",
"integrity": "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==",
"license": "MIT",
"dependencies": {
"tr46": "~0.0.3",
"webidl-conversions": "^3.0.0"
}
},
"node_modules/yaml": {
"version": "2.4.5",
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.4.5.tgz",
"integrity": "sha512-aBx2bnqDzVOyNKfsysjA2ms5ZlnjSAW2eG3/L5G/CSujfjLJTJsEw1bGw8kCf04KodQWk1pxlGnZ56CRxiawmg==",
"license": "ISC",
"bin": {
"yaml": "bin.mjs"
},
"engines": {
"node": ">= 14"
}
}
}
}

31
package.json Normal file
View file

@ -0,0 +1,31 @@
{
"name": "lightseeker",
"version": "0.0.1",
"description": "Lightseeker is a supercharged search engine for publications on the LBRY network.",
"main": "src/index.js",
"type": "module",
"keywords": [
"lbry",
"search",
"hono",
"rest",
"api",
"meilisearch"
],
"scripts": {
"start": "NODE_ENV=production node src/index.js",
"dev": "NODE_ENV=development node --watch -r dotenv/config src/index.js",
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "MIT",
"dependencies": {
"@hono/node-server": "^1.11.3",
"hono": "^4.4.6",
"meilisearch": "^0.40.0",
"yaml": "^2.4.5"
},
"devDependencies": {
"dotenv": "^16.4.5"
}
}

35
src/api.js Normal file
View file

@ -0,0 +1,35 @@
import { serve } from '@hono/node-server';
import { Hono } from 'hono';
import { getSize } from './utils/common.js';
import config from './config.js';
import { getLogger, style } from './utils/logger.js';
import search from './api/search.js';
import status from './api/status.js';
import autocomplete from './api/autocomplete.js';
const logger = getLogger("API");
export const app = new Hono()
app.get('/', (c) => c.json("Welcome to Lightseeker!"));
app.use(async (c, next)=> {
logger.info(`<-- ${style.bright}${c.req.method}${style.reset} ${c.req.path}`);
const start = new Date().getTime();
await next();
logger.info(`--> ${style.bright}${c.req.method}${style.reset} ${c.req.path} ${c.res.status} ${new Date().getTime() - start}ms ${Object.values(getSize((await c.res.arrayBuffer()).byteLength)).join('') + "B"}`);
});
app.route('/search', search);
app.route('/status', status);
app.route('/autocomplete', autocomplete);
serve({
fetch: app.fetch,
port: config.port,
})
logger.info("Listening on port: " + config.port);

32
src/api/autocomplete.js Normal file
View file

@ -0,0 +1,32 @@
import { Hono } from "hono"
import { Buffer } from 'node:buffer';
import config from "../config.js";
import meilisearch from '../utils/meilisearch.js';
await meilisearch.start();
const index = await meilisearch.getIndex(config.indexName);
// console.log(await index.updateFilterableAttributes(["nsfw"]));
const router = new Hono()
router.get('/', async (c)=>{
// { s, channel, size, from, nsfw, contentType, mediaType, claimType }
const queries = c.req.queries();
const results = await index.search(queries.s[0], {
filter: ['nsfw = 0', 'claim_type != 2'],
hitsPerPage: queries.size || 9,
offset: queries.from || 0
});
const hits = results.hits.map(hit=>{
return hit.name
})
return c.json(hits);
// const claims = await index.search(c.req.query)
});
export default router;

84
src/api/search.js Normal file
View file

@ -0,0 +1,84 @@
import { Hono } from "hono"
import { Buffer } from 'node:buffer';
import config from "../config.js";
import meilisearch from '../utils/meilisearch.js';
import processQueries from "../utils/processQueries.js";
import { getLogger } from "../utils/logger.js";
const logger = getLogger("/search");
await meilisearch.start();
const index = await meilisearch.getIndex(config.indexName);
const router = new Hono()
router.get('/', async (c)=>{
// { s, channel, size, from, nsfw, contentType, mediaType, claimType }
let q;
try {
q = processQueries(c.req.queries());
} catch (err) {
return c.json({ success: false, error: err, data: null});
}
if (q.error) return c.json({ success: false, error: q.error, data: null});
const queries = q.queries;
logger.debug(queries)
// Here, we are only interested in the queries that are indexFilters
const filters = Object.fromEntries(Object.entries(queries).
filter(([key, value]) => config.meilisearch.filters.includes(key)));
const results = await index.search(queries.s, {
filter: Object.keys(filters).map(q=> {
switch (typeof filters[q]) {
case 'object':
return filters[q].map(f=> `${q} = "${f}"` ).join(' OR ');
case 'string':
return `${q} = "${filters[q]}"`;
default:
return `${q} = ${filters[q]}`;
}
}),
sort: [
"effective_amount:desc",
"reposts:desc",
"release_time:desc"
],
offset: queries.from,
limit: queries.size
});
const hits = results.hits.map(hit=>{
// return hit;
if (queries.resolve) return {
channel: hit.channel,
channel_claim_id: hit.channel_id,
claimId: hit.claim_id,
duration: hit.duration,
fee: hit.fee,
name: hit.name,
release_time: hit.release_time,
thumbnail_url: hit.thumbnail_url,
title: hit.title,
reposts: hit.reposts,
is_nsfw: hit.is_nsfw
// effective_amount: hit.effective_amount
// tags: hit.tags
}
else return {
claimId: hit.claim_id,
name: hit.name,
};
})
return c.json(hits);
// const claims = await index.search(c.req.query)
});
export default router;

29
src/api/status.js Normal file
View file

@ -0,0 +1,29 @@
import { Hono } from "hono"
import meilisearch from '../utils/meilisearch.js';
import { getSize } from "../utils/common.js";
import config from "../config.js";
import ClaimSync from "../services/Sync.js";
const router = new Hono()
router.get('/', async (c)=>{
const stats = await meilisearch.client.getStats();
// const storage =
const index = await meilisearch.getIndex(config.indexName);
// const indexStats ();
const info = await index.getStats();
return c.json({
spaceUsed: Object.values(getSize(stats.databaseSize)).join("") + "B",
claimsInIndex: stats.indexes[config.indexName].numberOfDocuments,
totalSearches: 0,
isIndexing: info.isIndexing,
synced: ClaimSync.synced,
syncStatus: ClaimSync.status,
purgeLists: config.purgeLists,
database: { ...await meilisearch.client.getVersion(), name: "meilisearch"}
});
// const claims = await index.search(c.req.query)
});
export default router;

45
src/config.js Normal file
View file

@ -0,0 +1,45 @@
import YAML from 'yaml'
import fs from 'fs';
import path from 'path';
import { getLogger } from './utils/logger.js';
const logger = getLogger("Config");
// Grab the config file
let config = {};
// Try read file
try {
config = YAML.parse(await fs.promises.readFile(process.env.CONFIG_FILE ? path.join(process.env.CONFIG_FILE, "config.yml") : 'config.yml', { encoding: 'UTF-8' }) || "");
logger.info("Loaded configuration file.");
} catch (err) {
logger.info("No configuration file found. Using defaults...");
}
logger.debug(config);
function ensureAddress(host) {
const url = new URL((host.startsWith('https://') || host.startsWith('http://')) ? host : "http://" + host);
return url;
}
export default {
dataDirectory: process.env.DATA_DIR || process.cwd(),
batchSize: config.batchSize || 5000,
maxClaimsToProcessPerIteration: 100000,
indexName: "claims",
checkTaskInterval: 200,
sleepTimeAfterFullSync: 2 * 60 * 1000,
failedChainquerySleep: 5000,
port: process.env.PORT || config.apiPort || 3000,
purgeLists: config.purgeLists || [],
CHAINQUERY: ensureAddress(process.env.CHAINQUERY_HOST || config.chainqueryHost || "https://chainquery.lbry.com"),
meilisearch: {
host: ensureAddress(process.env.MEILISEARCH_HOST || config.meilisearchHost || "http://127.0.0.1:7700"),
apiKey: process.env.MEILISEARCH_API_KEY || config.meilisearchAPIKey || "masterKey",
filters: ["is_nsfw", "claim_id", "channel_id", "content_type", "media_type", "claim_type", "duration", "width", "height", "tags"],
sortableAttributes: ["bid_state", "fee", "release_time", "reposts", "effective_amount", "certificate_amount"],
purgeLists: config.purgeLists || [],
primaryKey: "claim_id"
}
}

15
src/constants.js Normal file
View file

@ -0,0 +1,15 @@
export default {
CLAIM_BID_STATES: {
CONTROLLING: 0,
ACCEPTED: 1,
ACTIVE: 2,
SPENT: 3,
EXPIRED: 4
},
CLAIM_TYPES: {
CHANNEL: 0,
STREAM: 1,
REPOST: 2,
COLLECTION: 3,
}
}

60
src/index.js Normal file
View file

@ -0,0 +1,60 @@
import config from './config.js';
import meilisearch from './utils/meilisearch.js';
await meilisearch.start([config.indexName]); // Make sure MeiliSearch is connected before starting up fully
import { getLogger } from './utils/logger.js';
import state from './utils/state.js';
import Sync from './services/Sync.js';
import { app } from './api.js';
const logger = getLogger("MAIN");
const services = [
Sync
]
function startServices() { return Promise.all(services.map(service => service.start())); }
function stopServices() { return Promise.all(services.map(service => service.stop())); }
startServices();
// logger.debug(config);
// await state.start();
// await meilisearch.start([{
// name: config.indexName,
// primaryKey: "id"
// }]);
// Sync.start(); // Start the claimSync service
// async function gracefulShutdown() {
// logger.info('Shutting down gracefully...');
// try {
// await stopServices();
// console.log('All services stopped.');
// process.exit(0);
// } catch (error) {
// console.error('Error stopping services:', error);
// process.exit(1);
// }
// }
// process.once('beforeExit', async ()=>{
// console.log("ss");
// await stopServices();
// process.exit(0);
// });
// process.once('SIGTERM', gracefulShutdown);
// process.on('SIGTERM', gracefulShutdown);
// process.on('SIGINT', gracefulShutdown);
// process.stdin.resume();

107
src/providers/Chainquery.js Normal file
View file

@ -0,0 +1,107 @@
import generateQuery from "../utils/generateQuery.js";
import state from "../utils/state.js";
import config from "../config.js";
import { getLogger } from "../utils/logger.js";
const logger = getLogger("Chainquery");
const statusURL = config.CHAINQUERY;
const sqlURL = config.CHAINQUERY;
statusURL.pathname = "/api/status";
sqlURL.pathname = "/api/sql";
class Chainquery {
#enabled;
constructor() {
this.#enabled = false;
}
async start() {
// Handle startup
await state.start();
if (!state.get("lastId")) state.set("lastId", 0);
if (!state.get("lastSyncTime")) state.set("lastSyncTime", "0001-01-01 00:00:00");
if (!state.get("startSyncTime")) state.set("startSyncTime", new Date().toISOString().slice(0, 19).replace('T', ' '));
return await this.#statusChecker();
}
async #statusChecker() {
let resp;
// Check if chainquery is working correctly
const check = async ()=>{
try {
resp = await (await fetch(statusURL.href)).json();
return resp.success;
} catch (err) {
return false;
}
}
const interval = setInterval(async ()=>{
if (!this.enabled) return clearInterval(interval);
if (!(await check())) {
logger.err(`Cannot access chainquery on ${config.CHAINQUERY.hostname}!`);
}
}, 5000);
return check();
}
async fetchClaims(params={}) {
return new Promise((resolve, reject)=>{
let resp;
const interval = setInterval(async ()=>{
try {
resp = await this.query(generateQuery(state.get("lastId"), params));
// If chainquery returns an error, there's something wrong with the request
if (resp.error) return reject(resp.error);
state.set("lastId", resp.data[resp.data.length - 1].id);
logger.info("Reached id " + state.get("lastId"));
clearInterval(interval);
resolve(resp.data);
} catch (err) {
logger.err(err);
logger.warn(`Failed to fetch from chainquery, trying again in ${config.failedChainquerySleep / 1000}s...`);
}
}, config.failedChainquerySleep);
});
}
query(q) {
return new Promise(async (resolve, reject)=>{
let resp;
try {
sqlURL.searchParams.set("query", q);
resp = await fetch(sqlURL.href);
resolve(resp.json());
} catch (err) {
reject(err);
}
})
}
get enabled() {
return this.#enabled;
}
#enable() {
this.#enabled = true;
}
#disable() {
this.#enabled = false;
}
}
export default new Chainquery();

26
src/providers/Provider.js Normal file
View file

@ -0,0 +1,26 @@
/*
This file contains an example Class of how a Provider should look like and what methods it needs to have.
*/
export default class Provider {
#enabled;
#start;
#stop;
constructor() {
this.#enabled = false;
}
start() {
if (!this.#start) throw Error("Tried to start a Provider without a '#start()' method!");
this.running = true;
return this.#start();
}
stop() {
if (!this.#stop) throw Error("Tried to stop a Provider without a '#stop()' method!");
this.running = false;
return this.#stop();
}
}

197
src/services/Sync.js Normal file
View file

@ -0,0 +1,197 @@
import config from "../config.js";
// import chainquery from "../utils/chainquery.js";
import provider from "../providers/Chainquery.js";
import state from "../utils/state.js";
import { partition, sleep } from "../utils/common.js";
import meilisearch from "../utils/meilisearch.js";
import { getLogger } from "../utils/logger.js";
import constants from "../constants.js";
const logger = getLogger("Sync");
// The states for ClaimSync
export const syncStates = Object.freeze({
INACTIVE: 0,
STARTING: 1,
RUNNING: 2,
FAILING: 3,
FETCHING: 4,
PUSHING: 5,
SYNCED: 6
});
const query = (time, lastID, MaxClaimsInCall) => {
return `SELECT c.id,
c.name,
p.name AS channel,
p.claim_id AS channel_id,
c.bid_state, c.effective_amount,
COALESCE(p.effective_amount,1) AS certificate_amount,
c.claim_id,
c.content_type,
c.is_nsfw as nsfw,
c.title,
c.author,
c.description,
c.claim_type,
c.value_as_json AS value
FROM claim c LEFT JOIN claim p
on p.claim_id = c.publisher_id
WHERE c.id >${lastID} AND
c.modified_at >='${time}'
ORDER BY c.id LIMIT ${MaxClaimsInCall}`;
}
class Sync {
#index;
#status;
#tasks;
#enabled;
constructor() {
this.meilisearch = meilisearch;
this.#enabled = false;
this.#setStatus(syncStates.INACTIVE);
}
get enabled() {
return this.#enabled;
}
async stop() {
if (this.#enabled) return;
logger.info("Shutting down...");
await this.meilisearch.stop();
this.#enabled = false;
this.#setStatus(syncStates.INACTIVE);
}
async start() {
this.#setStatus(syncStates.STARTING);
logger.info("Starting...");
await state.start();
await provider.start();
// Make sure that MeiliSearch is configured correctly before continuing
this.#index = await this.meilisearch.getIndex(config.indexName);
if (!this.#enabled) this.#enabled = true;
this.#setStatus(syncStates.RUNNING);
this.#loop();
}
// async #getTasks(parameters = {}) {
// logger.info(await this.meilisearch.getTasks(parameters));
// }
async #loop() {
logger.info("Starting to sync claims...");
let claims = 0;
let size = 0;
while (this.#enabled) {
while (claims < config.maxClaimsToProcessPerIteration && this.#enabled) {
this.#setStatus(syncStates.RUNNING);
try {
size = await this.#getClaims();
} catch (err) {
continue; // Let's try again
}
claims += size;
// If the size is less than batchSize, we are fully synced
if (size < config.batchSize) break;
}
claims = 0;
if (!this.#enabled) return await state.save();
if (size < config.batchSize) {
this.#setStatus(syncStates.SYNCED);
state.set("lastSyncTime", state.get("startSyncTime"));
state.set("lastId", 0);
logger.info(`We are fully synced! Sleeping for ${config.sleepTimeAfterFullSync / (60 * 1000)} minutes...`);
await sleep(config.sleepTimeAfterFullSync);
}
await state.save();
}
}
async #getClaims() {
let claims;
let size = 0;
this.#setStatus(syncStates.FETCHING);
logger.info(`Fetching new claims...`);
// Ask for new claims from the provider
try {
claims = await provider.fetchClaims();
} catch (err) {
logger.err(err);
logger.err(`Failed to fetch claims, trying again in ${config.failedChainquerySleep / 1000}s...`);
this.#setStatus(syncStates.FAILING);
await sleep(config.failedChainquerySleep);
throw new Error(err);
}
size = claims.length;
// Remove claims without value
claims = claims.filter((claim)=> claim.value).map((claim)=>{
delete claim.value;
return claim;
});
// Split tags into array
claims = claims.map(claim=>{
return {...claim, tags: claim.tags ? claim.tags.split(', ') : null};
});
// Separate claims (we need to remove claims that are abandoned, expired or blocked)
const [add, remove] = partition(claims, (claim=>{
return !(claim.bid_state === constants.CLAIM_BID_STATES.SPENT || claim.bid_state === constants.CLAIM_BID_STATES.EXPIRED);
}));
logger.info(`Pushing ${size} claims to DB...`);
this.#setStatus(syncStates.PUSHING);
// Update the MeiliSearch index
await this.meilisearch.updateDocuments(this.#index, {add, remove: await remove.map(async claim=>{
return claim[config.meilisearch.primaryKey];
})});
// state.set("lastId", claims[claims.length - 1].id);
return size;
}
#setStatus(status) {
this.#status = status;
}
get status() {
return Object.keys(syncStates).find(key =>
syncStates[key] === this.#status);
}
get synced() {
return this.status === syncStates.SYNCED;
}
}
export default new Sync();

17
src/utils/chainquery.js Normal file
View file

@ -0,0 +1,17 @@
import config from "../config.js";
config.CHAINQUERY.pathname = "/api/sql";
export default (query) => {
return new Promise(async (resolve, reject)=>{
let resp;
try {
config.CHAINQUERY.searchParams.set("query", query);
resp = await (await fetch(config.CHAINQUERY.href));
resolve(resp.json());
} catch (err) {
reject(err);
}
})
}

35
src/utils/common.js Normal file
View file

@ -0,0 +1,35 @@
export function isEqual(a, b) {
return a.sort().join() === b.sort().join();
}
export function partition(array, callback){
return array.reduce((result, element, i)=>{
callback(element, i, array)
? result[0].push(element)
: result[1].push(element);
return result;
}, [[],[]]
);
};
export function getSize(bytes) {
const PREFIXES = ["", "K", "M", "G", "T"];
let i = 0;
while (i < PREFIXES.length) {
if (bytes < 1000) break;
bytes /= 1024;
i++;
}
return {
size: bytes.toFixed(1),
prefix: PREFIXES[i]
}
}
export function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

View file

@ -0,0 +1,87 @@
import config from "../config.js";
// Base query skeleton
const baseQuery = {
table: "claim c",
fields: [
'c.id',
'c.title',
'c.name',
'c.claim_id',
'c.thumbnail_url',
'c.description',
'c.language',
'c.release_time',
'COALESCE(c.audio_duration, c.duration) AS duration',
'c.frame_height',
'c.frame_width',
'c.license',
'c.is_nsfw',
'c.fee',
'c.source_name AS file_name',
`SUBSTRING_INDEX(
COALESCE(c.content_type, c.source_media_type),
'/',
1
) AS media_supertype`,
`SUBSTRING_INDEX(
COALESCE(c.content_type,
c.source_media_type),
'/',
-1
) AS media_subtype`,
'c.effective_amount / 1e8 AS effective_amount',
`(
SELECT COUNT(*) AS reposts
FROM claim
WHERE
type = "claimreference"
AND claim_reference = c.claim_id
) AS reposts`,
`(
SELECT GROUP_CONCAT(tag SEPARATOR ', ') AS tags
FROM claim_tag ct
LEFT JOIN tag t
ON t.id = ct.tag_id
WHERE
claim_id = c.claim_id
) AS tags`,
'c.claim_reference AS reposted_claim_id',
'c.author',
'p.title AS channel_title',
'p.name AS channel_name',
'p.claim_id AS channel_id',
'COALESCE(p.effective_amount / 1e8,1) AS certificate_amount',
'c.value_as_json AS value',
'c.modified_at',
`CASE c.bid_state
WHEN 'Controlling' THEN 0
WHEN 'Accepted' THEN 1
WHEN 'Active' THEN 2
WHEN 'Spent' THEN 3
WHEN 'Expired' THEN 4
END AS bid_state`,
`CASE c.type
WHEN 'channel' THEN 0
WHEN 'stream' THEN 1
WHEN 'claimreference' THEN 2
WHEN 'claimlist' THEN 3
END AS type`
].map(field=> field.replaceAll(' ', '').replaceAll('\n', ' ')).join(','),
joins: [
"LEFT JOIN claim p ON p.claim_id = c.publisher_id",
"LEFT JOIN claim_tag ct ON ct.claim_id = c.claim_id",
"LEFT JOIN tag t ON t.id = ct.tag_id"
].join(" ")
}
export default (lastId, params) => {
const limit = params.limit || config.batchSize;
const where = [];
if (params.claim_id) where.push(`c.claim_id IN (${params.claim_id.map(id=>`'${id}'`).join(',')})`);
if (params.channel_id) where.push(`c.publisher_id IN (${params.channel_id.map(id=>`'${id}'`).join(',')})`);
where.push(`c.id > ${lastId}`);
return `SELECT ${baseQuery.fields} FROM ${baseQuery.table} ${baseQuery.joins} ${where.length ? `WHERE ${where.join(' AND ')}` : ""} ORDER BY c.id LIMIT ${limit}`;
}

50
src/utils/logger.js Normal file
View file

@ -0,0 +1,50 @@
export const style = {
reset: "\x1b[0m",
bright: "\x1b[1m",
dim: "\x1b[2m",
underscore: "\x1b[4m",
blink: "\x1b[5m",
reverse: "\x1b[7m",
hidden: "\x1b[8m",
// Foreground (text) colors
fg: {
black: "\x1b[30m",
red: "\x1b[31m",
green: "\x1b[32m",
yellow: "\x1b[33m",
blue: "\x1b[34m",
magenta: "\x1b[35m",
cyan: "\x1b[36m",
white: "\x1b[37m",
crimson: "\x1b[38m"
},
// Background colors
bg: {
black: "\x1b[40m",
red: "\x1b[41m",
green: "\x1b[42m",
yellow: "\x1b[43m",
blue: "\x1b[44m",
magenta: "\x1b[45m",
cyan: "\x1b[46m",
white: "\x1b[47m",
crimson: "\x1b[48m"
}
};
export const getLogger = (name)=> {
return {
info: logger("info", name, style.fg.cyan),
warn: logger("warn", name, style.fg.yellow),
err: logger("error", name, style.fg.red),
debug: process.env.NODE_ENV === "development" ? logger("debug", name, style.fg.magenta) : ()=>{},
}
}
function logger(type, name, color) {
return (msg)=>{
process.stdout.write(`${color}[${type.toUpperCase()}]${style.reset}\t${name}: `);
return console[type](msg);
}
}

246
src/utils/meilisearch.js Normal file
View file

@ -0,0 +1,246 @@
import { MeiliSearch } from 'meilisearch';
import config from '../config.js';
import { isEqual } from './common.js';
import { getLogger } from './logger.js';
const logger = getLogger("MeiliSearch");
class Meili {
#enabled;
#starting;
#tasks;
constructor() {
this.#enabled = false;
this.#starting = false;
this.#tasks = 0;
}
async stop() {
await new Promise((resolve, reject)=>{
const interval = setInterval(()=>{
if (this.#tasks <= 0) {
clearInterval(interval);
resolve();
}
logger.info(`Waiting for ${this.#tasks} tasks to finish before shutting down...`);
}, 1000);
});
logger.info(`Shutting down...`);
}
async #started() {
await new Promise((resolve, reject)=>{
const interval = setInterval(()=>{
if (this.#enabled) {
clearInterval(interval);
resolve();
}
}, 100);
});
}
// Make sure that MeiliSearch is configured correctly
async start() {
const indexes = [config.indexName];
if (this.#enabled || this.#starting) return await this.#started();
this.#starting = true;
// Make sure we have a healthy connection to MeiliSearch before we continue
await new Promise(async (resolve, reject)=>{
logger.debug(config.meilisearch.apiKey);
const check = async ()=>{
try {
this.client = new MeiliSearch({
host: config.meilisearch.host.href,
apiKey: config.meilisearch.apiKey,
});
if (await this.client.health()) return true;
} catch (err) {
logger.err(`Could not connect to ${config.meilisearch.host.href}`);
}
};
if (await check()) return resolve();
const interval = setInterval(async ()=>{
if (await check()) {
clearInterval(interval);
return resolve();
}
}, 5000)
});
logger.info(`Successfully connected to ${config.meilisearch.host.href}`);
for (let i = 0; i < indexes.length; i++) {
// Make sure that the claims index exists
const index = await this.getIndex(indexes[i]);
// Ensure that we have the correct filters configured
await this.#ensureFilters(index, config.meilisearch.filters);
await this.#ensurePrimaryKey(index, "id");
// Check sortableAttributes
await this.#ensureSortableAttributes(index, config.meilisearch.sortableAttributes);
}
this.#enabled = true;
this.#starting = false;
}
updateDocuments(index, {add, remove}) {
return new Promise(async (resolve, reject)=>{
let operation;
let task;
const indexName = (await index.getRawInfo()).uid;
try {
operation = await index.addDocuments(add);
task = await this.#waitForTask(operation.taskUid, `add ${add.length} documents in '${indexName}'`);
operation = await index.deleteDocuments(remove);
task = await this.#waitForTask(operation.taskUid, `delete ${remove.length} documents in '${indexName}'`);
} catch (err) {
logger.err(err);
reject();
}
resolve();
});
}
getTasks(parameters={}) {
return this.client.getTasks(parameters);
}
async getIndex(indexName) {
let index;
try {
index = await this.client.getIndex(indexName);
} catch {
let task;
// If we got here, the index doesn't exist
logger.info(`Index '${indexName}' does not exist, creating...`);
// Create the index and wait for it to complete
const creation = await this.client.createIndex(indexName);
task = await this.#waitForTask((creation).taskUid, `create index '${indexName}'`);
// We still need to return the index
index = await this.client.getIndex(indexName);
}
return index;
}
async #ensureSortableAttributes(index, sortableAttributes) {
const info = await index.getRawInfo();
const indexName = info.uid;
logger.info(`Checking sortableAttributes for '${indexName}'...`);
if (!isEqual(await index.getSortableAttributes(), sortableAttributes)) {
logger.info(`sortableAttributes for '${indexName}' is not correct, updating...`);
// Wait for the update to finish
let operation = await index.updateSortableAttributes(sortableAttributes);
const task = await this.#waitForTask(operation.taskUid, `update sortableAttributes for '${indexName}'`);
} else {
logger.info(`sortableAttributes for '${indexName}' is correct.`);
}
}
async #ensurePrimaryKey(index, primaryKey) {
const info = await index.getRawInfo();
const indexName = info.uid;
logger.info(`Checking primaryKey for '${indexName}'...`);
if (info.primaryKey !== primaryKey) {
logger.info(`primaryKey for '${indexName}' is not correct, updating...`);
// Wait for the update to finish
let operation = await index.update({primaryKey});
const task = await this.#waitForTask(operation.taskUid, `update primaryKey for '${indexName}'`);
} else {
logger.info(`primaryKey for '${indexName}' is correct.`);
}
}
// Make sure we have the correct filters configured
async #ensureFilters(index, filters) {
const indexName = (await index.getRawInfo()).uid;
logger.info(`Checking filters for '${indexName}'...`);
if (!isEqual(await index.getFilterableAttributes(), filters)) {
logger.info(`Filters for '${indexName}' is not correct, updating...`);
// Wait for the update to finish
let operation = await index.updateFilterableAttributes(filters);
const task = await this.#waitForTask(operation.taskUid, `update filters for '${indexName}'`);
} else {
logger.info(`Filters for '${indexName}' is correct.`);
}
}
#waitForTask(uid, msg="") {
return new Promise((resolve, reject)=>{
let task;
let time = new Date().getTime();
this.#tasks++;
const startTime = time;
const interval = setInterval(async ()=>{
try {
task = await this.client.getTask(uid);
} catch (err) {
reject(err);
}
// Check the current status
switch (task.status) {
case 'succeeded':
logger.info(`${task.status.toUpperCase()} task ${task.uid}: ${msg || 'no message'}, took ${(new Date().getTime() - startTime)/1000}s`);
this.#tasks--;
clearInterval(interval);
resolve(task);
return;
case 'failed':
case 'canceled':
logger.err(`${task.status.toUpperCase()} task ${task.uid}: ${msg || 'no message'}!`);
this.#tasks--;
clearInterval(interval);
reject(task);
return;
case 'processing':
case 'enqueued':
default:
break;
};
// Log the status of the task every 10 seconds
if (new Date().getTime() >= time + 10000) {
logger.warn(`${task.status.toUpperCase()} task ${task.uid}: ${msg || 'no message'}, ${(new Date().getTime() - startTime)/1000}s elapsed`);
time = new Date().getTime();
}
}, config.checkTaskInterval);
});
};
};
const client = new Meili();
export default client;

View file

@ -0,0 +1,90 @@
import { getLogger } from "./logger.js";
const logger = getLogger("Query");
const parseType = {
boolean: (data)=>{
if (!data) return true;
if (["1", "yes", "y", "true"].includes(data)) return true;
if (["0", "no", "n", "false"].includes(data)) return false;
},
list: (data)=>{ return data.split(',') }
}
const BOOLEAN_FALSE = [
"0", "no", "n", "false"
];
const BOOLEAN_TRUE = [
"1", "yes", "y", "true"
]
function getEscapedQuery (query) {
// https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
// The reserved characters are: + - = && || > < ! ( ) { } [ ] ^ " ~ * ? : \ /
let badCharacters = ['+', '-', '=', '&&', '||', '>', '<', '!', '(', ')', '{', '}', '[', ']', '^', '"', '~', '*', '?', ':', '\\'];
let escapedQuery = '';
for (var i = 0; i < query.length; i++) {
let char1 = query.charAt(i);
if (badCharacters.includes(char1)) {
escapedQuery = escapedQuery + '\\' + char1;
} else if (i + 1 <= query.length) {
let char2 = query.charAt(i + 1);
if (badCharacters.includes(char1 + char2)) {
escapedQuery = escapedQuery + '\\' + char1 + char2;
i++;
} else {
escapedQuery = escapedQuery + char1;
}
} else {
escapedQuery = escapedQuery + char1;
}
}
return escapedQuery;
}
function err(msg) { return { error: msg } }
export default (input)=>{
// Hono handles multiple querystring parameters, i.e. /search?s=A&s=B --> Query s: ['A', 'B'], but we are just interested in the first one: 'A'
Object.keys(input).map(function(key) {
input[key] = getEscapedQuery(input[key][0]);
})
const queries = {};
// if (!input.s) return err("s: cannot be blank.");
queries.s = input.s;
// Parse channel
if (input.channel !== undefined) {
queries.channel = input.channel.startsWith('@') ? input.channel : '@' + input.channel;
}
// Parse size
if (input.size === undefined) queries.size = 10;
else if (isNaN(input.size)) return err("size: has to be a number");
else queries.size = parseInt(input.size);
// Parse from
if (input.from === undefined) queries.from = 0;
else if (isNaN(input.from)) return err("from: has to be a number");
else queries.from = parseInt(input.from);
// Parse nsfw
if (input.nsfw !== undefined) queries.is_nsfw = parseType.boolean(input.nsfw) ? 1 : 0;
// Parse contentType
if (input.contentType !== undefined) queries.content_type = input.contentType.split(',');
// Parse tags
if (input.tags !== undefined) queries.tags = parseType.list(input.tags);
// Parse resolve
if (input.resolve !== undefined) queries.resolve = parseType.boolean(input.resolve);
return { queries };
}

70
src/utils/state.js Normal file
View file

@ -0,0 +1,70 @@
import fs from 'fs';
import { getLogger } from './logger.js';
import config from '../config.js';
import path from 'path';
const logger = getLogger("State");
class State {
#store;
#enabled;
constructor() {
this.#enabled = false;
}
get(key) {
return this.#store[key];
}
set(key, value) {
this.#store[key] = value;
}
async save() {
await fs.promises.writeFile(path.join(config.dataDirectory, 'state.json'), JSON.stringify(this.#store));
logger.info("[State] Wrote data to state.json");
}
async stop() {
if (!this.#enabled) return;
this.#enabled = false;
await this.save();
this.#store = {};
}
async start() {
if (this.#enabled) return;
this.#enabled = true;
logger.info("Initializing...");
let data;
// Try read file
try {
data = await fs.promises.readFile(path.join(config.dataDirectory, 'state.json'), {
encoding: 'utf-8'
});
} catch (err) {
// File does not exist
await fs.promises.writeFile('state.json', JSON.stringify({}));
data = "{}";
}
// Try parse
try {
this.#store = JSON.parse(data);
logger.info("Loaded state.json");
logger.debug(this.#store);
} catch (err) {
// Don't continue if the JSON is invalid
logger.err("[State] state.json contains invalid JSON");
process.abort();
}
}
}
export default new State();

106
test.js Normal file
View file

@ -0,0 +1,106 @@
import chainquery from "./src/utils/chainquery.js";
import { Hono } from "hono";
import { serve } from '@hono/node-server';
// Base query skeleton
const baseQuery = {
table: "claim c",
fields: [
'c.id',
'c.title',
'c.name',
'c.claim_id',
'c.thumbnail_url AS thumbnail',
'c.description',
'c.language',
'c.release_time',
'COALESCE(c.audio_duration, c.duration) AS duration',
'c.frame_height AS height',
'c.frame_width AS width',
'c.license',
'c.is_nsfw AS nsfw',
'c.fee',
'c.source_name AS file_name',
"SUBSTRING_INDEX(COALESCE(c.content_type, c.source_media_type), '/', 1) AS media_type",
"SUBSTRING_INDEX(COALESCE(c.content_type, c.source_media_type), '/', -1) AS mime_type",
'c.effective_amount / 1e8 AS effective_amount',
'(SELECT COUNT(*) AS reposts FROM claim WHERE type = "claimreference" AND claim_reference = c.claim_id) AS reposts',
`(SELECT GROUP_CONCAT(tag SEPARATOR ', ') AS tags FROM claim_tag ct LEFT JOIN tag t ON t.id = ct.tag_id WHERE claim_id = c.claim_id
) AS tags`,
'c.claim_reference AS reposted_claim_id',
'c.author',
'p.title AS channel_title',
'p.name AS channel_name',
'p.claim_id AS channel_id',
'COALESCE(p.effective_amount / 1e8,1) AS certificate_amount',
'c.value_as_json AS value',
'c.modified_at',
`CASE
WHEN c.bid_state = 'Controlling' THEN 0
WHEN c.bid_state = 'Accepted' THEN 1
WHEN c.bid_state = 'Active' THEN 2
WHEN c.bid_state = 'Spent' THEN 3
WHEN c.bid_state = 'Expired' THEN 4
END AS bid_state`,
`CASE
WHEN c.type = 'channel' THEN 0
WHEN c.type = 'stream' THEN 1
WHEN c.type = 'claimreference' THEN 2
WHEN c.type = 'claimlist' THEN 3
END AS type`
].map(field=> field.replaceAll(' ', '').replaceAll('\n', ' ')).join(','),
joins: [
"LEFT JOIN claim p ON p.claim_id = c.publisher_id",
// "LEFT JOIN claim_tag ct ON ct.claim_id = c.claim_id",
// "LEFT JOIN tag t ON t.id = ct.tag_id"
].join(" ")
}
function generateQuery(params={}) {
const limit = params.limit || 5000;
const where = [];
if (params.claim_id) where.push(`c.claim_id IN (${params.claim_id.map(id=>`'${id}'`).join(',')})`);
if (params.channel_id) where.push(`c.publisher_id IN (${params.channel_id.map(id=>`'${id}'`).join(',')})`);
// where.push(`t.tag IS NOT NULL`);
return `SELECT ${baseQuery.fields} FROM ${baseQuery.table} ${baseQuery.joins} ${where.length ? `WHERE ${where.join(' AND ')}` : ""} ORDER BY c.id DESC LIMIT ${limit}`;
}
// const app = new Hono();
// app.get('/', async (c) => {
// const params = {};
// Object.keys(c.req.queries()).map(function(key) {
// params[key] = c.req.queries()[key][0].split(',');
// })
// // console.log(params);
// const query = generateQuery(params);
// // console.log(query);
// const resp = await chainquery(query);
// return c.json({ ...resp, results: resp.data ? resp.data.length : 0, limit: params.limit || 5000})
// });
// serve(app);
// const query = `
// SELECT
// ct.*, t.tag
// FROM claim_tag ct
// LEFT JOIN tag t ON t.id = ct.tag_id
// LIMIT 20
// `;
const query = generateQuery();
const resp = await chainquery(query);
// console.log({ ...resp, results: resp.data ? resp.data.length : 0, limit: 5000});
console.log("Query: " + query.replaceAll('\n', ' '));
console.log("done");