Интерфейс программирования
В этом подразделе мы опишем интерфейс программирования. На рис. 6 показан Java-класс, реализующий SQL/MR-функцию sessionize (для нашего сквозного примера формирования пользовательских сессий).
public class Sessionize implements PartitionFunction { // Constructor (called at initialization)
public Sessionize (RuntimeContract contract) { InputInfo inputInfo = contract.get InputInfo() ;
// Determine time column
String timeColumnName = contract.useArgumentClause(”timecolumn”).getSingleValue(); timeColumnIndex_ = inputInfo.getColumnIndex(timeColumnName);
// Determine timeout
String timeoutValue = contract.useArgumentClause(”timeout”).getSingleValue(); timeout_ = Integer.parseInt(timeoutValue);
// Define output columns
List outputColumns = new ArrayList(); outputColumns.addAll(inputInfo.getColumns()); outputColumns.add(new ColumnDefinition(”sessionid”, SqlType.integer()));
// Complete the contract
contract.setOutputInfo( new OutputInfo(outputColumns)); contract.complete(); }
// Operate method ( called at runtime, for each partition)
public void operateOnPartition( PartitionDefinition partition, RowIterator inputIterator, // Iterates over all rows in the partition
RowEmitter outputEmitter // Used to emit output rows
) { int currentSessionId = 0 ; int lastTime = Integer.MIN_VALUE;
// Advance through each row in partition
while (inputIterator.advanceToNextRow()) { // Determine if time of this click is more than timeout after the last
int currentTime = inputIterator.getIntAt(timeColumnIndex_); if(currentTime > lastTime + timeout_) ++currentSessionId; // Emit ouput row with all input columns, plus current session id
outputEmitter.addFromRow(inputIterator); outputEmitter.addInt(currentSessionId); outputEmitter.emitRow(); lastTime = currentTime; } }
// State saved at initialization, used during runtime
private int timeColumnIndex_; private int timeout_; };
Рис. 6. Реализация повторно используемой функции sessionize с использованием Java API SQL/MR.