DMTN-249: Consistency model and division of responsibilities in Butler

  • Jim Bosch

Latest Revision: 2023-10-23

1 Introduction

The current boundaries between the Butler and its registry and datastore components are under strain in a number of different ways. Failure recovery during deletion operations has long been in bad shape, and much of the current “trash”-based system is currently just unused complexity. Butler client/server will require new approaches to atomic operations and managing operation latency (including caching), and RFC-888 has recently shown that we may want to move away from the registry component providing public APIs even outside of the client/server. This technical note will propose a new high-level organization of butler interfaces and responsibilities to address these concerns.

The current boundary between the registry and datastore components was set up with two principles in mind:

  • a dataset has to be added to the registry first, so it can take responsibility for generating a (at the time autoincrement integer) unique dataset ID;

  • we should use database transactions that are not committed until datastore operations are completed to maintain consistency across the two components.

The first is no longer true - we’ve switched to UUIDs specifically to support writing to datastore first, via BPS database-free schemes like execution butler and quantum-backed butler [3]. Using database transactions has never really worked for deletes, fundamentally because datastore operations cannot reliably be rolled back if the database commit itself fails. Our attempt to work around this with a system that moves datasets to a “trash” table had considerable problems of its own, leaving us with no real attempt to maintain integrity between datastore and registry. Even for put calls, starting a transaction before the datastore write begins is problematic because it keeps database transactions open longer than we’d like.

The upcoming client/server (also referred to as the “remote” or “http” butler) work is the impetus for most of the changes we propose here, even though the consistency issues we are trying to solve are long-standing. We will need to consider the client/server architecture in the design work to fix those issues, and a major piece of this is that we only trust the server to maintain our consistency model. Since any consistency model will necessarily involve both database and datastore content, enforcing consistency will have to be a Butler responsibility, not a Registry or Datastore responsibility. In order to ensure that the right parts of that enforcement occur on the server, we are pushed strongly towards making Butler itself polymorphic (with direct/SQL and client/server implementations) rather Registry (with Datastore remaining polymorphic for other reasons).

In 2   Component Overview, we describe the planned high-level division of responsibilities for Butler, Registry, and Datastore in the client/server era. 3   Consistency Model describes the new consistency model and introduces the artifact transaction as a new, limited-lifetime butler component that plays an important role in maintaining consistency. In 4   Use Case Details, we work through a few important use cases in detail to show how components interact in both client/server and direct-connection contexts. 5   Prototype Code serves as an appendix of sorts with code listings that together form a thorough prototyping of the changes being proposed here. It is not expected to be read directly, but will be frequently linked to in other sections.

Throughout this technote, links to code objects may refer to the existing ones (e.g. Butler) or, more frequently, the prototypes of their replacements defined here (e.g. Butler). Existing types that are not publicly documented (e.g. SqlRegistry) and future types that were not prototyped in detail (e.g. RemoteButler) are not linked. Unfortunately Sphinx formatting highlights linked vs. unlinked much more strongly than old vs. new, which is the opposite of what we want - but it should not be necessary to follow most linked code entities at all anyway.

In addition, we note that DMTN-271 [1] provides an in-depth description of changes to pipeline execution we expect to occur on a similar timescale, both enabling and benefiting from the lower-level changes described here. DMTN-242 [2] may be updated in the future to provide more detail about how we will actually implement the changes described, which will have to involve providing backwards-compatible access to heavily-used data repositories while standing up a minimal client/server butler as quickly as possible.

2 Component Overview

Our plan for the components of the butler system is shown at a high level in Data repositories and their clients.

Data repositories and their clients

Figure 1 Data repositories and their clients

The identities and roles of these components is broadly unchanged: a Butler is still a data repository client, and it still delegates SQL database interaction to a Registry and artifact (e.g. file) storage to a Datastore. Many details are changing, however, including which types are polymorphic:

  • The current Butler will be split into a Butler abstract base class and the DirectButler implementation. Butler will implement much of its public interface itself, while delegating to a few mostly-protected (in the C++/Java sense) abstract methods that must be implemented by derived class.

  • The current Registry and SqlRegistry classes will be merged into a single concrete final Registry, while RemoteRegistry will be dropped.

  • The new RemoteButler class will provide a new full Butler implementation that uses a Datastore directly for get, put, and transfer operations. It will communicate with the database only indirectly via a new Butler REST Server. It also obtains the signed URLs needed to interact with its Datastore from that server. The Butler REST server will have a Datastore as well, but will use it only to verify and delete artifacts.

  • In this design the Registry is just the database-interaction code shared by DirectButler and the Butler REST Server, and it may ultimately cease to exist in favor of its components being used directly by Butler implementations.

Note

Note that the Butler.registry attribute is already a thin shim that will increasingly delegate more and more to public methods on its Butler, until ultimately all butler functionality will be available without it and its continued existence will depend only on our need for backwards compatibility.

A single data repository may have both DirectButler and RemoteButler clients, corresponding to trusted and untrusted users. This means the Butler REST Server may not have persistent state (other than caching) that is not considered part of the data repository itself. This includes locks - we have to rely on SQL database and low-level artifact storage primitives to guarantee consistency in the presence of concurrency. This also implies that a single data repository may interact with multiple Butler REST Servers, which is something we definitely want for scalability.

Datastore will remain an abstract base class with largely the same concrete implementations as today, but instead of being able to fetch and store datastore-specific metadata records in the SQL database itself (currently mediated by a DatastoreRegistryBridge instance provided by Registry), it will return those records to Butler on write and receive it (often as part of a DatasetRef) on read, and its interface will change significantly as a result. By making it unnecessary for a Datastore to communicate with the database we make it possible to use the same Datastore objects in all kinds of Butler implementations, preserving Datastore inheritance as an axis for customizing how datasets are actually stored instead.

Note

It is not clear that Datastore inheritance is actually usable for customizing how datasets are actually stored - we have repeatedly found it much easier to add new functionality to FileDatastore than to add a new Datastore implementation. And all other concrete datastores are unusual in one sense or another:

  • InMemoryDatastore doesn’t actually correspond to a data repository (and is now slated for removal);

  • SasquatchDatastore only exports; it cannot get datasets back and cannot manage their lifetimes.

  • ChainedDatastore might work better as a layer between Butler and other datastores if it didn’t have to satisfy the Datastore interface itself.

As a result, we may be due for a larger rethink of the Datastore concept and its relationship with Butler as well, but we will consider that out of scope for this technote, as it isn’t necessary for either RemoteButler development or establishing a data repository consistency model.

Data repositories and their clients also includes workspaces, a new concept introduced here that will be expanded upon in DMTN-271 [1]. Workspaces formalize and generalize our use of QuantumBackedButler to provide a limited butler interface to PipelineTask execution that does not require continuous access to the central SQL database [3], by using (in this case) a QuantumGraph stored in a file to provide metadata instead. An internal workspace writes processing outputs directly to locations managed by a data repository, and at a low level should be considered an extension of that data repository that defers and batches up database access. An external workspace has similar high-level behavior, but since it does not write directly to the central data repository, it is more like an independent satellite repository that remembers its origin and can (when its work is done) transfer ownership of its datasets back to the central repository. An external workspace can also be converted into a complete standalone data repository in-place, by creating a SQL database (typically SQLite) from the metadata it holds. Internal workspaces can only interact with a DirectButler, because they are also a trusted entity that requires unsigned URI access to artifact storage. External workspaces can be used with any Butler. Workspaces are expected to have lifetimes up to days or perhaps weeks, and cease to exist when their outputs are committed to a data repository. Workspaces that use something other than a persisted QuantumGraph for dataset metadata will be supported, but no other concrete workspace implementations are currently planned.

3 Consistency Model

3.1 Definitions and Overview

A full data repository has both a SQL database and artifact storage that are expected to remain consistent at all times. A dataset is considered registered in the repository if its UUID is associated with a dataset type, data ID, and RUN collection in the database. It is considered stored in the repository if its UUID is associated with one or more datastore records in the database and all artifacts (e.g. files) necessary to fully read it are present.

Datastore records are rows in special database tables whose schemas are defined by the datastore configured with the repository. These must have the dataset ID as at least part of their primary key. They typically contain information like the formatter class used to read and write the dataset and a URI that points to the artifact, but aside from the dataset ID, the schema is fully datastore-dependent.

Each dataset in a data repository must be in one of the following states at all times:

  1. both registered and stored;

  2. registered but not stored;

  3. managed by an artifact transaction.

An artifact transaction is a limited-duration but persistent manifest of changes to be made to both the database and storage. All open artifact transactions are registered in the database and are closed by committing, reverting, or abandoning them (see 3.3   Artifact Transaction Details). A dataset that is managed by an artifact transaction:

  • may not have any datastore records associated with its UUID;

  • may or may not be registered;

  • may or may not have associated artifacts present.

An artifact transaction does not correspond to a database transaction - there will actually be one database transaction used to open each artifact transaction and another used to close it.

While most artifact transactions will have very brief durations, and are persisted only for fault-tolerance, internal workspaces open an artifact transaction when created, and they commit, revert, or abandon that transaction only when the workspace itself is committed, reverted, or abandoned; this is what gives an internal workspace “permission” to write processing-output artifacts directly to data repository locations while deferring the associated database inserts. External workspaces create (and commit) an artifact transaction only when the processing is complete and the workspace is committed by transferring artifacts back to the data repository. From the perspective of data repository consistency, this is no different from any other transfer operation.

The artifact transaction system relies on low-level database and artifact storage having their own mechanisms to guard against internal corruption and data loss (e.g. backups, replication, etc.), and it assumes that the data committed by successful database transactions and successful artifact writes can be always restored by those low-level mechanisms. The role of the artifact transaction system is to provide synchronization between two independently fault-tolerant persistent storage systems.

3.2 Storage tables

The current butler database schema includes database_location and database_location_trash tables that this proposal has no need for.

The former was intended as a way to make it possible to query (using the database alone) for whether a dataset is stored by a particular datastore. The ability to query this table was never implemented, and it is not clear that users should actually care which of several chained datastores actually store a dataset. Going forward, we intend for the query system to test whether a dataset is stored (in both results and where expressions) by checking for the presence of the dataset’s UUID in any datastore-record table. The set of which tables to include in that query could be restricted at query-construction time by asking the datastore whether it would ever store a particular dataset type, but at present this would probably be a premature optimization.

The database_location_trash was intended to aid with consistency when deleting datasets, but it never worked and no longer serves any real purpose.

3.3 Artifact Transaction Details

3.3.1 Opening a transaction

Artifact transactions are opened by calling Butler.begin_transaction() with an ArtifactTransaction instance. This will frequently happen inside other butler methods, but begin_transaction() is a public method precisely so external code (such as the pipeline execution middleware) can define specialized transaction types - though this may only ever happen in practice in DirectButler, since RemoteButler will only support transaction types that have been vetted in advance.

Open artifact transactions are represented in the repository database primarily by the artifact_transaction table:

CREATE TABLE artifact_transaction (
   name VARCHAR PRIMARY KEY,
   data JSON NOT NULL
);

The artifact_transaction table has one entry for each open transaction. In addition to the transaction name, it holds a serialized description of the transaction (ArtifactTransaction instances are subclasses of pydantic.BaseModel) directly in its data column. Additional tables that provide locking for concurrency are described in 3.3.4   Concurrency and locking.

The database inserts that open an artifact transaction occur within a single database transaction, and ArtifactTransaction.begin is first given an opportunity to execute certain database operations in this transaction as well (see 3.3.5   Database-only operations).

The RemoteButler implementation of Butler.begin_transaction() will serialize the transaction on the client and send this serialized form of the transaction to the Butler REST Server, which performs all other interactions with the transaction. This includes checking whether the user is permitted to run this transaction.

3.3.2 Closing a transaction

A transaction can only be closed by calling Butler.commit_transaction(), revert_transaction(), or abandon_transaction().

Butler.commit_transaction() delegates to ArtifactTransaction.commit(), and it always attempts to accomplish the original goals of the transaction. It raises (keeping the transaction open and performing no database operations) if it cannot fully succeed after doing as much as it can. Commit implementations are given an opportunity to perform additional database-only operations in the same database transaction that deletes the artifact_transaction rows (see 3.3.5   Database-only operations).

Butler.revert_transaction() (delegating to ArtifactTransaction.revert()) is the opposite - it attempts to undo any changes made by the transaction (including any changes made when opening it), and it also raises if this is impossible, preferably after undoing all that it can first. Revert implementations are also given an opportunity to perform additional database-only operations in the same database transaction that deletes the artifact_transaction rows, but the set of supported database operations supported here is limited to those that invert operations that can be performed in ArtifactTransaction.begin().

Butler.abandon_transaction() (delegating to ArtifactTransaction.abandon()) reconciles database and artifact state while minimizing the chance of failure; its goal is to only fail if low-level database or artifact storage operations fail. This means:

  • inserting datastore records for datasets that are already registered and whose artifacts are all present;

  • deleting artifacts that do not comprise a complete and valid dataset.

In RemoteButler the ArtifactTransaction closing methods are always run on the server, since this is the only place consistency guarantees can be safely verified. The transaction definitions are also read by the server. The server may need to re-check user permission to run the transaction if there’s a chance that may have changed, but we may also be able to handle this possibility by requiring the user to have no open artifact transactions when changing their access to various entities.

3.3.3 Workspace transactions

The set of datasets and related artifacts managed by an artifact transaction is usually fixed when the transaction is opened, allowing all dataset metadata needed to implement the transaction to be serialized to an artifact_transaction row at that time. A transaction can also indicate that new datasets will be added to the transaction over its lifetime by overriding ArtifactTransaction.is_workspace to True. This causes the transaction to be assigned a workspace root, a directory or directory-like location where the transaction can write files that describe these new datasets before the artifacts for those datasets are actually written. The driving use case is PipelineTask execution, for which these files will include the serialized QuantumGraph. At present we expect only DirectButler to support workspace transactions - using signed URLs for workspace files is a complication we’d prefer to avoid, and we want to limit the set of concrete artifact transaction types supported by RemoteButler to a few hopefully-simple critical ones anyway.

A workspace transaction may also provide access to a transaction-defined client object by implementing ArtifactTransaction.make_workspace_client(); this can be used to provide a higher-level interface for adding datasets (like building and executing quantum graphs). User code should obtain a client instance by calling Butler.make_workspace_client() with the transaction name.

When a workspace transaction is opened, the serialized transaction is written to a JSON file in the workspace as well as the artifact_transaction database table. This allows Butler.make_workspace_client() to almost always avoid any database or server calls (it is a classmethod(), so even Butler startup server calls are unnecessary). If the transaction JSON file does not exist, Butler.make_workspace_client() will have to query the artifact_transaction table to see if the transaction does, and recreate the file if it does. This guards against two rare failure modes in workspace construction:

  • When a workspace transaction is opened, we register the transaction with the database before creating the workspace root and the transaction JSON file there; this lets us detect concurrent attempts to open the same transaction and ensure only one of those attempts tries to perform the workspace writes. But it also makes it possible that a failure will occur after the transaction has already been registered, leaving the workspace root missing.

  • When a workspace transaction is closed, we delete the transaction JSON file just before removing the transaction from the database. This prevents calls to Butler.make_workspace_client() from succeeding during or after its deletion (since deleting the transaction JSON file can fail).

This scheme does not protect against concurrency issues occurring within a single workspace, which are left to transaction and workspace client implementations and the higher-level code that uses them. For example, a workspace client obtained before the transaction is closed can still write new workspace files and datastore artifacts without any way of knowing that the transaction has closed. This is another reason internal workspaces will be not be supported by RemoteButler.

The workspace root will be recursively deleted by the Butler after its transaction closes, with the expectation that its contents will have already been translated into database content or artifacts (or are intentionally being dropped). This can only be done after the closing database transaction concludes, since we need to preserve the workspace state in case the database transaction fails. In the rare case that workspace root deletion fails after the artifact transaction has been removed from the database, we still consider the transaction closed, and we provide Butler.vacuum_workspaces() as a way to scan for and remove those orphaned workspace roots.

3.3.4 Concurrency and locking

The artifact transaction system described in previous sections is sufficient to maintain repository consistency only when the changes made by concurrent transactions are disjoint. To guard against race conditions, we need to introduce some locking. Database tables that associate datasets or RUN collections with a single transaction (enforced by unique constraints) are an obvious choice.

Per-dataset locks would be ideal for maximizing parallelism, but expensive to implement in the database - to prevent competing writes to the same artifacts, we would need the lock tables to implement the full complexity of the dataset_tags_* tables to prevent {dataset type, data ID, run} conflicts as well as UUID conflicts, since the former are what provide uniqueness to artifact URIs. Coarse per-RUN locking is much cheaper, but a major challenge for at least one major use case and possibly a few others:

  • Prompt Processing needs each worker to be able to transfer its own outputs back to the same RUN collection in parallel.

  • Service-driven raw-ingest processes may need to ingest each file independently and in parallel, and modify a single, long-lived RUN.

  • Transfers between data facilities triggered by Rucio events may also need to perform multiple ingests into the same RUN in parallel.

It is important to note that what is missing from Prompt Processing (and possibly the others) is sequence-point hook that could run Butler.commit_transaction() to modify the database, close the transaction, and then possibly open a new one. When such a sequence-point hook is available, a single transaction could be used to wrap parallel artifact transfers that do the vast majority of the work, with only the database operations and artifact verification run in serial. This is what we expect batch- and user-driven ingest/import/transfer operations to do (to the extent those need parallelism at all).

To address these use cases we propose using two tables to represent locks:

CREATE TABLE artifact_transaction_modified_run (
   transaction_name VARCHAR NOT NULL REFERENCES artifact_transaction (name),
   run_name VARCHAR PRIMARY KEY
);

CREATE TABLE artifact_transaction_insert_only_run (
   transaction_name VARCHAR PRIMARY KEY REFERENCES artifact_transaction (name),
   run_name VARCHAR PRIMARY KEY
);

The artifact_transaction_modified_run table provides simple locking that associates a RUN with at most one artifact transaction. It would be populated from the contents of ArtifactTransaction.get_modified_runs() when the transaction is opened, preventing the opening database transaction from succeeding if there are any competing artifact transactions already open.

The artifact_transaction_insert_only_run table is populated by ArtifactTransaction.get_insert_only_runs(), which should include only RUN collections whose datasets are inserted via call to the ArtifactTransactionOpenContext.insert_new_datasets method, and not modified by the transaction in any other way. Inserting new datasets in exactly this way will also cause the opening database transaction to fail (due to a unique constraint violation) if any dataset already exists with the same {dataset type, data ID, run} combination, and it happens to be exactly what the challenging use cases would naturally do. This allows us to drop the unique constraint on run_name alone in this table, and permit multiple artifact transactions writing to the same run to coexist.

We do still need to track the affected RUN collections to ensure they do not appear in artifact_transaction_modified_run, which is why artifact_transaction_insert_only_run still needs to exist. When an artifact transaction is opened, the butler should verify that run_name values in those two tables are disjoint. This may require the opening database transaction to be performed with SERIALIZABLE isolation.

3.3.5 Database-only operations

Artifact transactions are given an opportunity to perform certain database-only operations both in begin() and in their closing methods, to make high-level operations that include both artifact and database-only modifications atomic. The set of operations permitted when opening and closing artifact transactions reflects an attempt to balance a few competing priorities:

  • Inserting datasets early is necessary for our limited per-dataset locking scheme, but datasets can only be inserted if their RUN collection and all dimension records for their data IDs already exist.

  • Performing likely-to-fail database operations early causes those failures to prevent the artifact transaction from being opened, before any expensive and hard-to-revert artifact writes are performed.

  • Performing database operations early makes it a challenge (at best) to implement to implement revert(). Idempotent database operations like INSERT ... ON CONFLICT IGNORE and DELETE FROM ... WHERE ... cannot know which rows they actually affected and hence which modifications to undo - at least not until after the initial database transaction is committed, which is too late to modify the serialized artifact transaction. This defeats the purpose of including these operations with the artifact transaction at all.

  • Even non-idempotent database operations performed early must reckon with the possibility of another artifact transaction (or database-only butler method) performing overlapping writes before the artifact transaction is closed, unless we prohibit them with our own locking.

Note

Dataset type registration is never included as part of an artifact transaction because it can require new database tables to be created, and that sometimes needs to be done in its own database transaction.

ArtifactTransactionOpenContext defines the operations available to begin() to be limited to non-idempotent dataset and collection registration (i.e. raising if those entities already exist) and datastore record removal. Dataset insertion sometimes has to occur early for fine-grained locking, and in other cases we want it to run early because its typical failure modes - foreign key violations (invalid data IDs) and {dataset type, data ID, run} unique constraint violations - are problems we want to prevent us from writing artifacts as early as possible. In order to insert datasets early, we also need to provide the ability to add RUN collections early. Dataset insertion can also depend on dimension record presence, but since these usually require idempotent inserts and are problematic to remove, we require dimension-record insertion to occur in a separate database transaction before the artifact transaction begins. Removing datastore records at the start of an artifact transaction is not really a “database-only” operation; it is required in order to remove the the associated artifacts during that transaction.

ArtifactTransactionCloseContext supports only datastore record insertion, since that is all abandon() is permitted to do. It also provides a convenience method for calling Datastore.verify() on a mapping of datasets, since that proved quite useful in prototyping.

ArtifactTransactionRevertContext extends the options available to revert() to removing dataset registrations and removing collection registrations; these are the inverses of the only operations supported by begin().

ArtifactTransactionCommitContext extends this further to also allow commit() to create and modify CHAINED, TAGGED, and CALIBRATION collections, and to register new datasets. The only reason we might want to perform those collection operations early would be to fail early if they violate constraints, but this is outweighed by the fact that they are impossible to manually undo safely (most are idempotent) and (unlike datasets) are not protected from concurrent modifications by locking. And unlike dataset-insertion constraint violations, errors in these operations rarely suggest problems that need to block artifacts from being written. Inserting new datasets when committing rather than when opening an artifact transaction is support specifically for internal workspaces, which generally do not have a complete list of datasets whose artifacts they will write when their transaction is opened. Ensuring consistency in CHAINED collection operations in particular may require the closing database transaction to use SERIALIZABLE isolation.

Adding further support for dimension-record insertion in commit() would not be problematic, but it’s not obviously useful, since dimension records usually need to be present before datasets are inserted.

4 Use Case Details

4.1 Butler.put

Note

Almost all put calls today happen in the context of pipeline execution, but our intent is to ultimately make all task execution go through the workspace concept introduced in 2   Component Overview (and described more fully in DMTN-271 [1]). The remaining use cases for put include RubinTV’s best-effort image processing (which should probably use workspaces as well), certain curated-calibration ingests, and users puttering around in notebooks.

The prototype implementation of Butler.put_many (a new API we envision put delegating to) begins by expanding the data IDs of all of the given DatasetRef objects is given. A dictionary mapping UUID to DatasetRef is then used to construct a PutTransaction instance to pass to Butler.begin_transaction().

The transaction state is just a mapping of DatasetRef objects. The begin() implementation for this transaction registers the new datasets. This provides fine-grained locking (as described in 3.3.4   Concurrency and locking) on success and forces the operation to fail early and prevent the transaction from ever being opened if this violates a constraint, such as an invalid data ID value or a {dataset type, data ID, collection} uniqueness failure. If the artifact transaction is opened successfully, the new datasets appear registered but unstored to queries throughout the transactions’ lifetime.

After the transaction has been opened, Butler.put_many calls Datastore.predict_new_uris() and Butler._get_resource_paths() to obtain all signed URLs needed for the writes. In DirectButler, _get_resource_paths() just concatenates the datastore root with the relative path instead. These URLs are passed to Datastore.put_many() to write the actual artifacts directly to their permanent locations. If these datastore operations succeed, Butler.commit_transaction() is called. This calls the transaction’s commit() method (on the server for RemoteButler, in the client in DirectButler), which calls Datastore.verify on all datasets in the transaction. Because these DatasetRef objects do not have datastore records attached, Datastore.verify is responsible for generating them (e.g. regenerating URIs, computing checksums and sizes) as well as checking that these artifacts all exist. The datastore records are inserts into the database as the artifact transaction is closed, with no additional database operations performed.

All operations after the transaction’s opening occur in a try block that calls Butler.revert_transaction if an exception is raised. The revert() implementation calls Datastore.unstore() to remove any artifacts that may have been written. If this succeeds, it provides strong exception safety; the repository is left in the same condition it was before the transaction was opened. If it fails - as would occur if the database or server became unavailable or artifact storage became unwriteable - a special exception is raised (chained to the original error) notifying the user that the transaction has been left open and must be cleaned up manually. The datasets registered when the transaction was opened are then removed.

In the case of PutTransaction, a revert should always be possible as long as the database and artifact storage systems are working normally, and the new datasets have not been added to any TAGGED or CALIBRATION collections.

Note

This technote assumes we will actually implement DM-33635 and make it an error to attempt to remove a dataset while it is still referenced by a TAGGED or CALIBRATION collection.

As always, abandoning the failed transaction is another option. The abandon() implementation for put is quite similar to the commit() implementation; it differs only in that it exits without error instead of raising an exception when some artifacts are missing. It still inserts datastore records only for the datasets whose artifacts are already present (as is necessary for consistency guarantees), and it deletes the rest completely. This leaves all dataset registrations in place (stored or unstored as appropriate), ensuring that abandon() can succeed even when those datasets have already been referenced in TAGGED or CALIBRATION collections.

4.2 Removing artifacts

The prototype includes a Butler.remove_datasets() method that can either fully remove datasets (purge=True) or merely unstore them (purge=False). This method begins by expanding all given DatasetRef objects, which includes both expanding their data IDs and attaching existing datastore records. These are used to construct and begin a RemovalTransaction. The state for this transaction is again a mapping of DatasetRef objects, along with the boolean purge flag.

The RemovalTransaction begin() implementation removes all datastore records for its artifacts, as required for datasets managed by an artifact transaction. As in PutTransaction, we want the datasets managed by the artifact transaction to appear as registered but unstored while the artifact transaction is open. Because RemovalTransaction performs modifications other than dataset insertions, it must use coarse RUN locking and implements to ArtifactTransaction.get_modified_runs() to return all RUN collections that hold any of the datasets it intends to delete.

In this case there is nothing to do with the transaction after it has been opened besides commit it. The commit() implementation delegates to Datastore.unstore() to actually remove all artifacts, and if purge=True it also fully removes these those datasets from the database. In addition to the ever-present possibility low-level failures, Butler.commit_transaction() can also fail if (purge=True) and any dataset is part of a TAGGED or CALIBRATION collection.

If the commit operation fails, a try block in Butler.remove_datasets() attempts a revert() in order to try to provide strong exception safety, but this will frequently fail, since it requires all artifacts to still be present, and hence works only if the error occurred quite early and the Datastore.verify() calls in revert() still succeed. More frequently we expect failures in removal that occur after the transaction is opened to result in the transaction being left open and resolution left to the user, again with a special exception raised to indicate this state.

Removals due to low-level failures can be retried by calling Butler.commit_transaction(); this can also be used after removing references to the dataset in TAGGED or CALIBRATION collections to complete a purge.

The abandon() implementation for removals is almost identical to the one for put: Datastore.verify() is used to identify which datasets still exist and which have been removed, and the datastore records for those still present are returned so they can be inserted into the database when the transaction is closed. When abandoning a removal we leave datasets as registered but unstored when their artifacts are missing, since this is closer to the state or the repository when the transaction was opened and avoids any chance of failure due to TAGGED or CALIBRATION associations.

A subtler difference between put and removal is that the DatasetRef objects held by RemovalTransaction include their original datastore records, allowing Datastore.verify() (in both abandon() and revert()) to guard against unexpected changes (e.g. by comparing checksums), while in PutTransaction all Datastore.verify() can do is generate new records.

4.3 Transfers

Transfers, ingests, and imports are not fully prototyped here because they’re broadly similar to put from the perspective of the transaction system - a transaction is opened, artifacts are written by code outside the transaction system by direct calls to Datastore methods, and then the transaction is committed with revert and abandon also behaving similarly. In particularly simple cases involving new-dataset transfers only, the PutTransaction implementation prototyped here may even be usable as-is, with a datastore ingest operation swapped in for the call to Datastore.put_many that occurs within the transaction lifetime but outside the ArtifactTransaction object itself.

5 Prototype Code

class LimitedButler
class LimitedButler(ABC):
    """Minimal butler interface.

    This interface will be sufficient for `~lsst.pipe.base.PipelineTask`
    execution, in that it can fully back a `~lsst.pipe.base.QuantumContext`
    implementation.  It doesn't have the existence-check and removal
    capabilities of the current `LimitedButler` (which are needed by
    `lsst.ctrl.mpexec.SingleQuantumExecutor`) because DMTN-271's workspace
    concept will handle clobbering and existence-checks differently.
    """

    @property
    @abstractmethod
    def dimensions(self) -> DimensionUniverse:
        """Definitions for all dimensions."""
        raise NotImplementedError()

    @property
    @abstractmethod
    def is_writeable(self) -> bool:
        """Whether `put` and other write methods are supposed."""
        raise NotImplementedError()

    def get(
        self,
        ref: DatasetRef,
        *,
        parameters: Mapping[GetParameter, Any] | None = None,
    ) -> InMemoryDataset:
        """Load a dataset, given a `DatasetRef` with a fully-expanded data ID
        and datastore records.

        Notes
        -----
        The full `Butler` will have an overload of this method that takes
        dataset type and data ID, and its version of this overload will not
        require the `DatasetRef` to be fully expanded.
        """
        ((_, _, result),) = self.get_many([(ref, parameters if parameters is not None else {})])
        return result

    @abstractmethod
    def get_many(
        self,
        arg: Iterable[tuple[DatasetRef, Mapping[GetParameter, Any]]],
        /,
    ) -> Iterable[tuple[DatasetRef, Mapping[GetParameter, Any], InMemoryDataset]]:
        """Vectorized implementation of `get`."""
        raise NotImplementedError()

    def get_deferred(
        self,
        ref: DatasetRef,
        *,
        parameters: Mapping[GetParameter, Any] | None = None,
    ) -> DeferredDatasetHandle:
        """Return a handle that can fetch a dataset into memory later.

        Notes
        -----
        The full `Butler` will have an overload of this method that takes
        dataset type and data ID, and its version of this overload will not
        require the `DatasetRef` to be fully expanded.
        """
        ((_, _, handle),) = self.get_many_deferred([(ref, parameters if parameters is not None else {})])
        return handle

    def get_many_deferred(
        self,
        arg: Iterable[tuple[DatasetRef, Mapping[GetParameter, Any]]],
        /,
    ) -> Iterable[tuple[DatasetRef, Mapping[GetParameter, Any], DeferredDatasetHandle]]:
        """Vectorized implementation of `get_deferred`."""
        return [
            (ref, parameters_for_ref, DeferredDatasetHandle(self, ref, parameters_for_ref))  # type: ignore
            for ref, parameters_for_ref in arg
        ]

    def get_uri(self, ref: DatasetRef) -> StorageURI:
        """Return the relative URI for the artifact that holds this dataset.

        The given dataset must have fully-expanded data IDs and datastore
        records.

        This method raises if there is no single URI for the dataset; use
        `get_many_uris` for the general case.

        Notes
        -----
        The full `Butler` will have an overload of this method that takes
        dataset type and data ID, and its version of this overload will not
        require the `DatasetRef` to be fully expanded.
        """
        ((_, uri),) = self.get_many_uris([ref])
        return uri

    @abstractmethod
    def get_many_uris(self, refs: Iterable[DatasetRef]) -> Iterable[tuple[DatasetRef, StorageURI]]:
        """Return all relative URIs associated with the given datasets.

        The given datasets must have fully-expanded data IDs and datastore
        records.

        Each `DatasetRef` may be associated with multiple URIs.

        Notes
        -----
        The full `Butler` version of this method will not require the given
        `DatasetRef` instances to be fully expanded.
        """
        raise NotImplementedError()

    def put(self, obj: InMemoryDataset, ref: DatasetRef) -> None:
        """Write a dataset given a DatasetRef with a fully-expanded data ID
        (but no Datastore records).

        Notes
        -----
        The full Butler will have an overload of this method that takes dataset
        type and data ID, and its version of this overload will not require the
        `DatasetRef` to be fully expanded.
        """
        self.put_many([(obj, ref)])

    @abstractmethod
    def put_many(self, arg: Iterable[tuple[InMemoryDataset, DatasetRef]], /) -> None:
        """Vectorized implementation of `put`."""
        raise NotImplementedError()
class Butler
get_many()
    @final
    def get_many(
        self,
        arg: Iterable[tuple[DatasetRef, Mapping[GetParameter, Any]]],
        /,
    ) -> Iterable[tuple[DatasetRef, Mapping[GetParameter, Any], InMemoryDataset]]:
        # Signature is inherited, but here it accepts not-expanded refs.
        parameters = []
        refs = []
        for ref, parameters_for_ref in arg:
            parameters.append(parameters_for_ref)
            refs.append(ref)
        expanded_refs = self.expand_existing_dataset_refs(refs)
        paths = self._get_resource_paths(self._datastore.extract_existing_uris(expanded_refs))
        return self._datastore.get_many(zip(list(expanded_refs), parameters), paths)
put_many()
    @final
    def put_many(self, arg: Iterable[tuple[InMemoryDataset, DatasetRef]]) -> None:
        # Signature is inherited, but here it accepts not-expanded refs. If
        # transaction_name is provided, concurrent calls with the same name
        # pick a single winner, with the losers doing nothing.
        from .put_transaction import PutTransaction

        refs: dict[DatasetId, DatasetRef] = {}
        objs: list[InMemoryDataset] = []
        data_ids: dict[tuple[DataIdValue, ...], DataCoordinate] = {}
        for obj, ref in arg:
            refs[ref.id] = ref
            data_ids[ref.dataId.values_tuple()] = ref.dataId
            objs.append(obj)
        # Expand the data IDs associated with all refs.
        data_ids = {
            data_id.values_tuple(): data_id for data_id in self.expand_data_coordinates(data_ids.values())
        }
        for ref in refs.values():
            refs[ref.id] = dataclasses.replace(ref, dataId=data_ids[ref.dataId.values_tuple()])
        # Open a transaction.
        transaction = PutTransaction(refs=refs)
        transaction_name, _ = self.begin_transaction(transaction)
        try:
            uris = self._datastore.predict_new_uris(refs.values())
            self._datastore.put_many(zip(objs, refs.values()), paths=self._get_resource_paths(uris))
            self.commit_transaction(transaction_name)
        except BaseException as main_err:
            try:
                self.revert_transaction(transaction_name)
            except BaseException:
                raise UnfinishedTransactionError(
                    "Dataset write failed (see chained exception for details) and could not be reverted. "
                    f"Artifact transaction {transaction_name!r} must be manually committed, reverted, "
                    "or abandoned after any low-level problems with the repository have been addressed."
                ) from main_err
            raise
expand_existing_dataset_refs()
    @final
    def expand_existing_dataset_refs(self, refs: Iterable[DatasetRef]) -> Iterable[DatasetRef]:
        """Expand DatasetRefs to include all relevant dimension records and
        datastore records.
        """
        raise NotImplementedError("TODO: implement here by delegating to query() and _cache.")
expand_data_coordinates()
    @final
    def expand_data_coordinates(self, data_coordinates: Iterable[DataCoordinate]) -> Iterable[DataCoordinate]:
        """Expand data IDs to include all relevant dimension records."""
        raise NotImplementedError("TODO: implement here by delegating to query() and _cache.")
remove_datasets()
    @final
    def remove_datasets(
        self,
        refs: Iterable[DatasetRef],
        purge: bool = False,
    ) -> None:
        """Remove datasets.

        Parameters
        ----------
        refs : `~collections.abc.Iterable` [ `DatasetRef` ]
            Datasets whose artifacts should be deleted.
        purge : `bool`, optional
            Whether to un-register datasets in addition to removing their
            stored artifacts.
        transaction_name : `str`, optional
            Name of the transaction.  If provided, concurrent calls with the
            same transaction will pick a single winner with the result doing
            nothing.  The caller is responsible for ensuring all such calls
            would actually do the same thing..
        """
        from .removal_transaction import RemovalTransaction

        transaction = RemovalTransaction(
            refs={ref.id: ref for ref in self.expand_existing_dataset_refs(refs)}, purge=purge
        )
        transaction_name, _ = self.begin_transaction(transaction)
        try:
            self.commit_transaction(transaction_name)
        except BaseException as main_err:
            try:
                self.revert_transaction(transaction_name)
            except BaseException:
                raise UnfinishedTransactionError(
                    "Dataset removal failed (see chained exception for details) and could not be reverted. "
                    f"Artifact transaction {transaction_name!r} must be manually committed, reverted, "
                    "or abandoned after any low-level problems with the repository have been addressed."
                ) from main_err
            raise
begin_transaction()
    @abstractmethod
    def begin_transaction(
        self,
        transaction: ArtifactTransaction,
        name: ArtifactTransactionName | None = None,
    ) -> tuple[ArtifactTransactionName, bool]:
        """Open a persistent transaction in this repository.

        Parameters
        ----------
        cls
            Transaction type object.
        transaction
            Object that knows how to commit, revert, or abandon the
            transaction can [de]serialize itself from JSON data, and provide
            information about what it will modify.
        name
            Name of the transaction.  Should be prefixed with ``u/$USER`` for
            regular users.  If not provided, defaults to something that is
            guaranteed to be universally unique with that prefix.

        Returns
        -------
        name
            Name of the transaction.
        just_opened
            Whether this process actually opened the transaction (`True`) vs.
            finding an already-open identical one (`False`).
        """
        raise NotImplementedError()
commit_transaction()
    @abstractmethod
    def commit_transaction(self, name: ArtifactTransactionName) -> None:
        """Commit an existing transaction.

        Most users should only need to call this method when a previous (often
        implicit) commit failed and they want to retry.

        This method will raise an exception and leave the transaction open if
        it cannot fully perform all of the operations originally included in
        the transaction.
        """
        raise NotImplementedError()
revert_transaction()
    @abstractmethod
    def revert_transaction(self, name: ArtifactTransactionName) -> bool:
        """Revert an existing transaction.

        Most users should only need to call this method when a previous (often
        implicit) commit failed and they want to retry.

        This method will raise an exception and leave the transaction open if
        it cannot fully undo any modifications made by the transation since it
        was opened.
        """
        raise NotImplementedError()
abandon_transaction()
    @abstractmethod
    def abandon_transaction(self, name: ArtifactTransactionName) -> None:
        """Abandon an existing transaction.

        Most users should only need to call this method when a previous (often
        implicit) commit failed and they want to retry.

        This method will only raise (and leave the transaction) open if it
        encounters a low-level error, but it may leave the transaction's
        operations incomplete (with warnings logged).  Repository invariants
        will still be satisfied.
        """
        raise NotImplementedError()
list_transactions()
    @abstractmethod
    def list_transactions(self) -> list[ArtifactTransactionName]:
        """Return the names of all active transactions that the current user
        has write access to.

        Notes
        -----
        Administrators are expected to use this to check that there are no
        active artifact transactions before any migration or change to central
        datastore configuration.  Active transactions are permitted to write to
        datastore locations without having the associated datastore records
        saved anywhere in advance, so we need to ensure the datastore
        configuration used to predict artifact locations is not changed while
        they are active.
        """
        raise NotImplementedError()
make_workspace_client()
    @classmethod
    @abstractmethod
    def make_workspace_client(
        cls, config: ButlerConfig | ResourcePathExpression, name: ArtifactTransactionName
    ) -> Any:
        """Make a workspace client for the given transaction.

        Notes
        -----
        This is a `classmethod` to allow it to avoid the need to connect to a
        database or REST server (as `Butler` construction typically does):
        workspace transactions are serialized to a JSON file whose path should
        be deterministic given butler config and the transaction name, and the
        `Datastore` should also be constructable from butler config alone.

        If the serialized transaction does not exist at the expected location,
        this method will have to connect to a server to see if the transaction
        exists there; if it does, the serialized transaction will be written to
        the file location for future calls.  If it does not, the transaction
        has been closed an an exception is raised.  This behavior guards
        against unexpected failures in either opening or closing a workspace
        transaction.
        """
        raise NotImplementedError()
vacuum_workspaces()
    @abstractmethod
    def vacuum_workspaces(self) -> None:
        """Clean up any workspace directories not associated with an active
        transaction.

        This method is only needed because:

         - we have to write transaction headers to workspace roots before
           inserting the transaction into the database when it is opened;
         - we have to delete workspace roots after deleting the transaction
           from the database on commit or abandon;

        and in both cases we can't guarantee we won't be interrupted.  But
        vacuums should only be needed extremely rarely.
        """
        raise NotImplementedError()
_get_resource_paths()
    def _get_resource_paths(
        self, uris: Iterable[StorageURI], write: bool = False
    ) -> Mapping[StorageURI, ResourcePath]:
        """Return `ResourcePath` objects usable for direct Datastore operations
        for the given possibly-relative URIs.

        This turns URIs into absolute URLs and will sign them if needed for
        this repository.
        """
        return PathMapping(frozenset(uris))
class Datastore
tables
    @property
    @abstractmethod
    def tables(self) -> Mapping[DatastoreTableName, DatastoreTableDefinition]:
        """Database tables needed to store this Datastore's records."""
        raise NotImplementedError()
extract_existing_uris()
    @abstractmethod
    def extract_existing_uris(self, refs: Iterable[DatasetRef]) -> list[StorageURI]:
        """Extract URIs from the records attached to datasets.

        These are possibly-relative URIs that need to be made absolute and
        possibly signed in order to be used.
        """
        raise NotImplementedError()
predict_new_uris()
    @abstractmethod
    def predict_new_uris(self, refs: Iterable[DatasetRef]) -> list[StorageURI]:
        """Predict URIs for new datasets to be written.

        These are possibly-relative URIs that need to be made absolute and
        possibly signed in order to be used.
        """
        raise NotImplementedError()
get_many()
    @abstractmethod
    def get_many(
        self,
        arg: Iterable[tuple[DatasetRef, Mapping[GetParameter, Any]]],
        /,
        paths: Mapping[StorageURI, ResourcePath],
    ) -> Iterable[tuple[DatasetRef, Mapping[GetParameter, Any], InMemoryDataset]]:
        """Load datasets into memory.

        Incoming `DatasetRef` objects will have already been fully expanded to
        include both expanded data IDs and all possibly-relevant datastore
        table records.

        Notes
        -----
        The datasets are not necessarily returned in the order they are passed
        in, to better permit async implementations with lazy first-received
        iterator returns.  Implementations that can guarantee consistent
        ordering might want to explicitly avoid it, to avoid allowing callers
        to grow dependent on that behavior instead of checking the returned
        `DatasetRef` objects themselves.
        """
        raise NotImplementedError()
put_many()
    @abstractmethod
    def put_many(
        self, arg: Iterable[tuple[InMemoryDataset, DatasetRef]], /, paths: Mapping[StorageURI, ResourcePath]
    ) -> None:
        """Write an in-memory object to this datastore.

        Parameters
        ----------
        arg
            Objects to write and the `DatasetRef` objects that should identify
            them.
        paths
            Mapping from possibly-relative URI to definitely-absolute,
            signed-if-needed URL.

        Notes
        -----
        This method is not responsible for generating records; in this
        prototype we delegate that to `verify` so we can always do that work on
        the Butler REST server when there is one, since `put_many` can only be
        called on the client.
        """
        raise NotImplementedError()
verify()
    @abstractmethod
    def verify(
        self, refs: Iterable[DatasetRef]
    ) -> Iterable[tuple[DatasetId, bool | None, dict[DatastoreTableName, list[StoredDatastoreItemInfo]]]]:
        """Test whether all artifacts are present for a dataset and return
        datastore records that represent them.

        Parameters
        ----------
        ref : ~collections.abc.Iterable` [ `DatasetRef` ]
            Dataset to verify.  May or may not have datastore records attached;
            if records are attached, they must be consistent with the artifact
            content (e.g. if checksums are present, they should be checked).

        Returns
        -------
        results : `~collections.abc.Iterable`
            Result 3-tuples, each of which contains:

            - ``dataset_id``: the ID of one of the given datasets.
            - ``valid``: if all artifacts for this dataset are present ,
              `True`.  If no artifacts are present, `None`.  If only some
              of the needed artifacts are present or any artifact is corrupted,
              `False`.
            - ``records``: datastore records that should be attached to this
              `DatasetRef`. These must be created from scratch if none were
              passed in (the datastore may assume its configuration has not
              changed since the artifact(s) were written) and maybe augmented
              if incomplete (e.g. if sizes or checksums were absent and have
              now been calculated).
        """
        raise NotImplementedError()
unstore()
    @abstractmethod
    def unstore(self, refs: Iterable[DatasetRef]) -> None:
        """Remove all stored artifacts associated with the given datasets.

        Notes
        -----
        Artifacts that do not exist should be silently ignored - that allows
        this method to be called to clean up after an interrupted operation has
        left artifacts in an unexpected state.

        The given `DatasetRef` object may or may not have datastore records
        attached.  If no records are attached the datastore may assume its
        configuration has not changed since the artifact(s) were written.
        """
        raise NotImplementedError()
class ArtifactTransaction
is_workspace
    @property
    def is_workspace(self) -> bool:
        """Whether this transaction is associated with a workspace that allows
        content to be added incrementally.
        """
        return False
make_workspace_client()
    def make_workspace_client(self, datastore: Datastore) -> Any:
        return None
get_operation_name()
    @abstractmethod
    def get_operation_name(self) -> str:
        """Return a human-readable name for the type of transaction.

        This is used to form the user-visible transaction name when no name is
        provided.
        """
        raise NotImplementedError()
get_insert_only_runs()
    @abstractmethod
    def get_insert_only_runs(self) -> Set[CollectionName]:
        """Return the `~CollectionType.RUN` collections that this transaction
        inserts new datasets into only.
        """
        raise NotImplementedError()
get_modified_runs()
    @abstractmethod
    def get_modified_runs(self) -> Set[CollectionName]:
        """Return the `~CollectionType.RUN` collections that this transaction
        modifies in any way other than insertin new datasets.

        This includes datasets added to the database via
        `ArtifactTransactionOpenContext.ensure_datasets`.
        """
        raise NotImplementedError()
begin()
    @abstractmethod
    def begin(self, context: ArtifactTransactionOpenContext) -> None:
        """Open the transaction.

        This method is called within a database transaction with at least
        ``READ COMMITTED`` isolation.  Exceptions raised by this method will
        cause that database transaction to be rolled back, and exceptions that
        originate in a database constraint failure must be re-raised or allowed
        to propagate; these indicate that a rollback is already in progress.

        After this method exits but before the database transaction is
        committed, the database rows representing this transaction will be
        inserted.
        """
        raise NotImplementedError()
commit()
    @abstractmethod
    def commit(self, context: ArtifactTransactionCommitContext) -> None:
        """Commit this transaction.

        This method is called within a database transaction with
        ``SERIALIZABLE`` isolation.  Exceptions raised by this method will
        cause that database transaction to be rolled back, and exceptions that
        originate in a database constraint failure must be re-raised or allowed
        to propagate; these indicate that a rollback is already in progress.

        After this method exits but before the database transaction is
        committed, the database rows representing this transaction will be
        deleted.
        """
        raise NotImplementedError()
revert()
    @abstractmethod
    def revert(self, context: ArtifactTransactionRevertContext) -> None:
        """Revert this transaction.

        This method is called within a database transaction with
        ``READ COMMITTED`` isolation.  Exceptions raised by this method will
        cause that database transaction to be rolled back, and exceptions that
        originate in a database constraint failure must be re-raised or allowed
        to propagate; these indicate that a rollback is already in progress.

        After this method exits but before the database transaction is
        committed, the database rows representing this transaction will be
        deleted.
        """
        raise NotImplementedError()
abandon()
    @abstractmethod
    def abandon(self, context: ArtifactTransactionCloseContext) -> None:
        """Abandon this transaction.

        This method is called within a database transaction with at least
        ``READ COMMITTED`` isolation.  Exceptions raised by this method will
        cause that database transaction to be rolled back, and exceptions that
        originate in a database constraint failure must be re-raised or allowed
        to propagate; these indicate that a rollback is already in progress.

        After this method exits but before the database transaction is
        committed, the database rows representing this transaction will be
        deleted.
        """
        raise NotImplementedError()
class ArtifactTransactionOpenContext
class ArtifactTransactionOpenContext(ABC):
    """The object passed to `ArtifactTransaction.begin` to allow it to perform
    limited database operations.
    """

    @abstractmethod
    def insert_new_run(self, name: CollectionName, doc: CollectionDocumentation | None = None) -> None:
        """Insert the given `~CollectionType.RUN` collection and raise if it
        already exists.
        """
        raise NotImplementedError()

    @abstractmethod
    def insert_new_datasets(self, refs: Iterable[DatasetRef]) -> None:
        """Insert new datasets, raising if any already exist."""
        raise NotImplementedError()

    def discard_datastore_records(self, dataset_ids: Iterable[DatasetId]) -> None:
        """Discard any datastore records associated with the given datasets,
        ignoring any datasets that do not exist or already do not have
        datastore records.
        """
        raise NotImplementedError()
class ArtifactTransactionCloseContext
class ArtifactTransactionCloseContext(ABC):
    """The object passed to `ArtifactTransaction.abandon` to allow it to modify
    the repository.
    """

    @property
    @abstractmethod
    def datastore(self) -> Datastore:
        """The datastore to use for all artifact operations.

        Typically only `Datastore.verify` and `Datastore.unstore` should need
        to be called when a transaction is closed.
        """
        raise NotImplementedError()

    @abstractmethod
    def insert_datastore_records(
        self, records: Mapping[DatastoreTableName, Iterable[StoredDatastoreItemInfo]]
    ) -> None:
        """Insert records into a database table, marking a dataset as being
        stored.
        """
        raise NotImplementedError()

    def verify_artifacts(
        self, refs: Mapping[DatasetId, DatasetRef]
    ) -> tuple[
        dict[DatastoreTableName, list[StoredDatastoreItemInfo]],
        dict[DatasetId, DatasetRef],
        dict[DatasetId, DatasetRef],
        dict[DatasetId, DatasetRef],
    ]:
        """Verify dataset refs using a datastore and classify them.

        This is a convenience method typically called by at least two of
        {`ArtifactTransaction.commit`, `ArtifactTransaction.revert`,
        `ArtifactTransaction.abandon`}.

        Parameters
        ----------
        refs : `~collections.abc.Mapping` [ `~lsst.daf.butler.DatasetId`, \
                `DatasetRef` ]
            Datasets to verify.

        Returns
        -------
        records
            Mapping from datastore table name to the records for that table,
            for all datasets in ``present``.
        present
            Datasets whose artifacts were verified successfully.
        missing
            Datasets whose artifacts were fully absent.
        corrupted
            Datasets whose artifacts were incomplete or invalid.
        """
        records: dict[DatastoreTableName, list[StoredDatastoreItemInfo]] = {
            table_name: [] for table_name in self.datastore.tables.keys()
        }
        present = {}
        missing = {}
        corrupted = {}
        for dataset_id, valid, records_for_dataset in self.datastore.verify(refs.values()):
            ref = refs[dataset_id]
            if valid:
                for table_name, records_for_table in records_for_dataset.items():
                    records[table_name].extend(records_for_table)
                present[ref.id] = ref
            elif valid is None:
                missing[ref.id] = ref
            else:
                corrupted[ref.id] = ref
        return records, present, missing, corrupted
class ArtifactTransactionRevertContext
class ArtifactTransactionRevertContext(ArtifactTransactionCloseContext):
    """The object passed to `ArtifactTransaction.revert` to allow it to modify
    the repository.
    """

    @abstractmethod
    def discard_collection(self, name: CollectionName, *, force: bool = False) -> None:
        """Delete any existing collection with the given name.

        If a collection with this name does not exist, do nothing.

        If ``force=False``, the collection must already have no associated
        datasets, parent collections, or child collections.
        If ``force=True``,

        - all datasets in `~CollectionType.RUN` collections are discarded;
        - all dataset-collection associations in `~CollectionType.TAGGED` or
          `~CollectionType.CALIBRATION` collections are discarded (the datasets
          themselves are not discarded);
        - parent and child collection associations are discarded (the parent
          and child collections themselves are not discarded).
        """
        raise NotImplementedError()

    @abstractmethod
    def discard_datasets(self, dataset_ids: Iterable[DatasetId]) -> None:
        """Discard datasets.

        Datasets that do not exist are ignored.

        Datasets must not have any associated datastore records or associations
        with `~CollectionType.TAGGED` or `~CollectionType.CALIBRATION`
        collections.
        """
        raise NotImplementedError()
class ArtifactTransactionCommitContext
class ArtifactTransactionCommitContext(ArtifactTransactionRevertContext):
    """The object passed to `ArtifactTransaction.commit` to allow it to modify
    the repository.
    """

    @abstractmethod
    def ensure_collection(
        self, name: CollectionName, type: CollectionType, doc: CollectionDocumentation | None = None
    ) -> None:
        """Insert the given collection if it does not exist.

        If the collection exists with a different type, raise
        `CollectionTypeError`.  Documentation is only used when inserting the
        new collection; documentation conflicts are not checked.
        """
        raise NotImplementedError()

    @abstractmethod
    def insert_new_datasets(self, refs: Iterable[DatasetRef]) -> None:
        """Insert new datasets, raising if any already exist."""
        raise NotImplementedError()

    @abstractmethod
    def set_collection_chain(
        self, parent: CollectionName, children: Sequence[CollectionName], *, flatten: bool = False
    ) -> None:
        """Set the childen of a `~CollectionType.CHAINED` collection."""
        raise NotImplementedError()

    @abstractmethod
    def get_collection_chain(self, parent: CollectionName) -> list[CollectionName]:
        """Get the childen of a `~CollectionType.CHAINED` collection."""
        raise NotImplementedError()

    @abstractmethod
    def ensure_associations(
        self,
        collection: CollectionName,
        dataset_ids: Iterable[DatasetId],
        *,
        replace: bool = False,
        timespan: Timespan | None = None,
    ) -> None:
        """Associate datasets with a `~CollectionType.TAGGED` or
        `~CollectionType.CALIBRATION` collection.
        """
        raise NotImplementedError()

    @abstractmethod
    def discard_associations(
        self,
        collection: CollectionName,
        dataset_ids: Iterable[DatasetId],
        *,
        replace: bool = False,
        timespan: Timespan | None = None,
    ) -> None:
        """Discard associations between datasets and a `~CollectionType.TAGGED`
        or `~CollectionType.CALIBRATION` collection.

        Associations that do not exist are ignored.
        """
        raise NotImplementedError()
class PutTransaction
class PutTransaction(ArtifactTransaction):
    """An artifact transaction implementation that writes several in-memory
    datasets to the repository.
    """

    refs: dict[uuid.UUID, DatasetRef]

    def get_operation_name(self) -> str:
        return "put"

    def get_insert_only_runs(self) -> Set[CollectionName]:
        return frozenset(ref.run for ref in self.refs.values())

    def get_modified_runs(self) -> Set[CollectionName]:
        return frozenset()

    def begin(self, context: ArtifactTransactionOpenContext) -> None:
        context.insert_new_datasets(self.refs.values())

    def commit(self, context: ArtifactTransactionCommitContext) -> None:
        records, _, missing, corrupted = context.verify_artifacts(self.refs)
        if missing or corrupted:
            raise RuntimeError(
                f"{len(missing)} dataset(s) were not written and {len(corrupted)} had missing or "
                f"invalid artifacts; transaction must be reverted or abandoned."
            )
        context.insert_datastore_records(records)

    def revert(self, context: ArtifactTransactionRevertContext) -> None:
        context.datastore.unstore(self.refs.values())
        context.discard_datasets(self.refs.keys())

    def abandon(self, context: ArtifactTransactionCloseContext) -> None:
        records, _, missing, corrupted = context.verify_artifacts(self.refs)
        for ref in missing:
            warnings.warn(f"{ref} was not written and will be unstored.")
        for ref in corrupted:
            warnings.warn(f"{ref} was not fully written and will be unstored.")
        context.insert_datastore_records(records)
class RemovalTransaction
class RemovalTransaction(ArtifactTransaction):
    """An artifact transaction implementation that removes datasets."""

    refs: dict[uuid.UUID, DatasetRef]
    purge: bool

    def get_operation_name(self) -> str:
        return "remove"

    def begin(self, context: ArtifactTransactionOpenContext) -> None:
        context.discard_datastore_records(self.refs.keys())

    def get_insert_only_runs(self) -> Set[CollectionName]:
        return frozenset()

    def get_modified_runs(self) -> Set[CollectionName]:
        return frozenset(ref.run for ref in self.refs.values())

    def commit(self, context: ArtifactTransactionCommitContext) -> None:
        context.datastore.unstore(self.refs.values())
        if self.purge:
            context.discard_datasets(self.refs.keys())

    def revert(self, context: ArtifactTransactionRevertContext) -> None:
        records, _, missing, corrupted = context.verify_artifacts(self.refs)
        if missing or corrupted:
            raise RuntimeError(
                f"{len(missing)} dataset(s) have already been deleted and {len(corrupted)} had missing or "
                f"invalid artifacts; transaction must be committed or abandoned."
            )
        context.insert_datastore_records(records)

    def abandon(self, context: ArtifactTransactionCloseContext) -> None:
        records, present, _, corrupted = context.verify_artifacts(self.refs)
        for ref in present:
            warnings.warn(f"{ref} was not deleted and will remain stored.")
        for ref in corrupted:
            warnings.warn(f"{ref} was partially unstored and will be fully unstored.")
        context.datastore.unstore(corrupted.values())
        context.insert_datastore_records(records)

References

[1] (1,2,3)

[DMTN-271]. Jim Bosch. Butler management of quantum graph storage and execution. 2023. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-271.lsst.io/

[2]

[DMTN-242]. Tim Jenness. Butler Client/Server Revisited. 2023. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-242.lsst.io/

[3] (1,2)

[DMTN-177]. Tim Jenness. Limiting Registry Access During Workflow Execution. 2023. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-177.lsst.io/