Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.examples.sql.streaming;
0018 
0019 import org.apache.spark.api.java.function.FlatMapFunction;
0020 import org.apache.spark.api.java.function.MapFunction;
0021 import org.apache.spark.api.java.function.MapGroupsWithStateFunction;
0022 import org.apache.spark.sql.*;
0023 import org.apache.spark.sql.streaming.GroupState;
0024 import org.apache.spark.sql.streaming.GroupStateTimeout;
0025 import org.apache.spark.sql.streaming.StreamingQuery;
0026 
0027 import java.io.Serializable;
0028 import java.sql.Timestamp;
0029 import java.util.*;
0030 
0031 /**
0032  * Counts words in UTF8 encoded, '\n' delimited text received from the network.
0033  * <p>
0034  * Usage: JavaStructuredNetworkWordCount <hostname> <port>
0035  * <hostname> and <port> describe the TCP server that Structured Streaming
0036  * would connect to receive data.
0037  * <p>
0038  * To run this on your local machine, you need to first run a Netcat server
0039  * `$ nc -lk 9999`
0040  * and then run the example
0041  * `$ bin/run-example sql.streaming.JavaStructuredSessionization
0042  * localhost 9999`
0043  */
0044 public final class JavaStructuredSessionization {
0045 
0046   public static void main(String[] args) throws Exception {
0047     if (args.length < 2) {
0048       System.err.println("Usage: JavaStructuredSessionization <hostname> <port>");
0049       System.exit(1);
0050     }
0051 
0052     String host = args[0];
0053     int port = Integer.parseInt(args[1]);
0054 
0055     SparkSession spark = SparkSession
0056         .builder()
0057         .appName("JavaStructuredSessionization")
0058         .getOrCreate();
0059 
0060     // Create DataFrame representing the stream of input lines from connection to host:port
0061     Dataset<Row> lines = spark
0062         .readStream()
0063         .format("socket")
0064         .option("host", host)
0065         .option("port", port)
0066         .option("includeTimestamp", true)
0067         .load();
0068 
0069     FlatMapFunction<LineWithTimestamp, Event> linesToEvents =
0070       new FlatMapFunction<LineWithTimestamp, Event>() {
0071         @Override
0072         public Iterator<Event> call(LineWithTimestamp lineWithTimestamp) {
0073           ArrayList<Event> eventList = new ArrayList<>();
0074           for (String word : lineWithTimestamp.getLine().split(" ")) {
0075             eventList.add(new Event(word, lineWithTimestamp.getTimestamp()));
0076           }
0077           return eventList.iterator();
0078         }
0079       };
0080 
0081     // Split the lines into words, treat words as sessionId of events
0082     Dataset<Event> events = lines
0083         .withColumnRenamed("value", "line")
0084         .as(Encoders.bean(LineWithTimestamp.class))
0085         .flatMap(linesToEvents, Encoders.bean(Event.class));
0086 
0087     // Sessionize the events. Track number of events, start and end timestamps of session, and
0088     // and report session updates.
0089     //
0090     // Step 1: Define the state update function
0091     MapGroupsWithStateFunction<String, Event, SessionInfo, SessionUpdate> stateUpdateFunc =
0092       new MapGroupsWithStateFunction<String, Event, SessionInfo, SessionUpdate>() {
0093         @Override public SessionUpdate call(
0094             String sessionId, Iterator<Event> events, GroupState<SessionInfo> state) {
0095           // If timed out, then remove session and send final update
0096           if (state.hasTimedOut()) {
0097             SessionUpdate finalUpdate = new SessionUpdate(
0098                 sessionId, state.get().calculateDuration(), state.get().getNumEvents(), true);
0099             state.remove();
0100             return finalUpdate;
0101 
0102           } else {
0103             // Find max and min timestamps in events
0104             long maxTimestampMs = Long.MIN_VALUE;
0105             long minTimestampMs = Long.MAX_VALUE;
0106             int numNewEvents = 0;
0107             while (events.hasNext()) {
0108               Event e = events.next();
0109               long timestampMs = e.getTimestamp().getTime();
0110               maxTimestampMs = Math.max(timestampMs, maxTimestampMs);
0111               minTimestampMs = Math.min(timestampMs, minTimestampMs);
0112               numNewEvents += 1;
0113             }
0114             SessionInfo updatedSession = new SessionInfo();
0115 
0116             // Update start and end timestamps in session
0117             if (state.exists()) {
0118               SessionInfo oldSession = state.get();
0119               updatedSession.setNumEvents(oldSession.numEvents + numNewEvents);
0120               updatedSession.setStartTimestampMs(oldSession.startTimestampMs);
0121               updatedSession.setEndTimestampMs(Math.max(oldSession.endTimestampMs, maxTimestampMs));
0122             } else {
0123               updatedSession.setNumEvents(numNewEvents);
0124               updatedSession.setStartTimestampMs(minTimestampMs);
0125               updatedSession.setEndTimestampMs(maxTimestampMs);
0126             }
0127             state.update(updatedSession);
0128             // Set timeout such that the session will be expired if no data received for 10 seconds
0129             state.setTimeoutDuration("10 seconds");
0130             return new SessionUpdate(
0131                 sessionId, state.get().calculateDuration(), state.get().getNumEvents(), false);
0132           }
0133         }
0134       };
0135 
0136     // Step 2: Apply the state update function to the events streaming Dataset grouped by sessionId
0137     Dataset<SessionUpdate> sessionUpdates = events
0138         .groupByKey(
0139             new MapFunction<Event, String>() {
0140               @Override public String call(Event event) {
0141                 return event.getSessionId();
0142               }
0143             }, Encoders.STRING())
0144         .mapGroupsWithState(
0145             stateUpdateFunc,
0146             Encoders.bean(SessionInfo.class),
0147             Encoders.bean(SessionUpdate.class),
0148             GroupStateTimeout.ProcessingTimeTimeout());
0149 
0150     // Start running the query that prints the session updates to the console
0151     StreamingQuery query = sessionUpdates
0152         .writeStream()
0153         .outputMode("update")
0154         .format("console")
0155         .start();
0156 
0157     query.awaitTermination();
0158   }
0159 
0160   /**
0161    * User-defined data type representing the raw lines with timestamps.
0162    */
0163   public static class LineWithTimestamp implements Serializable {
0164     private String line;
0165     private Timestamp timestamp;
0166 
0167     public Timestamp getTimestamp() { return timestamp; }
0168     public void setTimestamp(Timestamp timestamp) { this.timestamp = timestamp; }
0169 
0170     public String getLine() { return line; }
0171     public void setLine(String sessionId) { this.line = sessionId; }
0172   }
0173 
0174   /**
0175    * User-defined data type representing the input events
0176    */
0177   public static class Event implements Serializable {
0178     private String sessionId;
0179     private Timestamp timestamp;
0180 
0181     public Event() { }
0182     public Event(String sessionId, Timestamp timestamp) {
0183       this.sessionId = sessionId;
0184       this.timestamp = timestamp;
0185     }
0186 
0187     public Timestamp getTimestamp() { return timestamp; }
0188     public void setTimestamp(Timestamp timestamp) { this.timestamp = timestamp; }
0189 
0190     public String getSessionId() { return sessionId; }
0191     public void setSessionId(String sessionId) { this.sessionId = sessionId; }
0192   }
0193 
0194   /**
0195    * User-defined data type for storing a session information as state in mapGroupsWithState.
0196    */
0197   public static class SessionInfo implements Serializable {
0198     private int numEvents = 0;
0199     private long startTimestampMs = -1;
0200     private long endTimestampMs = -1;
0201 
0202     public int getNumEvents() { return numEvents; }
0203     public void setNumEvents(int numEvents) { this.numEvents = numEvents; }
0204 
0205     public long getStartTimestampMs() { return startTimestampMs; }
0206     public void setStartTimestampMs(long startTimestampMs) {
0207       this.startTimestampMs = startTimestampMs;
0208     }
0209 
0210     public long getEndTimestampMs() { return endTimestampMs; }
0211     public void setEndTimestampMs(long endTimestampMs) { this.endTimestampMs = endTimestampMs; }
0212 
0213     public long calculateDuration() { return endTimestampMs - startTimestampMs; }
0214 
0215     @Override public String toString() {
0216       return "SessionInfo(numEvents = " + numEvents +
0217           ", timestamps = " + startTimestampMs + " to " + endTimestampMs + ")";
0218     }
0219   }
0220 
0221   /**
0222    * User-defined data type representing the update information returned by mapGroupsWithState.
0223    */
0224   public static class SessionUpdate implements Serializable {
0225     private String id;
0226     private long durationMs;
0227     private int numEvents;
0228     private boolean expired;
0229 
0230     public SessionUpdate() { }
0231 
0232     public SessionUpdate(String id, long durationMs, int numEvents, boolean expired) {
0233       this.id = id;
0234       this.durationMs = durationMs;
0235       this.numEvents = numEvents;
0236       this.expired = expired;
0237     }
0238 
0239     public String getId() { return id; }
0240     public void setId(String id) { this.id = id; }
0241 
0242     public long getDurationMs() { return durationMs; }
0243     public void setDurationMs(long durationMs) { this.durationMs = durationMs; }
0244 
0245     public int getNumEvents() { return numEvents; }
0246     public void setNumEvents(int numEvents) { this.numEvents = numEvents; }
0247 
0248     public boolean isExpired() { return expired; }
0249     public void setExpired(boolean expired) { this.expired = expired; }
0250   }
0251 }