Abstracting database transactions with Effect

Composing atomic transactions across resources

A common issue with abstraction is how to handle database transactions across resources. We don’t want to leak the underlying implementation details, but we need to coordinate across different modules.

Setting the stage

We will use terminology from Righting Software:

  • Manager: coordinates among the ResourceAccess
  • ResourceAccess: abstracts the data store
  • Resource: wrapper for the SDK used to access the data store

Let’s start with defining the types needed for our use case (one for creating tasks and one for creating a user):

import { Context, Effect } from "effect";

export class TaskAccessRepo extends Context.Tag("TaskAccessRepo")<
  TaskAccessRepo,
  {
    create: (params: { title: string }) => Effect.Effect<void, Error>;
  }
>() {}

export class UserAccessRepo extends Context.Tag("UserAccessRepo")<
  UserAccessRepo,
  {
    create: (params: { name: string }) => Effect.Effect<void, Error>;
  }
>() {}

Then we implement the Resource type for the persistent store that will be used. We use the below pattern where the typed client is passed as the arg. This allows us to use non-Effect libraries without having to re-implement every method exposed on client.

// Resource type
export class DbResourceRepo extends Context.Tag("DbResourceRepo")<
  DbResourceRepo,
  {
    use: <T>(
      fn: (client: DbClient) => Promise<T>
    ) => Effect.Effect<T, DbError, never>;
  }
>() {}

Now we can implement the ResourceAccess logic by using the underlying Resource.

const TaskAccessLayer = Layer.effect(
  TaskAccessRepo,
  Effect.gen(function* () {
    const dbResource = yield* DbResourceRepo;
    return {
      create: ({ title }) =>
        dbResource.use((db) =>
          db
            .insertInto("task")
            .values({
              title,
            })
            .execute()
        ),
    };
  })
);

const UserAccessLayer = Layer.effect(
  UserAccessRepo,
  Effect.gen(function* () {
    const dbResource = yield* DbResourceRepo;
    return {
      create: ({ name }) =>
        dbResource.use((db) =>
          db
            .insertInto("user")
            .values({
              name,
            })
            .execute()
        ),
    };

Now let’s orchestrate with the Manager (assume GtdManagerRepo has an init method that takes user and tasks array as input)

export const GtdManagerLayer = Layer.effect(
  GtdManagerRepo,
  Effect.gen(function* () {
    const taskAccess = yield* TaskAccessRepo;
    const userAccess = yield* UserAccessRepo;

    return {
      init: ({ user, tasks }) =>
        Effect.gen(function* () {
          const tasksEffectArr = tasks.map((t) =>
            taskAccess.create({ title: t.title })
          );
          const userEffect = userAccess.create({ name: user });
          yield* Effect.all([...tasksEffectArr, userEffect];
        }),
    };
  })
);

Note that we still only have an abstract DbResourceRepo type. Now let’s provide the DbResourceRepo implementation using Kysely:

// Resource implementation
export const DbLive = Layer.effect(
  DbResourceRepo,
  Effect.gen(function* () {
    // Best practice: wrap this initialization in an Effect.try in case something fails, but we'll leave it out for brevity
    const db = new Kysely<Database>({
      dialect: new SqliteDialect({
        database: new Sqlite("db.sqlite3"),
      }),
    });

    return {
      use: (fn) =>
        Effect.tryPromise({
          try: () => fn(db),
          catch: (e) =>
            new DbError({
              message: `Failed to use db: ${e}`,
              cause: e,
            }),
        }),
    };
  })
);

And now we inject the dependencies and we have a fully-functioning, properly-abstracted implementation!

export const UserAccessLive = UserAccessLayer.pipe(Layer.provide(DbLive));
export const TaskAccessLive = TaskAccessLayer.pipe(Layer.provide(DbLive));

export const GtdManagerLive = GtdManagerLayer.pipe(
  Layer.provide([UserAccessLive, TaskAccessLive])
);

Implementing transaction scope

Let’s say a requirement is that creating a user and their first task is an all-or-nothing workflow, e.g. if the task doesn’t get inserted into the database, then the creation of the user should also rollback. The manager needs to operate across two ResourceAccess modules in the same database transaction. How do we do that?

Let’s write a helper that does scope management using Effect’s acquireRelease lifecycle util:

const DbTrxScopeLive = Effect.acquireRelease(
  // first argument is the `acquire`
  Effect.gen(function* () {
    const dbResource = yield* DbResourceRepo;
    const trx = yield* dbResource.use((db) => db.startTransaction().execute());

    return trx;
  }),
  // second argument is the `release`
  (trx, exit) =>
    Effect.promise(() =>
      Exit.match(exit, {
        onSuccess: () => trx.commit().execute(),
        onFailure: () => trx.rollback().execute(),
      })
    )
);

// We implement the same DbResourceRepo shape as DbLive but now with our new db transaction implementation instead
export const DbTrxLive = Layer.effect(
  DbResourceRepo,
  Effect.gen(function* () {
    const db = yield* DbTrxScopeLive;

    return {
      use: (fn) =>
        Effect.tryPromise({
          try: () => fn(db),
          catch: (e) =>
            new DbError({
              message: `Failed to use db: ${e}`,
              cause: e,
            }),
        }),
    };
  })
);

We can now pass this through without the underlying logic having any knowledge of whether it’s in a transaction or not

export const UserAccessTrxLive = UserAccessLayer.pipe(Layer.provide(DbTrxLive));
export const TaskAccessLTrxLive = TaskAccessLayer.pipe(
  Layer.provide(DbTrxLive)
);

export const GtdManagerLive = GtdManagerLayer.pipe(
  Layer.provide([UserAccessTrxLive, TaskAccessTrxLive]),
  Effect.scoped // we need to add this to the pipeline so Effect knows where the scope starts and ends to do the `acquire` + `release`
);

We can even scope it down to just the method that requires the transaction with the others using the regular version.

export const GtdManagerLive = Layer.effect(
  GtdManagerRepo,
  Effect.gen(function* () {
    const taskAccess = yield* TaskAccessRepo;
    const userAccess = yield* UserAccessRepo;

    return {
      init: ({ user, tasks }) =>
        Effect.gen(function* () {
          const tasksEffectArr = tasks.map((t) =>
            taskAccess.create({ title: t.title })
          );
          const userEffect = userAccess.create({ name: user });
          yield* Effect.all([...tasksEffectArr, userEffect], {
            concurrency: "unbounded",
          });
        }).pipe(
          // provide override for DbResourceRepo
          Effect.provide(DbTrxLive),
          Effect.scoped
        ),

      // will still use implementation from `TaskAccessDefaultLive`
      addTask: ({ title }) => taskAccess.create({ title }),
    };
  })
).pipe(Layer.provide([UserAccessDefaultLive, TaskAccessDefaultLive]));

Concurrency guardrails

Note that database transactions are sequential (ref), even if we try to make the method calls concurrently. Effect.all is sequential by default, but if we pass the concurrency config, it will run concurrently in separate Effect fibers. If one of the fibers fail, behavior is not guaranteed, e.g. some data might be committed without being rolled back.

Effect.gen(function* () {
  const trxUserAccess = yield* UserAccessRepo;
  const trxTaskAccess = yield* TaskAccessRepo;

  const tasksEffectArr = tasks.map((t) =>
    trxTaskAccess.create({ title: t.title })
  );
  const userEffect = trxUserAccess.create({ name: user });
  yield* Effect.all([userEffect, ...tasksEffectArr], {
    concurrency: "unbounded",
  });
});

We can bake in this serial limitation of database transactions with an Effect semaphore, which forces fibers to properly “share” the resource.

export const DbTrxLive = Layer.effect(
  DbResourceRepo,
  Effect.gen(function* () {
    const trx = yield* DbTrxScopeLive;

    const sem = yield* Effect.makeSemaphore(1);

    return {
      use: (fn) =>
        Effect.tryPromise({
          try: () => fn(trx),
          catch: (e) =>
            new DbError({
              message: `Failed to use db: ${e}`,
              cause: e,
            }),
        }).pipe(sem.withPermits(1)),
    };
  })
);

// even if a developer makes concurrent calls, they are bottlenecked under-the-hood by the permit number
Effect.all([userEffect, ...tasksEffectArr], {
  concurrency: "unbounded",
});

Takeaways

This approach lets teams coordinate multi-step writes across modules without leaking database details. By scoping a transaction via Effect and sharing it through layers, you get atomicity, simpler testing, and clear seams for change.

  • Fewer incidents: All-or-nothing writes prevent partial state that triggers on-call churn.
  • Lower costs: Abstractions contain volatility without rippling edits.
  • Safety under concurrency: Built-in guardrails (semaphore) keep behavior predictable during failures.

Appendix

Notice that we passed the DbTrxLive Layer to two separate Layers.

export const TaskAccessTrxLive = TaskAccessWithoutDeps.pipe(
  Layer.provide(DbTrxLive)
);
export const UserAccessTrxLive = UserAccessWithoutDeps.pipe(
  Layer.provide(DbTrxLive)
);

How do we ensure that both are sharing the same instance of DbTrxScopeLive?

It turns out that Effect memoizes Layers when used in the same application.

One important feature of an Effect application is that layers are shared by default. This means that if the same layer is used twice, and if we provide the layer globally, the layer will only be allocated a single time. For every layer in our dependency graph, there is only one instance of it that is shared between all the layers that depend on it.

See caveats and details here: https://effect.website/docs/requirements-management/layer-memoization/