Microscopic Look at the States Inside Apache Beam Stateful Pipeline (Part two of three)

Antonio Si
Apache Beam State Processing
10 min readJan 27, 2023

--

— coauthored with Prema Kuppuswamy and harish nagu sana

Photo by Mark Hang Fung So on Unsplash

In our previous article, we described several use cases in which the ability to read the states of a Beam/Flink pipeline would be beneficial. In this second part of our three-part series, we will go through the StateReader of our StateProcessor tool. To recap, at Intuit, we use Apache Beam with FlinkRunner (or Beam/Flink for short) as a real-time data processing platform for our data pipelines. A typical pipeline consumes data from one or more Kafka topics, runs several transformations, and outputs to another Kafka topic.

The StateReader uses Flink’s State Processing API to extract states from a Flink savepoint file. The same mechanism is applicable for extracting states from checkpoint files as well.

Context

We would like to begin by revisiting the sample pipeline that we will be using throughout this article series. The pipeline definition contains multiple stages or ParDo as follows:

The pipeline counts the number of sales for each specific model of Apple products. It begins by consuming data from a Kafka topic and converting it to key-value pairs. Sample input messages for the sale event are shown below:

{"item":"Watch", "quantity":2, "model":["Apple Watch series 7", "Apple Watch series 8"]}
{"item":"Watch", "quantity":2, "model":["Apple Watch series 8"]}
{"item":"iPhone", "quantity":10, "model":["iPhone 14"]}
{"item":"iPhone", "quantity":20, "model":["iPhone 14 Pro Max"]}
{"item":"Mac", "quantity":5, "model":["Macbook air"]}

One of the pipeline’s transformations manages the cumulative quantities and the models of each item using Beam state variables.

import com.intuit.spp.example.model.Purchase;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class StatefulTransformation extends DoFn<KV<String, Purchase>, KV<String, ResultContainerCumQuantity>> {
private static final long serialVersionUID = 1234567L;

@StateId("quantities")
private final StateSpec<ValueState<Long>> quantitiesSpec = StateSpecs.value();
@StateId("models")
private final StateSpec<BagState<String>> modelsSpec = StateSpecs.bag(StringUtf8Coder.of());

private static final Logger LOG = LoggerFactory.getLogger(StatefulTransformation.class);


@ProcessElement
public void format(ProcessContext context, @Element KV<String, Purchase> input, @StateId("quantities") ValueState<Long> quantities, @StateId("models") BagState<String> models) {
Long currentQuantity = quantities.read() != null ? quantities.read() : 0L;
List<String> currentModels = new ArrayList<>();

if (models.read() != null) {
Iterator<String> i = models.read().iterator();
while (i.hasNext()) {
currentModels.add(i.next());
}
}

List<String> newModels = input.getValue().getModels();

newModels.forEach(model -> {
if (!(currentModels.contains(model))) {
models.add(model);
currentModels.add(model);
}
});

Long newQuantity = currentQuantity + input.getValue().getQuantity();
quantities.write(newQuantity);
context.output(KV.of(input.getValue().getItem(), new ResultContainerCumQuantity(input.getValue().getItem(), input.getValue().getQuantity(), newQuantity, input.getValue().getTime(), currentModels)));
}
}

In our example, we define a transformation that accepts a Key/Value pair of <String, Purchase> and returns a Key/Value pair of <String, ResultContainerCumQuantity>. ResultContainerCumQuantity is a simple contain object of several fields:

    public ResultContainerCumQuantity(String item, long deltaQuantity, long cumulativeQuantity, String eventTime, List<String> models) {
this.item = item;
this.deltaQuantity = deltaQuantity;
this.cumulativeQuantity = cumulativeQuantity;
this.eventTime = eventTime;
this.models = models;
}

In this transformation, we also define two state variables — ‘quantities’ as a Long object storing cumulative sales information for each product, and ‘models’ as a bag of String objects. Our pipeline processes the input events by grouping them by key (i.e., item) using a ParDo before passing them to another ParDo that utilizes these state variables. This means that each state defined by these state variables is associated with the key item.

StateReader Design

The design of the StateReader is simple. It models a typical Flink pipeline definition, consuming data from a data source, and outputting data via a data sink:

As shown in the above diagram, a DataSource object is used to model a data source, which is simply a savepoint file in our context. With the help of an InputFormat object, it provides the logic to consume data from a binary savepoint file, reads the content of the savepoint file, and spits out some intermediate data structure. A DataSink object is used to model a data sink. With the help of an OutputFormat object, it provides the logic to consume the intermediate data structure and outputs the state data to some data storage such as s3 or dynamoDB.

InputFormat

An InputFormat object is used to provide logic about how to read a savepoint file and what intermediate data structures are generated.

An InputFormat object is defined by three pieces of information:

  1. Application State Definitions and Coders: Since state variables are defined in an application, we would need the state definitions in order to know what to extract from the savepoint file. Application may also provide custom coders for encoding and decoding state data. The InputFormat would need the custom coders as well.
  2. Beam Coder: Some state variables may simply use built in data types. Since our pipeline is defined using Apache Beam, the state data will be encoded/decoded using the built in Beam coder.
  3. State Processing API: Flink state processing api will be used to read the savepoint file and obtain the monitored states.

Application State Definitions

One of the inputs to the InputFormat is the definition of the state data that needs to be looked up. This information is supplied as a configuration to our state reader as follows:

           "monitorStates" : {
"quantities": { ← StateId
"type": "LongType" ← Datatype
}
"models": {
"type": "BagCustomType",
Coder → "coder": "org.apache.beam.sdk.coders.StringUtf8Coder"
}
},

The configuration includes the state id, data type, and the coder that the pipeline uses in its state variable definition. The coder is optional and is only required if a custom coder needs to be specified explicitly.

As another example, the following illustrates the configuration for the state definition with a custom coder:

                "monitorStates" : {
"event_buffer" : {
"type" : "BagCustomType",
"coder" : "com.intuit.data.platform.process.sessionization.model.ClickstreamEvent$ClickstreamEventCoder"
},
"current_session" : {
"type" : "CustomType",
"coder" : "com.intuit.data.platform.process.sessionization.model.Session$SessionCoder"
}
},

In this configuration, the data type of the state variable defined in the pipeline needs to use a coder that is defined in the Beam pipeline.

Beam Coder

Based on the data type specified in the configuration, StateReader will determine the proper coder to use. Apache Beam built-in coders will be used as default depending on the state definition data type. Here is a list of built-in coders:

In our sample pipeline, since the quantities state variable is of type Long, it will use the VarLongCoder. Apache Beam allows a user to define their own coder. For example, in our sample pipeline, the state variable models is a Bag. The configuration will need to specify the coder of each item in the bag, which is simply a StringUtf8Coder.

State Processing API

Flink State Processing API provides multiple information for InputFormat. In order to understand the role of Flink State Processing API in the InputFormat object, we provide a code snippet that instantiates an InputFormat object:

public <WindowT extends BoundedWindow, OutputT> DataSource<OutputT> readKeyedState(
OperatorState operatorState,
BeamRocksDBKeyedStateReaderFunction<String, Object, OutputT> readerFunction,
Coder<String> keyCoder,
Coder<WindowT> windowCoder,
TypeInformation<OutputT> outputType)
throws IOException {
final KeyedStateInputFormat<ByteBuffer, String, OutputT> inputFormat =
new KeyedStateInputFormat<>(
operatorState,
stateBackend,
environment.getConfiguration(),
new BeamSavepointForByteKey.BeamStateReaderOperator<>(
readerFunction,
keyCoder,
windowCoder,
Types.STRING.createSerializer(environment.getConfig())));

return environment.createInput(inputFormat, outputType);
}

Since our states are associated with a key, we instantiate a KeyedStateInputFormat object. The constructor of this class takes in several parameters:

  1. OperatorState
  2. StateBackend
  3. Configuration
  4. StateReaderOperator

I will go through each of these parameters below.

OperatorState

Every savepoint file contains metadata that contains a list of OperatorStates. An OperatorState is identified by a uuid, corresponding to a pipeline operator name. The states of the pipeline are mapped by an operator id. Therefore, in order to extract the states from the savepoint file, we need to know the operator name managing the states in the pipeline or we could loop through all OperatorStates of the metadata and attempt to look for the states from that operator. If the OperatorState does not manage the states in the pipeline, the look-up will return an empty object.

The following provides a code snippet demonstrating how to obtain the metadata from a savepoint file.

CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata(path);
int maxParallelism =
metadata.getOperatorStates().stream()
.map(OperatorState::getMaxParallelism)
.max(Comparator.naturalOrder())
.orElseThrow(
() -> new RuntimeException("Savepoint must contain at least one operator state."));
SavepointMetadata savepointMetadata =
new SavepointMetadata(
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());

The main reason that the code instantiates a SavepointMetadata rather than directly using the CheckpointMetadata is because SavepointMetadata allows one to lookup an OperatorState based on an operatorId.

StateBackend

Apache Flink supports two types of internal stores for holding the state data: Memory and RocksDB. We will need to provide the proper state backend to the InputFormat object, which needs to match the state backend used to create the savepoint of the data pipeline. In our context, since we use RocksDB in our data pipeline, we will be using an instance of org.apache.flink.contrib.streaming.state.RocksDBStateBackend.

Configuration

The Configuration parameter is simply an instance of org.apache.flink.configuration.Configuration. The configuration is a key-value pair and one can supply any Flink specific configuration parameters.

StateReaderOperator

The StateReaderOperator class provides the logic of how to extract the state. It acts as a callback object for the Flink engine. This interface defines two functions:

public abstract void processElement(KEY var1, N var2, Collector<OUT> var3) throws Exception;
public abstract CloseableIterator<Tuple2<KEY, N>> getKeysAndNamespaces(SavepointRuntimeContext var1) throws Exception;

The function getKeysAndNamespaces is called once when the InputFormat is initialized (here). It should return an iterator of keys and namespace for the given state id.

   public CloseableIterator<Tuple2<ByteBuffer, String>> getKeysAndNamespaces(
SavepointRuntimeContext ctx) throws Exception {
final Collection<? extends StateTag<?>> tags = readFunction.getTags();
IteratorWithRemove<Tuple2<ByteBuffer, String>>[] iterators = new IteratorWithRemove[tags.size()];
int i = 0;
for (StateTag<?> stateTag: tags) {
iterators[i++] = new IteratorWithRemove<>(getKeyedStateBackend().getKeysAndNamespaces(stateTag.getId()));
}
return new ChainedIteratorWithRemove<>(iterators);
}
}

In the above code snippet, tags are a collection of objects of the type StateTag. One can consider StateTag as a wrapper of id, data type, and coder of state variables. The function getKeyedStateBackend() will return a RocksDBKeyedStateBackend object. It provides a getKeysAndNamespaces() function which returns an iterator of <key, namespace> tuples, given a stateId. Flink internally organizes operators into namespaces and creates a value which is a composite value of <key, namespace> and uses it as the database key when writing to RocksDB.

For every <key, namespace> returned from getKeysAndNamespaces(), the Flink engine will call the processElement() function of StateReaderOperator. This function implements the logic to lookup the state that is associated with the key:

public void processElement(
ByteBuffer encodedKey, String namespace, Collector<OutputT> collector) throws Exception {
LOG.debug("encodeKey={}, namespace={}", encodedKey, namespace);
final StateNamespace stateNamespace = StateNamespaces.fromString(namespace, windowCoder);
final String key = FlinkKeyUtils.decodeKey(encodedKey, keyCoder);
final Collection<? extends StateTag<?>> tags = readFunction.getTags();
tags.forEach(tag -> {
State vs = (State)stateInternals.state(stateNamespace, tag);
Object value = getState(vs);
LOG.debug("key={}, nameyspace={}, value={}", key, namespace, value);

try {
this.readFunction.readKey(tag.getId(), key, value, context, collector);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

Beam provides a helper class, FlinkKeyUtils, which contains a helper function to decode a key using a keyCoder. In our pipeline, our key is a simple string; therefore, we use the Beam StringUtf8Coder.of() for the keyCoder. In the above code snippet, stateInternals is a FlinkStateInternals object. It is another Beam helper class. It provides a state() function which will return the states that are associated with the key, namespace, and stateId.

One may wonder how the state() function can lookup the states that are associated without taking the key as input. The FlinkStateInternals object has a reference to the RocksDBKeyedStateBackend object, which is also accessible by the Flink engine. The state() function uses the RocksDBKeyedStateBackend object to look up the states associated with the key. The RocksDBKeyedStateBackend object has a setCurrentKey() function which is invoked by the Flink engine before calling the processElement() function.

Intermediate Data Structure

We use a simple data structure to hold the key as well as the state associated with the key:

public class RestoredState<T> implements Serializable {
private static final long serialVersionUID = -1888667633L;

public String tag;
public String key;
public T value;

public RestoredState() {
}

public RestoredState(String tag, String key, T value) {
this.tag = tag;
this.key = key;
this.value = value;
}

public boolean isStringValue() {
return value instanceof String;
}

public boolean isMapValue() {
return value instanceof Map;
}

public boolean isBoolValue() {
return value instanceof Boolean;
}
}

OutputFormat

OutputFormat provides the logic to process the intermediate data structure returned by the DataSource. It defines 4 functions that need to be implemented. Our implementation mainly focuses on the writeRecord() and close() functions:

@Override
public void configure(Configuration parameters) {
LOG.info("Configuring {}", RestoredStateOutputFormat.class.getName());
}

@Override
public void open(int taskNumber, int numTasks) {
this.restoredStates = new ListAccumulator<>();
this.stateSerializer = StateSerializer.getSerializer(serializerConfig);
}

@Override
public void writeRecord(RestoredState record) {
restoredStates.add(record);
}

public List<RestoredState> getStates() {
return Collections.unmodifiableList(restoredStates.getLocalValue());
}

@Override
public void close() {
if (restoredStates != null) {
List<RestoredState> restoredStates = getStates();
if (restoredStates.isEmpty()) {
return;
}
String taskName = getRuntimeContext().getTaskName();
LOG.info("taskName={}, outputFormat={}", taskName, System.identityHashCode(this));
serializerConfig.setContextName(String.valueOf(System.identityHashCode(this)));
try (StateSerializer stateSerializer=StateSerializer.getSerializer(serializerConfig)) {
stateSerializer.output(serializerConfig.getPath(), restoredStates);
} catch (Exception e) {
throw new StateRestoreException("[Internal Error]: Unable to serialize restored states.", e);
}
}
}

In the above code snippet, we simply add the record to an accumulator in the writeRecord() implementation. In the close() function, we output all records via a stateSerializer. A StateSerializer is our proxy to external storage to hold the states. It could refer to a local file system, s3, dynamoDB, or any other storage media.

Output States

The output of the StateReader would be like below as demonstrated in the previous blog. This will make it easy for customer to debug, port to external store.

{"iPhone":{"models":["iPhone Plus 14"],"quantities":6430}}
{"Watch":{"models":["Apple Watch series 7","Apple Watch series 8"],"quantities":1929}}
{"Mac":{"models":["Macbook air 1","Macbook air 2"],"quantities":1929}}

To Be Continued …

In this blog, we went over the design and implementation of the StateReader that extracts state data of a Beam pipeline from a Flink savepoint file. The same mechanism is applicable to a Flink checkpoint file as well. We run this reader on a 24 GB savepoint and it takes about 32 minutes to extract and save the states in our S3 bucket.

In the last article of this three-part series, we will go over our StateBootstrapper which will generate a Flink savepoint for Beam/Flink from an external data source. We can then use the savepoint to bootstrap the states of the Beam/Flink pipeline.

Special thanks to my colleague David Farr at Intuit for reviewing and editing an early draft of this article.

--

--