Solution Source Code¶
A solution is a domain-specific configuration of Drive that is customized for a particular customer in that domain. Different solutions can be in different domains E.g., Telco, banking, retail, healthcare. They can also be tailored towards different customers in that domain E.g., different data sources, communication, and fulfillment services, etc.
To build custom solutions based on the requirement, the source code for it would reside here. All feed applications,
its parsers and event consumers development would happen here. The enrichment helpers logic are written here after they
are declared in the enrichment_functions.json
file. Let’s dive into each topic.
Feed Applications¶
A feed application consist of a flow with a defined set of operators in a certain structure. Operators are individual components designed to perform a certain set of tasks.
A typical feed application has the following components:
A Source: receives the input data from an external data source
A Parser: parses the input data into tuple format
An Enricher: enriches the tuple data with profile information stored in PinPoint, and the aggregate data stored in FastPast
An Aggregator: updates the aggregate data stored in FastPast
A Trigger Evaluator: Detects trigger conditions of interest
One or more Sinks: Sends trigger results to the campaign executor via a Kafka queue
A feed application is typically configured via parameters specified in drive.json
. A typical entry would look
something like this:
{
"drive": {
...
"feedApplications": [
{
"logLevel": "INFO",
"name": "CC Usage"
}
]
...
}
}
The source data for a feed application commonly come from files or Kafka queues. Custom source operators can be written for connecting to different sources.
An example of a simple feed application which would scan a directory for csv every 10 secs and ingest data would look like the code below:
class CCUsageFeedApplication(FeedApplication):
"""Implements the Goliath feed application"""
def create_flow(self):
"""Creates the Credit Card Usage flow"""
scan_schema = Schema(
{
"filename": String,
"modTime": Int64
},
name="scan_schema")
parser_schema = Schema(
{
drive_constants.CA_EPOCH_ATTRIBUTE: Int64,
drive_constants.CA_TIMESTAMP_ATTRIBUTE: Int64,
"departmentName": String,
"description": String,
"cardHolderName": String,
"customerID": String,
"merchant": String,
"category": String,
"mcc": String,
"transactionAmount": Double,
"transactionType": String
},
name="parser_schema")
scanner = self.instantiate_directory_scanner(
file_name_filter=PatternBasedFileNameFilter(r".*\.csv"),
output_schema=scan_schema,
period_in_seconds=10)
parser = self.instantiate_operator(
class_=CCUsageReader,
input_schema=scan_schema,
move_path=self.feed_data_move_dir(),
name=drive_constants.DRIVE_APPLICATION_PARSER_OPERATOR_NAME,
output_schema=parser_schema)
return self.compose_flow(ingress_flow=scanner >> parser)
The class
CCUsageFeedApplication
is inherited fromFeedApplication
base class.create_flow
is an abstract method ofFeedApplication
class which is implemented here. This method is used to define the flow of the operators for the feed application.Every operator by default requires a 3 parameters: name, input schema and output schema.
Schema defines the name and data type of the attributes that the operator would be receiving or sending.
Input Schema defines the schema that would be entering the operator.
Output Schema defines the schema that would be sent ahead from the operator.
scan_schema
andparser_schema
are two such schemas defined in the example above.scan_schema
contains the attributes would be sent by the directory scanner operator which would be file name and the last modified timestamp of the file.parser_schema
would contain the attributes that would be populated after reading the input file from the source directory.CCUsageReader
operator class will parse the contents of the file, raise errors if any, populate the output tuple and then emit it to the next operator.compose_flow
adds on the standard pre defined operators like aggregator, enricher and kafka sink after the ingress flow.
Parsers¶
Parser operators are used for parsing data from input source and setting it into tuple format. Here the data read from source is converted to a format which would be easier for the feed application to consume and process. A sample of a parser class operator would look like the snippet below:
class CCUsageReader(CSVParser):
"""A sample operator that generates cc_usage related data"""
CURRENCY_SIGN = "$"
@oxygen_operator()
def __init__(
self, batch_size=None, delimiter=",", move_path=None):
super(CCUsageReader, self).__init__(
batch_size=batch_size, date_format=None, delimiter=delimiter,
epoch_attribute=CA_EPOCH_ATTRIBUTE, move_path=move_path, row_processor=self._parse_row)
def set_up(self):
super(CCUsageReader, self).set_up()
def _parse_row(self, fields, out_tuple):
"""Parses the fields in a row"""
out_tuple.departmentName = fields[3].upper()
out_tuple.cardHolderName = fields[4].upper()
out_tuple.customerID = fields[2]
out_tuple.description = ""
out_tuple.merchant = fields[5].upper()
out_tuple.category = category = fields[6].upper()
setattr(out_tuple, CA_TIMESTAMP_ATTRIBUTE, int(self._datetime_parser.utime(fields[7])))
out_tuple.transactionAmount = float(fields[8].split(CCUsageReader.CURRENCY_SIGN)[1])
out_tuple.transactionType = fields[9]
The class
CCUsageReader
is inherited fromCSVParser
base class.The
delimiter
passed to the constructor will split each line in the file and then send the list to the_parse_row
method.The method
_parse_row
is passed on asrow_processor
to the constructor of theCSVParser
class.The method will have 2 parameters passed to it -
fields
is the list of the raw data of a single line read from a file, the list is created based on the split set by thedelimiter
.out_tuple
is the output tuple for the operator based on the output schema set in the feed application class.