44from datetime import datetime , timedelta
55
66from .bootstrap import (
7+ _generate_sql_copy_commands ,
78 _get_time_offsets ,
9+ _suffix ,
10+ _trigger_column_copies ,
11+ _override_config_to_map_data ,
12+ _plan_partitions_for_time_offsets ,
813 calculate_sql_alters_from_state_info ,
914 write_state_info ,
1015)
1116from .cli import Config
12- from .types import DatabaseCommand , Table , SqlInput
17+ from .types import (
18+ DatabaseCommand ,
19+ Table ,
20+ SqlInput ,
21+ MaxValuePartition ,
22+ ChangePlannedPartition ,
23+ NewPlannedPartition ,
24+ )
1325
1426
1527class MockDatabase (DatabaseCommand ):
1628 def __init__ (self ):
17- self .response = []
29+ self ._response = list ()
30+ self ._select_response = [[{"id" : 150 }]]
1831 self .num_queries = 0
1932
2033 def run (self , cmd ):
@@ -36,8 +49,9 @@ def run(self, cmd):
3649 ]
3750
3851 if "SELECT" in cmd :
39- return [{"id" : 150 }]
40- return self .response
52+ return self ._select_response .pop ()
53+
54+ return self ._response .pop ()
4155
4256 def db_name (self ):
4357 return SqlInput ("the-database" )
@@ -81,6 +95,7 @@ def test_get_time_offsets(self):
8195 )
8296
8397 def test_read_state_info (self ):
98+ self .maxDiff = None
8499 conf_past = Config ()
85100 conf_past .curtime = datetime (2021 , 3 , 1 )
86101 conf_past .dbcmd = MockDatabase ()
@@ -96,6 +111,12 @@ def test_read_state_info(self):
96111 conf_now = Config ()
97112 conf_now .curtime = datetime (2021 , 3 , 3 )
98113 conf_now .dbcmd = MockDatabase ()
114+ conf_now .dbcmd ._response = [
115+ [
116+ {"Field" : "id" , "Type" : "bigint UNSIGNED" },
117+ {"Field" : "serial" , "Type" : "varchar" },
118+ ]
119+ ]
99120 conf_now .tables = [Table ("test" ).set_partition_period (timedelta (days = 30 ))]
100121
101122 state_fs .seek (0 )
@@ -104,10 +125,148 @@ def test_read_state_info(self):
104125 x ,
105126 {
106127 "test" : [
107- "ALTER TABLE `test` REORGANIZE PARTITION `p_start` INTO "
108- "(PARTITION `p_20210303` VALUES LESS THAN (156), "
109- "PARTITION `p_20210402` VALUES LESS THAN (2406), "
110- "PARTITION `p_20210502` VALUES LESS THAN MAXVALUE);"
128+ "DROP TABLE IF EXISTS test_new_20210303;" ,
129+ "CREATE TABLE test_new_20210303 LIKE test;" ,
130+ "ALTER TABLE test_new_20210303 REMOVE PARTITIONING;" ,
131+ "ALTER TABLE test_new_20210303 PARTITION BY RANGE(id) (" ,
132+ "\t PARTITION p_start VALUES LESS THAN MAXVALUE" ,
133+ ");" ,
134+ "ALTER TABLE `test_new_20210303` REORGANIZE PARTITION `p_start` "
135+ + "INTO (PARTITION `p_20210303` VALUES LESS THAN (156), "
136+ + "PARTITION `p_20210402` VALUES LESS THAN (2406), PARTITION "
137+ + "`p_20210502` VALUES LESS THAN MAXVALUE);" ,
138+ "CREATE OR REPLACE TRIGGER copy_inserts_from_test_to_test_new_20210303" ,
139+ "\t AFTER INSERT ON test FOR EACH ROW" ,
140+ "\t \t INSERT INTO test_new_20210303 SET" ,
141+ "\t \t \t `id` = NEW.`id`," ,
142+ "\t \t \t `serial` = NEW.`serial`;" ,
143+ "CREATE OR REPLACE TRIGGER copy_updates_from_test_to_test_new_20210303" ,
144+ "\t AFTER UPDATE ON test FOR EACH ROW" ,
145+ "\t \t UPDATE test_new_20210303 SET" ,
146+ "\t \t \t `serial` = NEW.`serial`" ,
147+ "\t \t WHERE `id` = NEW.`id`;" ,
111148 ]
112149 },
113150 )
151+
152+ def test_read_state_info_map_table (self ):
153+ self .maxDiff = None
154+ conf = Config ()
155+ conf .assume_partitioned_on = ["order" , "auth" ]
156+ conf .curtime = datetime (2021 , 3 , 3 )
157+ conf .dbcmd = MockDatabase ()
158+ conf .dbcmd ._select_response = [[{"auth" : 22 }], [{"order" : 11 }]]
159+ conf .dbcmd ._response = [
160+ [
161+ {"Field" : "order" , "Type" : "bigint UNSIGNED" },
162+ {"Field" : "auth" , "Type" : "bigint UNSIGNED" },
163+ ]
164+ ]
165+ conf .tables = [Table ("map_table" ).set_partition_period (timedelta (days = 30 ))]
166+
167+ state_fs = io .StringIO ()
168+ yaml .dump (
169+ {
170+ "tables" : {"map_table" : {"order" : 11 , "auth" : 22 }},
171+ "time" : (conf .curtime - timedelta (days = 1 )),
172+ },
173+ state_fs ,
174+ )
175+ state_fs .seek (0 )
176+
177+ x = calculate_sql_alters_from_state_info (conf , state_fs )
178+ print (x )
179+ self .assertEqual (
180+ x ,
181+ {
182+ "map_table" : [
183+ "DROP TABLE IF EXISTS map_table_new_20210303;" ,
184+ "CREATE TABLE map_table_new_20210303 LIKE map_table;" ,
185+ "ALTER TABLE map_table_new_20210303 REMOVE PARTITIONING;" ,
186+ "ALTER TABLE map_table_new_20210303 PARTITION BY RANGE(order, auth) (" ,
187+ "\t PARTITION p_assumed VALUES LESS THAN MAXVALUE" ,
188+ ");" ,
189+ "ALTER TABLE `map_table_new_20210303` REORGANIZE PARTITION "
190+ + "`p_assumed` INTO (PARTITION `p_20210303` VALUES LESS THAN "
191+ + "(11, 22), PARTITION `p_20210402` VALUES LESS THAN "
192+ + "(11, 22), PARTITION `p_20210502` VALUES LESS THAN "
193+ + "MAXVALUE, MAXVALUE);" ,
194+ "CREATE OR REPLACE TRIGGER copy_inserts_from_map_table_"
195+ + "to_map_table_new_20210303" ,
196+ "\t AFTER INSERT ON map_table FOR EACH ROW" ,
197+ "\t \t INSERT INTO map_table_new_20210303 SET" ,
198+ "\t \t \t `auth` = NEW.`auth`," ,
199+ "\t \t \t `order` = NEW.`order`;" ,
200+ ]
201+ },
202+ )
203+
204+ def test_trigger_column_copies (self ):
205+ self .assertEqual (list (_trigger_column_copies ([])), [])
206+ self .assertEqual (list (_trigger_column_copies (["a" ])), ["`a` = NEW.`a`" ])
207+ self .assertEqual (
208+ list (_trigger_column_copies (["b" , "a" , "c" ])),
209+ ["`b` = NEW.`b`" , "`a` = NEW.`a`" , "`c` = NEW.`c`" ],
210+ )
211+
212+ def test_suffix (self ):
213+ self .assertEqual (list (_suffix (["a" ])), ["a" ])
214+ self .assertEqual (list (_suffix (["a" , "b" ])), ["a" , "b" ])
215+ self .assertEqual (list (_suffix (["a" , "b" ], indent = " " )), [" a" , " b" ])
216+ self .assertEqual (list (_suffix (["a" , "b" ], mid_suffix = "," )), ["a," , "b" ])
217+ self .assertEqual (list (_suffix (["a" , "b" ], final_suffix = ";" )), ["a" , "b;" ])
218+ self .assertEqual (
219+ list (_suffix (["a" , "b" ], mid_suffix = "," , final_suffix = ";" )), ["a," , "b;" ]
220+ )
221+
222+ def test_generate_sql_copy_commands (self ):
223+ conf = Config ()
224+ conf .assume_partitioned_on = ["id" ]
225+ conf .curtime = datetime (2021 , 3 , 3 )
226+ conf .dbcmd = MockDatabase ()
227+ map_data = _override_config_to_map_data (conf )
228+ cmds = list (
229+ _generate_sql_copy_commands (
230+ Table ("old" ),
231+ map_data ,
232+ ["id" , "field" ],
233+ Table ("new" ),
234+ ["STRAIGHT_UP_INSERTED" , "STUFF GOES HERE" ],
235+ )
236+ )
237+
238+ print (cmds )
239+ self .assertEqual (
240+ cmds ,
241+ [
242+ "DROP TABLE IF EXISTS new;" ,
243+ "CREATE TABLE new LIKE old;" ,
244+ "ALTER TABLE new REMOVE PARTITIONING;" ,
245+ "ALTER TABLE new PARTITION BY RANGE(id) (" ,
246+ "\t PARTITION p_assumed VALUES LESS THAN MAXVALUE" ,
247+ ");" ,
248+ "STRAIGHT_UP_INSERTED" ,
249+ "STUFF GOES HERE" ,
250+ "CREATE OR REPLACE TRIGGER copy_inserts_from_old_to_new" ,
251+ "\t AFTER INSERT ON old FOR EACH ROW" ,
252+ "\t \t INSERT INTO new SET" ,
253+ "\t \t \t `field` = NEW.`field`," ,
254+ "\t \t \t `id` = NEW.`id`;" ,
255+ "CREATE OR REPLACE TRIGGER copy_updates_from_old_to_new" ,
256+ "\t AFTER UPDATE ON old FOR EACH ROW" ,
257+ "\t \t UPDATE new SET" ,
258+ "\t \t \t `field` = NEW.`field`" ,
259+ "\t \t WHERE `id` = NEW.`id`;" ,
260+ ],
261+ )
262+
263+ def test_plan_partitions_for_time_offsets (self ):
264+ parts = _plan_partitions_for_time_offsets (
265+ datetime (2021 , 3 , 3 ),
266+ [timedelta (days = 60 ), timedelta (days = 360 )],
267+ [11943234 ],
268+ [16753227640 ],
269+ MaxValuePartition ("p_assumed" , count = 1 ),
270+ )
271+ self .assertIsInstance (parts [0 ], ChangePlannedPartition )
272+ self .assertIsInstance (parts [1 ], NewPlannedPartition )
0 commit comments