2525
2626import  org .apache .commons .lang3 .StringUtils ;
2727
28- import  com .google .common .base .Joiner ;
2928import  com .google .common .base .Optional ;
3029import  com .google .common .base .Strings ;
3130import  com .typesafe .config .Config ;
4645import  org .apache .gobblin .runtime .api .SpecExecutor ;
4746import  org .apache .gobblin .service .ExecutionStatus ;
4847import  org .apache .gobblin .service .modules .flowgraph .DagNodeId ;
49- import  org .apache .gobblin .service .modules .flowgraph .DatasetDescriptorConfigKeys ;
5048import  org .apache .gobblin .service .modules .flowgraph .FlowGraphConfigurationKeys ;
5149import  org .apache .gobblin .service .modules .orchestration .DagProcessingEngine ;
5250import  org .apache .gobblin .service .modules .orchestration .DagUtils ;
@@ -111,8 +109,6 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl
111109      String  flowName  = ConfigUtils .getString (flowConfig , ConfigurationKeys .FLOW_NAME_KEY , "" );
112110      String  flowGroup  = ConfigUtils .getString (flowConfig , ConfigurationKeys .FLOW_GROUP_KEY , "" );
113111      String  flowFailureOption  = ConfigUtils .getString (flowConfig , ConfigurationKeys .FLOW_FAILURE_OPTION , DagProcessingEngine .DEFAULT_FLOW_FAILURE_OPTION );
114-       String  flowInputPath  = ConfigUtils .getString (flowConfig , DatasetDescriptorConfigKeys .FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX 
115-           + "."  + DatasetDescriptorConfigKeys .PATH_KEY , "" );
116112      Long  flowModTime  = ConfigUtils .getLong (flowConfig , FlowSpec .MODIFICATION_TIME_KEY , 0L );
117113
118114      String  jobName  = ConfigUtils .getString (jobConfig , ConfigurationKeys .JOB_NAME_KEY , "" );
@@ -121,15 +117,9 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl
121117      final  int  gaasJobExecutionIdHash  = gaasJobExecutionId .hashCode ();  // Passing the hashCode of the uniqueIdentifier to be used as jobExecutionId for backward compatibility 
122118
123119      if  (!ConfigUtils .getBoolean (jobConfig , JOB_MAINTAIN_JOBNAME , false ) || jobName .isEmpty ()) {
124-         // Modify the job name to include the flow group, flow name, edge id, and a random string to avoid collisions since 
125-         // job names are assumed to be unique within a dag. 
126-         int  hash  = flowInputPath .hashCode ();
127-         jobName  = Joiner .on (JOB_NAME_COMPONENT_SEPARATION_CHAR ).join (flowGroup , flowName , jobName , edgeId , hash );
128-         // jobNames are commonly used as a directory name, which is limited to 255 characters 
129-         if  (jobName .length () >= MAX_JOB_NAME_LENGTH ) {
130-           // shorten job length to be 128 characters (flowGroup) + (hashed) flowName, hashCode length 
131-           jobName  = Joiner .on (JOB_NAME_COMPONENT_SEPARATION_CHAR ).join (flowGroup , flowName .hashCode (), hash );
132-         }
120+         jobName  = gaasJobExecutionId ; // Assigning jobName with the value of GaaSJobExecutionId 
121+         // which is a UUID and unique to avoid collisions 
122+ 
133123      }
134124      JobSpec .Builder  jobSpecBuilder  = JobSpec .builder (jobSpecURIGenerator (flowGroup , jobName , flowSpec )).withConfig (jobConfig )
135125          .withDescription (flowSpec .getDescription ()).withVersion (flowSpec .getVersion ());
0 commit comments