• 8 min. reading time

Contract Testing with Pact and Kafka

December 11, 2023

As applications get more complex and distributed, testing communication between services and modules becomes increasingly important. One way to make sure that different parts of a system can talk to each other is to use contract testing. Contract testing is a technique used in software testing to ensure that communication between different parts of a system works correctly and consistently. Instead of testing the entire system at once, contract testing focuses on testing the interactions between specific components, which are defined via contracts that describe what interaction is expected. We had introduced our approach in our previous post “Bridge Testing: A Holistic Approach to Testing a Micro-Service Architecture” In this post, we'll focus on using Pact to test message-based interactions between the modules that make up VoloIQ and integrate via Kafka. But first, we briefly cover the high-level VoloIQ architecture to provide some context and motivation.

VoloIQ Architecture

VoloIQ is structured based on a domain-driven design and therefore split into several sub-domains such as booking flights, aircraft and types, flight operations, planning of routes and a few others. Each domain is implemented by a so-called module, which essentially is a self-contained system that has a micro-frontend, one or more services backends and backing services for persistence, queues etc.

While the parts that make up a module are rather tightly integrated, the individual modules themselves are only loosely connected via an event-based architecture for which we decided to use Apache Kafka.

VoloIQ's architecture is designed to give our development teams maximum autonomy and flexibility in choosing the best technologies and libraries to solve specific problems. By using an event-based architecture with Apache Kafka to loosely connect the modules, we can maintain high availability and resilience even in the face of failures. This approach also allows us to deploy different configurations of VoloIQ with some modules but not others, as needed.

Customers and users, however, rightly think of VoloIQ as one cohesive application and therefore all the modules need to work together seamlessly. However, the sub-domains of the various modules are not entirely disjoint. For instance, information about our aircraft is used by many other modules. Similarly, for booked flights. Changes to those business entities are broadcast by events and stored in the consuming module's database. This, for instance, allows the Flight Operation module to maintain a list of aircraft without having to directly communicate with the aircraft module, which greatly improves the overall availability of the system.

So, how do we guarantee that the different modules can exchange messages or events that are understood by the others?

Contract Testing with Pact

Classical end-to-end integration tests rely on an entirely deployed system with a consistent and well-defined set of data that can be used for testing. This is difficult to set up in an automated reproducible and fast way. Tests running in parallel can also have unpredictable outcomes and need extra work. Hence, these tests are long running and can be flaky and don’t scale very well.

Pact takes a different approach to integration testing by establishing a so-called contract between both communication parties. A contract is made up of messages that are expected to be sent and received, and the data that they contain. You can also define the context in which the interaction takes place, such as the state of the system before the interaction occurs. Once the contract is defined, Pact can automatically generate tests that verify whether the interactions between services match the contract. It can be used with many languages and frameworks and can be applied to both HTTP and message-based interactions.

Moreover, Pact contract tests are more like unit-tests in the sense that they do not need a database or a deployed version of the software under tests. It is easy to integrate those tests in our pull-request pipelines and the provider and consumer teams can work independently against the contract and hence are less dependent on each other.

Design Goals

To be confident that our modules work correctly together, we want to test as much of our stack as possible in particular:

For producers we wanted to ensure that …

  • the message is published on the correct topic,
  • the correct headers are set,
  • the message is serialized correctly, and
  • the message is sent under the right circumstances.

For consumers we wanted to ensure that …

  • the consumer is subscribing to the right topic,
  • the messages are deserialized correctly, and
  • the message is handed over to the right piece of business logic.

Moreover, as VoloIQ is a polyglot system consisting of modules written in Kotlin and Python we need to support at least those two languages and technology stacks. Therefore, existing client libraries had to be carefully checked as not all of them offer the same features. For instance, the JVM libraries expose the metadata part, whereas the Python library does not. Therefore, we settled for including Kafka message headers, message key and the value, i.e., the actual payload in the content section of the message pacts.

Message-based Pacts with Kafka

Based on the above design goals, we decided to implement the following setup. Note, that the below code examples are simplified and shortened for brevity and readability.

For Kotlin, or Java for that matter, we decided to go with a setup based on Spring's embedded Kafka, so we can exercise the whole stack: sending/receiving messages, serialization/deserialization, routing to the right method and our whole Spring-Kafka configuration.
As there is no built-in support for Kafka, we must translate from Pact's V4Interaction.AsynchronousMessage to Spring Kafka's ProducerRecord and vice versa from Spring Kafka's ConsumerRecord to Pact's MessageAndMetadata. For this purpose, we have the following Converter class:

object Converter {
  private data class Envelope(
    val topic: String,
    val key: String,
    val headers: HashMap<String, String>,
    val value: JsonNode
  ) {}

  /**
   * Extract topic, key, headers, and value from 
   * AsynchronousMessage's content.
   */
  fun convert(message: V4Interaction.AsynchronousMessage): 
      ProducerRecord<String, String?> 
  {
    val contents = message.contents.contents
    val envelope = jacksonObjectMapper()
        .readValue(contents.valueAsString(), Envelope::class.java)
    val recordHeaders = envelope.headers
        .map { (k, v) ->
            RecordHeader(k, v.toByteArray("UTF-8"))
        }.toList()
    val partition = 0
    val bytes = jacksonObjectMapper()
        .writeValueAsString(envelope.value)
    ProducerRecord<String, String?>(envelope.topic, 
         partition, envelope.key, bytes, recordHeaders)
  }

  /**
   * Takes topic, key, headers, and value and writes it into 
   * AsynchronousMessage's content.
   */
  fun convert(rec: ConsumerRecord<String, String>): 
      MessageAndMetadata 
  {
    val headers = HashMap<String, String>()
    rec.headers().forEach { h -> 
        headers[h.key()] = h.value().decodeToString() 
    }
    val value = jacksonObjectMapper().readTree(rec.value())
    val envelope = Envelope(rec.topic(), rec.key(), headers, value)
    val payload = jacksonObjectMapper()
        .writeValueAsBytes(envelope)
    return MessageAndMetadata(payload, emptyMap())
  }
}

As mentioned before, we are adding all message details, i.e., headers, key, payload to the content section of the interaction this is done by (de)-serializing Envelope.

Consumer Tests

Pact is consumer-driven and, therefore, we start with the tests that establish the contract and at the same time check whether the consumer lives up to it. To this end, we need to be able to send an AsynchronousMessage by converting it to a ProducerRecord and send it to Kafka, so the consumer can consume it. This is facilitated by the following class.

class KafkaAsyncMessageProvider(private val template: 
    KafkaTemplate<String, String?>) 
{
  fun sendMessage(message: V4Interaction.AsynchronousMessage) {
    val record = Converter.convert(message)
    template.send(record).get()
  }
}

With that, we have everything in place to write our actual Consumer Tests:

@PactDirectory("build/pacts")
@ExtendWith(PactConsumerTestExt::class, SpringExtension::class)
@PactTestFor(providerName = "aircraft-management", 
    providerType = ProviderType.ASYNCH,
    pactVersion = PactSpecVersion.V3
)
@EmbeddedKafka(ports = [9092], topics = ["Aircraft"])
@SpringBootTest(classes = [ConsumerApp::class])
class AircraftConsumerTest {

  @Autowired
  lateinit var aircraftManagement: KafkaAsyncMessageProvider

  @MockBean
  lateinit var repository: AircraftRepository

  @Pact(provider="aircraft-management",consumer="aircraft-consumer")
  fun aircraftCreatedPact(builder: MessagePactBuilder): MessagePact
  {
    return builder
      .given("Aircraft A1 was created")
      .expectsToReceive("An Aircraft created message")
      .withContent(
         EnvelopeBuilder.message(
           topic = "Aircraft",
           key = "A1",
           type = "AircraftV1",
           value = PactDslJsonBody()
             .stringValue("id", "A1")
             // more properties here
       )
    ).toPact()
  }

  @Test
  @PactTestFor(pactMethod = "aircraftCreatedPact")
  fun `should update aircraft in repository`
      (aircraftCreatedMessage: V4Interaction.AsynchronousMessage) 
  { 
    // make it look like the message was sent
    // from aircraft management 
    aircraftManagement.sendMessage(aircraftCreatedMessage)
    Mockito.verify(repository, Mockito.timeout(1000))
      .upsertAircraft(Aircraft("A1", … )))
  }
}

There is quite a bit to unpack here. Let's start at the top and work our way to the bottom. @EmbeddedKafka(ports = [9092], topics = ["Aircraft"]) starts the embedded Kafka. It is important to create the topic "Aircraft" when it starts, otherwise the subscription performed by the consumer will not work and it will not consume any messages.

@MockBean
lateinit var repository: AircraftRepository

Here, we mock the AircraftRepository which is responsible for storing our aircraft in the database. Later we will verify that the correct methods with the right parameters were called. This will tell us whether the Kafka messages were consumed properly.

Next, we define the actual contract which uses our EnvelopeBuilder that helps with creating contracts that match our standard headers and message format. Value represents the expected payload, i.e., the aircraft in our case.

@Pact(provider="aircraft-management", consumer="aircraft-consumer")
fun aircraftCreatedPact(builder: MessagePactBuilder): MessagePact {
    return builder
        .given("Aircraft A1 was created")
        .expectsToReceive("An Aircraft created message")
        .withContent(
            EnvelopeBuilder.message(
                topic = "Aircraft",
                key = "A1",
                type = "AircraftV1",
                value = PactDslJsonBody()
                    .stringValue("id", "A1")
                    // more properties here
            )
        ).toPact()
    }

The EnvelopBuilder from above produces output that is equivalent to this:

PactDslJsonBody()
   .stringValue("topic", "Aircraft")
   .stringValue("key", "A1")
   .`object`("headers", PactDslJsonBody()
      .stringValue("type", "AircraftV1")
   )
   .`object`("value", value)

With this structure, topic, key, headers, and value are part of the contract and we can ensure that consumers and produces are using them correctly by publishing to or consuming from the right topic. The above type header tells the consumer what payload to expect. We also use it for versioning message payloads.

After that, we can move on to the actual test:

@Test
@PactTestFor(pactMethod = "aircraftCreatedPact")
fun `should update aircraft in repository`
    (aircraftCreatedMessage: V4Interaction.AsynchronousMessage)
{
  // make it look like the message was sent from aircraft management
  aircraftManagement.sendMessage(aircraftCreatedMessage)
  Mockito.verify(repository, Mockito.timeout(1000))
    .upsertAircraft(Aircraft("A1", … )))
}

Here, an aircraftCreatedMessage matching the above pact is passed and published by our aircraftManagement stub. The verification part of the test is a Mockito.verify() that checks whether upsertAircraft() has been called with Aircraft("A1", …). As this will happen asynchronously, we must use it with a timeout.

When the above test is executed successfully, we know that events signifying updates to aircraft, are consumed from the right topic, i.e. Aircraft, deserialised properly and then handed over correctly to the AircraftRepository. This shows that the consumer correctly implemented the contract.

Provider Test

With the above consumer tests in place, we now have a contract against which we can test the producer, i.e., AircraftManagement in our case, where we will create an aircraft and then verify that the correct message was published on the correct topic. To this end we have the following KafkaAsyncMessageConsumer, which subscribes to all topics via @KafkaListener(topicPattern = ".*") and converts the message using the above Converter. Via its waitForMessage() method we can wait until a message is received and retrieve it.

@Component
class KafkaAsyncMessageConsumer() {
  private var message: MessageAndMetadata? = null
  private var expectedMessage = CountDownLatch(1)

  @KafkaListener(topicPattern = ".*", groupId = "PACT-test")
  fun listener(@Payload rec: ConsumerRecord<String, String>) {
    this.message = Converter.convert(rec)
    logger.info("Message received headers={} payload={}", 
      this.message!!.metadata, rec.value())
    this.expectedMessage.countDown()
  }

  fun waitForMessage(): MessageAndMetadata {
    if (!expectedMessage.await(1, TimeUnit.SECONDS)) {
      throw RuntimeException("No Message received before timeout")
    }
    return message!!
  }

  fun reset() {
    message = null
    expectedMessage = CountDownLatch(1)
  }
}

With this generic consumer in place, we can now write our provider tests:

@Provider("aircraft-management")
@PactFolder("build/pacts")
@ExtendWith(SpringExtension::class)
@EmbeddedKafka(ports = [9092], topics = ["Aircraft"])
@SpringBootTest(classes = [Configuration::class])
class AircraftProviderTest {

  @Autowired
  private lateinit var aircraftService: AircraftService

  @Autowired
  private lateinit var consumer: KafkaAsyncMessageConsumer

  @TestTemplate
  @ExtendWith(PactVerificationInvocationContextProvider::class)
  fun pactVerificationTestTemplate(context: PactVerificationContext)
  {
    context.verifyInteraction()
  }

  @PactVerifyProvider("An Aircraft created message")
  fun capturedCreateAircraftMessage(): MessageAndMetadata {
    return consumer.waitForMessage()
  }

  @State("Aircraft A1 was created")
  fun createAircraft() {
    aircraftService.createAircraft("A1", /* more properties here*/)
  }
}

There are a few things to unpack here. Again, we start the embedded Kafka and create the required topic with @EmbeddedKafka(ports = [9092], topics = ["Aircraft"])
It is important to create the topics upfront, otherwise the KafkaAsyncMessageConsumer subscription will not work and no messages will be received.

@Autowired
private lateinit var aircraftService: AircraftService

This is our production code where aircraft are created, updated, and deleted. When asked to create a provider state "Aircraft A1 was created", we exercise our production code and actually create a matching Aircraft.

@State("Aircraft A1 was created")
fun createAircraft() {
    aircraftService.createAircraft("A1", /* more properties here*/)
}

This will then trigger the sending of the Kafka message, which will be captured by our KafkaAsyncMessageConsumer and returned to Pact for validation.

@PactVerifyProvider("An Aircraft created message")
fun capturedCreateAircraftMessage(): MessageAndMetadata {
    return consumer.waitForMessage()
}

With the above, we have successfully validated that the provider sends the right message on the right topic via Kafka and that it can be deserialized by the consumer listening to the right topic and executing the right business logic. We are now very confident that if we deployed both producers and consumers into the same environment that they actually work together and provide reliable business functionality to our users.

Summary

With Pact's generic support for message-based contracts and a few Kafka specific classes that help us convert Pact's abstract Messages to Spring Kafka's Messages and vice versa we can validate that messages are sent correctly by Providers and can be received and correctly processed by Consumers.

These tests are a cornerstone of our architecture that allows our development teams to work independently at great speed. And with the above tests passing, we are confident that our Modules can actually work together and form the cohesive whole that is VoloIQ and that our users expect.

Featured image by Cytonn Photography on Unsplash.

Authors

Thomas Hettel

iconbutton-90px iconbutton-90px iconbutton-90px copy iconbutton-90px Pagination/mouse-follow/left Pagination/mouse-follow/right Pagination/mouse-follow/right-up