feat: rework for soft multi organizations

This commit is contained in:
Gergő Jedlicska
2024-02-20 16:59:31 +01:00
parent 5e05baef6c
commit 68804d37c7
13 changed files with 517 additions and 406 deletions
+1
View File
@@ -4,5 +4,6 @@
.envrc
.swc
node_modules
.ca-cert*
dist
+25 -7
View File
@@ -1,8 +1,26 @@
export default {
client: 'pg',
connection: process.env.POSTGRES_URL,
import { Knex } from "knex";
import fs from "fs";
import path from "path";
console.log(`foobar ${process.env.POSTGRES_CA_CERT_PATH}`);
const config: Knex.Config = {
client: "pg",
connection: {
connectionString: process.env.POSTGRES_URL,
ssl: process.env.POSTGRES_CA_CERT_PATH
? {
ca: fs.readFileSync(
path.resolve(__dirname, process.env.POSTGRES_CA_CERT_PATH),
),
rejectUnauthorized: true,
}
: undefined,
},
migrations: {
directory: 'src/migrations',
extension: 'ts'
}
}
directory: "src/migrations",
extension: "ts",
},
};
export default config;
+7 -8
View File
@@ -1,9 +1,8 @@
import 'dotenv/config'
import { parseEnv } from 'znv'
import { z } from 'zod'
import "dotenv/config";
import { parseEnv } from "znv";
import { z } from "zod";
export const { POSTGRES_URL } = parseEnv(process.env, {
POSTGRES_URL: z.string().min(1)
})
console.log([POSTGRES_URL].join(', '))
export const { POSTGRES_URL, POSTGRES_CA_CERT_PATH } = parseEnv(process.env, {
POSTGRES_URL: z.string().min(1),
POSTGRES_CA_CERT_PATH: z.string().min(1).nullish(),
});
@@ -0,0 +1,15 @@
import type { Knex } from "knex";
const regionsTableName = "regions";
export async function up(knex: Knex): Promise<void> {
await knex.schema.alterTable(regionsTableName, (table) => {
table.dropColumn("maintenanceDb");
});
}
export async function down(knex: Knex): Promise<void> {
await knex.schema.alterTable(regionsTableName, (table) => {
table.text("maintenanceDb").notNullable().defaultTo("region");
});
}
@@ -0,0 +1,56 @@
import type { Knex } from "knex";
const tableName = "resource_region_organization";
export async function up(knex: Knex): Promise<void> {
await knex.schema.dropTable(tableName);
await knex.schema.createTable("resource_region", (table) => {
table
.string("resourceId")
.references("id")
.inTable("resources")
.onDelete("cascade")
.primary();
table
.string("regionId")
.references("id")
.inTable("regions")
.onDelete("cascade");
});
await knex.schema.createTable("resource_organization", (table) => {
table
.string("resourceId")
.references("id")
.inTable("resources")
.onDelete("cascade")
.primary();
table
.string("organizationId")
.references("id")
.inTable("organizations")
.onDelete("cascade");
});
}
export async function down(knex: Knex): Promise<void> {
await knex.schema.dropTable("resource_organization");
await knex.schema.dropTable("resource_region");
await knex.schema.createTable(tableName, (table) => {
table
.string("resourceId")
.references("id")
.inTable("resources")
.onDelete("cascade")
.primary();
table
.string("regionId")
.references("id")
.inTable("regions")
.onDelete("cascade");
table
.string("organizationId")
.references("id")
.inTable("organizations")
.onDelete("cascade");
});
}
@@ -0,0 +1,13 @@
import type { Knex } from "knex";
export async function up(knex: Knex): Promise<void> {
await knex.schema.alterTable("regions", (table) => {
table.text("sslCaCert").nullable();
});
}
export async function down(knex: Knex): Promise<void> {
await knex.schema.alterTable("regions", (table) => {
table.dropColumn("sslCaCert");
});
}
@@ -0,0 +1,13 @@
import type { Knex } from "knex";
export async function up(knex: Knex): Promise<void> {
await knex.schema.alterTable("regions", (table) => {
table.unique("name");
});
}
export async function down(knex: Knex): Promise<void> {
await knex.schema.alterTable("regions", (table) => {
table.dropUnique(["name"]);
});
}
+204 -160
View File
@@ -1,5 +1,4 @@
import { Knex } from "knex";
import { knex } from "./db";
import { Knex } from 'knex'
import {
UserRecord,
Resource,
@@ -11,190 +10,235 @@ import {
OrganizationAcl,
OrganizationResourceAcl,
ResourceRegion,
ResourceRegionOrg,
} from "./types";
} from './types'
const Users = () => knex<UserRecord>("users");
const Resources = () => knex<Resource>("resources");
const ResourceAclRepo = () => knex<ResourceAcl>("resource_acl");
export class RegionRepo {
db: Knex
export const queryUser = async (userId: string): Promise<UserRecord | null> => {
return (await Users().where("id", "=", userId).first()) ?? null;
};
export const getUsersFrom = (db: Knex) => async (): Promise<UserRecord[]> => {
return await db<UserRecord>("users").select();
};
export const saveUserTo =
(db: Knex) =>
async (user: UserRecord): Promise<void> => {
await db<UserRecord>("users").insert(user);
};
export const saveResourceTo =
(db: Knex) =>
async (resource: Resource): Promise<void> => {
await db<Resource>("resources").insert(resource);
};
export const queryResourceFrom =
(db: Knex) =>
async (resourceId: string): Promise<Resource | null> => {
return (
(await db<Resource>("resources").where({ id: resourceId }).first()) ??
null
);
};
export const queryResourceAclFrom =
(db: Knex) =>
async ({ resourceId, userId }: ResourceAcl): Promise<ResourceAcl | null> => {
return (
(await db<ResourceAcl>("resource_acl")
.where({ userId, resourceId })
.first()) ?? null
);
};
export const saveResourceAclTo =
(db: Knex) =>
async (resourceAcl: ResourceAcl): Promise<void> => {
await db<ResourceAcl>("resource_acl").insert(resourceAcl);
};
export const countResources = async (userId: string): Promise<number> => {
const [rawCount] = await ResourceAclRepo().count().where({ userId });
return parseInt(rawCount.count as string);
};
export const queryResources = async ({
userId,
limit,
cursor,
}: {
userId: string;
limit: number;
cursor: string | null;
}) => {
const query = Resources()
.join("resource_acl", "resources.id", "resource_acl.resourceId")
.where({ userId });
if (cursor) {
query.andWhere("createdAt", "<", cursor);
constructor(db: Knex) {
this.db = db
}
return await query.limit(limit);
};
export const countCommentsIn =
(db: Knex) =>
async (resourceId: string): Promise<number> => {
const [rawCount] = await db<Comment>("comments")
async saveResource(resource: Resource): Promise<void> {
await this.db<Resource>('resources').insert(resource)
}
async findResource(resourceId: string): Promise<Resource | null> {
return (
(await this.db<Resource>('resources')
.where({ id: resourceId })
.first()) ?? null
)
}
async saveComment(comment: Comment): Promise<void> {
await this.db<Comment>('comments').insert(comment)
}
async countComments(resourceId: string): Promise<number> {
const [rawCount] = await this.db<Comment>('comments')
.count()
.where({ resourceId });
return parseInt(rawCount.count as string);
};
.where({ resourceId })
return parseInt(rawCount.count as string)
}
export const queryCommentsFrom =
(db: Knex) =>
async ({
async queryComments({
resourceId,
limit,
cursor,
}: {
resourceId: string;
limit: number;
cursor: string | null;
}): Promise<Comment[]> => {
const query = db<Comment>("comments").where({ resourceId });
resourceId: string
limit: number
cursor: string | null
}): Promise<Comment[]> {
const query = this.db<Comment>('comments').where({ resourceId })
if (cursor) {
query.andWhere("createdAt", "<", cursor);
query.andWhere('createdAt', '<', cursor)
}
return await query.limit(limit);
};
return await query.limit(limit)
}
}
export const saveCommentTo =
(db: Knex) =>
async (comment: Comment): Promise<void> => {
await db<Comment>("comments").insert(comment);
};
export class MainRepo extends RegionRepo {
async findUser(userId: string): Promise<UserRecord | null> {
return (
(await this.db<UserRecord>('users').where('id', '=', userId).first()) ??
null
)
}
export const getRegionsFrom = (db: Knex) => async (): Promise<Array<Region>> =>
await db<Region>("regions").select();
async queryUsers(): Promise<UserRecord[]> {
return await this.db<UserRecord>('users').select()
}
export const getRegionFrom =
(db: Knex) =>
async (id: string): Promise<Region | null> =>
(await db<Region>("regions").where({ id }).first()) ?? null;
async saveUser(user: UserRecord): Promise<void> {
await this.db<UserRecord>('users').insert(user)
}
export const getOrganizationRegionsFrom =
(db: Knex) => async (): Promise<Array<OrganizationsRegions>> =>
await db<OrganizationsRegions>("organizations_regions").select();
async getUsersResourceAcl({
resourceId,
userId,
}: ResourceAcl): Promise<ResourceAcl | null> {
return (
(await this.db<ResourceAcl>('resource_acl')
.where({ userId, resourceId })
.first()) ?? null
)
}
export const queryOrganizationRegionsFrom =
(db: Knex) =>
async ({
async saveResourceAcl(resourceAcl: ResourceAcl): Promise<void> {
await this.db<ResourceAcl>('resource_acl').insert(resourceAcl)
}
async countUsersResources(userId: string): Promise<number> {
const [rawCount] = await this.db<ResourceAcl>('resource_acl')
.count()
.where({ userId })
return parseInt(rawCount.count as string)
}
async findUsersResource({
resourceId,
userId,
}: ResourceAcl): Promise<ResourceAcl | null> {
return (
(await this.db<ResourceAcl>('resource_acl')
.where({ userId, resourceId })
.first()) ?? null
)
}
async queryResources({
userId,
limit,
cursor,
}: {
userId: string
limit: number
cursor: string | null
}): Promise<Resource[]> {
let query = this.db<Resource & ResourceAcl>('resources')
.join('resource_acl', 'resources.id', 'resource_acl.resourceId')
.where({ userId })
if (cursor) {
query = query.andWhere('createdAt', '<', cursor)
}
const items = await query.orderBy('createdAt', 'desc').limit(limit)
return items
}
async countResourceComments(resourceId: string): Promise<number> {
const [rawCount] = await this.db<Comment>('comments')
.count()
.where({ resourceId })
return parseInt(rawCount.count as string)
}
async queryComments({
resourceId,
limit,
cursor,
}: {
resourceId: string
limit: number
cursor: string | null
}): Promise<Comment[]> {
let query = this.db<Comment>('comments').where({ resourceId })
if (cursor) {
query = query.andWhere('createdAt', '<', cursor)
}
return await query.orderBy('createdAt', 'desc').limit(limit)
}
async queryRegions(
params:
| {
connectionString?: string | undefined
}
| undefined = undefined,
): Promise<Array<Region>> {
const query = this.db<Region>('regions')
if (params && params.connectionString) query.where(params)
return await query.select()
}
async findRegion(id: string): Promise<Region | null> {
return (await this.db<Region>('regions').where({ id }).first()) ?? null
}
async queryOrganizationsRegions(): Promise<Array<OrganizationsRegions>> {
return await this.db<OrganizationsRegions>('organizations_regions').select()
}
async findOrganizationRegion({
regionId,
organizationId,
}: OrganizationsRegions): Promise<OrganizationsRegions | null> =>
(await db<OrganizationsRegions>("organizations_regions")
.where({ regionId, organizationId })
.first()) ?? null;
export const saveRegionTo = (db: Knex) => async (region: Region) =>
await db<Region>("regions").insert(region);
export const saveOrganizationTo =
(db: Knex) => async (organization: Organization) =>
await db<Organization>("organizations").insert(organization);
export const getOrganizationFrom =
(db: Knex) =>
async (id: string): Promise<Organization | null> => {
}: OrganizationsRegions): Promise<OrganizationsRegions | null> {
return (
(await db<Organization>("organizations").where({ id }).first()) ?? null
);
};
(await this.db<OrganizationsRegions>('organizations_regions')
.where({ regionId, organizationId })
.first()) ?? null
)
}
export const getOrganizationsFrom =
(db: Knex) => async (): Promise<Organization[]> =>
await db<Organization>("organizations").select();
async saveRegion(region: Region): Promise<void> {
await this.db<Region>('regions').insert(region)
}
async saveOrganization(organization: Organization) {
await this.db<Organization>('organizations').insert(organization)
}
async findOrganization(id: string): Promise<Organization | null> {
return (
(await this.db<Organization>('organizations').where({ id }).first()) ??
null
)
}
export const saveOrganizationsRegionsTo =
(db: Knex) =>
async (or: OrganizationsRegions): Promise<void> =>
await db<OrganizationsRegions>("organizations_regions").insert(or);
async queryOrganizations(): Promise<Organization[]> {
return await this.db<Organization>('organizations').select()
}
export const saveOrganizationAclTo =
(db: Knex) =>
async (orgAcl: OrganizationAcl): Promise<void> => {
await db<OrganizationsRegions>("organization_acl").insert(orgAcl);
};
async saveOrganizationRegion(or: OrganizationsRegions): Promise<void> {
return await this.db<OrganizationsRegions>('organizations_regions').insert(
or,
)
}
export const queryOrganizationAclFrom =
(db: Knex) =>
async ({
async saveOrganizationAcl(orgAcl: OrganizationAcl): Promise<void> {
await this.db<OrganizationsRegions>('organization_acl').insert(orgAcl)
}
async findOrganizationAcl({
userId,
organizationId,
}: OrganizationAcl): Promise<OrganizationAcl | null> =>
(await db<OrganizationAcl>("organization_acl")
.where({ userId, organizationId })
.first()) ?? null;
}: OrganizationAcl): Promise<OrganizationAcl | null> {
return (
(await this.db<OrganizationAcl>('organization_acl')
.where({ userId, organizationId })
.first()) ?? null
)
}
export const saveOrganizationResourceAclTo =
(db: Knex) =>
async (item: OrganizationResourceAcl): Promise<void> => {
await db<OrganizationResourceAcl>("organization_resource_acl").insert(item);
};
async saveOrganizationResourceAcl(
item: OrganizationResourceAcl,
): Promise<void> {
await this.db<OrganizationResourceAcl>('organization_resource_acl').insert(
item,
)
}
export const saveResourceRegionOrganizationTo =
(db: Knex) => async (item: ResourceRegionOrg) => {
await db<ResourceRegionOrg>("resource_region_organization").insert(item);
};
async findResourceRegion({
resourceId,
}: {
resourceId: string
}): Promise<ResourceRegion | null> {
return (
(await this.db<ResourceRegion>('resource_region')
.where({ resourceId })
.first()) ?? null
)
}
export const queryResourceRegionOrganizationFrom =
(db: Knex) =>
async (resourceId: string): Promise<ResourceRegion | null> =>
(await db<ResourceRegionOrg>("resource_region_organization")
.where({ resourceId })
.first()) ?? null;
async saveResourceRegion(item: ResourceRegion): Promise<void> {
await this.db<ResourceRegion>('resource_region').insert(item)
}
}
+43 -62
View File
@@ -1,22 +1,4 @@
import {
getOrganizationsFrom,
getRegionsFrom,
queryOrganizationAclFrom,
queryOrganizationRegionsFrom,
queryResourceAclFrom,
saveOrganizationResourceAclTo,
saveOrganizationAclTo,
saveResourceAclTo,
saveResourceTo,
saveResourceRegionOrganizationTo,
saveCommentTo,
queryResourceFrom,
queryUser,
countCommentsIn,
queryCommentsFrom,
getUsersFrom,
saveUserTo,
} from "./repositories";
import { RegionRepo, MainRepo } from "./repositories";
import { getComments } from "./services/comments";
import { createResource, getResources } from "./services/resources";
import { GraphQLError } from "graphql";
@@ -32,12 +14,11 @@ import {
UserCreateArgs,
} from "./types";
import {
bindRegionToOrganization,
createOrganization,
getDbClient,
getMainDbClient,
getResourceDatabaseConnection,
registerRegion,
getMainRepo,
getRegionRepo,
getResourceRepo,
} from "./services/databaseManagement";
import { authorizeUserOrgRegion } from "./services/authz";
import cryptoRandomString from "crypto-random-string";
@@ -47,17 +28,17 @@ import cryptoRandomString from "crypto-random-string";
export const resolvers = {
Query: {
async users() {
return await getUsersFrom(getMainDbClient())();
return await getMainRepo().queryUsers();
},
async user(_: unknown, args: { id: string }) {
return await queryUser(args.id);
return await getMainRepo().findUser(args.id);
},
async resource(
_: unknown,
args: { id: string; userId: string },
): Promise<Resource> {
const mainDb = getMainDbClient();
const maybeAcl = await queryResourceAclFrom(mainDb)({
const mainRepo = getMainRepo();
const maybeAcl = await mainRepo.getUsersResourceAcl({
userId: args.userId,
resourceId: args.id,
});
@@ -71,8 +52,8 @@ export const resolvers = {
},
);
}
const db = await getResourceDatabaseConnection(args.id);
const maybeResource = await queryResourceFrom(db)(args.id);
const resourceRepo = await getResourceRepo(args.id);
const maybeResource = await resourceRepo.findResource(args.id);
if (maybeResource == null) {
throw new GraphQLError("Resource not found", {
extensions: { code: "RESOURCE_NOT_FOUND" },
@@ -81,15 +62,19 @@ export const resolvers = {
return maybeResource;
},
async organizations() {
return await getOrganizationsFrom(getMainDbClient())();
return await getMainRepo().queryOrganizations();
},
async regions() {
return await getRegionsFrom(getMainDbClient())();
return await getMainRepo().queryRegions();
},
},
User: {
async resources(parent: UserRecord, args: PaginationArgs) {
return await getResources({ userId: parent.id, ...args });
const mainRepo = getMainRepo();
return await getResources(
mainRepo.countUsersResources.bind(mainRepo),
mainRepo.queryResources.bind(mainRepo),
)({ userId: parent.id, ...args });
},
},
Resource: {
@@ -97,10 +82,10 @@ export const resolvers = {
parent: Resource,
{ limit, cursor }: PaginationArgs,
): Promise<CommentCollection> {
const db = await getResourceDatabaseConnection(parent.id);
const resourceRepo = await getResourceRepo(parent.id);
return await getComments(
countCommentsIn(db),
queryCommentsFrom(db),
resourceRepo.countComments.bind(resourceRepo),
resourceRepo.queryComments.bind(resourceRepo),
)({
resourceId: parent.id,
limit,
@@ -114,7 +99,7 @@ export const resolvers = {
{ input: { name } }: { input: UserCreateArgs },
) {
const id = cryptoRandomString({ length: 10 });
await saveUserTo(getMainDbClient())({ id, name });
await getMainRepo().saveUser({ id, name });
return id;
},
async registerRegion(
@@ -122,7 +107,7 @@ export const resolvers = {
args: {
name: string;
connectionString: string;
maintenanceDb: string;
sslCaCert: string | null;
},
) {
return await registerRegion(args);
@@ -131,48 +116,44 @@ export const resolvers = {
return await createOrganization(args.name);
},
async addRegionToOrganization(_: unknown, args: OrganizationsRegions) {
await bindRegionToOrganization(args);
await getMainRepo().saveOrganizationRegion(args);
},
async addUserToOrganization(
_: unknown,
{ input: args }: { input: OrganizationAcl },
) {
await saveOrganizationAclTo(getMainDbClient())(args);
await getMainRepo().saveOrganizationAcl(args);
},
async createResource(
_: unknown,
{ input: args }: { input: ResourceCreateArgs },
) {
const mainDb = getMainDbClient();
const mainRepo = getMainRepo();
await authorizeUserOrgRegion(
queryOrganizationAclFrom(mainDb),
queryOrganizationRegionsFrom(mainDb),
mainRepo.findOrganizationAcl.bind(mainRepo),
mainRepo.findOrganizationRegion.bind(mainRepo),
)(args);
const db =
args.regionId && args.organizationId
? await getDbClient({
regionId: args.regionId,
organizationId: args.organizationId,
})
: mainDb;
const repo = args.regionId
? await getRegionRepo({ regionId: args.regionId })
: mainRepo;
const resourceId = await createResource(
saveResourceTo(db),
saveResourceAclTo(mainDb),
repo.saveResource.bind(repo),
mainRepo.saveResourceAcl.bind(mainRepo),
)(args);
if (args.organizationId) {
await saveOrganizationResourceAclTo(mainDb)({
await mainRepo.saveOrganizationResourceAcl({
organizationId: args.organizationId,
resourceId,
});
await saveResourceRegionOrganizationTo(mainDb)({
resourceId,
organizationId: args.organizationId,
// i know its not null here, the authz function ensures it
regionId: args.regionId!,
});
if (args.regionId)
await mainRepo.saveResourceRegion({
resourceId,
// i know its not null here, the authz function ensures it
regionId: args.regionId,
});
}
return resourceId;
},
@@ -180,16 +161,16 @@ export const resolvers = {
_: unknown,
{ input: args }: { input: CommentCreateArgs },
) {
const mainDb = getMainDbClient();
const resourceAcl = await queryResourceAclFrom(mainDb)(args);
const mainRepo = getMainRepo();
const resourceAcl = await mainRepo.getUsersResourceAcl(args);
if (!resourceAcl)
throw new Error("The user doesn't have access to the given resource");
//2. get resource db client
const db = await getResourceDatabaseConnection(args.resourceId);
const resourceRepo = await getResourceRepo(args.resourceId);
//3. save comment to db
const id = cryptoRandomString({ length: 10 });
const createdAt = new Date();
await saveCommentTo(db)({ id, createdAt, ...args });
await resourceRepo.saveComment({ id, createdAt, ...args });
return id;
},
},
+1 -2
View File
@@ -38,7 +38,6 @@ type Organization {
type Region {
id: String!
name: String!
maintenanceDb: String!
}
type Query {
@@ -78,7 +77,7 @@ type Mutation {
registerRegion(
name: String!
connectionString: String!
maintenanceDb: String!
sslCaCert: String
): String!
createOrganization(name: String!): String!
addRegionToOrganization(organizationId: String!, regionId: String!): Boolean
+110 -141
View File
@@ -1,20 +1,11 @@
import { POSTGRES_URL } from "../config";
import {
getOrganizationFrom,
getOrganizationRegionsFrom,
getRegionFrom,
queryResourceRegionOrganizationFrom,
saveOrganizationTo,
saveOrganizationsRegionsTo,
saveRegionTo,
} from "../repositories";
import { OrganizationsRegions, Region } from "../types";
import { RegionRepo, MainRepo } from "../repositories";
import knex, { Knex } from "knex";
import cryptoRandomString from "crypto-random-string";
const migrateToLatest = async (client: Knex): Promise<void> => {
const migrateToLatest = async (db: Knex): Promise<void> => {
const plannedMigrations: Array<{ file: string }> = (
await client.migrate.list()
await db.migrate.list()
)[1];
if (plannedMigrations.length > 0) {
console.log(
@@ -26,133 +17,146 @@ const migrateToLatest = async (client: Knex): Promise<void> => {
console.log("no migrations are planned");
}
// TODO: make sure if a migration fails, all migrations are rolled back
await client.migrate.latest();
await db.migrate.latest();
};
export const migrateAll = async (): Promise<void> => {
await migrateToLatest(mainClient);
const databaseSchemas = await getAllDatabaseSchemaConnections();
await migrateToLatest(mainRepo.db);
const repos = await getAllRepositories();
await Promise.all([
...databaseSchemas.map(async (sc) => await migrateToLatest(sc)),
...repos.map(async (repo) => await migrateToLatest(repo.db)),
]);
// 1. get all regions from main DB
// 2. construct region specific knex clients and cache them by
// 3. structure the cache so that it accomodates client creation by resource id
// 4. get all organization regions from main DB
// 5. for in all regions for all organizations, run the migration
// 6. do not forget the migration for the main DB
//
};
const createDatabaseConfig = (connectionString: string): Knex.Config => {
return {
const createDatabaseConfig = (
connectionString: string,
sslCaCert: string | null,
): Knex.Config => {
const config: Knex.Config = {
client: "pg",
connection: {
connectionString,
ssl: sslCaCert
? {
ca: sslCaCert,
rejectUnauthorized: true,
}
: undefined,
},
// connection: connectionString,
migrations: {
directory: "src/migrations",
extension: "ts",
},
};
return config;
};
const mainClient = knex(createDatabaseConfig(POSTGRES_URL));
const mainRepo = new MainRepo(knex(createDatabaseConfig(POSTGRES_URL, null)));
const _connectionStore: Map<string, Knex> = new Map();
interface RegionWithMaybeOrganization {
regionId: string;
organizationId?: string | undefined;
}
const _createConnectionKey = ({
organizationId,
const _repoStore: Map<string, RegionRepo> = new Map();
export const getRegionRepo = async ({
regionId,
}: RegionWithMaybeOrganization): string => {
return organizationId ? `${organizationId}@${regionId}` : regionId;
};
export const getDbClient = async ({
regionId,
organizationId,
}: RegionWithMaybeOrganization): Promise<Knex> => {
const connectionKey = _createConnectionKey({ organizationId, regionId });
const maybeClient = _connectionStore.get(connectionKey);
if (maybeClient) return maybeClient;
const maybeRegion = await mainClient<Region>("regions")
.select()
.where({ id: regionId })
.first();
}: {
regionId: string | undefined;
}): Promise<RegionRepo> => {
if (!regionId) return mainRepo;
const maybeRepo = _repoStore.get(regionId);
if (maybeRepo) return maybeRepo;
const maybeRegion = await mainRepo.findRegion(regionId);
if (!maybeRegion) throw Error(`region ${regionId} not found`);
const connectionString = organizationId
? `${maybeRegion.connectionString}/${organizationId}`
: `${maybeRegion.connectionString}/${maybeRegion.maintenanceDb}`;
const client = knex(createDatabaseConfig(connectionString));
_connectionStore.set(connectionKey, client);
return client;
const repo = new RegionRepo(
knex(
createDatabaseConfig(maybeRegion.connectionString, maybeRegion.sslCaCert),
),
);
_repoStore.set(regionId, repo);
return repo;
};
export const getMainDbClient = (): Knex => mainClient;
export const getMainRepo = (): MainRepo => mainRepo;
export const registerRegion = async ({
name,
connectionString,
maintenanceDb,
sslCaCert,
}: {
name: string;
connectionString: string;
maintenanceDb: string;
sslCaCert: string | null;
}): Promise<string> => {
// TODO: validate the connectionString, so that the knex client can connect to it
const regions = await mainRepo.queryRegions({ connectionString });
if (regions.length) throw new Error("This region is already registered");
const id = cryptoRandomString({ length: 10 });
await saveRegionTo(mainClient)({
const repo = new RegionRepo(
knex(createDatabaseConfig(connectionString, sslCaCert)),
);
await migrateToLatest(repo.db);
_repoStore.set(id, repo);
const sslmode = sslCaCert ? "require" : "disable";
await setUpUserReplication({
from: mainRepo.db,
to: repo.db,
regionName: name,
sslmode,
});
await setUpResourceReplication({
from: repo.db,
to: mainRepo.db,
regionName: name,
sslmode,
});
await mainRepo.saveRegion({
id,
name,
connectionString,
maintenanceDb,
sslCaCert,
});
return id;
};
export const createOrganization = async (name: string): Promise<string> => {
const id = cryptoRandomString({ length: 10 });
await saveOrganizationTo(mainClient)({ id, name });
await mainRepo.saveOrganization({ id, name });
return id;
};
const createDb = async (client: Knex, name: string): Promise<void> => {
try {
await client.raw(`create database "${name}"`);
} catch (err) {
if (!(err instanceof Error)) throw err;
if (!err.message.includes("already exists")) throw err;
}
type ReplicationArgs = {
from: Knex;
to: Knex;
sslmode: string;
regionName: string;
};
const setUpUserReplication = async ({
from,
to,
}: {
from: Knex;
to: Knex;
}): Promise<void> => {
sslmode,
regionName,
}: ReplicationArgs): Promise<void> => {
// TODO: ensure its created...
const connectionString: string =
from.client.config.connection.connectionString;
try {
await from.raw("CREATE PUBLICATION userspub FOR TABLE users;");
} catch (err) {
if (!(err instanceof Error)) throw err;
if (!err.message.includes("already exists")) throw err;
}
const fromUrl = new URL(from.client.config.connection.connectionString);
const fromDbName = fromUrl.pathname.replace("/", "");
const subName = `userssub_${regionName}`;
const rawSqeel = `SELECT * FROM aiven_extras.pg_create_subscription(
'${subName}',
'dbname=${fromDbName} host=${fromUrl.hostname} port=${fromUrl.port} sslmode=${sslmode} user=${fromUrl.username} password=${fromUrl.password}',
'userspub',
'${subName}',
TRUE,
TRUE
);`;
try {
const toUrl = new URL(to.client.config.connection.connectionString);
await to.raw(
`CREATE SUBSCRIPTION userssub_${toUrl.pathname.replace("/", "")} CONNECTION '${connectionString}' PUBLICATION userspub;`,
);
await to.raw(rawSqeel);
} catch (err) {
if (!(err instanceof Error)) throw err;
if (!err.message.includes("already exists")) throw err;
@@ -161,83 +165,48 @@ const setUpUserReplication = async ({
const setUpResourceReplication = async ({
from,
fromRegionName,
to,
}: {
from: Knex;
fromRegionName: string;
to: Knex;
}): Promise<void> => {
regionName,
sslmode,
}: ReplicationArgs): Promise<void> => {
// TODO: ensure its created...
const connectionString: string =
from.client.config.connection.connectionString;
const connUrl = new URL(connectionString);
try {
await from.raw("CREATE PUBLICATION resourcepub FOR TABLE resources;");
} catch (err) {
if (!(err instanceof Error)) throw err;
if (!err.message.includes("already exists")) throw err;
}
const fromUrl = new URL(from.client.config.connection.connectionString);
const fromDbName = fromUrl.pathname.replace("/", "");
const subName = `resourcesub_${regionName}`;
const rawSqeel = `SELECT * FROM aiven_extras.pg_create_subscription(
'${subName}',
'dbname=${fromDbName} host=${fromUrl.hostname} port=${fromUrl.port} sslmode=${sslmode} user=${fromUrl.username} password=${fromUrl.password}',
'resourcepub',
'${subName}',
TRUE,
TRUE
);`;
try {
await to.raw(
`CREATE SUBSCRIPTION "resroucesub_${fromRegionName.replace(
" ",
"",
)}_${connUrl.pathname.replace(
"/",
"",
)}" CONNECTION '${connectionString}' PUBLICATION resourcepub;`,
);
await to.raw(rawSqeel);
} catch (err) {
if (!(err instanceof Error)) throw err;
if (!err.message.includes("already exists")) throw err;
}
};
export const bindRegionToOrganization = async ({
regionId,
organizationId,
}: OrganizationsRegions): Promise<void> => {
const region = await getRegionFrom(mainClient)(regionId);
if (!region) throw Error(`region ${regionId} not found`);
const organization = await getOrganizationFrom(mainClient)(organizationId);
if (!organization) throw Error(`organization ${organizationId} not found`);
const regionClient = await getDbClient({ regionId });
await createDb(regionClient, organizationId);
const client = await getDbClient({ organizationId, regionId });
const connectionKey = _createConnectionKey({ organizationId, regionId });
await migrateToLatest(client);
await setUpUserReplication({ from: mainClient, to: client });
await setUpResourceReplication({
from: client,
fromRegionName: region.name,
to: mainClient,
});
_connectionStore.set(connectionKey, client);
await saveOrganizationsRegionsTo(mainClient)({ organizationId, regionId });
};
export const getAllDatabaseSchemaConnections = async (): Promise<Knex[]> => {
const organizationRegions = await getOrganizationRegionsFrom(mainClient)();
const clients = await Promise.all(
organizationRegions.map(async (or) => {
const client = await getDbClient(or);
return client;
}),
export const getAllRepositories = async (): Promise<RegionRepo[]> => {
const regions = await mainRepo.queryRegions({});
const regionRepos = await Promise.all(
regions.map(async (region) => await getRegionRepo({ regionId: region.id })),
);
return [mainClient, ...clients];
return [mainRepo, ...regionRepos];
};
export const getResourceDatabaseConnection = async (
export const getResourceRepo = async (
resourceId: string,
): Promise<Knex> => {
const resourceRegionOrg =
await queryResourceRegionOrganizationFrom(mainClient)(resourceId);
return resourceRegionOrg ? await getDbClient(resourceRegionOrg) : mainClient;
): Promise<RegionRepo> => {
const resourceRegion = await mainRepo.findResourceRegion({ resourceId });
return resourceRegion ? await getRegionRepo(resourceRegion) : getMainRepo();
};
+26 -24
View File
@@ -1,31 +1,33 @@
import cryptoRandomString from "crypto-random-string";
import { countResources, queryResources } from "../repositories";
import cryptoRandomString from 'crypto-random-string'
import {
Resource,
PaginationArgs,
ResourceCollection,
ResourceCreateArgs,
ResourceAcl,
} from "../types";
} from '../types'
interface GetResourcesArgs extends PaginationArgs {
userId: string;
userId: string
}
export const getResources = async (
params: GetResourcesArgs,
): Promise<ResourceCollection> => {
const totalCount = await countResources(params.userId);
const items = await queryResources(params);
let cursor = null;
if (items.length > 0) {
cursor = items.slice(-1)[0].createdAt.toISOString();
export const getResources =
(
countResources: (userId: string) => Promise<number>,
queryResources: (params: GetResourcesArgs) => Promise<Resource[]>,
) =>
async (params: GetResourcesArgs): Promise<ResourceCollection> => {
const totalCount = await countResources(params.userId)
const items = await queryResources(params)
let cursor = null
if (items.length > 0) {
cursor = items.slice(-1)[0].createdAt.toISOString()
}
return {
totalCount,
items,
cursor,
}
}
return {
totalCount,
items,
cursor,
};
};
export const createResource =
(
@@ -37,9 +39,9 @@ export const createResource =
//2. if org, validate if user has access to the org
//3. if org and region, validate if org has access to region
//4. create resource
const id = cryptoRandomString({ length: 10 });
const resource = { id, name, createdAt: new Date() };
await resourceSaver(resource);
await resourceAclSaver({ resourceId: id, userId });
return id;
};
const id = cryptoRandomString({ length: 10 })
const resource = { id, name, createdAt: new Date() }
await resourceSaver(resource)
await resourceAclSaver({ resourceId: id, userId })
return id
}
+3 -2
View File
@@ -65,7 +65,7 @@ export interface Region {
id: string;
name: string;
connectionString: string;
maintenanceDb: string;
sslCaCert: string | null;
}
export interface Organization {
@@ -93,6 +93,7 @@ export interface ResourceRegion {
regionId: string;
}
export interface ResourceRegionOrg extends ResourceRegion {
export interface ResourceOrganization {
resourceId: string;
organizationId: string;
}