2121logger = logging .getLogger (__name__ )
2222
2323
24+ @attr .s
25+ class BulkTransactionsClient (BaseBulkTransactionsClient ):
26+ """Postgres bulk transactions."""
27+
28+ session : Session = attr .ib (default = attr .Factory (Session .create_from_env ))
29+ debug : bool = attr .ib (default = False )
30+ item_table : Type [database .Item ] = attr .ib (default = database .Item )
31+ item_serializer : Type [serializers .Serializer ] = attr .ib (
32+ default = serializers .ItemSerializer
33+ )
34+
35+ def __attrs_post_init__ (self ):
36+ """Create sqlalchemy engine."""
37+ self .engine = self .session .writer .cached_engine
38+
39+ def _preprocess_item (self , item : stac_types .Item ) -> stac_types .Item :
40+ """Preprocess items to match data model.
41+
42+ # TODO: dedup with GetterDict logic (ref #58)
43+ """
44+ db_model = self .item_serializer .stac_to_db (item )
45+ return self .item_serializer .row_to_dict (db_model )
46+
47+ def bulk_item_insert (
48+ self , items : Items , chunk_size : Optional [int ] = None , ** kwargs
49+ ) -> str :
50+ """Bulk item insertion using sqlalchemy core.
51+
52+ https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
53+ """
54+ # Use items.items because schemas.Items is a model with an items key
55+ processed_items = [self ._preprocess_item (item ) for item in items ]
56+ return_msg = f"Successfully added { len (processed_items )} items."
57+ if chunk_size :
58+ for chunk in self ._chunks (processed_items , chunk_size ):
59+ self .engine .execute (self .item_table .__table__ .insert (), chunk )
60+ return return_msg
61+
62+ self .engine .execute (self .item_table .__table__ .insert (), processed_items )
63+ return return_msg
64+
65+
2466@attr .s
2567class TransactionsClient (BaseTransactionsClient ):
2668 """Transactions extension specific CRUD operations."""
@@ -34,6 +76,7 @@ class TransactionsClient(BaseTransactionsClient):
3476 collection_serializer : Type [serializers .Serializer ] = attr .ib (
3577 default = serializers .CollectionSerializer
3678 )
79+ bulk_client_cls = attr .ib (default = BulkTransactionsClient )
3780
3881 def create_item (
3982 self ,
@@ -46,7 +89,7 @@ def create_item(
4689
4790 # If a feature collection is posted
4891 if item ["type" ] == "FeatureCollection" :
49- bulk_client = BulkTransactionsClient (session = self .session )
92+ bulk_client = self . bulk_client_cls (session = self .session )
5093 bulk_client .bulk_item_insert (items = item ["features" ])
5194 return None
5295
@@ -158,44 +201,3 @@ def delete_collection(
158201 query .delete ()
159202 return self .collection_serializer .db_to_stac (data , base_url = base_url )
160203
161-
162- @attr .s
163- class BulkTransactionsClient (BaseBulkTransactionsClient ):
164- """Postgres bulk transactions."""
165-
166- session : Session = attr .ib (default = attr .Factory (Session .create_from_env ))
167- debug : bool = attr .ib (default = False )
168- item_table : Type [database .Item ] = attr .ib (default = database .Item )
169- item_serializer : Type [serializers .Serializer ] = attr .ib (
170- default = serializers .ItemSerializer
171- )
172-
173- def __attrs_post_init__ (self ):
174- """Create sqlalchemy engine."""
175- self .engine = self .session .writer .cached_engine
176-
177- def _preprocess_item (self , item : stac_types .Item ) -> stac_types .Item :
178- """Preprocess items to match data model.
179-
180- # TODO: dedup with GetterDict logic (ref #58)
181- """
182- db_model = self .item_serializer .stac_to_db (item )
183- return self .item_serializer .row_to_dict (db_model )
184-
185- def bulk_item_insert (
186- self , items : Items , chunk_size : Optional [int ] = None , ** kwargs
187- ) -> str :
188- """Bulk item insertion using sqlalchemy core.
189-
190- https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
191- """
192- # Use items.items because schemas.Items is a model with an items key
193- processed_items = [self ._preprocess_item (item ) for item in items ]
194- return_msg = f"Successfully added { len (processed_items )} items."
195- if chunk_size :
196- for chunk in self ._chunks (processed_items , chunk_size ):
197- self .engine .execute (self .item_table .__table__ .insert (), chunk )
198- return return_msg
199-
200- self .engine .execute (self .item_table .__table__ .insert (), processed_items )
201- return return_msg
0 commit comments