Creating a PageRank Analytics Platform Using Spring Boot Microservices

Sunday, January 3, 2016

This article introduces you to a sample application that combines multiple microservices with a graph processing platform to rank communities of users on Twitter. We’re going to use a collection of popular tools as a part of this article’s sample application. The tools we’ll use, in the order of importance, will be:

Ranking Twitter Profiles

Let’s do an overview of the problem we will solve as a part of our sample application. The problem we’re going to solve is how to discover communities of influencers on Twitter using a set of seed profiles as inputs. To solve this problem without a background in machine learning or social network analytics might be a bit of a stretch, but we’re going to take a stab at it using a little bit of computer science history.

The PageRank algorithm, created by Google co-founder Larry Page, was first used by Google to rank website documents from analyzing the graph of backlinks between sites.

I dug up the original research paper on PageRank from Stanford for some inspiration. In the paper, the authors talk about the notion of approximating the "importance" of an academic publication by weighting the value of its citations.

The reason that PageRank is interesting is that there are many cases where simple citation counting does not correspond to our common sense notion of importance. For example, if a webpage has a link to the Yahoo home page, it may be just one link but it is a very important one. This page should be ranked higher than many pages with more links but from obscure places. PageRank is an attempt to see how good an approximation to "importance" can be obtained just from the link structure.
— Page, Lawrence and Brin, Sergey and Motwani, Rajeev and Winograd, Terry (1999)
The PageRank Citation Ranking: Bringing Order to the Web

Now let’s take the same definition that is described in the paper and apply it to our problem of discovering important profiles on Twitter. Twitter users typically follow other users to track their updates as a part of their stream. We can use the same reasoning behind using PageRank on citations to approximate the "importance" of profiles on Twitter. This reasoning would tell us that it’s not the number of followers that make a profile important, it is measured by how important those followers are.

That’s exactly what we’re going to build in this article, and we’ll end up with something that looks like the following table.

Rank Photo Profile Followers PageRank
1. @ftrain 31948 7368.2417
2. @harper 32452 6754.455
3. @worrydream 37658 6747.585
4. @lstoll 41067 5976.3555
5. @katemats 25799 5916.3843
6. @rands 35079 5888.145
7. @al3x 41099 5547.4307
8. @defunkt 45310 4787.9644
9. @SaraJChipps 29617 4271.676
10. @leahculver 30723 3852.3728

The first thing we’re going to need to worry about when building this solution is how we’re going to calculate PageRank on potentially millions of users and links. To do this, we’re going to use something called a graph processing platform.

What is a graph processing platform?

A graph processing platform is an application architecture that provides a general-purpose job scheduling interface for analyzing graphs. The application we’ll build will make use of a graph processing platform to analyze and rank communities of users on Twitter. For this we’ll use Neo4j Mazerunner, an open source project that I started that connects Neo4j’s database server to Apache Spark.

The diagram below illustrates a graph processing platform similar to Neo4j Mazerunner.

Graph processing platform diagram

Submitting PageRank Jobs to GraphX

The graph processing platform I’ve described will provide us with a general purpose API for submitting PageRank jobs to Apache Spark’s GraphX module from Neo4j. The PageRank results from GraphX will be automatically applied back to Neo4j without any additional work to manually handle data loading. The workflow for this is extremely simple for our purposes. From a backend service we will only need to make a simple HTTP request to Neo4j to begin a PageRank job.

I’ve also taken care of making sure that the graph processing platform is easily deployable to a cloud provider using Docker containers. In a previous article, I describe how to use Docker Compose to run Mazerunner as a multi-container application. We’ll do the same for this sample application but extend the Docker Compose file to include additional Spring Boot applications that will become our backend microservices.

By default, Docker Compose will orchestrate containers on a single virtual machine. If we were to build a truly fault tolerant and resilient cloud-based application, we’d need to be sure to scale our system to multiple virtual machines using a cloud platform. This is the subject of a later article.

Now that we understand how we will use a graph processing platform, let’s talk about how to build a microservice architecture using Spring Boot and Spring Cloud to rank profiles on Twitter.

Building Microservices

I’ve talked a lot about microservices in past articles. When we talk about microservices we are talking about developing software in the context of continuous delivery. Microservices are not just smaller services that scale horizontally. When we talk about microservices, we are talking about being able to create applications that are the product of many teams delivering continuously in independent release cycles. Josh Long and I describe at length how to untangle the patterns of building and operating JVM-based microservices in O’Reilly’s Cloud Native Java.

In this sample, we’ll build 4 microservices, each as a Spring Boot application. If we were to build this architecture as microservices in an authentic scenario, each microservice would be owned and managed by a different team. This is an important differentiation in this new practice, as there is much confusion around what a microservice is and what it is not. A microservice is not just a distributed system of small services. The practice of building microservices should never be without the discipline of continuous delivery.

For the purposes of this article, we’ll focus on scenarios that help us gain experience and familiarity with building distributed systems that resemble a microservice architecture.

Overview

Now let’s do a quick overview of the concepts we’re going to cover as a part of this sample application. We will apply the same recipe from previous articles on similar topics for building microservices with Spring Boot and Spring Cloud. The key difference from my previous articles is that we are going to create a data service that does both batch processing tasks as well as exposing data as HTTP resources to API consumers.

System Architecture Diagram

The diagram below shows each component and microservice that we will create as a part of this sample application. Notice how we’re connecting the Spring Boot applications to the graph processing platform we looked at earlier. Also, notice the connections between the services, these connections define communication points between each service and what protocol is used.

Microservice architecture with Spring Boot

The three applications that are colored in blue are stateless services. Stateless services will not attach a persistent backing service or need to worry about managing state locally. The application that is colored in green is the Twitter Crawler service. Components that are colored in green will typically have an attached backing service. These backing services are responsible for managing state locally, and will either persist state to disk or in-memory.

Twitter Crawler

We’ll start by creating a service that is responsible for importing data from Twitter’s API and storing it in Neo4j. This service will be called the Twitter Crawler service. This service will also schedule PageRank jobs on data it imports into Neo4j. The Twitter Crawler service will be built using Spring Boot and attach backing services for Neo4j and RabbitMQ.

We’re going to walkthrough each of the following concerns as a part of building this service.

Creating Spring Data Neo4j repositories

We’ll start building the Twitter Crawler service by creating a set of domain classes and repositories to manage data with the Spring Data Neo4j project. Spring Data Neo4j is a project in the Spring Data ecosystem that implements the Spring Data repository abstraction using an OGM (Object Graph Mapping) library for Neo4j. Spring Data Neo4j allows you to manage data on a Neo4j server using annotated POJOs as entity references in a Spring Data application.

Before we can start managing data in Neo4j, we’ll need to design and construct a graph data model for our application’s domain data. The domain model for this application is rather simple, and we’ll construct it using domain objects described in Twitter’s API documentation. We’ll only have one domain concept, which is a User profile, and we’ll source this resource from profiles that are imported from the Twitter API. We’ll then have a relationship entity with the type named FOLLOWS. The FOLLOWS relationship will connect User profiles together in Neo4j after importing follower data from the Twitter API.

The graph data model that we will end up with looks like the following diagram.

Graph data model for Neo4j

We’ll now use the graph data model illustrated in the diagram to create a POJO that represents a domain class for the User node in Neo4j. We’ll also be sure to add fields for the incoming and outgoing follower connections as members of the User domain class. These fields will be created with the type Set<User>, and give us a way to load profiles that are connected to a User node with a FOLLOWS relationship. These steps are shown in the example code snippet below.

Example 1. User.java
@NodeEntity
public class User implements Serializable {

    @GraphId
    private Long id;

    @Index(unique = true) (1)
    private Long profileId;

    @Relationship(type = "FOLLOWS", direction = "OUTGOING") (2)
    private Set<User> follows = new HashSet<>();

    @Relationship(type = "FOLLOWS", direction = "INCOMING") (3)
    private Set<User> followers = new HashSet<>();

    private String screenName;
    private Float pagerank;

    ...
1 Creates a unique constraint on a user’s profileId property
2 Manages relationships of user nodes that this profile is following
3 Manages relationships of user nodes that this profile is being followed by

Next, we’ll need to create a repository to manage our data that will be mapped to the User domain class. The Spring Data project makes repository-based management of database entities a snap. We’re going to use the GraphRepository<T> interface to create a repository bean that will be created at runtime in our Spring Boot application.

public interface UserRepository extends GraphRepository<User> {

  // Get a User's Neo4j node ID using a Twitter profile ID
  @Query("MATCH (user:User { profileId: {profileId} }) RETURN id(user) as id")
  Long getUserIdByProfileId(@Param("profileId") Long profileId);

  ...
}

Here we create a basic interface that extends GraphRepository<User>. This repository interface will be initialized as a bean at runtime, and provides us with a client to manage transactions on entities for User nodes in Neo4j.

Now that we can manage User nodes and FOLLOWS relationships, we need to think about how performant it will be to save potentially thousands of relationships per second when importing user profiles from the Twitter API. We’ll need to be able to batch transactions so that Neo4j can handle the throughput of ingesting writes at a rapid pace. To do this, we need to create another GraphRepository bean for managing the creation of many FOLLOWS relationships between a set of User profiles.

public interface FollowsRepository extends GraphRepository<Follows> {

  // Batches the creation of many FOLLOWS relationships
  @Query("FOREACH(x in {follows} | MERGE (a:User { profileId: x.userA.profileId })\n" +
          "MERGE (b:User { profileId: x.userB.profileId })\n" +
          "MERGE (a)-[:FOLLOWS]->(b))")
  void saveFollows(@Param("follows") Set<Follows> follows);
}

The repository interface definition above is similar to the UserRepository interface. We’ve defined a Cypher query template for a custom repository method that will allow us to save batches of thousands of relationships that will connect User nodes together with a FOLLOWS relationship type in our Neo4j database. The custom saveFollows method takes in a domain class representing a relationship entity for the FOLLOWS relationship type. We’ll also need to create this domain class as a POJO like we did with the User node.

Example 4. Follows.java
@RelationshipEntity(type = "FOLLOWS") (1)
public class Follows {

  @GraphId
  private Long relationshipId;
  @StartNode
  private User userA; (2)
  @EndNode
  private User userB; (3)

  ...
1 Demarcates this class as a relationship entity for the FOLLOWS type
2 This is the user with the outgoing FOLLOWS relationship
3 This is the other user with the incoming FOLLOWS relationship

We now have the necessary data management capabilities to import Twitter profiles and their connections in the native shape of a graph. This is the advantage of using a graph database. Having our data stored as a graph makes it easy to perform PageRank analysis on without tedious aggregation and transformation that would be necessary if we used a relational database.

Exposing repository APIs using Spring Data REST

Now that we have created our Spring Data Neo4j repositories for managing our Twitter follower graph, we’ll need to expose a REST API interface that allows remote services to manage our domain data over HTTP. Thankfully this is a simple task when using the Spring Data REST project. All that we need to do to enable this as a feature on our Spring Data Neo4j repositories is add the spring-boot-starter-data-rest artifact as a dependency to the project’s pom.xml file.

Example 5. pom.xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-rest</artifactId>
</dependency>

By adding this artifact as a dependency, the Spring Boot application will automatically find Spring Data repositories and expose a REST API to manage repository data remotely over HTTP. Now if we start our Spring Boot application and navigate to the base HTTP endpoint we’ll see a JSON response in the format of application/hal+json.

{
  "_links" : {
    "following" : {
      "href" : "http://localhost:8080/following{?page,size,sort}",
      "templated" : true
    },
    "users" : {
      "href" : "http://localhost:8080/users{?page,size,sort}",
      "templated" : true
    },
    "profile" : {
      "href" : "http://localhost:8080/profile"
    }
  }
}

The application/hal+json content type is a JSON representation that lists hypermedia resources as embedded links. We can use these embedded links for /users{?page,size,sort} and /following{?page,size,sort} to manage resources of our graph repositories over HTTP.

Connecting to the Twitter API

Now that we have everything we need to manage our Twitter profile data in Neo4j, we can import profiles from the Twitter API. To do this, we can use the Spring Social Twitter project, an open source project from the Spring ecosystem that provides a managed Twitter API client. Before we can start using this client, we’ll need to add the spring-social-twitter artifact as one of our project dependencies in the pom.xml, which is shown in the code snippet below.

Example 6. pom.xml
<dependency>
    <groupId>org.springframework.social</groupId>
    <artifactId>spring-social-twitter</artifactId>
    <version>1.1.2.RELEASE</version>
</dependency>

The next step will be to configure the Twitter client that is provided by spring-social-twitter. In order to access operations for importing profiles from Twitter’s API, we will need to provide API tokens and keys that are generated for an application by Twitter. You’ll need to register with Twitter and create a developer app in order to get these keys. Getting API access is a simple process. A step-by-step guide is available from Spring’s website that will show you how to generate Twitter API keys for an application.

In our Spring Boot application we will map configuration properties as key values on the classpath. To do this, we will map keys in our application’s .properties file to values in the environment. These values will be our keys and access tokens that we need to authenticate with the Twitter API, and we’ll use them as parameters to configure a new TwitterTemplate bean at runtime.

The TwitterTemplate is provided as a bean from the Spring Social project and provides a client for authenticating and interacting with Twitter’s API.

The example snippet below shows how we can use the @Value annotation to load in configurations defined from the application’s .properties file.

@Value("${spring.social.twitter.appId}")
private String appId;

@Value("${spring.social.twitter.appSecret}")
private String appSecret;

@Value("${spring.social.twitter.accessToken}")
private String accessToken;

@Value("${spring.social.twitter.accessTokenSecret}")
private String accessTokenSecret;

@Bean
Twitter twitter() {
    return new TwitterTemplate(appId, appSecret, accessToken, accessTokenSecret);
}

As we can see from the code example, we will use a hierarchy of keys as a map to where we will find configuration properties in our application.yml file. We’ll need to add our Twitter API keys and tokens to the application.yml file exactly where it will be expected in our TwitterTemplate configuration class. The application.yml file for the Twitter Crawler service will look like the following.

Example 8. application.yml
spring.profiles.active: 'production'
---
server:
  port: 8080
spring:
  social: (1)
    twitter:
      accessTokenSecret: 'replace'
      accessToken: 'replace'
      appSecret: 'replace'
      appId: 'replace'
1 The spring.social.twitter configuration properties map will contain our Twitter API keys and tokens
The Twitter API keys and access tokens for these properties can be overridden using externalized values in the application’s runtime environment, since it’s not a good practice to store sensitive information like this as hardcoded values in the project’s source code repository.
Spring Boot provides an option to use YAML as a format for an application’s configuration .properties file, which I happen to be partial to using. To use YAML as a format instead of the classic .properties format, change the application.properties file name to application.yml, located in the application’s src/main/resources folder

Now our Spring Boot application for the Twitter Crawler service will be able to use a TwitterTemplate object as a client to interact with the Twitter API. The code snippet below is a simplified example of how we will access a TwitterTemplate bean using a Spring framework technique called constructor-based dependency injection.

Example 9. Receiver.java
@Component
public class Receiver {

  private Twitter twitter;

  @Autowired (1)
  public Listener(Twitter twitter) {
    this.twitter = twitter;
  }

  private void findFollowers(Long profileId) {
    // Retrieve a profile's followers using the TwitterTemplate
    twitter.friendOperations().getFollowerIds(profileId);
  }

...
1 Causes an instance of the TwitterTemplate object to be provided as a constructor’s parameter

The snippet of code above is meant to illustrate how we’ll be using the TwitterTemplate client throughout the application. We can see that in this class that we’re getting a reference to the TwitterTemplate through the constructor, which will be called by Spring when initializing the bean at runtime.

Using AMQP to import profiles from the Twitter API

The Twitter API has strict rate limiting policies for a single access token. The follower and friend API resources come with a 15 minute fixed window to make 15 HTTP requests. If we were to try to import a single user per request, it would take us roughly a year to import 1 million unique users. The good news is that Twitter has added the ability to grab cursored lists of followers and friends of a user. This allows us to import up to 5000 profiles per request. If you’re crawling the right users, you can import about 1 million unique users per day. That’s not too shabby.

If we were to get rate limited before we were done importing all of a user’s friends and followers, we need to make sure that we finish importing the rest of the user’s friends and followers when our rate limit expires. We will need a complete picture of the follower graph in order for PageRank to be an accurate predictor of a user’s importance in a community. We can’t lose any data during the import process or the crawler algorithm will be unreliable. We need our data to be consistent at all times or else we’ll suffer skewed results.

To ensure that data is imported reliably while maximizing utility of the rate limiting policies for resources, we’ll use the Spring AMQP project and bind to RabbitMQ.

We’re going to create two queues that we will use to serially import friends and followers of each discovered Twitter profile. We’ll start by configuring two RabbitMQ queues as beans in our Spring Boot application. The queues we need to create will be named twitter.follows and twitter.followers. We can do this by initializing the bean using the @Bean annotation and returning a new instance of the Queue class, shown below.

@Configuration
public class TwitterCrawlerConfig {

  @Bean
  Queue follows() {
      return new Queue("twitter.follows", true, false, false);
  }

  @Bean
  Queue followers() {
      return new Queue("twitter.followers", true, false, false);
  }
...

We’ll now create our RabbitMQ listeners for the two queues. The code snippet below uses the @RabbitListener annotation to indicate that the followers(String message) method should process messages that arrive to the twitter.followers queue on the active RabbitMQ connection.

Example 11. Receiver.java
@RabbitListener(queues = {"twitter.followers"})
public void followers(String message) throws InterruptedException, IOException {
  User user = objectMapper.readValue(message, User.class);

  if (user != null) {
    try {
      // Get the first cursor of followers for the user
      CursoredList<Long> followers = twitter.friendOperations()
          .getFollowerIds(user.getProfileId());

      // Import the users to Neo4j
      saveFollowers(user, followers);

      // Now import the users that the profile is following
      amqpTemplate.convertAndSend("twitter.follows", objectMapper.writeValueAsString(user));

    } catch (RateLimitExceededException rateLimitException) {

      // We exceeded the rate limit, redeliver the message to retry after 40 seconds
      Thread.sleep(40000L);
      throw new AmqpIllegalStateException(rateLimitException.getMessage());
      ...
    }
  }
}

We’ll create a similar listener to the one that is shown in the snippet above for the twitter.follows queue. To see both of these methods in full, head over to the source code repository for the sample application.

The reason why we need separate queues for each resource is to prevent wasting API requests if a failure occurs. Since the API resources for "followers" and "friends" have separate rate limiting policies, we could end up wasting duplicate API requests on the first resource if we encountered a fault during operations on the second resource.

Scheduling new PageRank jobs

The last concern we need to address on our Twitter Crawler service is to integrate with the graph processing platform. The graph processing platform is an attached backing service on Neo4j, which makes it easy for us to issue requests for new graph processing jobs. Neo4j exposes an endpoint to an unmanaged extension for Mazerunner on the classpath of the Neo4j database server. This unmanaged extension exposes a REST API for interacting with the graph processing platform’s analysis service that embeds an instance of Apache Spark.

It’s easy enough to make an HTTP GET request to the job scheduling interface on Neo4j, but we will still need to create a trigger that will be called on a scheduled time interval from the Twitter Crawler service. To do this, we can use the @Scheduled annotation on a method of an object in our Spring Boot application. We’ll then provide a fixed value for the rate parameter of the annotation that is measured in milliseconds. I’ve decided that the PageRank job should be started about every 5 minutes, so we’ll initialize the fixedRate value to 300000 milliseconds.

The snippet of code below is an example of how we will register a method using Spring’s @Scheduled annotation. The method issues an HTTP GET request to the job scheduling interface’s REST API, which resides on Neo4j.

@Scheduled(fixedRate = 300000) (1)
public void schedulePageRank() {

  // Schedule a PageRank job for the Twitter follower graph in Neo4j
  String relativePath = "%s/service/mazerunner/analysis/pagerank/FOLLOWS"
  String analysisEndpoint = String.format(relativePath, neo4jServer.url());

  // Make a HTTP GET request to the analysis endpoint
  new RestTemplate().getForEntity(analysisEndpoint, null);

  logger.info("PageRank scheduled on follows graph " + dateFormat.format(new Date()));
}
1 The @Scheduled annotation registers this method to be called every 5 minutes

Now that we know how to schedule operations using the @Scheduled annotation, we can use the same pattern above to create a reoccurring job to discover new users to import.

...
/**
 * Every minute, an attempt to discover a new user to be imported is made. This only succeeds if
 * the API is not restricted by a temporary rate limit. This makes sure that only relevant users are
 * discovered over time, to keep the API crawling relevant.
 */
@Scheduled(fixedRate = 60000)
public void scheduleDiscoverUser() {
  // Only discover users if the rate limit has not been exceeded
  if (!rateLimited) {
    // Uses PageRank to find the next most important user to import
    User user = userRepository.findRankedUserToCrawl();

    // If the user is null, the first PageRank hasn't been applied
    if (user == null) {
      // Uses a mutual follower metric
      user = userRepository.findNextUserToCrawl();
    }

    // If a user has been found, request the user to be imported
    if (user != null) {
      twitterService.discoverUserByProfileId(user.getProfileId());
    }
  } else {
    rateLimited = false;
  }

  // Update user rankings for the web dashboard
  logger.info("Updating last ranks...");
  userRepository.setLastPageRank();
  logger.info("Updating current rank...");
  userRepository.updateUserCurrentRank();
  logger.info("Current ranks updated!");
}

Discovering new users

Now we’ll need to implement a service contract that provides methods for crawling and discovering new Twitter profiles to import to Neo4j. To do this we’ll provide a REST API endpoint that takes in a Twitter user’s screen name as input and imports their profile and follower data to Neo4j. We’ll also need to implement a method for discovering a user using the Twitter profileId as input. When we import a user’s followers, we are only importing each follower’s profileId. This is enough for running PageRank on the resulting graph, but to display the user’s information on the web dashboard, we’ll need the rest of the profile information.

The TwitterServiceImpl class is shown below in full, and implements methods for discoverUserByScreenName and discoverUserByProfileId.

...
/**
 * This class implements the service contract for {@link TwitterService} and
 * is responsible for discovering users by screen name or profile ID.
 *
 * @author kbastani
 */
@Service
public class TwitterServiceImpl implements TwitterService {

  private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
  private final Log log = LogFactory.getLog(TwitterService.class);
  private static final String QUEUE_NAME = "twitter.followers";
  private final Twitter twitter;
  private final UserRepository userRepository;
  private final RabbitTemplate rabbitTemplate;
  private final ObjectMapper objectMapper;

  // These two fields are constants that target users below follows/following thresholds
  private static final Integer MAX_FOLLOWS = 50000;
  private static final Integer MAX_FOLLOWERS = 50000;

  @Autowired
  public TwitterServiceImpl(Twitter twitter, UserRepository userRepository,
    RabbitTemplate rabbitTemplate, ObjectMapper objectMapper) {
    this.twitter = twitter;
    this.userRepository = userRepository;
    this.rabbitTemplate = rabbitTemplate;
    this.objectMapper = objectMapper;
  }

  /**
   * Discover a user on Twitter using only their screen name
   *
   * @param screenName is the screen name of the user on Twitter
   * @return a user that has been retrieved from the Twitter API and saved to Neo4j
   */
  public User discoverUserByScreenName(String screenName) {
    User user;

    user = Optional.of(twitter.userOperations().getUserProfile(screenName))
        .map(User::new)
        .get();

    // Set the user's default values
    user.setPagerank(0f);
    user.setImported(true);

    user = getUser(user);

    return user;
  }

  /**
   * Discover a user on Twitter using their profile ID
   *
   * @param profileId is the profile ID of the user on the Twitter API
   * @return a user that has been retrieved from the Twitter API and saved to Neo4j
   */
  public User discoverUserByProfileId(Long profileId) {
    User user;

    user = Optional.of(twitter.userOperations().getUserProfile(profileId))
        .map(User::new)
        .get();

    user = getUser(user);

    log.info(String.format("Discover user: %s", user.getScreenName()));

    return user;
  }

  /**
   * Submit a job to crawl this user only if their follows/follower counts are within limits
   *
   * @param user is the {@link User} that is to potentially be requested for crawling
   * @return the saved {@link User} with full profile information now updated on the Neo4j node
   */
  private User getUser(User user) {
    Long userId = userRepository.getUserIdByProfileId(user.getProfileId());

    if (userId != null) {
      user.setId(userId);
    }

    user = userRepository.save(user, 0);

    try {
      // Only crawl users that have manageable follows/follower counts
      if (user.getFollowerCount() < MAX_FOLLOWERS && user.getFollowsCount() < MAX_FOLLOWS) {
        log.info("Discover user scheduled on follows graph " + dateFormat.format(new Date()));
        user.setDiscoveredTime(new Date().getTime());

        // Update discovery time
        userRepository.save(user, 0);

        // Update the discovery chain
        userRepository.updateDiscoveryChain();

        // Send a new import message to the 'twitter.followers' queue for the user
        rabbitTemplate.convertAndSend(QUEUE_NAME, objectMapper.writeValueAsString(user));
      } else {
        // Retry on a user with a valid number of follows/followers
        User nextUserToCrawl = userRepository.findNextUserToCrawl();

        if (nextUserToCrawl != null) {
          this.discoverUserByProfileId(nextUserToCrawl.getProfileId());
        }
      }
    } catch (JsonProcessingException e) {
      log.error(e);
    }
    return user;
  }
}

The last thing we’ll need to do is to expose the discoverUserByScreenName from the TwitterService as a REST API method to allow the web dashboard to add seed profiles. Below, I’ve created a @RestController annotated class with the name ApiController. This controller will be registered at runtime and allow consumers to issue GET requests to the URL template /v1/user/{screenName}, where screenName is the Twitter profile’s unique handle.

Example 15. ApiController.java
...
/**
 * Provides a single REST endpoint for seeding users to crawl on Twitter. Automated
 * crawling of Twitter users requires three seed users as input.
 *
 * @author kbastani
 */
@RestController
@RequestMapping("v1")
public class ApiController {

  private final TwitterService twitterService;

  @Autowired
  public ApiController(TwitterService twitterService) {
    this.twitterService = twitterService;
  }

  @RequestMapping(path = "user/{screenName}", method = RequestMethod.GET)
  public ResponseEntity<User> discoverProfileByScreenName(@PathVariable("screenName") String screenName) {
    return Optional.of(ResponseEntity.ok(twitterService.discoverUserByScreenName(screenName)))
        .or(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
  }
}

Registering as a discovery client with Eureka

Now that we’re done with the core business logic of the Twitter Crawler service, we need to think about making it operational. I’ve introduced Spring Cloud in previous articles, and we’ll use it here in the form of a Eureka discovery service.

The discovery service is a critical part of a microservice architecture. Eureka acts as a registry of service information that can be downloaded by members in a cluster and used for client-side load balancing, or pushing live configuration changes to all instances of a service that are running in a cluster.

@SpringCloudApplication (1)
@EnableZuulProxy (2)
@EnableScheduling (3)
public class TwitterCrawlerApplication extends SpringBootServletInitializer {
    public static void main(String[] args) {
        new SpringApplicationBuilder(TwitterCrawlerApplication.class).web(true).run(args);
    }
}
1 Adds @SpringBootApplication, @DiscoveryClient, and @CircuitBreaker annotations
2 Enables reverse proxying capabilities to route HTTP requests from other services
3 Enables scheduling, making sure @Scheduler annotations are registered

Ranking Dashboard

We’ve finished creating the backend components of the microservice architecture and can now write a simple client-side web application to interface with the Twitter Crawler service’s REST API. Since we are using Spring Cloud, we are able to take advantage of the Eureka discovery service and the @EnableZuulProxy annotation to automatically inject routes from the Twitter Crawler service into our new Ranking Dashboard service. What this means is that the new Ranking Dashboard service will be able to expose the full REST API of the Twitter Crawler service on its own host, without writing a single line of code.

Configuring the Ranking Dashboard for reverse proxy

The Spring Boot application we’ll create is as simple as it gets. The only application code we’ll create is the Spring Boot application class, shown below.

@SpringCloudApplication
@EnableZuulProxy
public class RankingDashboardApplication {
  public static void main(String[] args) {
    SpringApplication.run(RankingDashboardApplication.class, args);
  }
}

Here we’ve provided the annotations @SpringCloudApplication, which enables the basic Spring Cloud features for connecting to a discovery service. We’ve also added the annotation @EnableZuulProxy, which will enable an embedded Zuul proxy in this service. To get this to work, we do need to worry about some configuration properties in our application.yml file.

The properties below are configured for the production Spring profile and provides the necessary settings to connect and register with the discovery service. Since the Twitter Crawler service we created earlier uses the same discovery service connection settings, the Ranking Dashboard will automatically create a proxy to the Twitter Crawler’s REST API.

Example 18. application.yml
spring.profiles.active: 'production'
---
server:
  port: 8081
  servletPath: /
spring:
  profiles: 'production'
eureka:
  client:
    serviceUrl:
      defaultZone: http://discovery:8761/eureka/
  instance:
    preferIpAddress: true
    leaseRenewalIntervalInSeconds: 10
    hostname: ${spring.cloud.client.ipAddress:HOSTNAME}
    statusPageUrlPath: /info
    healthCheckUrlPath: /health
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds: 60000
ribbon:
  ConnectTimeout: 3000
  ReadTimeout: 60000

Now when the Ranking Dashboard service is started, it will contact the Eureka discovery service at http://discovery:8761/eureka/ and embed the request mappings that are exposed by the Twitter Crawler service. The ID that the Twitter Crawler service will use when registering with Eureka will be twitter-rank. This ID will be used as the request path to access the routes of the Twitter Crawler service from the Ranking Dashboard service. All requests to /twitter-rank/** on the Ranking Dashboard service will be forwarded to the Twitter Crawler service.

The next step will be to add static web content to the Ranking Dashboard service that connects to the REST API of the Twitter Crawler service through our newly embedded Zuul proxy.

Adding static web content

I’ve created a simple client-side web application that uses jQuery to make AJAX requests to the Twitter Crawler REST API. Spring Boot makes it easy to map static content by placing it in the resource directory under resources/static. The example below shows a directory tree of the Ranking Dashboard service and the static content I’ve placed in the resource directory.

├── docker
│   └── Dockerfile
├── java
│   └── org
│       └── kbastani
└── resources
    ├── application.yml
    ├── bootstrap.yml
    └── static
        ├── assets
        ├── css
        ├── dist
        ├── fonts
        ├── index.html
        └── js

Now when I run the Spring Boot application, the src/main/resources/static/index.html file will be mapped to the service’s root at /, or accessed directly at /index.html.

Consuming the Twitter Crawler’s REST API

The dashboard is a single page web application which consumes two REST API methods on the Twitter Crawler service. Let’s first review how the dashboard will be used.

The first time the dashboard is loaded, there won’t be any data to display from the Twitter Crawler service.

Twitter discovery dashboard

Before the Twitter Crawler service will begin to automatically discover new profiles, the user must provide a minimum of three screen names of Twitter users as seeds. The goal is to add three seed profiles of users who are members of a community on Twitter. It’s important to make sure that these users follow each other, which will make it likely that there are other mutual profile connections between these users.

Add new Twitter profile to crawl

The seed profiles I’ve chosen for this demonstration are:

When adding new seed profiles through the UI, an AJAX call will be made as an HTTP GET request to the relative path:

/twitter-rank/v1/user/{screenName}

After adding each of these profiles manually through the UI, we’ll end up with the following view.

PageRank on Twitter profile followers

To get the results that are displayed in the ranking table, the UI will make an AJAX call as an HTTP GET request to the relative path:

/twitter-rank/users/search/findRankedUsers?skip=0&limit=100

Now that I’ve added the three seed profiles, each of these user’s connections will be imported to Neo4j on the Twitter Crawler service. After about 5 minutes, the PageRank job will have been scheduled and completed its analysis of the initial users. After a PageRank value has been assigned to the initial users, you will begin to see other users that the crawling algorithm on the Twitter Crawler service has automatically discovered.

The following screenshot shows users that were discovered automatically by the Twitter Crawler service.

Automated discovery of new Twitter profiles

Docker Demo

The example project uses Docker to build a container image of each of our microservices as a part of the Maven build process.

Getting Started

To get started, visit the GitHub repository for this example project.

Clone or fork the project and download the repository to your machine. After downloading, you will need to use both Maven and Docker to compile and build the images locally.

Download Docker

First, download Docker if you haven’t already. Follow the instructions found here, to get Docker toolbox up and running on your development machine.

After you’ve installed Docker toolbox, run the following command to initialize a new virtualbox VM for this sample application.

$ docker-machine create twitter-demo --driver virtualbox --virtualbox-memory "11000" --virtualbox-disk-size "100000"
$ eval "$(docker-machine env twitter-demo)"

Requirements

The requirements for running this demo on your machine are found below.

  • Maven 3

  • Java 8

  • Docker

  • Docker Compose

Building the project

To build the project, from the terminal, run the following command at the root of the project.

$ mvn clean install

The project will then download all of the needed dependencies and compile each of the project artifacts. Each service will be built, and then a Maven Docker plugin will automatically build each of the images into your local Docker registry. Docker must be running and available from the command line where you run the mvn clean install command for the build to succeed.

Start the Cluster with Docker Compose

Now that each of the images has been built successfully, we can use Docker Compose to spin up our cluster. I’ve put together a Docker Compose file that will allow you to run the full sample application without needing to run the build. Before being able to run the application, you must provide your Twitter API credentials.

Example 19. docker-compose.yml
---
twitter-rank-crawler:
  image: kbastani/twitter-rank-crawler:latest
  ports:
   - '8080:8080'
  links:
   - config
   - discovery
   - rabbit
   - graphdb
  environment:
    SPRING_SOCIAL_TWITTER_ACCESSTOKENSECRET: 'REPLACE'
    SPRING_SOCIAL_TWITTER_ACCESSTOKEN: 'REPLACE'
    SPRING_SOCIAL_TWITTER_APPSECRET: 'REPLACE'
    SPRING_SOCIAL_TWITTER_APPID: 'REPLACE'
    SPRING_PROFILES_ACTIVE: 'production'
Make sure that you replace the environment values in the Docker Compose file with your own Twitter API keys and access tokens. Also, I highly recommend that you run this sample on a machine with at least 16GB of system memory.

Once you have modified the docker-compose.yml file in the project root, navigate to the spring-boot-graph-processing-example/ directory in your console.

To startup the cluster in detached mode, run the following command:

$ docker-compose up -d

If everything is configured correctly, each of the container images we built earlier will be launched within their own VM container on Docker and networked for automatic service discovery. You will see a flurry of log output from each of the services as they begin their startup sequence. This might take a few minutes to complete, depending on the performance of the machine you’re running this demo on.

To see the log output from the cluster, you can run the following command.

$ docker-compose logs

Once the startup sequence is completed, you can navigate to the Eureka host and see which services have registered with the discovery service.

Copy and paste the following command into the terminal where Docker can be accessed using the $DOCKER_HOST environment variable.

$ open $(echo \"$(echo $DOCKER_HOST)\"|
            \sed 's/tcp:\/\//http:\/\//g'|
            \sed 's/[0-9]\{4,\}/8761/g'|
            \sed 's/\"//g')

When the user interface successfully loads for Eureka, you’ll see the list of services that have registered as a Eureka client instance.

Eureka discovery service UI

Conclusion

In this article we built a ranking engine and crawler for discovering influential Twitter profiles. As a part of this article we covered the following Spring concepts:

  • Spring Data Neo4j

  • Spring Social Twitter

  • Spring Boot: Schedulers

  • Spring Boot: AMQP

  • Spring Cloud: Eureka client registration

  • Spring Cloud: Zuul Reverse Proxy

We covered a lot of ground in this one! This sample project is near and dear to my heart. I hope that this article showed you how incredibly simple Spring Boot makes developing these kinds of architectures. While there are some missing pieces, such as securing REST API access, and mapping device volumes to the containers, these kinds of important topics will be covered in future articles.

Please feel free to leave your questions and comments below.

If you found this article useful, please share it on Twitter — preferably with the influencers that you discover using the sample application!

No comments :

Post a Comment

Be curious, I dare you.