@@ -77,18 +77,18 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
7777 protected final GsonSerDe <List <JobExecutionPlan >> serDe ;
7878 private final JobExecutionPlanDagFactory jobExecPlanDagFactory ;
7979
80- // todo add a column that tells if it is a running dag or a failed dag
81- protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
82- + "dag_node_id VARCHAR(" + ServiceConfigKeys . MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, "
83- + "parent_dag_id VARCHAR(" + ServiceConfigKeys .MAX_DAG_ID_LENGTH + ") NOT NULL, "
84- + "dag_node JSON NOT NULL, "
85- + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
86- + "PRIMARY KEY (dag_node_id), "
87- + "UNIQUE INDEX dag_node_index (dag_node_id), "
88- + "INDEX dag_index (parent_dag_id))" ;
89-
90- protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) "
91- + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node" ;
80+ protected static final String CREATE_TABLE_STATEMENT =
81+ "CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys . MAX_DAG_NODE_ID_LENGTH
82+ + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR( "
83+ + ServiceConfigKeys .MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
84+ + "is_failed_dag INT NOT NULL DEFAULT 0 , "
85+ + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
86+ + "PRIMARY KEY (dag_node_id), " + "UNIQUE INDEX dag_node_index (dag_node_id), "
87+ + "INDEX dag_index (parent_dag_id))" ;
88+
89+ protected static final String INSERT_STATEMENT =
90+ "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node,is_failed_dag ) "
91+ + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node" ;
9292 protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?" ;
9393 protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?" ;
9494 protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?" ;
@@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
105105 DataSource dataSource = MysqlDataSourceFactory .get (config , SharedResourcesBrokerFactory .getImplicitBroker ());
106106
107107 try (Connection connection = dataSource .getConnection ();
108- PreparedStatement createStatement = connection .prepareStatement (String .format (CREATE_TABLE_STATEMENT , tableName ))) {
108+ PreparedStatement createStatement = connection .prepareStatement (
109+ String .format (CREATE_TABLE_STATEMENT , tableName ))) {
109110 createStatement .executeUpdate ();
110111 connection .commit ();
111112 } catch (SQLException e ) {
@@ -126,12 +127,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
126127 }
127128
128129 @ Override
129- public void writeCheckpoint (Dag <JobExecutionPlan > dag )
130- throws IOException {
130+ public void writeCheckpoint (Dag <JobExecutionPlan > dag ) throws IOException {
131131 DagManager .DagId dagId = DagManagerUtils .generateDagId (dag );
132132 boolean newDag = false ;
133133 for (Dag .DagNode <JobExecutionPlan > dagNode : dag .getNodes ()) {
134- if (updateDagNode (dagId , dagNode ) == 1 ) {
134+ if (updateDagNode (dagId , dagNode , false ) == 1 ) {
135135 newDag = true ;
136136 }
137137 }
@@ -140,6 +140,14 @@ public void writeCheckpoint(Dag<JobExecutionPlan> dag)
140140 }
141141 }
142142
143+ public void markDagAsFailed (Dag <JobExecutionPlan > dag ) throws IOException {
144+ DagManager .DagId dagId = DagManagerUtils .generateDagId (dag );
145+ for (Dag .DagNode <JobExecutionPlan > dagNode : dag .getNodes ()) {
146+ if (updateDagNode (dagId , dagNode , true ) == 1 ) {
147+ }
148+ }
149+ }
150+
143151 @ Override
144152 public void cleanUp (Dag <JobExecutionPlan > dag ) throws IOException {
145153 cleanUp (generateDagId (dag ));
@@ -153,7 +161,8 @@ public boolean cleanUp(DagManager.DagId dagId) throws IOException {
153161 return deleteStatement .executeUpdate () != 0 ;
154162 } catch (SQLException e ) {
155163 throw new IOException (String .format ("Failure deleting dag for %s" , dagId ), e );
156- }}, true );
164+ }
165+ }, true );
157166 this .totalDagCount .dec ();
158167 return true ;
159168 }
@@ -167,7 +176,8 @@ public void cleanUp(String dagId) throws IOException {
167176 @ Override
168177 public List <Dag <JobExecutionPlan >> getDags () throws IOException {
169178 throw new NotSupportedException (getClass ().getSimpleName () + " does not need this legacy API that originated with "
170- + "the DagManager that is replaced by DagProcessingEngine" ); }
179+ + "the DagManager that is replaced by DagProcessingEngine" );
180+ }
171181
172182 @ Override
173183 public Dag <JobExecutionPlan > getDag (DagManager .DagId dagId ) throws IOException {
@@ -195,33 +205,37 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
195205 }
196206
197207 @ Override
198- public int updateDagNode (DagManager .DagId parentDagId , Dag .DagNode <JobExecutionPlan > dagNode ) throws IOException {
208+ public int updateDagNode (DagManager .DagId parentDagId , Dag .DagNode <JobExecutionPlan > dagNode , boolean isFailedDag )
209+ throws IOException {
199210 String dagNodeId = dagNode .getValue ().getId ().toString ();
200211 return dbStatementExecutor .withPreparedStatement (String .format (INSERT_STATEMENT , tableName ), insertStatement -> {
201212 try {
202213 insertStatement .setString (1 , dagNodeId );
203214 insertStatement .setString (2 , parentDagId .toString ());
204215 insertStatement .setString (3 , this .serDe .serialize (Collections .singletonList (dagNode .getValue ())));
216+ insertStatement .setInt (4 , isFailedDag ? 1 : 0 );
205217 return insertStatement .executeUpdate ();
206218 } catch (SQLException e ) {
207219 throw new IOException (String .format ("Failure adding dag node for %s" , dagNodeId ), e );
208- }}, true );
220+ }
221+ }, true );
209222 }
210223
211224 @ Override
212225 public Set <Dag .DagNode <JobExecutionPlan >> getDagNodes (DagManager .DagId parentDagId ) throws IOException {
213- return dbStatementExecutor .withPreparedStatement (String .format (GET_DAG_NODES_STATEMENT , tableName ), getStatement -> {
214- getStatement .setString (1 , parentDagId .toString ());
215- HashSet <Dag .DagNode <JobExecutionPlan >> dagNodes = new HashSet <>();
216- try (ResultSet rs = getStatement .executeQuery ()) {
217- while (rs .next ()) {
218- dagNodes .add (new Dag .DagNode <>(this .serDe .deserialize (rs .getString (1 )).get (0 )));
219- }
220- return dagNodes ;
221- } catch (SQLException e ) {
222- throw new IOException (String .format ("Failure get dag nodes for dag %s" , parentDagId ), e );
223- }
224- }, true );
226+ return dbStatementExecutor .withPreparedStatement (String .format (GET_DAG_NODES_STATEMENT , tableName ),
227+ getStatement -> {
228+ getStatement .setString (1 , parentDagId .toString ());
229+ HashSet <Dag .DagNode <JobExecutionPlan >> dagNodes = new HashSet <>();
230+ try (ResultSet rs = getStatement .executeQuery ()) {
231+ while (rs .next ()) {
232+ dagNodes .add (new Dag .DagNode <>(this .serDe .deserialize (rs .getString (1 )).get (0 )));
233+ }
234+ return dagNodes ;
235+ } catch (SQLException e ) {
236+ throw new IOException (String .format ("Failure get dag nodes for dag %s" , parentDagId ), e );
237+ }
238+ }, true );
225239 }
226240
227241 @ Override
0 commit comments