Skip to content

Commit c13315d

Browse files
author
Aditya Pratap Singh
committed
addressed PR comments
1 parent 7883009 commit c13315d

File tree

3 files changed

+70
-42
lines changed

3 files changed

+70
-42
lines changed

gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -261,17 +261,11 @@ public static class DagNode<T> {
261261
private T value;
262262
//List of parent Nodes that are dependencies of this Node.
263263
private List<DagNode<T>> parentNodes;
264-
@Setter
265-
private boolean isFailedDag;
266264

267265
//Constructor
268266
public DagNode(T value) {
269267
this.value = value;
270268
}
271-
public DagNode(T value,boolean isFailedDag) {
272-
this.value = value;
273-
this.isFailedDag = isFailedDag;
274-
}
275269

276270

277271
public void addParentNode(DagNode<T> node) {

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -77,18 +77,20 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
7777
protected final GsonSerDe<List<JobExecutionPlan>> serDe;
7878
private final JobExecutionPlanDagFactory jobExecPlanDagFactory;
7979

80-
protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
81-
+ "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH
82-
+ ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
83-
+ ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
84-
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
85-
+ "is_failed_dag INT NOT NULL DEFAULT 0, " + "PRIMARY KEY (dag_node_id), "
86-
+ "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))";
87-
88-
protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) "
89-
+ "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag";
90-
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?";
91-
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?";
80+
protected static final String CREATE_TABLE_STATEMENT =
81+
"CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH
82+
+ ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
83+
+ ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
84+
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
85+
+ "is_failed_dag TINYINT(1) DEFAULT 0, " + "PRIMARY KEY (dag_node_id), "
86+
+ "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))";
87+
88+
protected static final String INSERT_STATEMENT =
89+
"INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) "
90+
+ "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag";
91+
protected static final String GET_DAG_NODES_STATEMENT =
92+
"SELECT dag_node FROM %s WHERE parent_dag_id = ?";
93+
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?";
9294
protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?";
9395
private final ContextAwareCounter totalDagCount;
9496

@@ -103,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
103105
DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());
104106

105107
try (Connection connection = dataSource.getConnection();
106-
PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
108+
PreparedStatement createStatement = connection.prepareStatement(
109+
String.format(CREATE_TABLE_STATEMENT, tableName))) {
107110
createStatement.executeUpdate();
108111
connection.commit();
109112
} catch (SQLException e) {
@@ -165,7 +168,8 @@ public void cleanUp(String dagId) throws IOException {
165168
@Override
166169
public List<Dag<JobExecutionPlan>> getDags() throws IOException {
167170
throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with "
168-
+ "the DagManager that is replaced by DagProcessingEngine"); }
171+
+ "the DagManager that is replaced by DagProcessingEngine");
172+
}
169173

170174
@Override
171175
public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException {
@@ -189,14 +193,7 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
189193
if (dagNodes.isEmpty()) {
190194
return null;
191195
}
192-
Dag<JobExecutionPlan> dag = jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList()));
193-
194-
// if any node of the dag is failed it means that the dag has been marked as failed, update the is_failed_dag field of the dag and it's nodes as true
195-
if (dag.getNodes().stream().anyMatch(Dag.DagNode::isFailedDag)) {
196-
dag.setFailedDag(true);
197-
dag.getNodes().forEach(node -> node.setFailedDag(true));
198-
}
199-
return dag;
196+
return jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList()));
200197
}
201198

202199
@Override
@@ -224,7 +221,7 @@ public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId parentDag
224221
HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
225222
try (ResultSet rs = getStatement.executeQuery()) {
226223
while (rs.next()) {
227-
dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2)));
224+
dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
228225
}
229226
return dagNodes;
230227
} catch (SQLException e) {
@@ -239,7 +236,7 @@ public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId) t
239236
getStatement.setString(1, dagNodeId.toString());
240237
try (ResultSet rs = getStatement.executeQuery()) {
241238
if (rs.next()) {
242-
return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2)));
239+
return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
243240
}
244241
return Optional.empty();
245242
} catch (SQLException e) {

gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,18 @@
1717

1818
package org.apache.gobblin.service.modules.orchestration;
1919

20+
import java.io.IOException;
2021
import java.net.URI;
22+
import java.sql.ResultSet;
23+
import java.sql.SQLException;
24+
import java.util.ArrayList;
2125
import java.util.HashMap;
26+
import java.util.HashSet;
27+
import java.util.List;
2228
import java.util.Map;
2329

30+
import lombok.extern.slf4j.Slf4j;
31+
2432
import org.testng.Assert;
2533
import org.testng.annotations.AfterClass;
2634
import org.testng.annotations.BeforeClass;
@@ -36,17 +44,22 @@
3644
import org.apache.gobblin.service.ExecutionStatus;
3745
import org.apache.gobblin.service.modules.flowgraph.Dag;
3846
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
39-
47+
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
48+
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
49+
import org.apache.gobblin.util.DBStatementExecutor;
4050

4151
/**
4252
* Mainly testing functionalities related to DagStateStore but not Mysql-related components.
4353
*/
54+
@Slf4j
4455
public class MysqlDagStateStoreWithDagNodesTest {
4556

4657
private DagStateStore dagStateStore;
47-
4858
private static final String TEST_USER = "testUser";
4959
private static ITestMetastoreDatabase testDb;
60+
private DBStatementExecutor dbStatementExecutor;
61+
private static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node, is_failed_dag FROM %s WHERE parent_dag_id = ?";
62+
private static final String tableName = "dag_node_state_store";
5063

5164
@BeforeClass
5265
public void setUp() throws Exception {
@@ -63,6 +76,8 @@ public void setUp() throws Exception {
6376
URI specExecURI = new URI(specExecInstance);
6477
topologySpecMap.put(specExecURI, topologySpec);
6578
this.dagStateStore = new MysqlDagStateStoreWithDagNodes(configBuilder.build(), topologySpecMap);
79+
dbStatementExecutor = new DBStatementExecutor(
80+
MysqlDataSourceFactory.get(configBuilder.build(), SharedResourcesBrokerFactory.getImplicitBroker()), log);
6681
}
6782

6883
@AfterClass(alwaysRun = true)
@@ -74,7 +89,7 @@ public void tearDown() throws Exception {
7489
}
7590

7691
@Test
77-
public void testAddGetAndDeleteDag() throws Exception{
92+
public void testAddGetAndDeleteDag() throws Exception {
7893
Dag<JobExecutionPlan> originalDag1 = DagTestUtils.buildDag("random_1", 123L);
7994
Dag<JobExecutionPlan> originalDag2 = DagTestUtils.buildDag("random_2", 456L);
8095
DagManager.DagId dagId1 = DagManagerUtils.generateDagId(originalDag1);
@@ -140,26 +155,48 @@ public void testAddGetAndDeleteDag() throws Exception{
140155

141156
@Test
142157
public void testMarkDagAsFailed() throws Exception {
143-
//Set up initial conditions
158+
// Set up initial conditions
144159
Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test_dag", 789L);
145160
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
146161

147162
this.dagStateStore.writeCheckpoint(dag);
148-
//Check Initial State
149-
for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
150-
Assert.assertFalse(node.isFailedDag());
163+
164+
// Fetch all initial states into a list
165+
List<Boolean> initialStates = fetchDagNodeStates(dagId.toString());
166+
167+
// Check Initial State
168+
for (Boolean state : initialStates) {
169+
Assert.assertFalse(state);
151170
}
171+
// Set the DAG as failed
152172
dag.setFailedDag(true);
153173
this.dagStateStore.writeCheckpoint(dag);
154174

155-
Dag<JobExecutionPlan> updatedDag = this.dagStateStore.getDag(dagId);
156-
for (Dag.DagNode<JobExecutionPlan> node : updatedDag.getNodes()) {
157-
Assert.assertTrue(node.isFailedDag());
158-
}
175+
// Fetch all states after marking the DAG as failed
176+
List<Boolean> failedStates = fetchDagNodeStates(dagId.toString());
159177

160-
// Cleanup
178+
// Check if all states are now true (indicating failure)
179+
for (Boolean state : failedStates) {
180+
Assert.assertTrue(state);
181+
}
161182
dagStateStore.cleanUp(dagId);
162183
Assert.assertNull(this.dagStateStore.getDag(dagId));
163184
}
164185

186+
private List<Boolean> fetchDagNodeStates(String dagId) throws IOException {
187+
List<Boolean> states = new ArrayList<>();
188+
dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> {
189+
getStatement.setString(1, dagId.toString());
190+
HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
191+
try (ResultSet rs = getStatement.executeQuery()) {
192+
while (rs.next()) {
193+
states.add(rs.getBoolean(2));
194+
}
195+
return dagNodes;
196+
} catch (SQLException e) {
197+
throw new IOException(String.format("Failure get dag nodes for dag %s", dagId), e);
198+
}
199+
}, true);
200+
return states;
201+
}
165202
}

0 commit comments

Comments
 (0)