Skip to content

Temporal Graph Support

Christopher Rost edited this page Oct 11, 2019 · 5 revisions

Temporal Property Graph Model (TPGM)

The temporal analysis of evolving graphs is an important requirement in many domains but hardly supported in current graph database and graph processing systems. We therefore have started with extending Gradoop for temporal graph analysis by adding time properties to vertices, edges and graphs and using them within graph operators.

The key features of our model are:

  • Bi-temporal time dimension support
  • Backwards compatible to the most EPGM operators
  • Flexible time representation: can be empty, a timestamp or a time-interval

See our publications

Data Model

The data model is an extension of the EPGM data model. Each graph element (i.e., logical graph, vertex and edge) has two additional time intervals to enable bi-temporal time semantics. One interval represents the transaction time, i.e. the time the fact is current in the graph. It represents rollback information that is maintained by Gradoop. The other interval represents the valid time dimension (also referred to as application time) that represents the time when the information is valid in the real world. It represents historical information. Valid times are typically embedded within the context of the application before they enter Gradoop.

Operators

We added two new operators to the TPGM model that can be exclusively applied on a TemporalGraph instance: Snapshot and Difference. Besides that, we extended some EPGM operators like Transformation, Grouping and Aggregation with additional features to enable different temporal analysis.

Snapshot

The TPGM snapshot operator allows one to retrieve a valid snapshot of the whole temporal graph either at a specific point in time or a subgraph that is valid during a given time range by providing a temporal predicate function. Besides the operator itself, several predefined predicate functions are available. They are adopted from SQL:2011 that supports temporal databases.

Example code snippet:

// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();

// We need a UNIX timestamp in milliseconds since epoch.
long queryTimestamp = LocalDateTime
      .of(2018, 12, 24, 20, 15, 0, 0)
      .toInstant(ZoneOffset.UTC)
      .toEpochMilli();

// Get the graph 'as of' our query timestamp (consideres the valid time dimension as default)
TemporalGraph historicalGraph = temporalGraph
      .snapshot(new AsOf(queryTimestamp));

Difference

The evolution of graphs over time can be represented by the difference of two graph snapshots, i.e., by a difference graph that is the union of both snapshots where each graph element is annotated as an added, deleted, or persistent element.

The TPGM diff operator consumes two graph snapshots defined by temporal predicate functions and calculates the difference graph. The annotations are stored as a property _diff on each graph element, whereas the value of the property will be a number indicating that an element is either equal in both snapshots (0) or added (1) or removed (-1) in the second snapshot.

Example code snippet:

// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();

// We need two UNIX timestamp in milliseconds since epoch that we want to compare
long firstQueryTimestamp = LocalDateTime
      .of(2018, 12, 24, 20, 15, 0, 0)
      .toInstant(ZoneOffset.UTC)
      .toEpochMilli();

long secondQueryTimestamp = LocalDateTime
      .of(2019, 12, 24, 20, 15, 0, 0)
      .toInstant(ZoneOffset.UTC)
      .toEpochMilli();

// Get the difference of both historical graph version (consideres the valid time dimension as default)
TemporalGraph differenceGraph = temporalGraph
      .diff(new AsOf(firstQueryTimestamp), new AsOf(secondQueryTimestamp));

Transformation

The transform operator defines a structure-preserving modification of graph, vertex and edge data. User-defined transformation functions can be applied to a temporal graph, which results in an modified output graph. Within TPGM it is possible to (1) modify the temporal attributes, (2) define the time attributes from information stored in properties or (3) create properties resulting from the temporal information of the time attributes. For example, if the temporal attributes are not yet set or calculated during a workflow, this operator offers the possibility to define the valid times at runtime.

Example code snippet:

// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();

// Assign the valid-from attribute from a property "CreationDate" for all vertices
temporalGraph = temporalGraph.transform(
  // Keep the graph heads
  TransformationFunction.keep(),
  // Extract timestamps for vertices
  (origin, t) -> {
    if (origin.hasProperty("CreationDate") && origin.getPropertyValue("CreationDate").isDateTime()) {
      origin.setValidFrom(
        origin.getPropertyValue("CreationDate")
          .getDateTime()
          .atZone(ZoneId.systemDefault())
          .toInstant()
          .toEpochMilli());
    }
    return origin;
  },
  // Keep the edges
  TransformationFunction.keep()
);

Grouping

A structural grouping of vertices and edges is an important task in temporal graph analytics. Since temporal graphs can become very large, a condensation can facilitate deeper insights about structures and patterns hidden in the graph. In the current EPGM implementation of the groupBy operator, a grouping is based on vertex and edge grouping keys as well as vertex and edge aggregation functions.

For temporal grouping, TPGM provides three additional features:

  • time-specific value transformation functions can be applied to compute time values on the desired granularity for grouping
  • support for GROUP BY CUBE and GROUP BY ROLLUP similar to SQL (the output of the operator is a collection where each graph corresponds to a single combination of the given grouping keys)
  • support of aggregations on the temporal properties by user-defined functions and predefined time-specific aggregation functions (e.g., MinFrom or MaxFrom)

Example code snippet:

// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();

// Assign the valid-from attribute from a property "CreationDate" for all vertices
TemporalGraph groupedGraph = temporalGraph.groupBy(
  // Vertex grouping keys: Group vertices by their duration in months
  Collections.singletonList(TemporalGroupingKeys.duration(VALID_TIME, ChronoUnit.MONTHS)),
  // Vertex aggregate functions: calculate the average valid time duration of grouped vertices
  Collections.singletonList(new AverageVertexDuration("avgVertexDurValid", VALID_TIME)),
  // Edge grouping keys: Group edges by their type label
  Collections.singletonList(GroupingKeys.label()),
  // Edge aggregate functions: count the grouped edges and save the result in a property "cnt"
  Collections.singletonList(new Count("cnt"))
);

Aggregation

The aggregation operator is used to reduce some kind of information that is contained in a graph down to a single value by given aggregations. The operator can be called by the aggregate() function that is available on the LogicalGraph and TemporalGraph class. It takes any amount of aggregate functions as an input and outputs the original graph with the result of each aggregate function as a new property at the graph head. Most aggregate functions require a string denoting the name of the property or label they are being applied to.

The temporal extension of the aggregation operator introduces new aggregation functions, that are listed under Temporal Aggregations.

The following temporal predicates are available for snapshot and difference operator.

The placeholder named queryTimestamp, queryFrom and queryTo has to be specified as arguments by the user.

elementFrom and elementTo represent the begin and end of the elements validity interval.

Name Description Predicate
All A filter that returns all elements.
AsOf Given a timestamp, this predicate will match all timestamps before or at that time and all time-intervals containing that time. elementFrom <= queryTimestamp && elementTo > queryTimestamp
Between Given a time-interval, this predicate will match all intervals that start before or at that interval's end and end after the start of that interval. elementFrom <= queryTo && elementTo > queryFrom
ContainedIn Given a time interval, this predicate will match all intervals that are a subset of that interval. queryFrom <= elementFrom && elementTo <= queryTo
CreatedIn Given a time-interval, this predicate matches all intervals starting during that interval. queryFrom <= elementFrom && elementFrom <= queryTo
DeletedIn Given a time-interval, this predicate will match all intervals ending during that interval. queryFrom <= elementTo && elementTo <= queryTo
FromTo Given a time-interval, this predicate will match all intervals that were valid during that interval. elementFrom < queryTo && elementTo > queryFrom
ValidDuring Given a time-interval, this predicate matches all intervals that contain that interval. elementFrom <= queryFrom && elementTo >= queryTo

Temporal Aggregations

The following aggregations are available for temporal grouping and aggregation. The two available time dimensions TRANSACTION_TIME and VALID_TIME can be specified by the usage of the enum called TimeDimension. To specify the beginning or end of an interval of a time dimension, the enum TimeDimension.Field can be used.

Function Description Input
MinTime Minimum of a specified time dimensions's begin or end over all edges and vertices (1) The property key where the aggregated value is stored. (2) The time dimension to consider. (3) The field of the time dimension (begin or end) to consider.
MinVertexTime Minimum of a specified time dimensions's begin or end over all vertices Same as MinTime
MinEdgeTime Minimum of a specified time dimensions's begin or end over all edges Same as MinTime
MaxTime Maximum of a specified time dimensions's begin or end over all edges and vertices Same as MinTime
MaxVertexTime Maximum of a specified time dimensions's begin or end over all vertices Same as MinTime
MaxEdgeTime Maximum of a specified time dimensions's begin or end over all edges Same as MinTime
AverageDuration Calculate the average duration of temporal elements of a specified time dimension. Time intervals with either the start or end time set to the respective default value will be ignored. (1) The property key where the aggregated value is stored. (2) The time dimension to consider.
AverageVertexDuration Calculate the average duration of vertices of a specified time dimension. Time intervals with either the start or end time set to the respective default value will be ignored. Same as AverageDuration
AverageEdgeDuration Calculate the average duration of edges of a specified time dimension. Time intervals with either the start or end time set to the respective default value will be ignored. Same as AverageDuration

Sources and Sinks

The TPGM provides a source (TemporalCSVDataSource) and sink (TemporalCSVDataSink) to read or write temporal graph data. Both working the same way as CSVDataSource and CSVDataSource except that four long values are attached to each graph element additionally holding the bitemporal time attributes.

The following is an example line of an edge.csv file written by a temporal data sink:

5d777236ed08fd369d717ab2;[5d777162ed08fd369d6f2cd5];5d7771d5ed08fd369d6f77d7;5d7771bded08fd369d6f63c0;Owner;;(1568108898709,9223372036854775807),(-9223372036854775808,9223372036854775807)

with the respective signature:

EdgeId;[GraphId(s)];SourceVertexId;TargetVertexId;Label;Properties;(tx-from,tx-to),(val-from,val-to)

Compatiblity with EPGM

The TPGM is mainly compatible to all operators of the EPGM, i.e., an operator like Subgraph can be easily applied on a TemporalGraph instance.

Example workflow

Below we provided an exemplary workflow of a typical temporal graph analysis.

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TemporalGradoopConfig config = TemporalGradoopConfig.createConfig(env);
TemporalCSVDataSource dataSource = new TemporalCSVDataSource("path/to/graph", temporalGradoopConfig);

TemporalGraph graph = dataSource.getTemporalGraph();

graph
  // Filter for necessary vertex and edge types
  .subgraph(
    new ByLabel<>("Agent").or(new ByLabel<>("Customer")
      .and(new ByProperty<>("city", PropertyValue.create("Istanbul")))), new ByLabel<>("calls"))
  // Extract a snapshot from historical graph information
  .snapshot(new CreatedIn(
    LocalDate.of(2018, Month.JANUARY, 1).atStartOfDay(), 
    LocalDate.of(2018, Month.DECEMBER, 31).atTime(LocalTime.MAX)))
  // Remove dangling edges
  .verify()
  // Apply a grouping with ROLL UP feature
  .groupEdgesByRollup(
    // Vertex grouping key's
    Arrays.asList(GroupingKeys.label(), GroupingKeys.property("city")),
    // Vertex aggregation functions
    Collections.singletonList(new Count()),
    // Edge grouping key's
    Arrays.asList(
      TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.MONTH_OF_YEAR),
      TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.MONTH_OF_YEAR),
      TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.ALIGNED_WEEK_OF_YEAR),
      TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.DAY_OF_YEAR)),
    // Edge aggregation functions
    Arrays.asList(new Count("cnt"), new AverageEdgeDuration()))
  .writeTo(new TemporalCSVDataSink("path/to/output", temporalGradoopConfig));

env.execute("My temporal analysis.");
Clone this wiki locally