1 Introduction¶
The current boundaries between the Butler and its registry and datastore components are under strain in a number of different ways. Failure recovery during deletion operations has long been in bad shape, and much of the current “trash”-based system is currently just unused complexity. Butler client/server will require new approaches to atomic operations and managing operation latency (including caching), and RFC-888 has recently shown that we may want to move away from the registry component providing public APIs even outside of the client/server. This technical note will propose a new high-level organization of butler interfaces and responsibilities to address these concerns.
The current boundary between the registry and datastore components was set up with two principles in mind:
a dataset has to be added to the registry first, so it can take responsibility for generating a (at the time autoincrement integer) unique dataset ID;
we should use database transactions that are not committed until datastore operations are completed to maintain consistency across the two components.
The first is no longer true - we’ve switched to UUIDs specifically to support writing to datastore first, via BPS database-free schemes like execution butler and quantum-backed butler [3].
Using database transactions has never really worked for deletes, fundamentally because datastore operations cannot reliably be rolled back if the database commit itself fails.
Our attempt to work around this with a system that moves datasets to a “trash” table had considerable problems of its own, leaving us with no real attempt to maintain integrity between datastore and registry.
Even for put
calls, starting a transaction before the datastore write begins is problematic because it keeps database transactions open longer than we’d like.
The upcoming client/server (also referred to as the “remote” or “http” butler)
work is the impetus for most of the changes we propose here, even though the
consistency issues we are trying to solve are long-standing.
We will need to consider the client/server architecture in the design work to
fix those issues, and a major piece of this is that we only trust the server to maintain our consistency model.
Since any consistency model will necessarily involve both database and datastore content, enforcing consistency will have to be a Butler
responsibility, not a Registry
or Datastore
responsibility.
In order to ensure that the right parts of that enforcement occur on the server, we are pushed strongly towards making Butler
itself polymorphic (with direct/SQL and client/server implementations) rather Registry
(with Datastore
remaining polymorphic for other reasons).
In 2 Component Overview, we describe the planned high-level division of responsibilities for Butler
, Registry
, and Datastore
in the client/server era.
3 Consistency Model describes the new consistency model and introduces the artifact transaction as a new, limited-lifetime butler component that plays an important role in maintaining consistency.
In 4 Use Case Details, we work through a few important use cases in detail to show how components interact in both client/server and direct-connection contexts.
5 Prototype Code serves as an appendix of sorts with code listings that together form a thorough prototyping of the changes being proposed here.
It is not expected to be read directly, but will be frequently linked to in other sections.
Throughout this technote, links to code objects may refer to the existing ones (e.g. Butler
) or, more frequently, the prototypes of their replacements defined here (e.g. Butler
).
Existing types that are not publicly documented (e.g. SqlRegistry
) and future types that were not prototyped in detail (e.g. RemoteButler
) are not linked.
Unfortunately Sphinx formatting highlights linked vs. unlinked much more strongly than old vs. new, which is the opposite of what we want - but it should not be necessary to follow most linked code entities at all anyway.
In addition, we note that DMTN-271 [1] provides an in-depth description of changes to pipeline execution we expect to occur on a similar timescale, both enabling and benefiting from the lower-level changes described here. DMTN-242 [2] may be updated in the future to provide more detail about how we will actually implement the changes described, which will have to involve providing backwards-compatible access to heavily-used data repositories while standing up a minimal client/server butler as quickly as possible.
2 Component Overview¶
Our plan for the components of the butler system is shown at a high level in Data repositories and their clients.
The identities and roles of these components is broadly unchanged: a Butler
is still a data repository client, and it still delegates SQL database interaction to a Registry
and artifact (e.g. file) storage to a Datastore
.
Many details are changing, however, including which types are polymorphic:
The current
Butler
will be split into aButler
abstract base class and theDirectButler
implementation.Butler
will implement much of its public interface itself, while delegating to a few mostly-protected (in the C++/Java sense) abstract methods that must be implemented by derived class.The current
Registry
andSqlRegistry
classes will be merged into a single concrete finalRegistry
, whileRemoteRegistry
will be dropped.The new
RemoteButler
class will provide a new fullButler
implementation that uses aDatastore
directly forget
,put
, and transfer operations. It will communicate with the database only indirectly via a new Butler REST Server. It also obtains the signed URLs needed to interact with itsDatastore
from that server. The Butler REST server will have aDatastore
as well, but will use it only to verify and delete artifacts.In this design the
Registry
is just the database-interaction code shared byDirectButler
and the Butler REST Server, and it may ultimately cease to exist in favor of its components being used directly byButler
implementations.
Note
Note that the Butler.registry
attribute is already a thin shim that will increasingly delegate more and more to public methods on its Butler
, until ultimately all butler functionality will be available without it and its continued existence will depend only on our need for backwards compatibility.
A single data repository may have both DirectButler
and RemoteButler
clients, corresponding to trusted and untrusted users.
This means the Butler REST Server may not have persistent state (other than caching) that is not considered part of the data repository itself.
This includes locks - we have to rely on SQL database and low-level artifact storage primitives to guarantee consistency in the presence of concurrency.
This also implies that a single data repository may interact with multiple Butler REST Servers, which is something we definitely want for scalability.
Datastore
will remain an abstract base class with largely the same concrete implementations as today, but instead of being able to fetch and store datastore-specific metadata records in the SQL database itself (currently mediated by a DatastoreRegistryBridge
instance provided by Registry
), it will return those records to Butler
on write and receive it (often as part of a DatasetRef
) on read, and its interface will change significantly as a result.
By making it unnecessary for a Datastore
to communicate with the database we make it possible to use the same Datastore
objects in all kinds of Butler
implementations, preserving Datastore
inheritance as an axis for customizing how datasets are actually stored instead.
Note
It is not clear that Datastore
inheritance is actually usable for customizing how datasets are actually stored - we have repeatedly found it much easier to add new functionality to FileDatastore
than to add a new Datastore
implementation.
And all other concrete datastores are unusual in one sense or another:
InMemoryDatastore
doesn’t actually correspond to a data repository (and is now slated for removal);SasquatchDatastore
only exports; it cannotget
datasets back and cannot manage their lifetimes.ChainedDatastore
might work better as a layer betweenButler
and other datastores if it didn’t have to satisfy theDatastore
interface itself.
As a result, we may be due for a larger rethink of the Datastore
concept and its relationship with Butler
as well, but we will consider that out of scope for this technote, as it isn’t necessary for either RemoteButler
development or establishing a data repository consistency model.
Data repositories and their clients also includes workspaces, a new concept introduced here that will be expanded upon in DMTN-271 [1].
Workspaces formalize and generalize our use of QuantumBackedButler
to provide a limited butler interface to PipelineTask
execution that does not require continuous access to the central SQL database [3], by using (in this case) a QuantumGraph
stored in a file to provide metadata instead.
An internal workspace writes processing outputs directly to locations managed by a data repository, and at a low level should be considered an extension of that data repository that defers and batches up database access.
An external workspace has similar high-level behavior, but since it does not write directly to the central data repository, it is more like an independent satellite repository that remembers its origin and can (when its work is done) transfer ownership of its datasets back to the central repository.
An external workspace can also be converted into a complete standalone data repository in-place, by creating a SQL database (typically SQLite) from the metadata it holds.
Internal workspaces can only interact with a DirectButler
, because they are also a trusted entity that requires unsigned URI access to artifact storage.
External workspaces can be used with any Butler
.
Workspaces are expected to have lifetimes up to days or perhaps weeks, and cease to exist when their outputs are committed to a data repository.
Workspaces that use something other than a persisted QuantumGraph
for dataset metadata will be supported, but no other concrete workspace implementations are currently planned.
3 Consistency Model¶
3.1 Definitions and Overview¶
A full data repository has both a SQL database and artifact storage that are expected to remain consistent at all times.
A dataset is considered registered in the repository if its UUID is associated with a dataset type, data ID, and RUN
collection in the database.
It is considered stored in the repository if its UUID is associated with one or more datastore records in the database and all artifacts (e.g. files) necessary to fully read it are present.
Datastore records are rows in special database tables whose schemas are defined by the datastore configured with the repository. These must have the dataset ID as at least part of their primary key. They typically contain information like the formatter class used to read and write the dataset and a URI that points to the artifact, but aside from the dataset ID, the schema is fully datastore-dependent.
Each dataset in a data repository must be in one of the following states at all times:
both registered and stored;
registered but not stored;
managed by an artifact transaction.
An artifact transaction is a limited-duration but persistent manifest of changes to be made to both the database and storage. All open artifact transactions are registered in the database and are closed by committing, reverting, or abandoning them (see 3.3 Artifact Transaction Details). A dataset that is managed by an artifact transaction:
may not have any datastore records associated with its UUID;
may or may not be registered;
may or may not have associated artifacts present.
An artifact transaction does not correspond to a database transaction - there will actually be one database transaction used to open each artifact transaction and another used to close it.
While most artifact transactions will have very brief durations, and are persisted only for fault-tolerance, internal workspaces open an artifact transaction when created, and they commit, revert, or abandon that transaction only when the workspace itself is committed, reverted, or abandoned; this is what gives an internal workspace “permission” to write processing-output artifacts directly to data repository locations while deferring the associated database inserts. External workspaces create (and commit) an artifact transaction only when the processing is complete and the workspace is committed by transferring artifacts back to the data repository. From the perspective of data repository consistency, this is no different from any other transfer operation.
The artifact transaction system relies on low-level database and artifact storage having their own mechanisms to guard against internal corruption and data loss (e.g. backups, replication, etc.), and it assumes that the data committed by successful database transactions and successful artifact writes can be always restored by those low-level mechanisms. The role of the artifact transaction system is to provide synchronization between two independently fault-tolerant persistent storage systems.
3.2 Storage tables¶
The current butler database schema includes database_location
and database_location_trash
tables that this proposal has no need for.
The former was intended as a way to make it possible to query (using the database alone) for whether a dataset is stored by a particular datastore.
The ability to query this table was never implemented, and it is not clear that users should actually care which of several chained datastores actually store a dataset.
Going forward, we intend for the query system to test whether a dataset is stored (in both results and where
expressions) by checking for the presence of the dataset’s UUID in any datastore-record table.
The set of which tables to include in that query could be restricted at query-construction time by asking the datastore whether it would ever store a particular dataset type, but at present this would probably be a premature optimization.
The database_location_trash
was intended to aid with consistency when deleting datasets, but it never worked and no longer serves any real purpose.
3.3 Artifact Transaction Details¶
3.3.1 Opening a transaction¶
Artifact transactions are opened by calling Butler.begin_transaction()
with an ArtifactTransaction
instance.
This will frequently happen inside other butler methods, but begin_transaction()
is a public method precisely so external code (such as the pipeline execution middleware) can define specialized transaction types - though this may only ever happen in practice in DirectButler
, since RemoteButler
will only support transaction types that have been vetted in advance.
Open artifact transactions are represented in the repository database primarily by the artifact_transaction
table:
CREATE TABLE artifact_transaction (
name VARCHAR PRIMARY KEY,
data JSON NOT NULL
);
The artifact_transaction
table has one entry for each open transaction.
In addition to the transaction name, it holds a serialized description of the transaction (ArtifactTransaction
instances are subclasses of pydantic.BaseModel
) directly in its data
column.
Additional tables that provide locking for concurrency are described in 3.3.4 Concurrency and locking.
The database inserts that open an artifact transaction occur within a single database transaction, and ArtifactTransaction.begin
is first given an opportunity to execute certain database operations in this transaction as well (see 3.3.5 Database-only operations).
The RemoteButler
implementation of Butler.begin_transaction()
will serialize the transaction on the client and send this serialized form of the transaction to the Butler REST Server, which performs all other interactions with the transaction.
This includes checking whether the user is permitted to run this transaction.
3.3.2 Closing a transaction¶
A transaction can only be closed by calling Butler.commit_transaction()
, revert_transaction()
, or abandon_transaction()
.
Butler.commit_transaction()
delegates to ArtifactTransaction.commit()
, and it always attempts to accomplish the original goals of the transaction.
It raises (keeping the transaction open and performing no database operations) if it cannot fully succeed after doing as much as it can.
Commit implementations are given an opportunity to perform additional database-only operations in the same database transaction that deletes the artifact_transaction
rows (see 3.3.5 Database-only operations).
Butler.revert_transaction()
(delegating to ArtifactTransaction.revert()
) is the opposite - it attempts to undo any changes made by the transaction (including any changes made when opening it), and it also raises if this is impossible, preferably after undoing all that it can first.
Revert implementations are also given an opportunity to perform additional database-only operations in the same database transaction that deletes the artifact_transaction
rows, but the set of supported database operations supported here is limited to those that invert operations that can be performed in ArtifactTransaction.begin()
.
Butler.abandon_transaction()
(delegating to ArtifactTransaction.abandon()
) reconciles database and artifact state while minimizing the chance of failure; its goal is to only fail if low-level database or artifact storage operations fail.
This means:
inserting datastore records for datasets that are already registered and whose artifacts are all present;
deleting artifacts that do not comprise a complete and valid dataset.
In RemoteButler
the ArtifactTransaction
closing methods are always run on the server, since this is the only place consistency guarantees can be safely verified.
The transaction definitions are also read by the server.
The server may need to re-check user permission to run the transaction if there’s a chance that may have changed, but we may also be able to handle this possibility by requiring the user to have no open artifact transactions when changing their access to various entities.
3.3.3 Workspace transactions¶
The set of datasets and related artifacts managed by an artifact transaction is usually fixed when the transaction is opened, allowing all dataset metadata needed to implement the transaction to be serialized to an artifact_transaction
row at that time.
A transaction can also indicate that new datasets will be added to the transaction over its lifetime by overriding ArtifactTransaction.is_workspace
to True
.
This causes the transaction to be assigned a workspace root, a directory or directory-like location where the transaction can write files that describe these new datasets before the artifacts for those datasets are actually written.
The driving use case is PipelineTask
execution, for which these files will include the serialized QuantumGraph
.
At present we expect only DirectButler
to support workspace transactions - using signed URLs for workspace files is a complication we’d prefer to avoid, and we want to limit the set of concrete artifact transaction types supported by RemoteButler
to a few hopefully-simple critical ones anyway.
A workspace transaction may also provide access to a transaction-defined client object by implementing ArtifactTransaction.make_workspace_client()
; this can be used to provide a higher-level interface for adding datasets (like building and executing quantum graphs).
User code should obtain a client instance by calling Butler.make_workspace_client()
with the transaction name.
When a workspace transaction is opened, the serialized transaction is written to a JSON file in the workspace as well as the artifact_transaction
database table.
This allows Butler.make_workspace_client()
to almost always avoid any database or server calls (it is a classmethod()
, so even Butler
startup server calls are unnecessary).
If the transaction JSON file does not exist, Butler.make_workspace_client()
will have to query the artifact_transaction
table to see if the transaction does, and recreate the file if it does.
This guards against two rare failure modes in workspace construction:
When a workspace transaction is opened, we register the transaction with the database before creating the workspace root and the transaction JSON file there; this lets us detect concurrent attempts to open the same transaction and ensure only one of those attempts tries to perform the workspace writes. But it also makes it possible that a failure will occur after the transaction has already been registered, leaving the workspace root missing.
When a workspace transaction is closed, we delete the transaction JSON file just before removing the transaction from the database. This prevents calls to
Butler.make_workspace_client()
from succeeding during or after its deletion (since deleting the transaction JSON file can fail).
This scheme does not protect against concurrency issues occurring within a single workspace, which are left to transaction and workspace client implementations and the higher-level code that uses them.
For example, a workspace client obtained before the transaction is closed can still write new workspace files and datastore artifacts without any way of knowing that the transaction has closed.
This is another reason internal workspaces will be not be supported by RemoteButler
.
The workspace root will be recursively deleted by the Butler
after its transaction closes, with the expectation that its contents will have already been translated into database content or artifacts (or are intentionally being dropped).
This can only be done after the closing database transaction concludes, since we need to preserve the workspace state in case the database transaction fails.
In the rare case that workspace root deletion fails after the artifact transaction has been removed from the database, we still consider the transaction closed, and we provide Butler.vacuum_workspaces()
as a way to scan for and remove those orphaned workspace roots.
3.3.4 Concurrency and locking¶
The artifact transaction system described in previous sections is sufficient to maintain repository consistency only when the changes made by concurrent transactions are disjoint.
To guard against race conditions, we need to introduce some locking.
Database tables that associate datasets or RUN
collections with a single transaction (enforced by unique constraints) are an obvious choice.
Per-dataset locks would be ideal for maximizing parallelism, but expensive to implement in the database - to prevent competing writes to the same artifacts, we would need the lock tables to implement the full complexity of the dataset_tags_*
tables to prevent {dataset type, data ID, run}
conflicts as well as UUID conflicts, since the former are what provide uniqueness to artifact URIs.
Coarse per-RUN
locking is much cheaper, but a major challenge for at least one major use case and possibly a few others:
Prompt Processing needs each worker to be able to transfer its own outputs back to the same
RUN
collection in parallel.Service-driven raw-ingest processes may need to ingest each file independently and in parallel, and modify a single, long-lived
RUN
.Transfers between data facilities triggered by Rucio events may also need to perform multiple ingests into the same
RUN
in parallel.
It is important to note that what is missing from Prompt Processing (and possibly the others) is sequence-point hook that could run Butler.commit_transaction()
to modify the database, close the transaction, and then possibly open a new one.
When such a sequence-point hook is available, a single transaction could be used to wrap parallel artifact transfers that do the vast majority of the work, with only the database operations and artifact verification run in serial.
This is what we expect batch- and user-driven ingest/import/transfer operations to do (to the extent those need parallelism at all).
To address these use cases we propose using two tables to represent locks:
CREATE TABLE artifact_transaction_modified_run (
transaction_name VARCHAR NOT NULL REFERENCES artifact_transaction (name),
run_name VARCHAR PRIMARY KEY
);
CREATE TABLE artifact_transaction_insert_only_run (
transaction_name VARCHAR PRIMARY KEY REFERENCES artifact_transaction (name),
run_name VARCHAR PRIMARY KEY
);
The artifact_transaction_modified_run
table provides simple locking that associates a RUN
with at most one artifact transaction.
It would be populated from the contents of ArtifactTransaction.get_modified_runs()
when the transaction is opened, preventing the opening database transaction from succeeding if there are any competing artifact transactions already open.
The artifact_transaction_insert_only_run
table is populated by ArtifactTransaction.get_insert_only_runs()
, which should include only RUN
collections whose datasets are inserted via call to the ArtifactTransactionOpenContext.insert_new_datasets
method, and not modified by the transaction in any other way.
Inserting new datasets in exactly this way will also cause the opening database transaction to fail (due to a unique constraint violation) if any dataset already exists with the same {dataset type, data ID, run}
combination, and it happens to be exactly what the challenging use cases would naturally do.
This allows us to drop the unique constraint on run_name
alone in this table, and permit multiple artifact transactions writing to the same run to coexist.
We do still need to track the affected RUN
collections to ensure they do not appear in artifact_transaction_modified_run
, which is why artifact_transaction_insert_only_run
still needs to exist.
When an artifact transaction is opened, the butler should verify that run_name
values in those two tables are disjoint.
This may require the opening database transaction to be performed with SERIALIZABLE
isolation.
3.3.5 Database-only operations¶
Artifact transactions are given an opportunity to perform certain database-only operations both in begin()
and in their closing methods, to make high-level operations that include both artifact and database-only modifications atomic.
The set of operations permitted when opening and closing artifact transactions reflects an attempt to balance a few competing priorities:
Inserting datasets early is necessary for our limited per-dataset locking scheme, but datasets can only be inserted if their
RUN
collection and all dimension records for their data IDs already exist.Performing likely-to-fail database operations early causes those failures to prevent the artifact transaction from being opened, before any expensive and hard-to-revert artifact writes are performed.
Performing database operations early makes it a challenge (at best) to implement to implement
revert()
. Idempotent database operations likeINSERT ... ON CONFLICT IGNORE
andDELETE FROM ... WHERE ...
cannot know which rows they actually affected and hence which modifications to undo - at least not until after the initial database transaction is committed, which is too late to modify the serialized artifact transaction. This defeats the purpose of including these operations with the artifact transaction at all.Even non-idempotent database operations performed early must reckon with the possibility of another artifact transaction (or database-only butler method) performing overlapping writes before the artifact transaction is closed, unless we prohibit them with our own locking.
Note
Dataset type registration is never included as part of an artifact transaction because it can require new database tables to be created, and that sometimes needs to be done in its own database transaction.
ArtifactTransactionOpenContext
defines the operations available to begin()
to be limited to non-idempotent dataset and collection registration (i.e. raising if those entities already exist) and datastore record removal.
Dataset insertion sometimes has to occur early for fine-grained locking, and in other cases we want it to run early because its typical failure modes - foreign key violations (invalid data IDs) and {dataset type, data ID, run}
unique constraint violations - are problems we want to prevent us from writing artifacts as early as possible.
In order to insert datasets early, we also need to provide the ability to add RUN
collections early.
Dataset insertion can also depend on dimension record presence, but since these usually require idempotent inserts and are problematic to remove, we require dimension-record insertion to occur in a separate database transaction before the artifact transaction begins.
Removing datastore records at the start of an artifact transaction is not really a “database-only” operation; it is required in order to remove the the associated artifacts during that transaction.
ArtifactTransactionCloseContext
supports only datastore record insertion, since that is all abandon()
is permitted to do.
It also provides a convenience method for calling Datastore.verify()
on a mapping of datasets, since that proved quite useful in prototyping.
ArtifactTransactionRevertContext
extends the options available to
revert()
to removing dataset registrations and removing collection registrations; these are the inverses of the only operations supported by begin()
.
ArtifactTransactionCommitContext
extends this further to also allow commit()
to create and modify CHAINED
, TAGGED
, and CALIBRATION
collections, and to register new datasets.
The only reason we might want to perform those collection operations early would be to fail early if they violate constraints, but this is outweighed by the fact that they are impossible to manually undo safely (most are idempotent) and (unlike datasets) are not protected from concurrent modifications by locking.
And unlike dataset-insertion constraint violations, errors in these operations rarely suggest problems that need to block artifacts from being written.
Inserting new datasets when committing rather than when opening an artifact transaction is support specifically for internal workspaces, which generally do not have a complete list of datasets whose artifacts they will write when their transaction is opened.
Ensuring consistency in CHAINED
collection operations in particular may require the closing database transaction to use SERIALIZABLE
isolation.
Adding further support for dimension-record insertion in commit()
would not be problematic, but it’s not obviously useful, since dimension records usually need to be present before datasets are inserted.
4 Use Case Details¶
4.1 Butler.put
¶
Note
Almost all put
calls today happen in the context of pipeline execution, but our intent is to ultimately make all task execution go through the workspace concept introduced in 2 Component Overview (and described more fully in DMTN-271 [1]).
The remaining use cases for put
include RubinTV’s best-effort image processing (which should probably use workspaces as well), certain curated-calibration ingests, and users puttering around in notebooks.
The prototype implementation of Butler.put_many
(a new API we envision put
delegating to) begins by expanding the data IDs of all of the given DatasetRef
objects is given.
A dictionary mapping UUID to DatasetRef
is then used to construct a PutTransaction
instance to pass to Butler.begin_transaction()
.
The transaction state is just a mapping of DatasetRef
objects.
The begin()
implementation for this transaction registers the new datasets.
This provides fine-grained locking (as described in 3.3.4 Concurrency and locking) on success and forces the operation to fail early and prevent the transaction from ever being opened if this violates a constraint, such as an invalid data ID value or a {dataset type, data ID, collection}
uniqueness failure.
If the artifact transaction is opened successfully, the new datasets appear registered but unstored to queries throughout the transactions’ lifetime.
After the transaction has been opened, Butler.put_many
calls Datastore.predict_new_uris()
and Butler._get_resource_paths()
to obtain all signed URLs needed for the writes.
In DirectButler
, _get_resource_paths()
just concatenates the datastore root with the relative path instead.
These URLs are passed to Datastore.put_many()
to write the actual artifacts directly to their permanent locations.
If these datastore operations succeed, Butler.commit_transaction()
is called.
This calls the transaction’s commit()
method (on the server for RemoteButler
, in the client in DirectButler
), which calls Datastore.verify
on all datasets in the transaction.
Because these DatasetRef
objects do not have datastore records attached, Datastore.verify
is responsible for generating them (e.g. regenerating URIs, computing checksums and sizes) as well as checking that these artifacts all exist.
The datastore records are inserts into the database as the artifact transaction is closed, with no additional database operations performed.
All operations after the transaction’s opening occur in a try
block that calls Butler.revert_transaction
if an exception is raised.
The revert()
implementation calls Datastore.unstore()
to remove any artifacts that may have been written.
If this succeeds, it provides strong exception safety; the repository is left in the same condition it was before the transaction was opened.
If it fails - as would occur if the database or server became unavailable or artifact storage became unwriteable - a special exception is raised (chained to the original error) notifying the user that the transaction has been left open and must be cleaned up manually.
The datasets registered when the transaction was opened are then removed.
In the case of PutTransaction
, a revert should always be possible as long as the database and artifact storage systems are working normally, and the new datasets have not been added to any TAGGED
or CALIBRATION
collections.
Note
This technote assumes we will actually implement DM-33635 and make it an error to attempt to remove a dataset while it is still referenced by a TAGGED
or CALIBRATION
collection.
As always, abandoning the failed transaction is another option.
The abandon()
implementation for put
is quite similar to the commit()
implementation; it differs only in that it exits without error instead of raising an exception when some artifacts are missing.
It still inserts datastore records only for the datasets whose artifacts are already present (as is necessary for consistency guarantees), and it deletes the rest completely.
This leaves all dataset registrations in place (stored or unstored as appropriate), ensuring that abandon()
can succeed even when those datasets have already been referenced in TAGGED
or CALIBRATION
collections.
4.2 Removing artifacts¶
The prototype includes a Butler.remove_datasets()
method that can either fully remove datasets (purge=True
) or merely unstore them (purge=False
).
This method begins by expanding all given DatasetRef
objects, which includes both expanding their data IDs and attaching existing datastore records.
These are used to construct and begin a RemovalTransaction
.
The state for this transaction is again a mapping of DatasetRef
objects, along with the boolean purge
flag.
The RemovalTransaction
begin()
implementation removes all datastore records for its artifacts, as required for datasets managed by an artifact transaction.
As in PutTransaction
, we want the datasets managed by the artifact transaction to appear as registered but unstored while the artifact transaction is open.
Because RemovalTransaction
performs modifications other than dataset insertions, it must use coarse RUN
locking and implements to ArtifactTransaction.get_modified_runs()
to return all RUN
collections that hold any of the datasets it intends to delete.
In this case there is nothing to do with the transaction after it has been opened besides commit it.
The commit()
implementation delegates to Datastore.unstore()
to actually remove all artifacts, and if purge=True
it also fully removes these those datasets from the database.
In addition to the ever-present possibility low-level failures, Butler.commit_transaction()
can also fail if (purge=True
) and any dataset is part of a TAGGED
or CALIBRATION
collection.
If the commit operation fails, a try
block in Butler.remove_datasets()
attempts a revert()
in order to try to provide strong exception safety, but this will frequently fail, since it requires all artifacts to still be present, and hence works only if the error occurred quite early and the Datastore.verify()
calls in revert()
still succeed.
More frequently we expect failures in removal that occur after the transaction is opened to result in the transaction being left open and resolution left to the user, again with a special exception raised to indicate this state.
Removals due to low-level failures can be retried by calling Butler.commit_transaction()
; this can also be used after removing references to the dataset in TAGGED
or CALIBRATION
collections to complete a purge.
The abandon()
implementation for removals is almost identical to the one for put
: Datastore.verify()
is used to identify which datasets still exist and which have been removed, and the datastore records for those still present are returned so they can be inserted into the database when the transaction is closed.
When abandoning a removal we leave datasets as registered but unstored when their artifacts are missing, since this is closer to the state or the repository when the transaction was opened and avoids any chance of failure due to TAGGED
or CALIBRATION
associations.
A subtler difference between put
and removal is that the DatasetRef
objects held by RemovalTransaction
include their original datastore records, allowing Datastore.verify()
(in both abandon()
and revert()
) to guard against unexpected changes (e.g. by comparing checksums), while in PutTransaction
all Datastore.verify()
can do is generate new records.
4.3 Transfers¶
Transfers, ingests, and imports are not fully prototyped here because they’re broadly similar to put
from the perspective of the transaction system - a transaction is opened, artifacts are written by code outside the transaction system by direct calls to Datastore
methods, and then the transaction is committed with revert and abandon also behaving similarly.
In particularly simple cases involving new-dataset transfers only, the PutTransaction
implementation prototyped here may even be usable as-is, with a datastore ingest operation swapped in for the call to Datastore.put_many
that occurs within the transaction lifetime but outside the ArtifactTransaction
object itself.
5 Prototype Code¶
- class LimitedButler¶
class LimitedButler(ABC): """Minimal butler interface. This interface will be sufficient for `~lsst.pipe.base.PipelineTask` execution, in that it can fully back a `~lsst.pipe.base.QuantumContext` implementation. It doesn't have the existence-check and removal capabilities of the current `LimitedButler` (which are needed by `lsst.ctrl.mpexec.SingleQuantumExecutor`) because DMTN-271's workspace concept will handle clobbering and existence-checks differently. """ @property @abstractmethod def dimensions(self) -> DimensionUniverse: """Definitions for all dimensions.""" raise NotImplementedError() @property @abstractmethod def is_writeable(self) -> bool: """Whether `put` and other write methods are supposed.""" raise NotImplementedError() def get( self, ref: DatasetRef, *, parameters: Mapping[GetParameter, Any] | None = None, ) -> InMemoryDataset: """Load a dataset, given a `DatasetRef` with a fully-expanded data ID and datastore records. Notes ----- The full `Butler` will have an overload of this method that takes dataset type and data ID, and its version of this overload will not require the `DatasetRef` to be fully expanded. """ ((_, _, result),) = self.get_many([(ref, parameters if parameters is not None else {})]) return result @abstractmethod def get_many( self, arg: Iterable[tuple[DatasetRef, Mapping[GetParameter, Any]]], /, ) -> Iterable[tuple[DatasetRef, Mapping[GetParameter, Any], InMemoryDataset]]: """Vectorized implementation of `get`.""" raise NotImplementedError() def get_deferred( self, ref: DatasetRef, *, parameters: Mapping[GetParameter, Any] | None = None, ) -> DeferredDatasetHandle: """Return a handle that can fetch a dataset into memory later. Notes ----- The full `Butler` will have an overload of this method that takes dataset type and data ID, and its version of this overload will not require the `DatasetRef` to be fully expanded. """ ((_, _, handle),) = self.get_many_deferred([(ref, parameters if parameters is not None else {})]) return handle def get_many_deferred( self, arg: Iterable[tuple[DatasetRef, Mapping[GetParameter, Any]]], /, ) -> Iterable[tuple[DatasetRef, Mapping[GetParameter, Any], DeferredDatasetHandle]]: """Vectorized implementation of `get_deferred`.""" return [ (ref, parameters_for_ref, DeferredDatasetHandle(self, ref, parameters_for_ref)) # type: ignore for ref, parameters_for_ref in arg ] def get_uri(self, ref: DatasetRef) -> StorageURI: """Return the relative URI for the artifact that holds this dataset. The given dataset must have fully-expanded data IDs and datastore records. This method raises if there is no single URI for the dataset; use `get_many_uris` for the general case. Notes ----- The full `Butler` will have an overload of this method that takes dataset type and data ID, and its version of this overload will not require the `DatasetRef` to be fully expanded. """ ((_, uri),) = self.get_many_uris([ref]) return uri @abstractmethod def get_many_uris(self, refs: Iterable[DatasetRef]) -> Iterable[tuple[DatasetRef, StorageURI]]: """Return all relative URIs associated with the given datasets. The given datasets must have fully-expanded data IDs and datastore records. Each `DatasetRef` may be associated with multiple URIs. Notes ----- The full `Butler` version of this method will not require the given `DatasetRef` instances to be fully expanded. """ raise NotImplementedError() def put(self, obj: InMemoryDataset, ref: DatasetRef) -> None: """Write a dataset given a DatasetRef with a fully-expanded data ID (but no Datastore records). Notes ----- The full Butler will have an overload of this method that takes dataset type and data ID, and its version of this overload will not require the `DatasetRef` to be fully expanded. """ self.put_many([(obj, ref)]) @abstractmethod def put_many(self, arg: Iterable[tuple[InMemoryDataset, DatasetRef]], /) -> None: """Vectorized implementation of `put`.""" raise NotImplementedError()
- class Butler¶
- get_many()¶
@final def get_many( self, arg: Iterable[tuple[DatasetRef, Mapping[GetParameter, Any]]], /, ) -> Iterable[tuple[DatasetRef, Mapping[GetParameter, Any], InMemoryDataset]]: # Signature is inherited, but here it accepts not-expanded refs. parameters = [] refs = [] for ref, parameters_for_ref in arg: parameters.append(parameters_for_ref) refs.append(ref) expanded_refs = self.expand_existing_dataset_refs(refs) paths = self._get_resource_paths(self._datastore.extract_existing_uris(expanded_refs)) return self._datastore.get_many(zip(list(expanded_refs), parameters), paths)
- put_many()¶
@final def put_many(self, arg: Iterable[tuple[InMemoryDataset, DatasetRef]]) -> None: # Signature is inherited, but here it accepts not-expanded refs. If # transaction_name is provided, concurrent calls with the same name # pick a single winner, with the losers doing nothing. from .put_transaction import PutTransaction refs: dict[DatasetId, DatasetRef] = {} objs: list[InMemoryDataset] = [] data_ids: dict[tuple[DataIdValue, ...], DataCoordinate] = {} for obj, ref in arg: refs[ref.id] = ref data_ids[ref.dataId.values_tuple()] = ref.dataId objs.append(obj) # Expand the data IDs associated with all refs. data_ids = { data_id.values_tuple(): data_id for data_id in self.expand_data_coordinates(data_ids.values()) } for ref in refs.values(): refs[ref.id] = dataclasses.replace(ref, dataId=data_ids[ref.dataId.values_tuple()]) # Open a transaction. transaction = PutTransaction(refs=refs) transaction_name, _ = self.begin_transaction(transaction) try: uris = self._datastore.predict_new_uris(refs.values()) self._datastore.put_many(zip(objs, refs.values()), paths=self._get_resource_paths(uris)) self.commit_transaction(transaction_name) except BaseException as main_err: try: self.revert_transaction(transaction_name) except BaseException: raise UnfinishedTransactionError( "Dataset write failed (see chained exception for details) and could not be reverted. " f"Artifact transaction {transaction_name!r} must be manually committed, reverted, " "or abandoned after any low-level problems with the repository have been addressed." ) from main_err raise
- expand_existing_dataset_refs()¶
@final def expand_existing_dataset_refs(self, refs: Iterable[DatasetRef]) -> Iterable[DatasetRef]: """Expand DatasetRefs to include all relevant dimension records and datastore records. """ raise NotImplementedError("TODO: implement here by delegating to query() and _cache.")
- expand_data_coordinates()¶
@final def expand_data_coordinates(self, data_coordinates: Iterable[DataCoordinate]) -> Iterable[DataCoordinate]: """Expand data IDs to include all relevant dimension records.""" raise NotImplementedError("TODO: implement here by delegating to query() and _cache.")
- remove_datasets()¶
@final def remove_datasets( self, refs: Iterable[DatasetRef], purge: bool = False, ) -> None: """Remove datasets. Parameters ---------- refs : `~collections.abc.Iterable` [ `DatasetRef` ] Datasets whose artifacts should be deleted. purge : `bool`, optional Whether to un-register datasets in addition to removing their stored artifacts. transaction_name : `str`, optional Name of the transaction. If provided, concurrent calls with the same transaction will pick a single winner with the result doing nothing. The caller is responsible for ensuring all such calls would actually do the same thing.. """ from .removal_transaction import RemovalTransaction transaction = RemovalTransaction( refs={ref.id: ref for ref in self.expand_existing_dataset_refs(refs)}, purge=purge ) transaction_name, _ = self.begin_transaction(transaction) try: self.commit_transaction(transaction_name) except BaseException as main_err: try: self.revert_transaction(transaction_name) except BaseException: raise UnfinishedTransactionError( "Dataset removal failed (see chained exception for details) and could not be reverted. " f"Artifact transaction {transaction_name!r} must be manually committed, reverted, " "or abandoned after any low-level problems with the repository have been addressed." ) from main_err raise
- begin_transaction()¶
@abstractmethod def begin_transaction( self, transaction: ArtifactTransaction, name: ArtifactTransactionName | None = None, ) -> tuple[ArtifactTransactionName, bool]: """Open a persistent transaction in this repository. Parameters ---------- cls Transaction type object. transaction Object that knows how to commit, revert, or abandon the transaction can [de]serialize itself from JSON data, and provide information about what it will modify. name Name of the transaction. Should be prefixed with ``u/$USER`` for regular users. If not provided, defaults to something that is guaranteed to be universally unique with that prefix. Returns ------- name Name of the transaction. just_opened Whether this process actually opened the transaction (`True`) vs. finding an already-open identical one (`False`). """ raise NotImplementedError()
- commit_transaction()¶
@abstractmethod def commit_transaction(self, name: ArtifactTransactionName) -> None: """Commit an existing transaction. Most users should only need to call this method when a previous (often implicit) commit failed and they want to retry. This method will raise an exception and leave the transaction open if it cannot fully perform all of the operations originally included in the transaction. """ raise NotImplementedError()
- revert_transaction()¶
@abstractmethod def revert_transaction(self, name: ArtifactTransactionName) -> bool: """Revert an existing transaction. Most users should only need to call this method when a previous (often implicit) commit failed and they want to retry. This method will raise an exception and leave the transaction open if it cannot fully undo any modifications made by the transation since it was opened. """ raise NotImplementedError()
- abandon_transaction()¶
@abstractmethod def abandon_transaction(self, name: ArtifactTransactionName) -> None: """Abandon an existing transaction. Most users should only need to call this method when a previous (often implicit) commit failed and they want to retry. This method will only raise (and leave the transaction) open if it encounters a low-level error, but it may leave the transaction's operations incomplete (with warnings logged). Repository invariants will still be satisfied. """ raise NotImplementedError()
- list_transactions()¶
@abstractmethod def list_transactions(self) -> list[ArtifactTransactionName]: """Return the names of all active transactions that the current user has write access to. Notes ----- Administrators are expected to use this to check that there are no active artifact transactions before any migration or change to central datastore configuration. Active transactions are permitted to write to datastore locations without having the associated datastore records saved anywhere in advance, so we need to ensure the datastore configuration used to predict artifact locations is not changed while they are active. """ raise NotImplementedError()
- make_workspace_client()¶
@classmethod @abstractmethod def make_workspace_client( cls, config: ButlerConfig | ResourcePathExpression, name: ArtifactTransactionName ) -> Any: """Make a workspace client for the given transaction. Notes ----- This is a `classmethod` to allow it to avoid the need to connect to a database or REST server (as `Butler` construction typically does): workspace transactions are serialized to a JSON file whose path should be deterministic given butler config and the transaction name, and the `Datastore` should also be constructable from butler config alone. If the serialized transaction does not exist at the expected location, this method will have to connect to a server to see if the transaction exists there; if it does, the serialized transaction will be written to the file location for future calls. If it does not, the transaction has been closed an an exception is raised. This behavior guards against unexpected failures in either opening or closing a workspace transaction. """ raise NotImplementedError()
- vacuum_workspaces()¶
@abstractmethod def vacuum_workspaces(self) -> None: """Clean up any workspace directories not associated with an active transaction. This method is only needed because: - we have to write transaction headers to workspace roots before inserting the transaction into the database when it is opened; - we have to delete workspace roots after deleting the transaction from the database on commit or abandon; and in both cases we can't guarantee we won't be interrupted. But vacuums should only be needed extremely rarely. """ raise NotImplementedError()
- _get_resource_paths()¶
def _get_resource_paths( self, uris: Iterable[StorageURI], write: bool = False ) -> Mapping[StorageURI, ResourcePath]: """Return `ResourcePath` objects usable for direct Datastore operations for the given possibly-relative URIs. This turns URIs into absolute URLs and will sign them if needed for this repository. """ return PathMapping(frozenset(uris))
- class Datastore¶
- tables¶
@property @abstractmethod def tables(self) -> Mapping[DatastoreTableName, DatastoreTableDefinition]: """Database tables needed to store this Datastore's records.""" raise NotImplementedError()
- extract_existing_uris()¶
@abstractmethod def extract_existing_uris(self, refs: Iterable[DatasetRef]) -> list[StorageURI]: """Extract URIs from the records attached to datasets. These are possibly-relative URIs that need to be made absolute and possibly signed in order to be used. """ raise NotImplementedError()
- predict_new_uris()¶
@abstractmethod def predict_new_uris(self, refs: Iterable[DatasetRef]) -> list[StorageURI]: """Predict URIs for new datasets to be written. These are possibly-relative URIs that need to be made absolute and possibly signed in order to be used. """ raise NotImplementedError()
- get_many()¶
@abstractmethod def get_many( self, arg: Iterable[tuple[DatasetRef, Mapping[GetParameter, Any]]], /, paths: Mapping[StorageURI, ResourcePath], ) -> Iterable[tuple[DatasetRef, Mapping[GetParameter, Any], InMemoryDataset]]: """Load datasets into memory. Incoming `DatasetRef` objects will have already been fully expanded to include both expanded data IDs and all possibly-relevant datastore table records. Notes ----- The datasets are not necessarily returned in the order they are passed in, to better permit async implementations with lazy first-received iterator returns. Implementations that can guarantee consistent ordering might want to explicitly avoid it, to avoid allowing callers to grow dependent on that behavior instead of checking the returned `DatasetRef` objects themselves. """ raise NotImplementedError()
- put_many()¶
@abstractmethod def put_many( self, arg: Iterable[tuple[InMemoryDataset, DatasetRef]], /, paths: Mapping[StorageURI, ResourcePath] ) -> None: """Write an in-memory object to this datastore. Parameters ---------- arg Objects to write and the `DatasetRef` objects that should identify them. paths Mapping from possibly-relative URI to definitely-absolute, signed-if-needed URL. Notes ----- This method is not responsible for generating records; in this prototype we delegate that to `verify` so we can always do that work on the Butler REST server when there is one, since `put_many` can only be called on the client. """ raise NotImplementedError()
- verify()¶
@abstractmethod def verify( self, refs: Iterable[DatasetRef] ) -> Iterable[tuple[DatasetId, bool | None, dict[DatastoreTableName, list[StoredDatastoreItemInfo]]]]: """Test whether all artifacts are present for a dataset and return datastore records that represent them. Parameters ---------- ref : ~collections.abc.Iterable` [ `DatasetRef` ] Dataset to verify. May or may not have datastore records attached; if records are attached, they must be consistent with the artifact content (e.g. if checksums are present, they should be checked). Returns ------- results : `~collections.abc.Iterable` Result 3-tuples, each of which contains: - ``dataset_id``: the ID of one of the given datasets. - ``valid``: if all artifacts for this dataset are present , `True`. If no artifacts are present, `None`. If only some of the needed artifacts are present or any artifact is corrupted, `False`. - ``records``: datastore records that should be attached to this `DatasetRef`. These must be created from scratch if none were passed in (the datastore may assume its configuration has not changed since the artifact(s) were written) and maybe augmented if incomplete (e.g. if sizes or checksums were absent and have now been calculated). """ raise NotImplementedError()
- unstore()¶
@abstractmethod def unstore(self, refs: Iterable[DatasetRef]) -> None: """Remove all stored artifacts associated with the given datasets. Notes ----- Artifacts that do not exist should be silently ignored - that allows this method to be called to clean up after an interrupted operation has left artifacts in an unexpected state. The given `DatasetRef` object may or may not have datastore records attached. If no records are attached the datastore may assume its configuration has not changed since the artifact(s) were written. """ raise NotImplementedError()
- class ArtifactTransaction¶
- is_workspace¶
@property def is_workspace(self) -> bool: """Whether this transaction is associated with a workspace that allows content to be added incrementally. """ return False
- make_workspace_client()¶
def make_workspace_client(self, datastore: Datastore) -> Any: return None
- get_operation_name()¶
@abstractmethod def get_operation_name(self) -> str: """Return a human-readable name for the type of transaction. This is used to form the user-visible transaction name when no name is provided. """ raise NotImplementedError()
- get_insert_only_runs()¶
@abstractmethod def get_insert_only_runs(self) -> Set[CollectionName]: """Return the `~CollectionType.RUN` collections that this transaction inserts new datasets into only. """ raise NotImplementedError()
- get_modified_runs()¶
@abstractmethod def get_modified_runs(self) -> Set[CollectionName]: """Return the `~CollectionType.RUN` collections that this transaction modifies in any way other than insertin new datasets. This includes datasets added to the database via `ArtifactTransactionOpenContext.ensure_datasets`. """ raise NotImplementedError()
- begin()¶
@abstractmethod def begin(self, context: ArtifactTransactionOpenContext) -> None: """Open the transaction. This method is called within a database transaction with at least ``READ COMMITTED`` isolation. Exceptions raised by this method will cause that database transaction to be rolled back, and exceptions that originate in a database constraint failure must be re-raised or allowed to propagate; these indicate that a rollback is already in progress. After this method exits but before the database transaction is committed, the database rows representing this transaction will be inserted. """ raise NotImplementedError()
- commit()¶
@abstractmethod def commit(self, context: ArtifactTransactionCommitContext) -> None: """Commit this transaction. This method is called within a database transaction with ``SERIALIZABLE`` isolation. Exceptions raised by this method will cause that database transaction to be rolled back, and exceptions that originate in a database constraint failure must be re-raised or allowed to propagate; these indicate that a rollback is already in progress. After this method exits but before the database transaction is committed, the database rows representing this transaction will be deleted. """ raise NotImplementedError()
- revert()¶
@abstractmethod def revert(self, context: ArtifactTransactionRevertContext) -> None: """Revert this transaction. This method is called within a database transaction with ``READ COMMITTED`` isolation. Exceptions raised by this method will cause that database transaction to be rolled back, and exceptions that originate in a database constraint failure must be re-raised or allowed to propagate; these indicate that a rollback is already in progress. After this method exits but before the database transaction is committed, the database rows representing this transaction will be deleted. """ raise NotImplementedError()
- abandon()¶
@abstractmethod def abandon(self, context: ArtifactTransactionCloseContext) -> None: """Abandon this transaction. This method is called within a database transaction with at least ``READ COMMITTED`` isolation. Exceptions raised by this method will cause that database transaction to be rolled back, and exceptions that originate in a database constraint failure must be re-raised or allowed to propagate; these indicate that a rollback is already in progress. After this method exits but before the database transaction is committed, the database rows representing this transaction will be deleted. """ raise NotImplementedError()
- class ArtifactTransactionOpenContext¶
class ArtifactTransactionOpenContext(ABC): """The object passed to `ArtifactTransaction.begin` to allow it to perform limited database operations. """ @abstractmethod def insert_new_run(self, name: CollectionName, doc: CollectionDocumentation | None = None) -> None: """Insert the given `~CollectionType.RUN` collection and raise if it already exists. """ raise NotImplementedError() @abstractmethod def insert_new_datasets(self, refs: Iterable[DatasetRef]) -> None: """Insert new datasets, raising if any already exist.""" raise NotImplementedError() def discard_datastore_records(self, dataset_ids: Iterable[DatasetId]) -> None: """Discard any datastore records associated with the given datasets, ignoring any datasets that do not exist or already do not have datastore records. """ raise NotImplementedError()
- class ArtifactTransactionCloseContext¶
class ArtifactTransactionCloseContext(ABC): """The object passed to `ArtifactTransaction.abandon` to allow it to modify the repository. """ @property @abstractmethod def datastore(self) -> Datastore: """The datastore to use for all artifact operations. Typically only `Datastore.verify` and `Datastore.unstore` should need to be called when a transaction is closed. """ raise NotImplementedError() @abstractmethod def insert_datastore_records( self, records: Mapping[DatastoreTableName, Iterable[StoredDatastoreItemInfo]] ) -> None: """Insert records into a database table, marking a dataset as being stored. """ raise NotImplementedError() def verify_artifacts( self, refs: Mapping[DatasetId, DatasetRef] ) -> tuple[ dict[DatastoreTableName, list[StoredDatastoreItemInfo]], dict[DatasetId, DatasetRef], dict[DatasetId, DatasetRef], dict[DatasetId, DatasetRef], ]: """Verify dataset refs using a datastore and classify them. This is a convenience method typically called by at least two of {`ArtifactTransaction.commit`, `ArtifactTransaction.revert`, `ArtifactTransaction.abandon`}. Parameters ---------- refs : `~collections.abc.Mapping` [ `~lsst.daf.butler.DatasetId`, \ `DatasetRef` ] Datasets to verify. Returns ------- records Mapping from datastore table name to the records for that table, for all datasets in ``present``. present Datasets whose artifacts were verified successfully. missing Datasets whose artifacts were fully absent. corrupted Datasets whose artifacts were incomplete or invalid. """ records: dict[DatastoreTableName, list[StoredDatastoreItemInfo]] = { table_name: [] for table_name in self.datastore.tables.keys() } present = {} missing = {} corrupted = {} for dataset_id, valid, records_for_dataset in self.datastore.verify(refs.values()): ref = refs[dataset_id] if valid: for table_name, records_for_table in records_for_dataset.items(): records[table_name].extend(records_for_table) present[ref.id] = ref elif valid is None: missing[ref.id] = ref else: corrupted[ref.id] = ref return records, present, missing, corrupted
- class ArtifactTransactionRevertContext¶
class ArtifactTransactionRevertContext(ArtifactTransactionCloseContext): """The object passed to `ArtifactTransaction.revert` to allow it to modify the repository. """ @abstractmethod def discard_collection(self, name: CollectionName, *, force: bool = False) -> None: """Delete any existing collection with the given name. If a collection with this name does not exist, do nothing. If ``force=False``, the collection must already have no associated datasets, parent collections, or child collections. If ``force=True``, - all datasets in `~CollectionType.RUN` collections are discarded; - all dataset-collection associations in `~CollectionType.TAGGED` or `~CollectionType.CALIBRATION` collections are discarded (the datasets themselves are not discarded); - parent and child collection associations are discarded (the parent and child collections themselves are not discarded). """ raise NotImplementedError() @abstractmethod def discard_datasets(self, dataset_ids: Iterable[DatasetId]) -> None: """Discard datasets. Datasets that do not exist are ignored. Datasets must not have any associated datastore records or associations with `~CollectionType.TAGGED` or `~CollectionType.CALIBRATION` collections. """ raise NotImplementedError()
- class ArtifactTransactionCommitContext¶
class ArtifactTransactionCommitContext(ArtifactTransactionRevertContext): """The object passed to `ArtifactTransaction.commit` to allow it to modify the repository. """ @abstractmethod def ensure_collection( self, name: CollectionName, type: CollectionType, doc: CollectionDocumentation | None = None ) -> None: """Insert the given collection if it does not exist. If the collection exists with a different type, raise `CollectionTypeError`. Documentation is only used when inserting the new collection; documentation conflicts are not checked. """ raise NotImplementedError() @abstractmethod def insert_new_datasets(self, refs: Iterable[DatasetRef]) -> None: """Insert new datasets, raising if any already exist.""" raise NotImplementedError() @abstractmethod def set_collection_chain( self, parent: CollectionName, children: Sequence[CollectionName], *, flatten: bool = False ) -> None: """Set the childen of a `~CollectionType.CHAINED` collection.""" raise NotImplementedError() @abstractmethod def get_collection_chain(self, parent: CollectionName) -> list[CollectionName]: """Get the childen of a `~CollectionType.CHAINED` collection.""" raise NotImplementedError() @abstractmethod def ensure_associations( self, collection: CollectionName, dataset_ids: Iterable[DatasetId], *, replace: bool = False, timespan: Timespan | None = None, ) -> None: """Associate datasets with a `~CollectionType.TAGGED` or `~CollectionType.CALIBRATION` collection. """ raise NotImplementedError() @abstractmethod def discard_associations( self, collection: CollectionName, dataset_ids: Iterable[DatasetId], *, replace: bool = False, timespan: Timespan | None = None, ) -> None: """Discard associations between datasets and a `~CollectionType.TAGGED` or `~CollectionType.CALIBRATION` collection. Associations that do not exist are ignored. """ raise NotImplementedError()
- class PutTransaction¶
class PutTransaction(ArtifactTransaction): """An artifact transaction implementation that writes several in-memory datasets to the repository. """ refs: dict[uuid.UUID, DatasetRef] def get_operation_name(self) -> str: return "put" def get_insert_only_runs(self) -> Set[CollectionName]: return frozenset(ref.run for ref in self.refs.values()) def get_modified_runs(self) -> Set[CollectionName]: return frozenset() def begin(self, context: ArtifactTransactionOpenContext) -> None: context.insert_new_datasets(self.refs.values()) def commit(self, context: ArtifactTransactionCommitContext) -> None: records, _, missing, corrupted = context.verify_artifacts(self.refs) if missing or corrupted: raise RuntimeError( f"{len(missing)} dataset(s) were not written and {len(corrupted)} had missing or " f"invalid artifacts; transaction must be reverted or abandoned." ) context.insert_datastore_records(records) def revert(self, context: ArtifactTransactionRevertContext) -> None: context.datastore.unstore(self.refs.values()) context.discard_datasets(self.refs.keys()) def abandon(self, context: ArtifactTransactionCloseContext) -> None: records, _, missing, corrupted = context.verify_artifacts(self.refs) for ref in missing: warnings.warn(f"{ref} was not written and will be unstored.") for ref in corrupted: warnings.warn(f"{ref} was not fully written and will be unstored.") context.insert_datastore_records(records)
- class RemovalTransaction¶
class RemovalTransaction(ArtifactTransaction): """An artifact transaction implementation that removes datasets.""" refs: dict[uuid.UUID, DatasetRef] purge: bool def get_operation_name(self) -> str: return "remove" def begin(self, context: ArtifactTransactionOpenContext) -> None: context.discard_datastore_records(self.refs.keys()) def get_insert_only_runs(self) -> Set[CollectionName]: return frozenset() def get_modified_runs(self) -> Set[CollectionName]: return frozenset(ref.run for ref in self.refs.values()) def commit(self, context: ArtifactTransactionCommitContext) -> None: context.datastore.unstore(self.refs.values()) if self.purge: context.discard_datasets(self.refs.keys()) def revert(self, context: ArtifactTransactionRevertContext) -> None: records, _, missing, corrupted = context.verify_artifacts(self.refs) if missing or corrupted: raise RuntimeError( f"{len(missing)} dataset(s) have already been deleted and {len(corrupted)} had missing or " f"invalid artifacts; transaction must be committed or abandoned." ) context.insert_datastore_records(records) def abandon(self, context: ArtifactTransactionCloseContext) -> None: records, present, _, corrupted = context.verify_artifacts(self.refs) for ref in present: warnings.warn(f"{ref} was not deleted and will remain stored.") for ref in corrupted: warnings.warn(f"{ref} was partially unstored and will be fully unstored.") context.datastore.unstore(corrupted.values()) context.insert_datastore_records(records)
References
[DMTN-271]. Jim Bosch. Butler management of quantum graph storage and execution. 2023. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-271.lsst.io/
[DMTN-242]. Tim Jenness. Butler Client/Server Revisited. 2023. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-242.lsst.io/
[DMTN-177]. Tim Jenness. Limiting Registry Access During Workflow Execution. 2023. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-177.lsst.io/