6 Commits

Author SHA1 Message Date
Gergő Jedlicska 06e97372f0 WIP: add different implementation sketches 2024-05-10 17:25:53 +02:00
Gergő Jedlicska b806185565 chore: format 2024-02-26 18:48:48 +01:00
Gergő Jedlicska bdb0963c8a chore: format 2024-02-20 17:15:58 +01:00
Gergő Jedlicska d1cdf36ee5 feat: add cert parsing script 2024-02-20 17:10:43 +01:00
Gergő Jedlicska b1e4554d97 add aiven extras docs 2024-02-20 17:06:02 +01:00
Gergő Jedlicska 68804d37c7 feat: rework for soft multi organizations 2024-02-20 16:59:31 +01:00
38 changed files with 1027 additions and 751 deletions
+1
View File
@@ -4,5 +4,6 @@
.envrc .envrc
.swc .swc
node_modules node_modules
ca-cert*
dist dist
+11 -2
View File
@@ -19,6 +19,16 @@ it is done with running the SQL command below, and restating the database server
ALTER SYSTEM SET wal_level = logical; ALTER SYSTEM SET wal_level = logical;
``` ```
When registering a new region on a DigitalOcean postgres server, the default user doesn't have the required roles to set up a subscription.
On DO we can use [aiven-extras](https://github.com/aiven/aiven-extras) to create subs without root access.
The current branch is utilizing just that. But it needs a setup step executed on each database, that is registered as a region
Run this in a `psql` shell
```sql
CREATE EXTENSION aiven_extras;
```
Note: Postgres subscriptions (which we use) in the same db server don't work that easily; easiest way to get things going is to set up multiple db servers locally. Note: Postgres subscriptions (which we use) in the same db server don't work that easily; easiest way to get things going is to set up multiple db servers locally.
## Project description ## Project description
@@ -52,11 +62,10 @@ Organizations may be granted access to any given region. That action creates a n
## Steps to flex this POC ## Steps to flex this POC
Using the exposed graphql explorer, you can go ahead and Using the exposed graphql explorer, you can go ahead and
- create a user - create a user
- create an organisation - create an organisation
- add the user to the organisation - add the user to the organisation
- create regions & associate them with an organisation - create regions & associate them with an organisation
- create a resource in the default organisation, or for a specific organisation & region - create a resource in the default organisation, or for a specific organisation & region
- etc. - etc.
+20 -2
View File
@@ -1,8 +1,26 @@
export default { import { Knex } from 'knex'
import fs from 'fs'
import path from 'path'
console.log(`foobar ${process.env.POSTGRES_CA_CERT_PATH}`)
const config: Knex.Config = {
client: 'pg', client: 'pg',
connection: process.env.POSTGRES_URL, connection: {
connectionString: process.env.POSTGRES_URL,
ssl: process.env.POSTGRES_CA_CERT_PATH
? {
ca: fs.readFileSync(
path.resolve(__dirname, process.env.POSTGRES_CA_CERT_PATH)
),
rejectUnauthorized: true
}
: undefined
},
migrations: { migrations: {
directory: 'src/migrations', directory: 'src/migrations',
extension: 'ts' extension: 'ts'
} }
} }
export default config
+1
View File
@@ -32,6 +32,7 @@
"dependencies": { "dependencies": {
"@apollo/server": "^4.10.0", "@apollo/server": "^4.10.0",
"crypto-random-string": "^3.0.0", "crypto-random-string": "^3.0.0",
"dataloader": "^2.2.2",
"dotenv": "^16.4.1", "dotenv": "^16.4.1",
"graphql": "^16.8.1", "graphql": "^16.8.1",
"graphql-scalars": "^1.22.4", "graphql-scalars": "^1.22.4",
+8
View File
@@ -0,0 +1,8 @@
from pathlib import Path
import json
cert = Path("./ca-cert").read_text()
cert_json = json.dumps({"cert": cert})
Path("./ca-cert.json").write_text(cert_json)
+7
View File
@@ -11,6 +11,9 @@ dependencies:
crypto-random-string: crypto-random-string:
specifier: ^3.0.0 specifier: ^3.0.0
version: 3.3.1 version: 3.3.1
dataloader:
specifier: ^2.2.2
version: 2.2.2
dotenv: dotenv:
specifier: ^16.4.1 specifier: ^16.4.1
version: 16.4.1 version: 16.4.1
@@ -1599,6 +1602,10 @@ packages:
type-fest: 0.8.1 type-fest: 0.8.1
dev: false dev: false
/dataloader@2.2.2:
resolution: {integrity: sha512-8YnDaaf7N3k/q5HnTJVuzSyLETjoZjVmHc4AeKAzOvKHEFQKcn64OKBfzHYtE9zGjctNM7V9I0MfnUVLpi7M5g==}
dev: false
/date-fns@2.30.0: /date-fns@2.30.0:
resolution: {integrity: sha512-fnULvOpxnC5/Vg3NCiWelDsLiUc9bRwAPs/+LfTLNvetFCtCTN+yQz15C/fs4AwX1R9K5GLtLfn8QW+dWisaAw==} resolution: {integrity: sha512-fnULvOpxnC5/Vg3NCiWelDsLiUc9bRwAPs/+LfTLNvetFCtCTN+yQz15C/fs4AwX1R9K5GLtLfn8QW+dWisaAw==}
engines: {node: '>=0.11'} engines: {node: '>=0.11'}
+16 -16
View File
@@ -1,31 +1,31 @@
import { ApolloServer } from "@apollo/server"; import { ApolloServer } from '@apollo/server'
import { resolvers } from "./resolvers"; import { resolvers } from './resolvers'
import { startStandaloneServer } from "@apollo/server/standalone"; import { startStandaloneServer } from '@apollo/server/standalone'
import { readFileSync } from "fs"; import { readFileSync } from 'fs'
import { typeDefs as scalarTypeDefs } from "graphql-scalars"; import { typeDefs as scalarTypeDefs } from 'graphql-scalars'
import { migrateAll } from "./services/databaseManagement"; import { migrateAll } from './services/databaseManagement'
const typeDefs = readFileSync("src/schema.graphql", { encoding: "utf-8" }); const typeDefs = readFileSync('src/schema.graphql', { encoding: 'utf-8' })
// The ApolloServer constructor requires two parameters: your schema // The ApolloServer constructor requires two parameters: your schema
// definition and your set of resolvers. // definition and your set of resolvers.
const server = new ApolloServer({ const server = new ApolloServer({
typeDefs: [typeDefs, ...scalarTypeDefs], typeDefs: [typeDefs, ...scalarTypeDefs],
resolvers, resolvers
}); })
const startServer = async (): Promise<void> => { const startServer = async (): Promise<void> => {
const { url } = await startStandaloneServer(server, { const { url } = await startStandaloneServer(server, {
listen: { port: 4000 }, listen: { port: 4000 }
}); })
await migrateAll(); await migrateAll()
console.log(`🚀 Server ready at: ${url}`); console.log(`🚀 Server ready at: ${url}`)
}; }
startServer() startServer()
.then() .then()
.catch((err: Error) => .catch((err: Error) =>
console.log(`🔥 failed to start server ${err.message}`), console.log(`🔥 failed to start server ${err.message}`)
); )
+3 -4
View File
@@ -2,8 +2,7 @@ import 'dotenv/config'
import { parseEnv } from 'znv' import { parseEnv } from 'znv'
import { z } from 'zod' import { z } from 'zod'
export const { POSTGRES_URL } = parseEnv(process.env, { export const { POSTGRES_URL, POSTGRES_CA_CERT_PATH } = parseEnv(process.env, {
POSTGRES_URL: z.string().min(1) POSTGRES_URL: z.string().min(1),
POSTGRES_CA_CERT_PATH: z.string().min(1).nullish()
}) })
console.log([POSTGRES_URL].join(', '))
@@ -1,14 +1,14 @@
import type { Knex } from "knex"; import type { Knex } from 'knex'
const tableName = "organizations"; const tableName = 'organizations'
export async function up(knex: Knex): Promise<void> { export async function up (knex: Knex): Promise<void> {
return await knex.schema.createTable(tableName, (table) => { return await knex.schema.createTable(tableName, (table) => {
table.text("id").primary(); table.text('id').primary()
table.text("name"); table.text('name')
}); })
} }
export async function down(knex: Knex): Promise<void> { export async function down (knex: Knex): Promise<void> {
return await knex.schema.dropTable(tableName); return await knex.schema.dropTable(tableName)
} }
+33 -33
View File
@@ -1,49 +1,49 @@
import type { Knex } from "knex"; import type { Knex } from 'knex'
const regionsTableName = "regions"; const regionsTableName = 'regions'
export async function up(knex: Knex): Promise<void> { export async function up (knex: Knex): Promise<void> {
await knex.schema.createTable(regionsTableName, (table) => { await knex.schema.createTable(regionsTableName, (table) => {
table.text("id").primary(); table.text('id').primary()
table.text("connectionString"); table.text('connectionString')
}); })
await knex.schema.createTable("organizations_regions", (table) => { await knex.schema.createTable('organizations_regions', (table) => {
table table
.text("organizationId") .text('organizationId')
.references("id") .references('id')
.inTable("organizations") .inTable('organizations')
.notNullable() .notNullable()
.onDelete("cascade"); .onDelete('cascade')
table table
.text("regionId") .text('regionId')
.references("id") .references('id')
.inTable("regions") .inTable('regions')
.notNullable() .notNullable()
.onDelete("cascade"); .onDelete('cascade')
}); })
await knex.schema.createTable("resource_organization_region", (table) => { await knex.schema.createTable('resource_organization_region', (table) => {
table table
.text("resourceId") .text('resourceId')
.references("id") .references('id')
.inTable("resources") .inTable('resources')
.notNullable() .notNullable()
.onDelete("cascade"); .onDelete('cascade')
table table
.text("organizationId") .text('organizationId')
.references("id") .references('id')
.inTable("organizations") .inTable('organizations')
.notNullable() .notNullable()
.onDelete("cascade"); .onDelete('cascade')
table table
.text("regionId") .text('regionId')
.references("id") .references('id')
.inTable("regions") .inTable('regions')
.notNullable() .notNullable()
.onDelete("cascade"); .onDelete('cascade')
}); })
} }
export async function down(knex: Knex): Promise<void> { export async function down (knex: Knex): Promise<void> {
await knex.schema.dropTable(regionsTableName); await knex.schema.dropTable(regionsTableName)
await knex.schema.dropTable("organizations_regions"); await knex.schema.dropTable('organizations_regions')
} }
+8 -8
View File
@@ -1,15 +1,15 @@
import type { Knex } from "knex"; import type { Knex } from 'knex'
const regionsTableName = "regions"; const regionsTableName = 'regions'
export async function up(knex: Knex): Promise<void> { export async function up (knex: Knex): Promise<void> {
await knex.schema.alterTable(regionsTableName, (table) => { await knex.schema.alterTable(regionsTableName, (table) => {
table.text("name").notNullable().defaultTo("region"); table.text('name').notNullable().defaultTo('region')
}); })
} }
export async function down(knex: Knex): Promise<void> { export async function down (knex: Knex): Promise<void> {
await knex.schema.alterTable(regionsTableName, (table) => { await knex.schema.alterTable(regionsTableName, (table) => {
table.dropColumn("name"); table.dropColumn('name')
}); })
} }
@@ -1,15 +1,15 @@
import type { Knex } from "knex"; import type { Knex } from 'knex'
const regionsTableName = "regions"; const regionsTableName = 'regions'
export async function up(knex: Knex): Promise<void> { export async function up (knex: Knex): Promise<void> {
await knex.schema.alterTable(regionsTableName, (table) => { await knex.schema.alterTable(regionsTableName, (table) => {
table.text("maintenanceDb").notNullable().defaultTo("region"); table.text('maintenanceDb').notNullable().defaultTo('region')
}); })
} }
export async function down(knex: Knex): Promise<void> { export async function down (knex: Knex): Promise<void> {
await knex.schema.alterTable(regionsTableName, (table) => { await knex.schema.alterTable(regionsTableName, (table) => {
table.dropColumn("maintenanceDb"); table.dropColumn('maintenanceDb')
}); })
} }
@@ -1,22 +1,22 @@
import type { Knex } from "knex"; import type { Knex } from 'knex'
const tableName = "organization_acl"; const tableName = 'organization_acl'
export async function up(knex: Knex): Promise<void> { export async function up (knex: Knex): Promise<void> {
return await knex.schema.createTable(tableName, (table) => { return await knex.schema.createTable(tableName, (table) => {
table table
.string("userId") .string('userId')
.references("id") .references('id')
.inTable("users") .inTable('users')
.onDelete("cascade"); .onDelete('cascade')
table table
.string("organizationId") .string('organizationId')
.references("id") .references('id')
.inTable("organizations") .inTable('organizations')
.onDelete("cascade"); .onDelete('cascade')
}); })
} }
export async function down(knex: Knex): Promise<void> { export async function down (knex: Knex): Promise<void> {
return await knex.schema.dropTable(tableName); return await knex.schema.dropTable(tableName)
} }
@@ -1,22 +1,22 @@
import type { Knex } from "knex"; import type { Knex } from 'knex'
const tableName = "organization_resource_acl"; const tableName = 'organization_resource_acl'
export async function up(knex: Knex): Promise<void> { export async function up (knex: Knex): Promise<void> {
return await knex.schema.createTable(tableName, (table) => { return await knex.schema.createTable(tableName, (table) => {
table table
.string("resourceId") .string('resourceId')
.references("id") .references('id')
.inTable("resources") .inTable('resources')
.onDelete("cascade"); .onDelete('cascade')
table table
.string("organizationId") .string('organizationId')
.references("id") .references('id')
.inTable("organizations") .inTable('organizations')
.onDelete("cascade"); .onDelete('cascade')
}); })
} }
export async function down(knex: Knex): Promise<void> { export async function down (knex: Knex): Promise<void> {
return await knex.schema.dropTable(tableName); return await knex.schema.dropTable(tableName)
} }
@@ -1,28 +1,28 @@
import type { Knex } from "knex"; import type { Knex } from 'knex'
const tableName = "resource_region_organization"; const tableName = 'resource_region_organization'
export async function up(knex: Knex): Promise<void> { export async function up (knex: Knex): Promise<void> {
return await knex.schema.createTable(tableName, (table) => { return await knex.schema.createTable(tableName, (table) => {
table table
.string("resourceId") .string('resourceId')
.references("id") .references('id')
.inTable("resources") .inTable('resources')
.onDelete("cascade") .onDelete('cascade')
.primary(); .primary()
table table
.string("regionId") .string('regionId')
.references("id") .references('id')
.inTable("regions") .inTable('regions')
.onDelete("cascade"); .onDelete('cascade')
table table
.string("organizationId") .string('organizationId')
.references("id") .references('id')
.inTable("organizations") .inTable('organizations')
.onDelete("cascade"); .onDelete('cascade')
}); })
} }
export async function down(knex: Knex): Promise<void> { export async function down (knex: Knex): Promise<void> {
return await knex.schema.dropTable(tableName); return await knex.schema.dropTable(tableName)
} }
@@ -0,0 +1,15 @@
import type { Knex } from 'knex'
const regionsTableName = 'regions'
export async function up (knex: Knex): Promise<void> {
await knex.schema.alterTable(regionsTableName, (table) => {
table.dropColumn('maintenanceDb')
})
}
export async function down (knex: Knex): Promise<void> {
await knex.schema.alterTable(regionsTableName, (table) => {
table.text('maintenanceDb').notNullable().defaultTo('region')
})
}
@@ -0,0 +1,56 @@
import type { Knex } from 'knex'
const tableName = 'resource_region_organization'
export async function up (knex: Knex): Promise<void> {
await knex.schema.dropTable(tableName)
await knex.schema.createTable('resource_region', (table) => {
table
.string('resourceId')
.references('id')
.inTable('resources')
.onDelete('cascade')
.primary()
table
.string('regionId')
.references('id')
.inTable('regions')
.onDelete('cascade')
})
await knex.schema.createTable('resource_organization', (table) => {
table
.string('resourceId')
.references('id')
.inTable('resources')
.onDelete('cascade')
.primary()
table
.string('organizationId')
.references('id')
.inTable('organizations')
.onDelete('cascade')
})
}
export async function down (knex: Knex): Promise<void> {
await knex.schema.dropTable('resource_organization')
await knex.schema.dropTable('resource_region')
await knex.schema.createTable(tableName, (table) => {
table
.string('resourceId')
.references('id')
.inTable('resources')
.onDelete('cascade')
.primary()
table
.string('regionId')
.references('id')
.inTable('regions')
.onDelete('cascade')
table
.string('organizationId')
.references('id')
.inTable('organizations')
.onDelete('cascade')
})
}
@@ -0,0 +1,13 @@
import type { Knex } from 'knex'
export async function up (knex: Knex): Promise<void> {
await knex.schema.alterTable('regions', (table) => {
table.text('sslCaCert').nullable()
})
}
export async function down (knex: Knex): Promise<void> {
await knex.schema.alterTable('regions', (table) => {
table.dropColumn('sslCaCert')
})
}
@@ -0,0 +1,13 @@
import type { Knex } from 'knex'
export async function up (knex: Knex): Promise<void> {
await knex.schema.alterTable('regions', (table) => {
table.unique('name')
})
}
export async function down (knex: Knex): Promise<void> {
await knex.schema.alterTable('regions', (table) => {
table.dropUnique(['name'])
})
}
+217 -170
View File
@@ -1,5 +1,4 @@
import { Knex } from "knex"; import { Knex } from 'knex'
import { knex } from "./db";
import { import {
UserRecord, UserRecord,
Resource, Resource,
@@ -10,191 +9,239 @@ import {
Organization, Organization,
OrganizationAcl, OrganizationAcl,
OrganizationResourceAcl, OrganizationResourceAcl,
ResourceRegion, ResourceRegion
ResourceRegionOrg, } from './types'
} from "./types";
const Users = () => knex<UserRecord>("users"); export class RegionRepo {
const Resources = () => knex<Resource>("resources"); db: Knex
const ResourceAclRepo = () => knex<ResourceAcl>("resource_acl");
export const queryUser = async (userId: string): Promise<UserRecord | null> => { constructor (db: Knex) {
return (await Users().where("id", "=", userId).first()) ?? null; this.db = db
};
export const getUsersFrom = (db: Knex) => async (): Promise<UserRecord[]> => {
return await db<UserRecord>("users").select();
};
export const saveUserTo =
(db: Knex) =>
async (user: UserRecord): Promise<void> => {
await db<UserRecord>("users").insert(user);
};
export const saveResourceTo =
(db: Knex) =>
async (resource: Resource): Promise<void> => {
await db<Resource>("resources").insert(resource);
};
export const queryResourceFrom =
(db: Knex) =>
async (resourceId: string): Promise<Resource | null> => {
return (
(await db<Resource>("resources").where({ id: resourceId }).first()) ??
null
);
};
export const queryResourceAclFrom =
(db: Knex) =>
async ({ resourceId, userId }: ResourceAcl): Promise<ResourceAcl | null> => {
return (
(await db<ResourceAcl>("resource_acl")
.where({ userId, resourceId })
.first()) ?? null
);
};
export const saveResourceAclTo =
(db: Knex) =>
async (resourceAcl: ResourceAcl): Promise<void> => {
await db<ResourceAcl>("resource_acl").insert(resourceAcl);
};
export const countResources = async (userId: string): Promise<number> => {
const [rawCount] = await ResourceAclRepo().count().where({ userId });
return parseInt(rawCount.count as string);
};
export const queryResources = async ({
userId,
limit,
cursor,
}: {
userId: string;
limit: number;
cursor: string | null;
}) => {
const query = Resources()
.join("resource_acl", "resources.id", "resource_acl.resourceId")
.where({ userId });
if (cursor) {
query.andWhere("createdAt", "<", cursor);
} }
return await query.limit(limit);
};
export const countCommentsIn = async saveResource (resource: Resource): Promise<void> {
(db: Knex) => await this.db<Resource>('resources').insert(resource)
async (resourceId: string): Promise<number> => { }
const [rawCount] = await db<Comment>("comments")
async findResource (resourceId: string): Promise<Resource | null> {
return (
(await this.db<Resource>('resources')
.where({ id: resourceId })
.first()) ?? null
)
}
async saveComment (comment: Comment): Promise<void> {
await this.db<Comment>('comments').insert(comment)
}
async countComments (resourceId: string): Promise<number> {
const [rawCount] = await this.db<Comment>('comments')
.count() .count()
.where({ resourceId }); .where({ resourceId })
return parseInt(rawCount.count as string); return parseInt(rawCount.count as string)
}; }
export const queryCommentsFrom = async queryComments ({
(db: Knex) =>
async ({
resourceId, resourceId,
limit, limit,
cursor, cursor
}: { }: {
resourceId: string; resourceId: string
limit: number; limit: number
cursor: string | null; cursor: string | null
}): Promise<Comment[]> => { }): Promise<Comment[]> {
const query = db<Comment>("comments").where({ resourceId }); const query = this.db<Comment>('comments').where({ resourceId })
if (cursor) { if (cursor) {
query.andWhere("createdAt", "<", cursor); query.andWhere('createdAt', '<', cursor)
} }
return await query.limit(limit); return await query.limit(limit)
}; }
}
export const saveCommentTo = export class MainRepo extends RegionRepo {
(db: Knex) => async findUser (userId: string): Promise<UserRecord | null> {
async (comment: Comment): Promise<void> => {
await db<Comment>("comments").insert(comment);
};
export const getRegionsFrom = (db: Knex) => async (): Promise<Array<Region>> =>
await db<Region>("regions").select();
export const getRegionFrom =
(db: Knex) =>
async (id: string): Promise<Region | null> =>
(await db<Region>("regions").where({ id }).first()) ?? null;
export const getOrganizationRegionsFrom =
(db: Knex) => async (): Promise<Array<OrganizationsRegions>> =>
await db<OrganizationsRegions>("organizations_regions").select();
export const queryOrganizationRegionsFrom =
(db: Knex) =>
async ({
regionId,
organizationId,
}: OrganizationsRegions): Promise<OrganizationsRegions | null> =>
(await db<OrganizationsRegions>("organizations_regions")
.where({ regionId, organizationId })
.first()) ?? null;
export const saveRegionTo = (db: Knex) => async (region: Region) =>
await db<Region>("regions").insert(region);
export const saveOrganizationTo =
(db: Knex) => async (organization: Organization) =>
await db<Organization>("organizations").insert(organization);
export const getOrganizationFrom =
(db: Knex) =>
async (id: string): Promise<Organization | null> => {
return ( return (
(await db<Organization>("organizations").where({ id }).first()) ?? null (await this.db<UserRecord>('users').where('id', '=', userId).first()) ??
); null
}; )
}
export const getOrganizationsFrom = async queryUsers (): Promise<UserRecord[]> {
(db: Knex) => async (): Promise<Organization[]> => return await this.db<UserRecord>('users').select()
await db<Organization>("organizations").select(); }
export const saveOrganizationsRegionsTo = async saveUser (user: UserRecord): Promise<void> {
(db: Knex) => await this.db<UserRecord>('users').insert(user)
async (or: OrganizationsRegions): Promise<void> => }
await db<OrganizationsRegions>("organizations_regions").insert(or);
export const saveOrganizationAclTo = async getUsersResourceAcl ({
(db: Knex) => resourceId,
async (orgAcl: OrganizationAcl): Promise<void> => { userId
await db<OrganizationsRegions>("organization_acl").insert(orgAcl); }: ResourceAcl): Promise<ResourceAcl | null> {
}; return (
(await this.db<ResourceAcl>('resource_acl')
.where({ userId, resourceId })
.first()) ?? null
)
}
export const queryOrganizationAclFrom = async saveResourceAcl (resourceAcl: ResourceAcl): Promise<void> {
(db: Knex) => await this.db<ResourceAcl>('resource_acl').insert(resourceAcl)
async ({ }
async countUsersResources (userId: string): Promise<number> {
const [rawCount] = await this.db<ResourceAcl>('resource_acl')
.count()
.where({ userId })
return parseInt(rawCount.count as string)
}
async findUsersResource ({
resourceId,
userId
}: ResourceAcl): Promise<ResourceAcl | null> {
return (
(await this.db<ResourceAcl>('resource_acl')
.where({ userId, resourceId })
.first()) ?? null
)
}
async queryResources ({
userId, userId,
organizationId, limit,
}: OrganizationAcl): Promise<OrganizationAcl | null> => cursor
(await db<OrganizationAcl>("organization_acl") }: {
.where({ userId, organizationId }) userId: string
.first()) ?? null; limit: number
cursor: string | null
}): Promise<Resource[]> {
let query = this.db<Resource & ResourceAcl>('resources')
.join('resource_acl', 'resources.id', 'resource_acl.resourceId')
.where({ userId })
if (cursor) {
query = query.andWhere('createdAt', '<', cursor)
}
const items = await query.orderBy('createdAt', 'desc').limit(limit)
return items
}
export const saveOrganizationResourceAclTo = async countResourceComments (resourceId: string): Promise<number> {
(db: Knex) => const [rawCount] = await this.db<Comment>('comments')
async (item: OrganizationResourceAcl): Promise<void> => { .count()
await db<OrganizationResourceAcl>("organization_resource_acl").insert(item);
};
export const saveResourceRegionOrganizationTo =
(db: Knex) => async (item: ResourceRegionOrg) => {
await db<ResourceRegionOrg>("resource_region_organization").insert(item);
};
export const queryResourceRegionOrganizationFrom =
(db: Knex) =>
async (resourceId: string): Promise<ResourceRegion | null> =>
(await db<ResourceRegionOrg>("resource_region_organization")
.where({ resourceId }) .where({ resourceId })
.first()) ?? null; return parseInt(rawCount.count as string)
}
async queryComments ({
resourceId,
limit,
cursor
}: {
resourceId: string
limit: number
cursor: string | null
}): Promise<Comment[]> {
let query = this.db<Comment>('comments').where({ resourceId })
if (cursor) {
query = query.andWhere('createdAt', '<', cursor)
}
return await query.orderBy('createdAt', 'desc').limit(limit)
}
async queryRegions (
params:
| {
connectionString?: string | undefined
}
| undefined = undefined
): Promise<Region[]> {
const query = this.db<Region>('regions')
if ((params != null) && params.connectionString) query.where(params)
return await query.select()
}
async findRegion (id: string): Promise<Region | null> {
return (await this.db<Region>('regions').where({ id }).first()) ?? null
}
async queryOrganizationsRegions (): Promise<OrganizationsRegions[]> {
return await this.db<OrganizationsRegions>('organizations_regions').select()
}
async findOrganizationRegion ({
regionId,
organizationId
}: OrganizationsRegions): Promise<OrganizationsRegions | null> {
return (
(await this.db<OrganizationsRegions>('organizations_regions')
.where({ regionId, organizationId })
.first()) ?? null
)
}
async saveRegion (region: Region): Promise<void> {
await this.db<Region>('regions').insert(region)
}
async saveOrganization (organization: Organization) {
await this.db<Organization>('organizations').insert(organization)
}
async findOrganization (id: string): Promise<Organization | null> {
return (
(await this.db<Organization>('organizations').where({ id }).first()) ??
null
)
}
async queryOrganizations (): Promise<Organization[]> {
return await this.db<Organization>('organizations').select()
}
async saveOrganizationRegion (or: OrganizationsRegions): Promise<void> {
return await this.db<OrganizationsRegions>('organizations_regions').insert(
or
)
}
async saveOrganizationAcl (orgAcl: OrganizationAcl): Promise<void> {
await this.db<OrganizationsRegions>('organization_acl').insert(orgAcl)
}
async findOrganizationAcl ({
userId,
organizationId
}: OrganizationAcl): Promise<OrganizationAcl | null> {
return (
(await this.db<OrganizationAcl>('organization_acl')
.where({ userId, organizationId })
.first()) ?? null
)
}
async saveOrganizationResourceAcl (
item: OrganizationResourceAcl
): Promise<void> {
await this.db<OrganizationResourceAcl>('organization_resource_acl').insert(
item
)
}
async findResourceRegion ({
resourceId
}: {
resourceId: string
}): Promise<ResourceRegion | null> {
return (
(await this.db<ResourceRegion>('resource_region')
.where({ resourceId })
.first()) ?? null
)
}
async saveResourceRegion (item: ResourceRegion): Promise<void> {
await this.db<ResourceRegion>('resource_region').insert(item)
}
}
+108 -127
View File
@@ -1,25 +1,7 @@
import { import { RegionRepo, MainRepo } from './repositories'
getOrganizationsFrom, import { getComments } from './services/comments'
getRegionsFrom, import { createResource, getResources } from './services/resources'
queryOrganizationAclFrom, import { GraphQLError } from 'graphql'
queryOrganizationRegionsFrom,
queryResourceAclFrom,
saveOrganizationResourceAclTo,
saveOrganizationAclTo,
saveResourceAclTo,
saveResourceTo,
saveResourceRegionOrganizationTo,
saveCommentTo,
queryResourceFrom,
queryUser,
countCommentsIn,
queryCommentsFrom,
getUsersFrom,
saveUserTo,
} from "./repositories";
import { getComments } from "./services/comments";
import { createResource, getResources } from "./services/resources";
import { GraphQLError } from "graphql";
import { import {
Resource, Resource,
UserRecord, UserRecord,
@@ -29,168 +11,167 @@ import {
OrganizationsRegions, OrganizationsRegions,
OrganizationAcl, OrganizationAcl,
CommentCreateArgs, CommentCreateArgs,
UserCreateArgs, UserCreateArgs
} from "./types"; } from './types'
import { import {
bindRegionToOrganization,
createOrganization, createOrganization,
getDbClient,
getMainDbClient,
getResourceDatabaseConnection,
registerRegion, registerRegion,
} from "./services/databaseManagement"; getMainRepo,
import { authorizeUserOrgRegion } from "./services/authz"; getRegionRepo,
import cryptoRandomString from "crypto-random-string"; getResourceRepo
} from './services/databaseManagement'
import { authorizeUserOrgRegion } from './services/authz'
import cryptoRandomString from 'crypto-random-string'
// Resolvers define how to fetch the types defined in your schema. // Resolvers define how to fetch the types defined in your schema.
// This resolver retrieves books from the "books" array above. // This resolver retrieves books from the "books" array above.
export const resolvers = { export const resolvers = {
Query: { Query: {
async users() { async users () {
return await getUsersFrom(getMainDbClient())(); return await getMainRepo().queryUsers()
}, },
async user(_: unknown, args: { id: string }) { async user (_: unknown, args: { id: string }) {
return await queryUser(args.id); return await getMainRepo().findUser(args.id)
}, },
async resource( async resource (
_: unknown, _: unknown,
args: { id: string; userId: string }, args: { id: string, userId: string }
): Promise<Resource> { ): Promise<Resource> {
const mainDb = getMainDbClient(); const mainRepo = getMainRepo()
const maybeAcl = await queryResourceAclFrom(mainDb)({ const maybeAcl = await mainRepo.getUsersResourceAcl({
userId: args.userId, userId: args.userId,
resourceId: args.id, resourceId: args.id
}); })
if (maybeAcl == null) { if (maybeAcl == null) {
throw new GraphQLError( throw new GraphQLError(
"The user doesn't have access to the given resource", "The user doesn't have access to the given resource",
{ {
extensions: { extensions: {
code: "FORBIDDEN", code: 'FORBIDDEN'
}, }
}, }
); )
} }
const db = await getResourceDatabaseConnection(args.id); const resourceRepo = await getResourceRepo(args.id)
const maybeResource = await queryResourceFrom(db)(args.id); const maybeResource = await resourceRepo.findResource(args.id)
if (maybeResource == null) { if (maybeResource == null) {
throw new GraphQLError("Resource not found", { throw new GraphQLError('Resource not found', {
extensions: { code: "RESOURCE_NOT_FOUND" }, extensions: { code: 'RESOURCE_NOT_FOUND' }
}); })
} }
return maybeResource; return maybeResource
}, },
async organizations() { async organizations () {
return await getOrganizationsFrom(getMainDbClient())(); return await getMainRepo().queryOrganizations()
},
async regions() {
return await getRegionsFrom(getMainDbClient())();
}, },
async regions () {
return await getMainRepo().queryRegions()
}
}, },
User: { User: {
async resources(parent: UserRecord, args: PaginationArgs) { async resources (parent: UserRecord, args: PaginationArgs) {
return await getResources({ userId: parent.id, ...args }); const mainRepo = getMainRepo()
}, return await getResources(
mainRepo.countUsersResources.bind(mainRepo),
mainRepo.queryResources.bind(mainRepo)
)({ userId: parent.id, ...args })
}
}, },
Resource: { Resource: {
async comments( async comments (
parent: Resource, parent: Resource,
{ limit, cursor }: PaginationArgs, { limit, cursor }: PaginationArgs
): Promise<CommentCollection> { ): Promise<CommentCollection> {
const db = await getResourceDatabaseConnection(parent.id); const resourceRepo = await getResourceRepo(parent.id)
return await getComments( return await getComments(
countCommentsIn(db), resourceRepo.countComments.bind(resourceRepo),
queryCommentsFrom(db), resourceRepo.queryComments.bind(resourceRepo)
)({ )({
resourceId: parent.id, resourceId: parent.id,
limit, limit,
cursor, cursor
}); })
}, }
}, },
Mutation: { Mutation: {
async createUser( async createUser (
_: unknown, _: unknown,
{ input: { name } }: { input: UserCreateArgs }, { input: { name } }: { input: UserCreateArgs }
) { ) {
const id = cryptoRandomString({ length: 10 }); const id = cryptoRandomString({ length: 10 })
await saveUserTo(getMainDbClient())({ id, name }); await getMainRepo().saveUser({ id, name })
return id; return id
}, },
async registerRegion( async registerRegion (
_: unknown, _: unknown,
args: { args: {
name: string; name: string
connectionString: string; connectionString: string
maintenanceDb: string; sslCaCert: string | null
}, }
) { ) {
return await registerRegion(args); return await registerRegion(args)
}, },
async createOrganization(_: unknown, args: { name: string }) { async createOrganization (_: unknown, args: { name: string }) {
return await createOrganization(args.name); return await createOrganization(args.name)
}, },
async addRegionToOrganization(_: unknown, args: OrganizationsRegions) { async addRegionToOrganization (_: unknown, args: OrganizationsRegions) {
await bindRegionToOrganization(args); await getMainRepo().saveOrganizationRegion(args)
}, },
async addUserToOrganization( async addUserToOrganization (
_: unknown, _: unknown,
{ input: args }: { input: OrganizationAcl }, { input: args }: { input: OrganizationAcl }
) { ) {
await saveOrganizationAclTo(getMainDbClient())(args); await getMainRepo().saveOrganizationAcl(args)
}, },
async createResource( async createResource (
_: unknown, _: unknown,
{ input: args }: { input: ResourceCreateArgs }, { input: args }: { input: ResourceCreateArgs }
) { ) {
const mainDb = getMainDbClient(); const mainRepo = getMainRepo()
await authorizeUserOrgRegion( await authorizeUserOrgRegion(
queryOrganizationAclFrom(mainDb), mainRepo.findOrganizationAcl.bind(mainRepo),
queryOrganizationRegionsFrom(mainDb), mainRepo.findOrganizationRegion.bind(mainRepo)
)(args); )(args)
const db = const repo = args.regionId
args.regionId && args.organizationId ? await getRegionRepo({ regionId: args.regionId })
? await getDbClient({ : mainRepo
regionId: args.regionId,
organizationId: args.organizationId,
})
: mainDb;
const resourceId = await createResource( const resourceId = await createResource(
saveResourceTo(db), repo.saveResource.bind(repo),
saveResourceAclTo(mainDb), mainRepo.saveResourceAcl.bind(mainRepo)
)(args); )(args)
if (args.organizationId) { if (args.organizationId) {
await saveOrganizationResourceAclTo(mainDb)({ await mainRepo.saveOrganizationResourceAcl({
organizationId: args.organizationId, organizationId: args.organizationId,
resourceId, resourceId
}); })
await saveResourceRegionOrganizationTo(mainDb)({ if (args.regionId) {
resourceId, await mainRepo.saveResourceRegion({
organizationId: args.organizationId, resourceId,
// i know its not null here, the authz function ensures it // i know its not null here, the authz function ensures it
regionId: args.regionId!, regionId: args.regionId
}); })
}
} }
return resourceId; return resourceId
}, },
async addComment( async addComment (
_: unknown, _: unknown,
{ input: args }: { input: CommentCreateArgs }, { input: args }: { input: CommentCreateArgs }
) { ) {
const mainDb = getMainDbClient(); const mainRepo = getMainRepo()
const resourceAcl = await queryResourceAclFrom(mainDb)(args); const resourceAcl = await mainRepo.getUsersResourceAcl(args)
if (!resourceAcl) if (resourceAcl == null) { throw new Error("The user doesn't have access to the given resource") }
throw new Error("The user doesn't have access to the given resource"); // 2. get resource db client
//2. get resource db client const resourceRepo = await getResourceRepo(args.resourceId)
const db = await getResourceDatabaseConnection(args.resourceId); // 3. save comment to db
//3. save comment to db const id = cryptoRandomString({ length: 10 })
const id = cryptoRandomString({ length: 10 }); const createdAt = new Date()
const createdAt = new Date(); await resourceRepo.saveComment({ id, createdAt, ...args })
await saveCommentTo(db)({ id, createdAt, ...args }); return id
return id; }
}, }
}, }
};
+1 -2
View File
@@ -38,7 +38,6 @@ type Organization {
type Region { type Region {
id: String! id: String!
name: String! name: String!
maintenanceDb: String!
} }
type Query { type Query {
@@ -78,7 +77,7 @@ type Mutation {
registerRegion( registerRegion(
name: String! name: String!
connectionString: String! connectionString: String!
maintenanceDb: String! sslCaCert: String
): String! ): String!
createOrganization(name: String!): String! createOrganization(name: String!): String!
addRegionToOrganization(organizationId: String!, regionId: String!): Boolean addRegionToOrganization(organizationId: String!, regionId: String!): Boolean
+12 -15
View File
@@ -1,26 +1,23 @@
import { import {
OrganizationAcl, OrganizationAcl,
OrganizationsRegions, OrganizationsRegions,
UserOrgRegionArgs, UserOrgRegionArgs
} from "../types"; } from '../types'
export const authorizeUserOrgRegion = export const authorizeUserOrgRegion =
( (
orgAclGetter: (params: OrganizationAcl) => Promise<OrganizationAcl | null>, orgAclGetter: (params: OrganizationAcl) => Promise<OrganizationAcl | null>,
orgRegionGetter: ( orgRegionGetter: (
params: OrganizationsRegions, params: OrganizationsRegions,
) => Promise<OrganizationsRegions | null>, ) => Promise<OrganizationsRegions | null>
) => ) =>
async ({ userId, regionId, organizationId }: UserOrgRegionArgs) => { async ({ userId, regionId, organizationId }: UserOrgRegionArgs) => {
if (!organizationId && regionId) if (!organizationId && regionId) { throw new Error("public org doesn't support regions") }
throw new Error("public org doesn't support regions"); if (organizationId) {
if (organizationId) { if (!regionId) throw new Error('organizations can only write to regions')
if (!regionId) throw new Error("organizations can only write to regions"); const orgAcl = await orgAclGetter({ organizationId, userId })
const orgAcl = await orgAclGetter({ organizationId, userId }); if (orgAcl == null) { throw new Error("user doesn't have access to this organization") }
if (!orgAcl) const orgRegion = await orgRegionGetter({ organizationId, regionId })
throw new Error("user doesn't have access to this organization"); if (orgRegion == null) { throw new Error('organization doesnt have access to this region') }
const orgRegion = await orgRegionGetter({ organizationId, regionId }); }
if (!orgRegion)
throw new Error("organization doesnt have access to this region");
} }
};
+15 -15
View File
@@ -1,25 +1,25 @@
import { CommentCollection, PaginationArgs, Comment } from "../types"; import { CommentCollection, PaginationArgs, Comment } from '../types'
interface GetCommentsArgs extends PaginationArgs { interface GetCommentsArgs extends PaginationArgs {
resourceId: string; resourceId: string
} }
export const getComments = export const getComments =
( (
countComments: (resourceId: string) => Promise<number>, countComments: (resourceId: string) => Promise<number>,
queryComments: (params: GetCommentsArgs) => Promise<Comment[]>, queryComments: (params: GetCommentsArgs) => Promise<Comment[]>
) => ) =>
async (params: GetCommentsArgs): Promise<CommentCollection> => { async (params: GetCommentsArgs): Promise<CommentCollection> => {
// yes, i should be doing base64 de and encoding with the cursor... // yes, i should be doing base64 de and encoding with the cursor...
const totalCount = await countComments(params.resourceId); const totalCount = await countComments(params.resourceId)
const items = await queryComments(params); const items = await queryComments(params)
let cursor = null; let cursor = null
if (items.length > 0) { if (items.length > 0) {
cursor = items.slice(-1)[0].createdAt.toISOString(); cursor = items.slice(-1)[0].createdAt.toISOString()
}
return {
totalCount,
items,
cursor
}
} }
return {
totalCount,
items,
cursor,
};
};
+163 -194
View File
@@ -1,243 +1,212 @@
import { POSTGRES_URL } from "../config"; import { POSTGRES_URL } from '../config'
import { import { RegionRepo, MainRepo } from '../repositories'
getOrganizationFrom, import knex, { Knex } from 'knex'
getOrganizationRegionsFrom, import cryptoRandomString from 'crypto-random-string'
getRegionFrom,
queryResourceRegionOrganizationFrom,
saveOrganizationTo,
saveOrganizationsRegionsTo,
saveRegionTo,
} from "../repositories";
import { OrganizationsRegions, Region } from "../types";
import knex, { Knex } from "knex";
import cryptoRandomString from "crypto-random-string";
const migrateToLatest = async (client: Knex): Promise<void> => { const migrateToLatest = async (db: Knex): Promise<void> => {
const plannedMigrations: Array<{ file: string }> = ( const plannedMigrations: Array<{ file: string }> = (
await client.migrate.list() await db.migrate.list()
)[1]; )[1]
if (plannedMigrations.length > 0) { if (plannedMigrations.length > 0) {
console.log( console.log(
`🕰️ planning migrations: ${plannedMigrations `🕰️ planning migrations: ${plannedMigrations
.map((m) => m.file) .map((m) => m.file)
.join(",")}`, .join(',')}`
); )
} else { } else {
console.log("no migrations are planned"); console.log('no migrations are planned')
} }
// TODO: make sure if a migration fails, all migrations are rolled back // TODO: make sure if a migration fails, all migrations are rolled back
await client.migrate.latest(); await db.migrate.latest()
};
export const migrateAll = async (): Promise<void> => {
await migrateToLatest(mainClient);
const databaseSchemas = await getAllDatabaseSchemaConnections();
await Promise.all([
...databaseSchemas.map(async (sc) => await migrateToLatest(sc)),
]);
// 1. get all regions from main DB
// 2. construct region specific knex clients and cache them by
// 3. structure the cache so that it accomodates client creation by resource id
// 4. get all organization regions from main DB
// 5. for in all regions for all organizations, run the migration
// 6. do not forget the migration for the main DB
//
};
const createDatabaseConfig = (connectionString: string): Knex.Config => {
return {
client: "pg",
connection: {
connectionString,
},
// connection: connectionString,
migrations: {
directory: "src/migrations",
extension: "ts",
},
};
};
const mainClient = knex(createDatabaseConfig(POSTGRES_URL));
const _connectionStore: Map<string, Knex> = new Map();
interface RegionWithMaybeOrganization {
regionId: string;
organizationId?: string | undefined;
} }
const _createConnectionKey = ({ export const migrateAll = async (): Promise<void> => {
organizationId, await migrateToLatest(mainRepo.db)
regionId, const repos = await getAllRepositories()
}: RegionWithMaybeOrganization): string => {
return organizationId ? `${organizationId}@${regionId}` : regionId;
};
export const getDbClient = async ({ await Promise.all([
regionId, ...repos.map(async (repo) => await migrateToLatest(repo.db))
organizationId, ])
}: RegionWithMaybeOrganization): Promise<Knex> => { }
const connectionKey = _createConnectionKey({ organizationId, regionId });
const maybeClient = _connectionStore.get(connectionKey);
if (maybeClient) return maybeClient;
const maybeRegion = await mainClient<Region>("regions")
.select()
.where({ id: regionId })
.first();
if (!maybeRegion) throw Error(`region ${regionId} not found`);
const connectionString = organizationId
? `${maybeRegion.connectionString}/${organizationId}`
: `${maybeRegion.connectionString}/${maybeRegion.maintenanceDb}`;
const client = knex(createDatabaseConfig(connectionString));
_connectionStore.set(connectionKey, client);
return client;
};
export const getMainDbClient = (): Knex => mainClient; const createDatabaseConfig = (
connectionString: string,
sslCaCert: string | null
): Knex.Config => {
const config: Knex.Config = {
client: 'pg',
connection: {
connectionString,
ssl: sslCaCert
? {
ca: sslCaCert,
rejectUnauthorized: true
}
: undefined
},
migrations: {
directory: 'src/migrations',
extension: 'ts'
}
}
return config
}
const mainRepo = new MainRepo(knex(createDatabaseConfig(POSTGRES_URL, null)))
const _repoStore: Map<string, RegionRepo> = new Map()
export const getRegionRepo = async ({
regionId
}: {
regionId: string | undefined
}): Promise<RegionRepo> => {
if (!regionId) return mainRepo
const maybeRepo = _repoStore.get(regionId)
if (maybeRepo != null) return maybeRepo
const maybeRegion = await mainRepo.findRegion(regionId)
if (maybeRegion == null) throw Error(`region ${regionId} not found`)
const repo = new RegionRepo(
knex(
createDatabaseConfig(maybeRegion.connectionString, maybeRegion.sslCaCert)
)
)
_repoStore.set(regionId, repo)
return repo
}
export const getMainRepo = (): MainRepo => mainRepo
export const registerRegion = async ({ export const registerRegion = async ({
name, name,
connectionString, connectionString,
maintenanceDb, sslCaCert
}: { }: {
name: string; name: string
connectionString: string; connectionString: string
maintenanceDb: string; sslCaCert: string | null
}): Promise<string> => { }): Promise<string> => {
// TODO: validate the connectionString, so that the knex client can connect to it const regions = await mainRepo.queryRegions({ connectionString })
const id = cryptoRandomString({ length: 10 }); if (regions.length > 0) throw new Error('This region is already registered')
await saveRegionTo(mainClient)({ const id = cryptoRandomString({ length: 10 })
const repo = new RegionRepo(
knex(createDatabaseConfig(connectionString, sslCaCert))
)
await migrateToLatest(repo.db)
_repoStore.set(id, repo)
const sslmode = sslCaCert ? 'require' : 'disable'
await setUpUserReplication({
from: mainRepo.db,
to: repo.db,
regionName: name,
sslmode
})
await setUpResourceReplication({
from: repo.db,
to: mainRepo.db,
regionName: name,
sslmode
})
await mainRepo.saveRegion({
id, id,
name, name,
connectionString, connectionString,
maintenanceDb, sslCaCert
}); })
return id; return id
}; }
export const createOrganization = async (name: string): Promise<string> => { export const createOrganization = async (name: string): Promise<string> => {
const id = cryptoRandomString({ length: 10 }); const id = cryptoRandomString({ length: 10 })
await saveOrganizationTo(mainClient)({ id, name }); await mainRepo.saveOrganization({ id, name })
return id; return id
}; }
const createDb = async (client: Knex, name: string): Promise<void> => { interface ReplicationArgs {
try { from: Knex
await client.raw(`create database "${name}"`); to: Knex
} catch (err) { sslmode: string
if (!(err instanceof Error)) throw err; regionName: string
if (!err.message.includes("already exists")) throw err; }
}
};
const setUpUserReplication = async ({ const setUpUserReplication = async ({
from, from,
to, to,
}: { sslmode,
from: Knex; regionName
to: Knex; }: ReplicationArgs): Promise<void> => {
}): Promise<void> => {
// TODO: ensure its created... // TODO: ensure its created...
const connectionString: string =
from.client.config.connection.connectionString;
try { try {
await from.raw("CREATE PUBLICATION userspub FOR TABLE users;"); await from.raw('CREATE PUBLICATION userspub FOR TABLE users;')
} catch (err) { } catch (err) {
if (!(err instanceof Error)) throw err; if (!(err instanceof Error)) throw err
if (!err.message.includes("already exists")) throw err; if (!err.message.includes('already exists')) throw err
} }
const fromUrl = new URL(from.client.config.connection.connectionString)
const fromDbName = fromUrl.pathname.replace('/', '')
const subName = `userssub_${regionName}`
const rawSqeel = `SELECT * FROM aiven_extras.pg_create_subscription(
'${subName}',
'dbname=${fromDbName} host=${fromUrl.hostname} port=${fromUrl.port} sslmode=${sslmode} user=${fromUrl.username} password=${fromUrl.password}',
'userspub',
'${subName}',
TRUE,
TRUE
);`
try { try {
const toUrl = new URL(to.client.config.connection.connectionString); await to.raw(rawSqeel)
await to.raw(
`CREATE SUBSCRIPTION userssub_${toUrl.pathname.replace("/", "")} CONNECTION '${connectionString}' PUBLICATION userspub;`,
);
} catch (err) { } catch (err) {
if (!(err instanceof Error)) throw err; if (!(err instanceof Error)) throw err
if (!err.message.includes("already exists")) throw err; if (!err.message.includes('already exists')) throw err
} }
}; }
const setUpResourceReplication = async ({ const setUpResourceReplication = async ({
from, from,
fromRegionName,
to, to,
}: { regionName,
from: Knex; sslmode
fromRegionName: string; }: ReplicationArgs): Promise<void> => {
to: Knex;
}): Promise<void> => {
// TODO: ensure its created... // TODO: ensure its created...
const connectionString: string =
from.client.config.connection.connectionString;
const connUrl = new URL(connectionString);
try { try {
await from.raw("CREATE PUBLICATION resourcepub FOR TABLE resources;"); await from.raw('CREATE PUBLICATION resourcepub FOR TABLE resources;')
} catch (err) { } catch (err) {
if (!(err instanceof Error)) throw err; if (!(err instanceof Error)) throw err
if (!err.message.includes("already exists")) throw err; if (!err.message.includes('already exists')) throw err
} }
const fromUrl = new URL(from.client.config.connection.connectionString)
const fromDbName = fromUrl.pathname.replace('/', '')
const subName = `resourcesub_${regionName}`
const rawSqeel = `SELECT * FROM aiven_extras.pg_create_subscription(
'${subName}',
'dbname=${fromDbName} host=${fromUrl.hostname} port=${fromUrl.port} sslmode=${sslmode} user=${fromUrl.username} password=${fromUrl.password}',
'resourcepub',
'${subName}',
TRUE,
TRUE
);`
try { try {
await to.raw( await to.raw(rawSqeel)
`CREATE SUBSCRIPTION "resroucesub_${fromRegionName.replace(
" ",
"",
)}_${connUrl.pathname.replace(
"/",
"",
)}" CONNECTION '${connectionString}' PUBLICATION resourcepub;`,
);
} catch (err) { } catch (err) {
if (!(err instanceof Error)) throw err; if (!(err instanceof Error)) throw err
if (!err.message.includes("already exists")) throw err; if (!err.message.includes('already exists')) throw err
} }
}; }
export const bindRegionToOrganization = async ({ export const getAllRepositories = async (): Promise<RegionRepo[]> => {
regionId, const regions = await mainRepo.queryRegions({})
organizationId, const regionRepos = await Promise.all(
}: OrganizationsRegions): Promise<void> => { regions.map(async (region) => await getRegionRepo({ regionId: region.id }))
const region = await getRegionFrom(mainClient)(regionId); )
if (!region) throw Error(`region ${regionId} not found`); return [mainRepo, ...regionRepos]
const organization = await getOrganizationFrom(mainClient)(organizationId); }
if (!organization) throw Error(`organization ${organizationId} not found`);
const regionClient = await getDbClient({ regionId }); export const getResourceRepo = async (
resourceId: string
await createDb(regionClient, organizationId); ): Promise<RegionRepo> => {
const resourceRegion = await mainRepo.findResourceRegion({ resourceId })
const client = await getDbClient({ organizationId, regionId }); return (resourceRegion != null) ? await getRegionRepo(resourceRegion) : getMainRepo()
const connectionKey = _createConnectionKey({ organizationId, regionId }); }
await migrateToLatest(client);
await setUpUserReplication({ from: mainClient, to: client });
await setUpResourceReplication({
from: client,
fromRegionName: region.name,
to: mainClient,
});
_connectionStore.set(connectionKey, client);
await saveOrganizationsRegionsTo(mainClient)({ organizationId, regionId });
};
export const getAllDatabaseSchemaConnections = async (): Promise<Knex[]> => {
const organizationRegions = await getOrganizationRegionsFrom(mainClient)();
const clients = await Promise.all(
organizationRegions.map(async (or) => {
const client = await getDbClient(or);
return client;
}),
);
return [mainClient, ...clients];
};
export const getResourceDatabaseConnection = async (
resourceId: string,
): Promise<Knex> => {
const resourceRegionOrg =
await queryResourceRegionOrganizationFrom(mainClient)(resourceId);
return resourceRegionOrg ? await getDbClient(resourceRegionOrg) : mainClient;
};
+34 -32
View File
@@ -1,45 +1,47 @@
import cryptoRandomString from "crypto-random-string"; import cryptoRandomString from 'crypto-random-string'
import { countResources, queryResources } from "../repositories";
import { import {
Resource, Resource,
PaginationArgs, PaginationArgs,
ResourceCollection, ResourceCollection,
ResourceCreateArgs, ResourceCreateArgs,
ResourceAcl, ResourceAcl
} from "../types"; } from '../types'
interface GetResourcesArgs extends PaginationArgs { interface GetResourcesArgs extends PaginationArgs {
userId: string; userId: string
} }
export const getResources = async ( export const getResources =
params: GetResourcesArgs, (
): Promise<ResourceCollection> => { countResources: (userId: string) => Promise<number>,
const totalCount = await countResources(params.userId); queryResources: (params: GetResourcesArgs) => Promise<Resource[]>
const items = await queryResources(params); ) =>
let cursor = null; async (params: GetResourcesArgs): Promise<ResourceCollection> => {
if (items.length > 0) { const totalCount = await countResources(params.userId)
cursor = items.slice(-1)[0].createdAt.toISOString(); const items = await queryResources(params)
} let cursor = null
return { if (items.length > 0) {
totalCount, cursor = items.slice(-1)[0].createdAt.toISOString()
items, }
cursor, return {
}; totalCount,
}; items,
cursor
}
}
export const createResource = export const createResource =
( (
resourceSaver: (resource: Resource) => Promise<void>, resourceSaver: (resource: Resource) => Promise<void>,
resourceAclSaver: (resourceAcl: ResourceAcl) => Promise<void>, resourceAclSaver: (resourceAcl: ResourceAcl) => Promise<void>
) => ) =>
async ({ userId, name }: ResourceCreateArgs): Promise<string> => { async ({ userId, name }: ResourceCreateArgs): Promise<string> => {
//1. if no org, create project in main region, validate that, regionId is null // 1. if no org, create project in main region, validate that, regionId is null
//2. if org, validate if user has access to the org // 2. if org, validate if user has access to the org
//3. if org and region, validate if org has access to region // 3. if org and region, validate if org has access to region
//4. create resource // 4. create resource
const id = cryptoRandomString({ length: 10 }); const id = cryptoRandomString({ length: 10 })
const resource = { id, name, createdAt: new Date() }; const resource = { id, name, createdAt: new Date() }
await resourceSaver(resource); await resourceSaver(resource)
await resourceAclSaver({ resourceId: id, userId }); await resourceAclSaver({ resourceId: id, userId })
return id; return id
}; }
+33
View File
@@ -0,0 +1,33 @@
import { knex } from "../../db";
import { Knex } from "knex";
type Thing = {
id: string;
name: string;
};
// talk to the DB
const repo =
(db: Knex<Thing>) =>
async (id: string): Promise<Thing | null> => {
return (await db.where({ id }).first()) ?? null;
};
// business / domain logic
const service =
(thingGetter: (id: string) => Promise<Thing | null>) =>
async (id: string): Promise<Thing | null> => {
return thingGetter(id);
};
const getThingClient = (id: string | undefined): Knex => {
if (!id) return knex;
return knex;
};
// graphql entry
export const resolver = async (args: { id: string }): Promise<Thing> => {
const thing = await service(repo(getThingClient(args.id)))(args.id);
if (!thing) throw new Error("not found");
return thing;
};
+33
View File
@@ -0,0 +1,33 @@
import { Knex } from "knex";
import { knex } from "../../db";
type Thing = {
id: string;
name: string;
};
type ServedThing = {
foo: number;
} & Thing;
// talk to the DB
const repo =
({ db }: { db: Knex<Thing> }) =>
async (id: string): Promise<Thing | null> => {
return (await db().where({ id }).first()) ?? null;
};
// business / domain logic
const service =
({ thingGetter }: { thingGetter: (id: string) => Promise<Thing | null> }) =>
async (id: string): Promise<ServedThing | null> => {
const thing = await thingGetter(id);
const foo = 123;
return thing ? { ...thing, foo } : null;
};
// graphql entry
export const resolver = async (id: string): Promise<ServedThing> => {
const thing = await service({ thingGetter: repo({ db: knex }) })(id);
if (!thing) throw new Error("not found");
return thing;
};
+8
View File
@@ -0,0 +1,8 @@
export type Thing = {
id: string;
};
export type ThingRepo = {
findThing: (id: string) => Promise<Thing | null>;
queryThing: () => Promise<Thing[]>;
};
+12
View File
@@ -0,0 +1,12 @@
import { Knex } from "knex";
import { Thing } from "./domain";
const findThing =
({ db }: { db: Knex }) =>
async (id: string): Promise<Thing | null> => {
return null;
};
export const thingRepo = ({ db }: { db: Knex }) => ({
findThing: findThing({ db }),
});
+11
View File
@@ -0,0 +1,11 @@
import { ServedThing, service } from "./services/service";
import { thingRepo } from "./repo";
import { knex } from "../../db";
export const resolver = async (id: string): Promise<ServedThing> => {
const thing = await service({
thingRepo: thingRepo({ db: knex }),
})(id);
if (!thing) throw new Error("not found");
return thing;
};
+19
View File
@@ -0,0 +1,19 @@
import { Thing, type ThingRepo } from "../domain";
export type ServedThing = {
foo: number;
} & Thing;
export const service =
({ thingRepo }: { thingRepo: Pick<ThingRepo, "findThing"> }) =>
async (id: string): Promise<ServedThing | null> => {
const thing = await thingRepo.findThing(id);
const foo = 123;
return thing ? { ...thing, foo } : null;
};
export const service2 = ({
thingRepo,
}: {
thingRepo: Pick<ThingRepo, "queryThing">;
}) => {};
View File
+24
View File
@@ -0,0 +1,24 @@
import { knex } from "../../db";
type Thing = {
id: string;
name: string;
};
const Things = () => knex<Thing>("things");
// talk to the DB
const repo = async (id: string): Promise<Thing | null> => {
return (await Things().where({ id }).first()) ?? null;
};
// business / domain logic
const service = async (id: string): Promise<Thing | null> => {
return repo(id);
};
// graphql entry
export const resolver = async (id: string): Promise<Thing> => {
const thing = await service(id);
if (!thing) throw new Error("not found");
return thing;
};
View File
+42 -41
View File
@@ -1,98 +1,99 @@
export interface CommentCreateArgs { export interface CommentCreateArgs {
userId: string; userId: string
content: string; content: string
resourceId: string; resourceId: string
} }
export interface Comment extends CommentCreateArgs { export interface Comment extends CommentCreateArgs {
id: string; id: string
createdAt: Date; createdAt: Date
} }
export interface PaginationArgs { export interface PaginationArgs {
limit: number; limit: number
cursor: string | null; cursor: string | null
} }
interface Collection<T> { interface Collection<T> {
totalCount: number; totalCount: number
cursor: string | null; cursor: string | null
items: T[]; items: T[]
} }
export interface CommentCollection extends Collection<Comment> {} export interface CommentCollection extends Collection<Comment> {}
export interface UserOrgRegionArgs { export interface UserOrgRegionArgs {
userId: string; userId: string
organizationId: string | null; organizationId: string | null
regionId: string | null; regionId: string | null
} }
export interface ResourceCreateArgs extends UserOrgRegionArgs { export interface ResourceCreateArgs extends UserOrgRegionArgs {
name: string; name: string
} }
export interface Resource { export interface Resource {
id: string; id: string
name: string; name: string
createdAt: Date; createdAt: Date
} }
export interface ResourceCollection extends Collection<Resource> {} export interface ResourceCollection extends Collection<Resource> {}
export interface UserCreateArgs { export interface UserCreateArgs {
name: string; name: string
} }
export interface UserRecord extends UserCreateArgs { export interface UserRecord extends UserCreateArgs {
id: string; id: string
} }
export interface User extends UserRecord { export interface User extends UserRecord {
resources: { resources: {
cursor: string | null; cursor: string | null
totalCount: number; totalCount: number
items: Resource[]; items: Resource[]
}; }
} }
export interface ResourceAcl { export interface ResourceAcl {
userId: string; userId: string
resourceId: string; resourceId: string
} }
export interface Region { export interface Region {
id: string; id: string
name: string; name: string
connectionString: string; connectionString: string
maintenanceDb: string; sslCaCert: string | null
} }
export interface Organization { export interface Organization {
id: string; id: string
name: string; name: string
} }
export interface OrganizationAcl { export interface OrganizationAcl {
userId: string; userId: string
organizationId: string; organizationId: string
} }
export interface OrganizationsRegions { export interface OrganizationsRegions {
organizationId: string; organizationId: string
regionId: string; regionId: string
} }
export interface OrganizationResourceAcl { export interface OrganizationResourceAcl {
organizationId: string; organizationId: string
resourceId: string; resourceId: string
} }
export interface ResourceRegion { export interface ResourceRegion {
resourceId: string; resourceId: string
regionId: string; regionId: string
} }
export interface ResourceRegionOrg extends ResourceRegion { export interface ResourceOrganization {
organizationId: string; resourceId: string
organizationId: string
} }
+22 -22
View File
@@ -1,29 +1,29 @@
import { expect, beforeAll, describe, it } from "vitest"; import { expect, beforeAll, describe, it } from 'vitest'
import { import {
getOrganizationRegionsFrom, getOrganizationRegionsFrom,
getRegionsFrom, getRegionsFrom
} from "../../src/repositories"; } from '../../src/repositories'
import { import {
getMainDbClient, getMainDbClient,
migrateAll, migrateAll
} from "../../src/services/databaseManagement"; } from '../../src/services/databaseManagement'
import { Knex } from "knex"; import { Knex } from 'knex'
describe("regions", () => { describe('regions', () => {
let dbClient: Knex; let dbClient: Knex
beforeAll(async () => { beforeAll(async () => {
dbClient = await getMainDbClient(); dbClient = await getMainDbClient()
}); })
it("gets all regions", async () => { it('gets all regions', async () => {
const regions = await getRegionsFrom(dbClient)(); const regions = await getRegionsFrom(dbClient)()
expect(regions.length).toBeGreaterThan(0); expect(regions.length).toBeGreaterThan(0)
}); })
it("gets organizations regions", async () => { it('gets organizations regions', async () => {
const orgRegions = await getOrganizationRegionsFrom(dbClient)(); const orgRegions = await getOrganizationRegionsFrom(dbClient)()
expect(orgRegions.length).toBeGreaterThan(0); expect(orgRegions.length).toBeGreaterThan(0)
}); })
it("migrates all", async () => { it('migrates all', async () => {
await migrateAll(); await migrateAll()
}); })
}); })
+4 -4
View File
@@ -1,7 +1,7 @@
import { defineConfig } from "vitest/config"; import { defineConfig } from 'vitest/config'
export default defineConfig({ export default defineConfig({
test: { test: {
dir: "tests", dir: 'tests'
}, }
}); })