GemFire Write-Behind Event Handling with Spring JPA

Juan Jose Ramos June 12, 2020

Introduction

VMware GemFire is an in-memory data grid that provides real-time, consistent access to data-intensive applications throughout widely distributed cloud architectures. One of the most used features of VMware GemFire is the Write-Behind Cache Event Handling which allows the user to keep external databases in sync with the data stored within a VMware GemFire region(s).

Spring Data JPA, part of the larger Spring Data family, allows the user to easily implement JPA based repositories. The module itself deals with enhanced support for JPA based data access layers and makes it easier to build Spring-powered applications that use data access technologies.

Spring Boot For GemFire provides the convenience of Spring Boot’s convention over configuration approach using auto-configuration with the Spring Framework’s powerful abstractions and highly consistent programming model to truly simplify the development of VMware GemFire applications in a Spring context.

Why?

Implementing a data access layer of an application is a cumbersome task, a lot of boilerplate code needs to be written and tested to execute simple queries, perform pagination, auditing, etc. Spring Data JPA allows the user to significantly improve the implementation of data access layers by reducing the effort to the amount that’s actually needed.

An AsyncEventListener is a simple VMware GemFire callback that asynchronously processes batches of events after they have been applied to a VMware GemFire Region. It’s widely used to implement a write-behind cache event handler to synchronize region updates with an external database. It’s, however, cumbersome to configure the asynchronous-event-queue and the region itself using plain VMware GemFire configuration files or Java API. Spring Boot for VMware GemFire makes the process easier, faster, and less error-prone, simplifying the configuration, development, testing, and deployment of the application.

Having those technologies out there in the open, why would we want to spend hours and resources in developing and testing the parts on our own?… Guess what, we don’t!; instead, we’re gonna build a small and simple project to show how a VMware GemFire write-behind event handling can be easily implemented with the help of Spring Data JPA and Spring Boot for VMware GemFire.

How?

We will build an application that stores Employee POJOs (Plain Old Java Objects) within an in-memory-based database. We won’t store those objects manually, though, our application will only use a VMware GemFire Region as the datastore, and we’ll leverage an Asynchronous Event Queue to synchronize our Region with the external database.

Setting Things Up

The fastest and easiest way to set up a Spring Application from scratch is to use Spring Initializr, which offers a fast way to pull in all the dependencies we need for an application and does a lot of the set up automatically.

This example needs the H2 Database, Spring Data JPA, and Spring for VMware GemFire dependencies. The following image shows the Initializr set up for the sample project:

Screenshot of Spring Initializr illustrating dependency selections

Defining the Entity

In this example we store Employee objects, each annotated as a JPA entity, so let’s create that.

package org.apache.geode.examples.model;


import java.io.Serializable;
import java.util.Objects;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class Employee implements Serializable {
  @Id
  private Long id;
  private String name;
  private String surname;

  public Long getId() {
    return id;
  }

  public String getName() {
    return name;
  }

  public String getSurname() {
    return surname;
  }

  protected Employee() {}

  public Employee(Long id, String name, String surname) {
    this.id = id;
    this.name = name;
    this.surname = surname;
  }

  @Override
  public String toString() {
    return "Employee{" +
        "id=" + id +
        ", name='" + name + '\'' +
        ", surname='" + surname + '\'' +
        '}';
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }

    Employee employee = (Employee) o;

    if (!Objects.equals(id, employee.id)) {
      return false;
    }
    if (!Objects.equals(name, employee.name)) {
      return false;
    }
    return Objects.equals(surname, employee.surname);
  }

  @Override
  public int hashCode() {
    int result = id != null ? id.hashCode() : 0;
    result = 31 * result + (name != null ? name.hashCode() : 0);
    result = 31 * result + (surname != null ? surname.hashCode() : 0);
    return result;
  }
}

Here we have an Employee class with three attributes: id, name, and surname. We also have two constructors: the default exists only for the sake of JPA, we will only use the other one to create instances of the Employee class.

The Employee class is annotated with Entity, indicating that it is a JPA entity. (Because no Table annotation exists, it is assumed that this entity is mapped to a table named Employee). The Employee object’s id property is annotated with Id so that JPA recognizes it as the object’s ID. The other two properties, name, and surname, are left unannotated, it is assumed that they are mapped to columns that share the same names as the properties themselves.

Creating the Repository

Spring Data JPA focuses on using JPA to store data in a relational database, one of its “best features” is the ability to create repository implementations automatically from a repository interface (no boiler-plate code needed on our end, yay!).

We won’t be executing custom queries nor anything like that in our example, so we just need to create an empty repository interface that works with Employee entities and let Spring Data JPA do its magic for us.

package org.apache.geode.examples.repository;

import org.springframework.data.repository.CrudRepository;

import org.apache.geode.examples.model.Employee;

public interface EmployeeRepository extends CrudRepository<Employee, Long> {
}

Our EmployeeRepository extends the CrudRepository interface; the type of entity and ID that it works with, Employee and Long, are specified in the generic parameters on CrudRepository. By extending CrudRepository, EmployeeRepository inherits several methods for working with Employee persistence, including methods for saving, deleting, and finding Employee entities.

Implementing the AsyncEventListener

As stated before, an AsyncEventListener asynchronously processes batches of events after they have been applied to a Region.

package org.apache.geode.examples.config;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.examples.model.Employee;
import org.apache.geode.examples.repository.EmployeeRepository;

@Component
public class EmployeeAsyncEventListener implements AsyncEventListener {
  private static final Logger logger = LoggerFactory.getLogger(EmployeeAsyncEventListener.class);
  private final EmployeeRepository employeeRepository;

  public EmployeeAsyncEventListener(EmployeeRepository employeeRepository) {
    this.employeeRepository = employeeRepository;
  }

  @Override
  public boolean processEvents(List<AsyncEvent> events) {
    events.forEach(asyncEvent -> {
      Employee employee = (Employee) asyncEvent.getDeserializedValue();
      logger.info("Processing Employee {}...", employee.toString());
      employeeRepository.save(employee);
      logger.info("Processing Employee {}... Done!", employee.toString());
    });

    return true;
  }
}

In our case, the Region contains Employee instances, and we just need to delegate to our EmployeeRepository in order to persist the entity in the external database (for the sake of simplicity, we won’t deal with errors nor transactional behavior right now).

The Component annotation indicates that the class should be considered by Spring as candidates for auto-detection when using annotation-based configuration and classpath scanning.

Configuring the VMware GemFire Region

At this point we just need to wire things up, that is, define a GeodeConfiguration class and annotate it with the Configuration annotation to tag it as a source of bean definitions for the application context.

package org.apache.geode.examples.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.gemfire.ReplicatedRegionFactoryBean;
import org.springframework.data.gemfire.wan.AsyncEventQueueFactoryBean;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.GemFireCache;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.examples.model.Employee;

@Configuration
public class GeodeConfiguration {

  @Bean
  AsyncEventQueueFactoryBean employeeAsyncEventQueue(GemFireCache cache, EmployeeAsyncEventListener asyncEventListener) {
    AsyncEventQueueFactoryBean queueFactoryBean = new AsyncEventQueueFactoryBean((Cache) cache);
    queueFactoryBean.setAsyncEventListener(asyncEventListener);

    return queueFactoryBean;
  }

  @Bean("Employees")
  ReplicatedRegionFactoryBean<Long, Employee> employeesRegion(GemFireCache cache, AsyncEventQueue asyncEventQueue) {
    ReplicatedRegionFactoryBean<Long, Employee> regionFactoryBean = new ReplicatedRegionFactoryBean<>();
    regionFactoryBean.setCache(cache);
    regionFactoryBean.setPersistent(false);
    regionFactoryBean.setAsyncEventQueues(new AsyncEventQueue[]{asyncEventQueue});

    return regionFactoryBean;
  }
}

The employeeAsyncEventQueue() method simply defines the VMware GemFire AsyncEventQueue that we are going to use as the container for our EmployeeAsyncEventListener. The listener instance itself will be auto-magically instantiated and passed to our method by Spring.

The employeesRegion() method just defines and configures the VMware GemFire Region, it attaches the AsyncEventQueue and sets the Region as non-persistent Replicate.

Creating The Application Class

package org.apache.geode.examples;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.gemfire.config.annotation.CacheServerApplication;
import org.springframework.data.gemfire.config.annotation.EnableLogging;

@EnableLogging
@SpringBootApplication
@CacheServerApplication
public class JPAAsyncListenerApplication {

  public static void main(String[] args) {
    SpringApplication.run(JPAAsyncListenerApplication.class);
  }
}

The SpringBootApplication annotation is for convenience, it adds all of the following to our application:

  • Configuration: Tags the class as a source of bean definitions for the application context.

  • ComponentScan: Tells Spring to look for other components, configurations, and services in the org.apache.geode.examples package.

  • EnableAutoConfiguration: Tells Spring Boot to start adding beans based on classpath settings, other beans, and various property settings.

We also added some Spring Boot for VMware GemFire annotations to customize the behavior of our VMware GemFire Cache.

  • EnableLogging: Tells Spring Boot for VMware GemFire to configure and enable VMware GemFire system logging.

  • CacheServerApplication: Tells Spring Boot for VMware GemFire to enable an embedded VMware GemFire CacheServer instance. Moreover, this also implies an embedded peer Cache must exist and, therefore, will be configured, constructed, and initialized as a Spring bean in the application context.

Running the Application

Believe it or not, the work is done!… with a handful set of classes and annotations (and, as you’ve noticed, almost no code written by us) we have a fully working and operational application, so let’s test it!.

package org.apache.geode.examples;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.ArrayList;
import java.util.List;

import javax.annotation.Resource;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.examples.model.Employee;
import org.apache.geode.examples.repository.EmployeeRepository;

@SpringBootTest
public class JPAAsyncListenerApplicationTest {
  private static final Logger logger = LoggerFactory.getLogger(JPAAsyncListenerApplicationTest.class);

  @Autowired
  private EmployeeRepository repository;

  @Autowired
  private AsyncEventQueue asyncEventQueue;

  @Resource(name = "Employees")
  private Region<Long, Employee> employeesRegion;

  @Test
  public void jpaListenerTest() {
    List<Employee> employees = new ArrayList<>();
    employees.add(new Employee(1L, "Zell", "Dincht"));
    employees.add(new Employee(2L, "Quistis", "Trepe"));
    employees.add(new Employee(3L, "Irvine", "Kinneas"));
    employees.add(new Employee(4L, "Rinoa", "Heartilly"));
    employees.add(new Employee(5L, "Squall", "Leonhart"));

    // DB empty at the beginning.
    assertThat(repository.findAll()).isEmpty();
    logger.info("Employees found with findAll():");
    logger.info("-------------------------------");
    repository.findAll().forEach(employee -> logger.info(employee.toString()));
    logger.info("-------------------------------");
    logger.info("");

    // Insert some employees into the GemFire Region
    employees.forEach(employee -> employeesRegion.put(employee.getId(), employee));

    // Wait for queues to drain (AsyncEventListener invoked).
    logger.info("Waiting for AsyncEventQueue to drain...");
    await().untilAsserted(() -> assertThat(asyncEventQueue.size()).isEqualTo(0));
    logger.info("Waiting for AsyncEventQueue to drain... Done!.");
    logger.info("");

    // DB should now have all the employees inserted through the AsyncEventListener.
    Iterable<Employee> employeeList = repository.findAll();
    assertThat(employeeList).hasSize(employees.size());
    employeeList.forEach(employee -> assertThat(employees.contains(employee)));
    logger.info("Employees found with findAll():");
    logger.info("-------------------------------");
    employeeList.forEach(employee -> logger.info(employee.toString()));
    logger.info("-------------------------------");
    logger.info("");
  }
}

The test is straightforward and simple: we check that the database is empty, insert some Employee instances into the VMware GemFire Region, wait for the actual Asynchronous Event Queue to be empty (the AsyncEventListener processed all the events) and, at the very end, verify that the database contains exactly the Employee instances we inserted into the Region.

What’s next?

Check out Spring Boot for VMware GemFire, you can do way more things more easily and quickly, with just some extra annotations!