diff --git a/.gitignore b/.gitignore index fc147f2..0435c69 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,6 @@ .envrc .swc node_modules +.ca-cert* dist diff --git a/knexfile.ts b/knexfile.ts index eb752cd..43e2073 100644 --- a/knexfile.ts +++ b/knexfile.ts @@ -1,8 +1,26 @@ -export default { - client: 'pg', - connection: process.env.POSTGRES_URL, +import { Knex } from "knex"; +import fs from "fs"; +import path from "path"; + +console.log(`foobar ${process.env.POSTGRES_CA_CERT_PATH}`); + +const config: Knex.Config = { + client: "pg", + connection: { + connectionString: process.env.POSTGRES_URL, + ssl: process.env.POSTGRES_CA_CERT_PATH + ? { + ca: fs.readFileSync( + path.resolve(__dirname, process.env.POSTGRES_CA_CERT_PATH), + ), + rejectUnauthorized: true, + } + : undefined, + }, migrations: { - directory: 'src/migrations', - extension: 'ts' - } -} + directory: "src/migrations", + extension: "ts", + }, +}; + +export default config; diff --git a/src/config.ts b/src/config.ts index 13aa009..f448228 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,9 +1,8 @@ -import 'dotenv/config' -import { parseEnv } from 'znv' -import { z } from 'zod' +import "dotenv/config"; +import { parseEnv } from "znv"; +import { z } from "zod"; -export const { POSTGRES_URL } = parseEnv(process.env, { - POSTGRES_URL: z.string().min(1) -}) - -console.log([POSTGRES_URL].join(', ')) +export const { POSTGRES_URL, POSTGRES_CA_CERT_PATH } = parseEnv(process.env, { + POSTGRES_URL: z.string().min(1), + POSTGRES_CA_CERT_PATH: z.string().min(1).nullish(), +}); diff --git a/src/migrations/20240213175105_region_pruning.ts b/src/migrations/20240213175105_region_pruning.ts new file mode 100644 index 0000000..3703dd1 --- /dev/null +++ b/src/migrations/20240213175105_region_pruning.ts @@ -0,0 +1,15 @@ +import type { Knex } from "knex"; + +const regionsTableName = "regions"; + +export async function up(knex: Knex): Promise { + await knex.schema.alterTable(regionsTableName, (table) => { + table.dropColumn("maintenanceDb"); + }); +} + +export async function down(knex: Knex): Promise { + await knex.schema.alterTable(regionsTableName, (table) => { + table.text("maintenanceDb").notNullable().defaultTo("region"); + }); +} diff --git a/src/migrations/20240213214212_split_resource_region_org.ts b/src/migrations/20240213214212_split_resource_region_org.ts new file mode 100644 index 0000000..c64e553 --- /dev/null +++ b/src/migrations/20240213214212_split_resource_region_org.ts @@ -0,0 +1,56 @@ +import type { Knex } from "knex"; + +const tableName = "resource_region_organization"; + +export async function up(knex: Knex): Promise { + await knex.schema.dropTable(tableName); + await knex.schema.createTable("resource_region", (table) => { + table + .string("resourceId") + .references("id") + .inTable("resources") + .onDelete("cascade") + .primary(); + table + .string("regionId") + .references("id") + .inTable("regions") + .onDelete("cascade"); + }); + await knex.schema.createTable("resource_organization", (table) => { + table + .string("resourceId") + .references("id") + .inTable("resources") + .onDelete("cascade") + .primary(); + table + .string("organizationId") + .references("id") + .inTable("organizations") + .onDelete("cascade"); + }); +} + +export async function down(knex: Knex): Promise { + await knex.schema.dropTable("resource_organization"); + await knex.schema.dropTable("resource_region"); + await knex.schema.createTable(tableName, (table) => { + table + .string("resourceId") + .references("id") + .inTable("resources") + .onDelete("cascade") + .primary(); + table + .string("regionId") + .references("id") + .inTable("regions") + .onDelete("cascade"); + table + .string("organizationId") + .references("id") + .inTable("organizations") + .onDelete("cascade"); + }); +} diff --git a/src/migrations/20240214151340_region_ssl_cert.ts b/src/migrations/20240214151340_region_ssl_cert.ts new file mode 100644 index 0000000..c502d0c --- /dev/null +++ b/src/migrations/20240214151340_region_ssl_cert.ts @@ -0,0 +1,13 @@ +import type { Knex } from "knex"; + +export async function up(knex: Knex): Promise { + await knex.schema.alterTable("regions", (table) => { + table.text("sslCaCert").nullable(); + }); +} + +export async function down(knex: Knex): Promise { + await knex.schema.alterTable("regions", (table) => { + table.dropColumn("sslCaCert"); + }); +} diff --git a/src/migrations/20240220152312_region_name_unique.ts b/src/migrations/20240220152312_region_name_unique.ts new file mode 100644 index 0000000..c9af1d9 --- /dev/null +++ b/src/migrations/20240220152312_region_name_unique.ts @@ -0,0 +1,13 @@ +import type { Knex } from "knex"; + +export async function up(knex: Knex): Promise { + await knex.schema.alterTable("regions", (table) => { + table.unique("name"); + }); +} + +export async function down(knex: Knex): Promise { + await knex.schema.alterTable("regions", (table) => { + table.dropUnique(["name"]); + }); +} diff --git a/src/repositories.ts b/src/repositories.ts index 8f9136d..15a52ef 100644 --- a/src/repositories.ts +++ b/src/repositories.ts @@ -1,5 +1,4 @@ -import { Knex } from "knex"; -import { knex } from "./db"; +import { Knex } from 'knex' import { UserRecord, Resource, @@ -11,190 +10,235 @@ import { OrganizationAcl, OrganizationResourceAcl, ResourceRegion, - ResourceRegionOrg, -} from "./types"; +} from './types' -const Users = () => knex("users"); -const Resources = () => knex("resources"); -const ResourceAclRepo = () => knex("resource_acl"); +export class RegionRepo { + db: Knex -export const queryUser = async (userId: string): Promise => { - return (await Users().where("id", "=", userId).first()) ?? null; -}; - -export const getUsersFrom = (db: Knex) => async (): Promise => { - return await db("users").select(); -}; - -export const saveUserTo = - (db: Knex) => - async (user: UserRecord): Promise => { - await db("users").insert(user); - }; - -export const saveResourceTo = - (db: Knex) => - async (resource: Resource): Promise => { - await db("resources").insert(resource); - }; - -export const queryResourceFrom = - (db: Knex) => - async (resourceId: string): Promise => { - return ( - (await db("resources").where({ id: resourceId }).first()) ?? - null - ); - }; - -export const queryResourceAclFrom = - (db: Knex) => - async ({ resourceId, userId }: ResourceAcl): Promise => { - return ( - (await db("resource_acl") - .where({ userId, resourceId }) - .first()) ?? null - ); - }; - -export const saveResourceAclTo = - (db: Knex) => - async (resourceAcl: ResourceAcl): Promise => { - await db("resource_acl").insert(resourceAcl); - }; - -export const countResources = async (userId: string): Promise => { - const [rawCount] = await ResourceAclRepo().count().where({ userId }); - return parseInt(rawCount.count as string); -}; - -export const queryResources = async ({ - userId, - limit, - cursor, -}: { - userId: string; - limit: number; - cursor: string | null; -}) => { - const query = Resources() - .join("resource_acl", "resources.id", "resource_acl.resourceId") - .where({ userId }); - if (cursor) { - query.andWhere("createdAt", "<", cursor); + constructor(db: Knex) { + this.db = db } - return await query.limit(limit); -}; -export const countCommentsIn = - (db: Knex) => - async (resourceId: string): Promise => { - const [rawCount] = await db("comments") + async saveResource(resource: Resource): Promise { + await this.db('resources').insert(resource) + } + + async findResource(resourceId: string): Promise { + return ( + (await this.db('resources') + .where({ id: resourceId }) + .first()) ?? null + ) + } + + async saveComment(comment: Comment): Promise { + await this.db('comments').insert(comment) + } + + async countComments(resourceId: string): Promise { + const [rawCount] = await this.db('comments') .count() - .where({ resourceId }); - return parseInt(rawCount.count as string); - }; + .where({ resourceId }) + return parseInt(rawCount.count as string) + } -export const queryCommentsFrom = - (db: Knex) => - async ({ + async queryComments({ resourceId, limit, cursor, }: { - resourceId: string; - limit: number; - cursor: string | null; - }): Promise => { - const query = db("comments").where({ resourceId }); + resourceId: string + limit: number + cursor: string | null + }): Promise { + const query = this.db('comments').where({ resourceId }) if (cursor) { - query.andWhere("createdAt", "<", cursor); + query.andWhere('createdAt', '<', cursor) } - return await query.limit(limit); - }; + return await query.limit(limit) + } +} -export const saveCommentTo = - (db: Knex) => - async (comment: Comment): Promise => { - await db("comments").insert(comment); - }; +export class MainRepo extends RegionRepo { + async findUser(userId: string): Promise { + return ( + (await this.db('users').where('id', '=', userId).first()) ?? + null + ) + } -export const getRegionsFrom = (db: Knex) => async (): Promise> => - await db("regions").select(); + async queryUsers(): Promise { + return await this.db('users').select() + } -export const getRegionFrom = - (db: Knex) => - async (id: string): Promise => - (await db("regions").where({ id }).first()) ?? null; + async saveUser(user: UserRecord): Promise { + await this.db('users').insert(user) + } -export const getOrganizationRegionsFrom = - (db: Knex) => async (): Promise> => - await db("organizations_regions").select(); + async getUsersResourceAcl({ + resourceId, + userId, + }: ResourceAcl): Promise { + return ( + (await this.db('resource_acl') + .where({ userId, resourceId }) + .first()) ?? null + ) + } -export const queryOrganizationRegionsFrom = - (db: Knex) => - async ({ + async saveResourceAcl(resourceAcl: ResourceAcl): Promise { + await this.db('resource_acl').insert(resourceAcl) + } + + async countUsersResources(userId: string): Promise { + const [rawCount] = await this.db('resource_acl') + .count() + .where({ userId }) + return parseInt(rawCount.count as string) + } + + async findUsersResource({ + resourceId, + userId, + }: ResourceAcl): Promise { + return ( + (await this.db('resource_acl') + .where({ userId, resourceId }) + .first()) ?? null + ) + } + + async queryResources({ + userId, + limit, + cursor, + }: { + userId: string + limit: number + cursor: string | null + }): Promise { + let query = this.db('resources') + .join('resource_acl', 'resources.id', 'resource_acl.resourceId') + .where({ userId }) + if (cursor) { + query = query.andWhere('createdAt', '<', cursor) + } + const items = await query.orderBy('createdAt', 'desc').limit(limit) + return items + } + + async countResourceComments(resourceId: string): Promise { + const [rawCount] = await this.db('comments') + .count() + .where({ resourceId }) + return parseInt(rawCount.count as string) + } + + async queryComments({ + resourceId, + limit, + cursor, + }: { + resourceId: string + limit: number + cursor: string | null + }): Promise { + let query = this.db('comments').where({ resourceId }) + if (cursor) { + query = query.andWhere('createdAt', '<', cursor) + } + return await query.orderBy('createdAt', 'desc').limit(limit) + } + + async queryRegions( + params: + | { + connectionString?: string | undefined + } + | undefined = undefined, + ): Promise> { + const query = this.db('regions') + if (params && params.connectionString) query.where(params) + return await query.select() + } + + async findRegion(id: string): Promise { + return (await this.db('regions').where({ id }).first()) ?? null + } + + async queryOrganizationsRegions(): Promise> { + return await this.db('organizations_regions').select() + } + async findOrganizationRegion({ regionId, organizationId, - }: OrganizationsRegions): Promise => - (await db("organizations_regions") - .where({ regionId, organizationId }) - .first()) ?? null; - -export const saveRegionTo = (db: Knex) => async (region: Region) => - await db("regions").insert(region); - -export const saveOrganizationTo = - (db: Knex) => async (organization: Organization) => - await db("organizations").insert(organization); - -export const getOrganizationFrom = - (db: Knex) => - async (id: string): Promise => { + }: OrganizationsRegions): Promise { return ( - (await db("organizations").where({ id }).first()) ?? null - ); - }; + (await this.db('organizations_regions') + .where({ regionId, organizationId }) + .first()) ?? null + ) + } -export const getOrganizationsFrom = - (db: Knex) => async (): Promise => - await db("organizations").select(); + async saveRegion(region: Region): Promise { + await this.db('regions').insert(region) + } + async saveOrganization(organization: Organization) { + await this.db('organizations').insert(organization) + } + async findOrganization(id: string): Promise { + return ( + (await this.db('organizations').where({ id }).first()) ?? + null + ) + } -export const saveOrganizationsRegionsTo = - (db: Knex) => - async (or: OrganizationsRegions): Promise => - await db("organizations_regions").insert(or); + async queryOrganizations(): Promise { + return await this.db('organizations').select() + } -export const saveOrganizationAclTo = - (db: Knex) => - async (orgAcl: OrganizationAcl): Promise => { - await db("organization_acl").insert(orgAcl); - }; + async saveOrganizationRegion(or: OrganizationsRegions): Promise { + return await this.db('organizations_regions').insert( + or, + ) + } -export const queryOrganizationAclFrom = - (db: Knex) => - async ({ + async saveOrganizationAcl(orgAcl: OrganizationAcl): Promise { + await this.db('organization_acl').insert(orgAcl) + } + + async findOrganizationAcl({ userId, organizationId, - }: OrganizationAcl): Promise => - (await db("organization_acl") - .where({ userId, organizationId }) - .first()) ?? null; + }: OrganizationAcl): Promise { + return ( + (await this.db('organization_acl') + .where({ userId, organizationId }) + .first()) ?? null + ) + } -export const saveOrganizationResourceAclTo = - (db: Knex) => - async (item: OrganizationResourceAcl): Promise => { - await db("organization_resource_acl").insert(item); - }; + async saveOrganizationResourceAcl( + item: OrganizationResourceAcl, + ): Promise { + await this.db('organization_resource_acl').insert( + item, + ) + } -export const saveResourceRegionOrganizationTo = - (db: Knex) => async (item: ResourceRegionOrg) => { - await db("resource_region_organization").insert(item); - }; + async findResourceRegion({ + resourceId, + }: { + resourceId: string + }): Promise { + return ( + (await this.db('resource_region') + .where({ resourceId }) + .first()) ?? null + ) + } -export const queryResourceRegionOrganizationFrom = - (db: Knex) => - async (resourceId: string): Promise => - (await db("resource_region_organization") - .where({ resourceId }) - .first()) ?? null; + async saveResourceRegion(item: ResourceRegion): Promise { + await this.db('resource_region').insert(item) + } +} diff --git a/src/resolvers.ts b/src/resolvers.ts index b509b67..2fe6fed 100644 --- a/src/resolvers.ts +++ b/src/resolvers.ts @@ -1,22 +1,4 @@ -import { - getOrganizationsFrom, - getRegionsFrom, - queryOrganizationAclFrom, - queryOrganizationRegionsFrom, - queryResourceAclFrom, - saveOrganizationResourceAclTo, - saveOrganizationAclTo, - saveResourceAclTo, - saveResourceTo, - saveResourceRegionOrganizationTo, - saveCommentTo, - queryResourceFrom, - queryUser, - countCommentsIn, - queryCommentsFrom, - getUsersFrom, - saveUserTo, -} from "./repositories"; +import { RegionRepo, MainRepo } from "./repositories"; import { getComments } from "./services/comments"; import { createResource, getResources } from "./services/resources"; import { GraphQLError } from "graphql"; @@ -32,12 +14,11 @@ import { UserCreateArgs, } from "./types"; import { - bindRegionToOrganization, createOrganization, - getDbClient, - getMainDbClient, - getResourceDatabaseConnection, registerRegion, + getMainRepo, + getRegionRepo, + getResourceRepo, } from "./services/databaseManagement"; import { authorizeUserOrgRegion } from "./services/authz"; import cryptoRandomString from "crypto-random-string"; @@ -47,17 +28,17 @@ import cryptoRandomString from "crypto-random-string"; export const resolvers = { Query: { async users() { - return await getUsersFrom(getMainDbClient())(); + return await getMainRepo().queryUsers(); }, async user(_: unknown, args: { id: string }) { - return await queryUser(args.id); + return await getMainRepo().findUser(args.id); }, async resource( _: unknown, args: { id: string; userId: string }, ): Promise { - const mainDb = getMainDbClient(); - const maybeAcl = await queryResourceAclFrom(mainDb)({ + const mainRepo = getMainRepo(); + const maybeAcl = await mainRepo.getUsersResourceAcl({ userId: args.userId, resourceId: args.id, }); @@ -71,8 +52,8 @@ export const resolvers = { }, ); } - const db = await getResourceDatabaseConnection(args.id); - const maybeResource = await queryResourceFrom(db)(args.id); + const resourceRepo = await getResourceRepo(args.id); + const maybeResource = await resourceRepo.findResource(args.id); if (maybeResource == null) { throw new GraphQLError("Resource not found", { extensions: { code: "RESOURCE_NOT_FOUND" }, @@ -81,15 +62,19 @@ export const resolvers = { return maybeResource; }, async organizations() { - return await getOrganizationsFrom(getMainDbClient())(); + return await getMainRepo().queryOrganizations(); }, async regions() { - return await getRegionsFrom(getMainDbClient())(); + return await getMainRepo().queryRegions(); }, }, User: { async resources(parent: UserRecord, args: PaginationArgs) { - return await getResources({ userId: parent.id, ...args }); + const mainRepo = getMainRepo(); + return await getResources( + mainRepo.countUsersResources.bind(mainRepo), + mainRepo.queryResources.bind(mainRepo), + )({ userId: parent.id, ...args }); }, }, Resource: { @@ -97,10 +82,10 @@ export const resolvers = { parent: Resource, { limit, cursor }: PaginationArgs, ): Promise { - const db = await getResourceDatabaseConnection(parent.id); + const resourceRepo = await getResourceRepo(parent.id); return await getComments( - countCommentsIn(db), - queryCommentsFrom(db), + resourceRepo.countComments.bind(resourceRepo), + resourceRepo.queryComments.bind(resourceRepo), )({ resourceId: parent.id, limit, @@ -114,7 +99,7 @@ export const resolvers = { { input: { name } }: { input: UserCreateArgs }, ) { const id = cryptoRandomString({ length: 10 }); - await saveUserTo(getMainDbClient())({ id, name }); + await getMainRepo().saveUser({ id, name }); return id; }, async registerRegion( @@ -122,7 +107,7 @@ export const resolvers = { args: { name: string; connectionString: string; - maintenanceDb: string; + sslCaCert: string | null; }, ) { return await registerRegion(args); @@ -131,48 +116,44 @@ export const resolvers = { return await createOrganization(args.name); }, async addRegionToOrganization(_: unknown, args: OrganizationsRegions) { - await bindRegionToOrganization(args); + await getMainRepo().saveOrganizationRegion(args); }, async addUserToOrganization( _: unknown, { input: args }: { input: OrganizationAcl }, ) { - await saveOrganizationAclTo(getMainDbClient())(args); + await getMainRepo().saveOrganizationAcl(args); }, async createResource( _: unknown, { input: args }: { input: ResourceCreateArgs }, ) { - const mainDb = getMainDbClient(); + const mainRepo = getMainRepo(); await authorizeUserOrgRegion( - queryOrganizationAclFrom(mainDb), - queryOrganizationRegionsFrom(mainDb), + mainRepo.findOrganizationAcl.bind(mainRepo), + mainRepo.findOrganizationRegion.bind(mainRepo), )(args); - const db = - args.regionId && args.organizationId - ? await getDbClient({ - regionId: args.regionId, - organizationId: args.organizationId, - }) - : mainDb; + const repo = args.regionId + ? await getRegionRepo({ regionId: args.regionId }) + : mainRepo; const resourceId = await createResource( - saveResourceTo(db), - saveResourceAclTo(mainDb), + repo.saveResource.bind(repo), + mainRepo.saveResourceAcl.bind(mainRepo), )(args); if (args.organizationId) { - await saveOrganizationResourceAclTo(mainDb)({ + await mainRepo.saveOrganizationResourceAcl({ organizationId: args.organizationId, resourceId, }); - await saveResourceRegionOrganizationTo(mainDb)({ - resourceId, - organizationId: args.organizationId, - // i know its not null here, the authz function ensures it - regionId: args.regionId!, - }); + if (args.regionId) + await mainRepo.saveResourceRegion({ + resourceId, + // i know its not null here, the authz function ensures it + regionId: args.regionId, + }); } return resourceId; }, @@ -180,16 +161,16 @@ export const resolvers = { _: unknown, { input: args }: { input: CommentCreateArgs }, ) { - const mainDb = getMainDbClient(); - const resourceAcl = await queryResourceAclFrom(mainDb)(args); + const mainRepo = getMainRepo(); + const resourceAcl = await mainRepo.getUsersResourceAcl(args); if (!resourceAcl) throw new Error("The user doesn't have access to the given resource"); //2. get resource db client - const db = await getResourceDatabaseConnection(args.resourceId); + const resourceRepo = await getResourceRepo(args.resourceId); //3. save comment to db const id = cryptoRandomString({ length: 10 }); const createdAt = new Date(); - await saveCommentTo(db)({ id, createdAt, ...args }); + await resourceRepo.saveComment({ id, createdAt, ...args }); return id; }, }, diff --git a/src/schema.graphql b/src/schema.graphql index 86933bd..b1c6430 100644 --- a/src/schema.graphql +++ b/src/schema.graphql @@ -38,7 +38,6 @@ type Organization { type Region { id: String! name: String! - maintenanceDb: String! } type Query { @@ -78,7 +77,7 @@ type Mutation { registerRegion( name: String! connectionString: String! - maintenanceDb: String! + sslCaCert: String ): String! createOrganization(name: String!): String! addRegionToOrganization(organizationId: String!, regionId: String!): Boolean diff --git a/src/services/databaseManagement.ts b/src/services/databaseManagement.ts index 598ac62..691dcde 100644 --- a/src/services/databaseManagement.ts +++ b/src/services/databaseManagement.ts @@ -1,20 +1,11 @@ import { POSTGRES_URL } from "../config"; -import { - getOrganizationFrom, - getOrganizationRegionsFrom, - getRegionFrom, - queryResourceRegionOrganizationFrom, - saveOrganizationTo, - saveOrganizationsRegionsTo, - saveRegionTo, -} from "../repositories"; -import { OrganizationsRegions, Region } from "../types"; +import { RegionRepo, MainRepo } from "../repositories"; import knex, { Knex } from "knex"; import cryptoRandomString from "crypto-random-string"; -const migrateToLatest = async (client: Knex): Promise => { +const migrateToLatest = async (db: Knex): Promise => { const plannedMigrations: Array<{ file: string }> = ( - await client.migrate.list() + await db.migrate.list() )[1]; if (plannedMigrations.length > 0) { console.log( @@ -26,133 +17,146 @@ const migrateToLatest = async (client: Knex): Promise => { console.log("no migrations are planned"); } // TODO: make sure if a migration fails, all migrations are rolled back - await client.migrate.latest(); + await db.migrate.latest(); }; export const migrateAll = async (): Promise => { - await migrateToLatest(mainClient); - const databaseSchemas = await getAllDatabaseSchemaConnections(); + await migrateToLatest(mainRepo.db); + const repos = await getAllRepositories(); await Promise.all([ - ...databaseSchemas.map(async (sc) => await migrateToLatest(sc)), + ...repos.map(async (repo) => await migrateToLatest(repo.db)), ]); - // 1. get all regions from main DB - // 2. construct region specific knex clients and cache them by - // 3. structure the cache so that it accomodates client creation by resource id - // 4. get all organization regions from main DB - // 5. for in all regions for all organizations, run the migration - // 6. do not forget the migration for the main DB - // }; -const createDatabaseConfig = (connectionString: string): Knex.Config => { - return { +const createDatabaseConfig = ( + connectionString: string, + sslCaCert: string | null, +): Knex.Config => { + const config: Knex.Config = { client: "pg", connection: { connectionString, + ssl: sslCaCert + ? { + ca: sslCaCert, + rejectUnauthorized: true, + } + : undefined, }, - // connection: connectionString, migrations: { directory: "src/migrations", extension: "ts", }, }; + return config; }; -const mainClient = knex(createDatabaseConfig(POSTGRES_URL)); +const mainRepo = new MainRepo(knex(createDatabaseConfig(POSTGRES_URL, null))); -const _connectionStore: Map = new Map(); - -interface RegionWithMaybeOrganization { - regionId: string; - organizationId?: string | undefined; -} - -const _createConnectionKey = ({ - organizationId, +const _repoStore: Map = new Map(); +export const getRegionRepo = async ({ regionId, -}: RegionWithMaybeOrganization): string => { - return organizationId ? `${organizationId}@${regionId}` : regionId; -}; - -export const getDbClient = async ({ - regionId, - organizationId, -}: RegionWithMaybeOrganization): Promise => { - const connectionKey = _createConnectionKey({ organizationId, regionId }); - const maybeClient = _connectionStore.get(connectionKey); - if (maybeClient) return maybeClient; - const maybeRegion = await mainClient("regions") - .select() - .where({ id: regionId }) - .first(); +}: { + regionId: string | undefined; +}): Promise => { + if (!regionId) return mainRepo; + const maybeRepo = _repoStore.get(regionId); + if (maybeRepo) return maybeRepo; + const maybeRegion = await mainRepo.findRegion(regionId); if (!maybeRegion) throw Error(`region ${regionId} not found`); - const connectionString = organizationId - ? `${maybeRegion.connectionString}/${organizationId}` - : `${maybeRegion.connectionString}/${maybeRegion.maintenanceDb}`; - const client = knex(createDatabaseConfig(connectionString)); - _connectionStore.set(connectionKey, client); - return client; + const repo = new RegionRepo( + knex( + createDatabaseConfig(maybeRegion.connectionString, maybeRegion.sslCaCert), + ), + ); + _repoStore.set(regionId, repo); + return repo; }; -export const getMainDbClient = (): Knex => mainClient; +export const getMainRepo = (): MainRepo => mainRepo; export const registerRegion = async ({ name, connectionString, - maintenanceDb, + sslCaCert, }: { name: string; connectionString: string; - maintenanceDb: string; + sslCaCert: string | null; }): Promise => { - // TODO: validate the connectionString, so that the knex client can connect to it + const regions = await mainRepo.queryRegions({ connectionString }); + if (regions.length) throw new Error("This region is already registered"); const id = cryptoRandomString({ length: 10 }); - await saveRegionTo(mainClient)({ + const repo = new RegionRepo( + knex(createDatabaseConfig(connectionString, sslCaCert)), + ); + await migrateToLatest(repo.db); + _repoStore.set(id, repo); + + const sslmode = sslCaCert ? "require" : "disable"; + await setUpUserReplication({ + from: mainRepo.db, + to: repo.db, + regionName: name, + sslmode, + }); + await setUpResourceReplication({ + from: repo.db, + to: mainRepo.db, + regionName: name, + sslmode, + }); + + await mainRepo.saveRegion({ id, name, connectionString, - maintenanceDb, + sslCaCert, }); return id; }; export const createOrganization = async (name: string): Promise => { const id = cryptoRandomString({ length: 10 }); - await saveOrganizationTo(mainClient)({ id, name }); + await mainRepo.saveOrganization({ id, name }); return id; }; -const createDb = async (client: Knex, name: string): Promise => { - try { - await client.raw(`create database "${name}"`); - } catch (err) { - if (!(err instanceof Error)) throw err; - if (!err.message.includes("already exists")) throw err; - } +type ReplicationArgs = { + from: Knex; + to: Knex; + sslmode: string; + regionName: string; }; const setUpUserReplication = async ({ from, to, -}: { - from: Knex; - to: Knex; -}): Promise => { + sslmode, + regionName, +}: ReplicationArgs): Promise => { // TODO: ensure its created... - const connectionString: string = - from.client.config.connection.connectionString; try { await from.raw("CREATE PUBLICATION userspub FOR TABLE users;"); } catch (err) { if (!(err instanceof Error)) throw err; if (!err.message.includes("already exists")) throw err; } + + const fromUrl = new URL(from.client.config.connection.connectionString); + const fromDbName = fromUrl.pathname.replace("/", ""); + const subName = `userssub_${regionName}`; + const rawSqeel = `SELECT * FROM aiven_extras.pg_create_subscription( + '${subName}', + 'dbname=${fromDbName} host=${fromUrl.hostname} port=${fromUrl.port} sslmode=${sslmode} user=${fromUrl.username} password=${fromUrl.password}', + 'userspub', + '${subName}', + TRUE, + TRUE + );`; try { - const toUrl = new URL(to.client.config.connection.connectionString); - await to.raw( - `CREATE SUBSCRIPTION userssub_${toUrl.pathname.replace("/", "")} CONNECTION '${connectionString}' PUBLICATION userspub;`, - ); + await to.raw(rawSqeel); } catch (err) { if (!(err instanceof Error)) throw err; if (!err.message.includes("already exists")) throw err; @@ -161,83 +165,48 @@ const setUpUserReplication = async ({ const setUpResourceReplication = async ({ from, - fromRegionName, to, -}: { - from: Knex; - fromRegionName: string; - to: Knex; -}): Promise => { + regionName, + sslmode, +}: ReplicationArgs): Promise => { // TODO: ensure its created... - const connectionString: string = - from.client.config.connection.connectionString; - const connUrl = new URL(connectionString); try { await from.raw("CREATE PUBLICATION resourcepub FOR TABLE resources;"); } catch (err) { if (!(err instanceof Error)) throw err; if (!err.message.includes("already exists")) throw err; } + + const fromUrl = new URL(from.client.config.connection.connectionString); + const fromDbName = fromUrl.pathname.replace("/", ""); + const subName = `resourcesub_${regionName}`; + const rawSqeel = `SELECT * FROM aiven_extras.pg_create_subscription( + '${subName}', + 'dbname=${fromDbName} host=${fromUrl.hostname} port=${fromUrl.port} sslmode=${sslmode} user=${fromUrl.username} password=${fromUrl.password}', + 'resourcepub', + '${subName}', + TRUE, + TRUE + );`; try { - await to.raw( - `CREATE SUBSCRIPTION "resroucesub_${fromRegionName.replace( - " ", - "", - )}_${connUrl.pathname.replace( - "/", - "", - )}" CONNECTION '${connectionString}' PUBLICATION resourcepub;`, - ); + await to.raw(rawSqeel); } catch (err) { if (!(err instanceof Error)) throw err; if (!err.message.includes("already exists")) throw err; } }; -export const bindRegionToOrganization = async ({ - regionId, - organizationId, -}: OrganizationsRegions): Promise => { - const region = await getRegionFrom(mainClient)(regionId); - if (!region) throw Error(`region ${regionId} not found`); - const organization = await getOrganizationFrom(mainClient)(organizationId); - if (!organization) throw Error(`organization ${organizationId} not found`); - - const regionClient = await getDbClient({ regionId }); - - await createDb(regionClient, organizationId); - - const client = await getDbClient({ organizationId, regionId }); - const connectionKey = _createConnectionKey({ organizationId, regionId }); - - await migrateToLatest(client); - - await setUpUserReplication({ from: mainClient, to: client }); - await setUpResourceReplication({ - from: client, - fromRegionName: region.name, - to: mainClient, - }); - - _connectionStore.set(connectionKey, client); - await saveOrganizationsRegionsTo(mainClient)({ organizationId, regionId }); -}; - -export const getAllDatabaseSchemaConnections = async (): Promise => { - const organizationRegions = await getOrganizationRegionsFrom(mainClient)(); - const clients = await Promise.all( - organizationRegions.map(async (or) => { - const client = await getDbClient(or); - return client; - }), +export const getAllRepositories = async (): Promise => { + const regions = await mainRepo.queryRegions({}); + const regionRepos = await Promise.all( + regions.map(async (region) => await getRegionRepo({ regionId: region.id })), ); - return [mainClient, ...clients]; + return [mainRepo, ...regionRepos]; }; -export const getResourceDatabaseConnection = async ( +export const getResourceRepo = async ( resourceId: string, -): Promise => { - const resourceRegionOrg = - await queryResourceRegionOrganizationFrom(mainClient)(resourceId); - return resourceRegionOrg ? await getDbClient(resourceRegionOrg) : mainClient; +): Promise => { + const resourceRegion = await mainRepo.findResourceRegion({ resourceId }); + return resourceRegion ? await getRegionRepo(resourceRegion) : getMainRepo(); }; diff --git a/src/services/resources.ts b/src/services/resources.ts index c2ebff8..1ce8be3 100644 --- a/src/services/resources.ts +++ b/src/services/resources.ts @@ -1,31 +1,33 @@ -import cryptoRandomString from "crypto-random-string"; -import { countResources, queryResources } from "../repositories"; +import cryptoRandomString from 'crypto-random-string' import { Resource, PaginationArgs, ResourceCollection, ResourceCreateArgs, ResourceAcl, -} from "../types"; +} from '../types' interface GetResourcesArgs extends PaginationArgs { - userId: string; + userId: string } -export const getResources = async ( - params: GetResourcesArgs, -): Promise => { - const totalCount = await countResources(params.userId); - const items = await queryResources(params); - let cursor = null; - if (items.length > 0) { - cursor = items.slice(-1)[0].createdAt.toISOString(); +export const getResources = + ( + countResources: (userId: string) => Promise, + queryResources: (params: GetResourcesArgs) => Promise, + ) => + async (params: GetResourcesArgs): Promise => { + const totalCount = await countResources(params.userId) + const items = await queryResources(params) + let cursor = null + if (items.length > 0) { + cursor = items.slice(-1)[0].createdAt.toISOString() + } + return { + totalCount, + items, + cursor, + } } - return { - totalCount, - items, - cursor, - }; -}; export const createResource = ( @@ -37,9 +39,9 @@ export const createResource = //2. if org, validate if user has access to the org //3. if org and region, validate if org has access to region //4. create resource - const id = cryptoRandomString({ length: 10 }); - const resource = { id, name, createdAt: new Date() }; - await resourceSaver(resource); - await resourceAclSaver({ resourceId: id, userId }); - return id; - }; + const id = cryptoRandomString({ length: 10 }) + const resource = { id, name, createdAt: new Date() } + await resourceSaver(resource) + await resourceAclSaver({ resourceId: id, userId }) + return id + } diff --git a/src/types.ts b/src/types.ts index cfb6843..36898a3 100644 --- a/src/types.ts +++ b/src/types.ts @@ -65,7 +65,7 @@ export interface Region { id: string; name: string; connectionString: string; - maintenanceDb: string; + sslCaCert: string | null; } export interface Organization { @@ -93,6 +93,7 @@ export interface ResourceRegion { regionId: string; } -export interface ResourceRegionOrg extends ResourceRegion { +export interface ResourceOrganization { + resourceId: string; organizationId: string; }