0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.examples.sql.streaming;
0018
0019 import org.apache.spark.api.java.function.FlatMapFunction;
0020 import org.apache.spark.api.java.function.MapFunction;
0021 import org.apache.spark.api.java.function.MapGroupsWithStateFunction;
0022 import org.apache.spark.sql.*;
0023 import org.apache.spark.sql.streaming.GroupState;
0024 import org.apache.spark.sql.streaming.GroupStateTimeout;
0025 import org.apache.spark.sql.streaming.StreamingQuery;
0026
0027 import java.io.Serializable;
0028 import java.sql.Timestamp;
0029 import java.util.*;
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044 public final class JavaStructuredSessionization {
0045
0046 public static void main(String[] args) throws Exception {
0047 if (args.length < 2) {
0048 System.err.println("Usage: JavaStructuredSessionization <hostname> <port>");
0049 System.exit(1);
0050 }
0051
0052 String host = args[0];
0053 int port = Integer.parseInt(args[1]);
0054
0055 SparkSession spark = SparkSession
0056 .builder()
0057 .appName("JavaStructuredSessionization")
0058 .getOrCreate();
0059
0060
0061 Dataset<Row> lines = spark
0062 .readStream()
0063 .format("socket")
0064 .option("host", host)
0065 .option("port", port)
0066 .option("includeTimestamp", true)
0067 .load();
0068
0069 FlatMapFunction<LineWithTimestamp, Event> linesToEvents =
0070 new FlatMapFunction<LineWithTimestamp, Event>() {
0071 @Override
0072 public Iterator<Event> call(LineWithTimestamp lineWithTimestamp) {
0073 ArrayList<Event> eventList = new ArrayList<>();
0074 for (String word : lineWithTimestamp.getLine().split(" ")) {
0075 eventList.add(new Event(word, lineWithTimestamp.getTimestamp()));
0076 }
0077 return eventList.iterator();
0078 }
0079 };
0080
0081
0082 Dataset<Event> events = lines
0083 .withColumnRenamed("value", "line")
0084 .as(Encoders.bean(LineWithTimestamp.class))
0085 .flatMap(linesToEvents, Encoders.bean(Event.class));
0086
0087
0088
0089
0090
0091 MapGroupsWithStateFunction<String, Event, SessionInfo, SessionUpdate> stateUpdateFunc =
0092 new MapGroupsWithStateFunction<String, Event, SessionInfo, SessionUpdate>() {
0093 @Override public SessionUpdate call(
0094 String sessionId, Iterator<Event> events, GroupState<SessionInfo> state) {
0095
0096 if (state.hasTimedOut()) {
0097 SessionUpdate finalUpdate = new SessionUpdate(
0098 sessionId, state.get().calculateDuration(), state.get().getNumEvents(), true);
0099 state.remove();
0100 return finalUpdate;
0101
0102 } else {
0103
0104 long maxTimestampMs = Long.MIN_VALUE;
0105 long minTimestampMs = Long.MAX_VALUE;
0106 int numNewEvents = 0;
0107 while (events.hasNext()) {
0108 Event e = events.next();
0109 long timestampMs = e.getTimestamp().getTime();
0110 maxTimestampMs = Math.max(timestampMs, maxTimestampMs);
0111 minTimestampMs = Math.min(timestampMs, minTimestampMs);
0112 numNewEvents += 1;
0113 }
0114 SessionInfo updatedSession = new SessionInfo();
0115
0116
0117 if (state.exists()) {
0118 SessionInfo oldSession = state.get();
0119 updatedSession.setNumEvents(oldSession.numEvents + numNewEvents);
0120 updatedSession.setStartTimestampMs(oldSession.startTimestampMs);
0121 updatedSession.setEndTimestampMs(Math.max(oldSession.endTimestampMs, maxTimestampMs));
0122 } else {
0123 updatedSession.setNumEvents(numNewEvents);
0124 updatedSession.setStartTimestampMs(minTimestampMs);
0125 updatedSession.setEndTimestampMs(maxTimestampMs);
0126 }
0127 state.update(updatedSession);
0128
0129 state.setTimeoutDuration("10 seconds");
0130 return new SessionUpdate(
0131 sessionId, state.get().calculateDuration(), state.get().getNumEvents(), false);
0132 }
0133 }
0134 };
0135
0136
0137 Dataset<SessionUpdate> sessionUpdates = events
0138 .groupByKey(
0139 new MapFunction<Event, String>() {
0140 @Override public String call(Event event) {
0141 return event.getSessionId();
0142 }
0143 }, Encoders.STRING())
0144 .mapGroupsWithState(
0145 stateUpdateFunc,
0146 Encoders.bean(SessionInfo.class),
0147 Encoders.bean(SessionUpdate.class),
0148 GroupStateTimeout.ProcessingTimeTimeout());
0149
0150
0151 StreamingQuery query = sessionUpdates
0152 .writeStream()
0153 .outputMode("update")
0154 .format("console")
0155 .start();
0156
0157 query.awaitTermination();
0158 }
0159
0160
0161
0162
0163 public static class LineWithTimestamp implements Serializable {
0164 private String line;
0165 private Timestamp timestamp;
0166
0167 public Timestamp getTimestamp() { return timestamp; }
0168 public void setTimestamp(Timestamp timestamp) { this.timestamp = timestamp; }
0169
0170 public String getLine() { return line; }
0171 public void setLine(String sessionId) { this.line = sessionId; }
0172 }
0173
0174
0175
0176
0177 public static class Event implements Serializable {
0178 private String sessionId;
0179 private Timestamp timestamp;
0180
0181 public Event() { }
0182 public Event(String sessionId, Timestamp timestamp) {
0183 this.sessionId = sessionId;
0184 this.timestamp = timestamp;
0185 }
0186
0187 public Timestamp getTimestamp() { return timestamp; }
0188 public void setTimestamp(Timestamp timestamp) { this.timestamp = timestamp; }
0189
0190 public String getSessionId() { return sessionId; }
0191 public void setSessionId(String sessionId) { this.sessionId = sessionId; }
0192 }
0193
0194
0195
0196
0197 public static class SessionInfo implements Serializable {
0198 private int numEvents = 0;
0199 private long startTimestampMs = -1;
0200 private long endTimestampMs = -1;
0201
0202 public int getNumEvents() { return numEvents; }
0203 public void setNumEvents(int numEvents) { this.numEvents = numEvents; }
0204
0205 public long getStartTimestampMs() { return startTimestampMs; }
0206 public void setStartTimestampMs(long startTimestampMs) {
0207 this.startTimestampMs = startTimestampMs;
0208 }
0209
0210 public long getEndTimestampMs() { return endTimestampMs; }
0211 public void setEndTimestampMs(long endTimestampMs) { this.endTimestampMs = endTimestampMs; }
0212
0213 public long calculateDuration() { return endTimestampMs - startTimestampMs; }
0214
0215 @Override public String toString() {
0216 return "SessionInfo(numEvents = " + numEvents +
0217 ", timestamps = " + startTimestampMs + " to " + endTimestampMs + ")";
0218 }
0219 }
0220
0221
0222
0223
0224 public static class SessionUpdate implements Serializable {
0225 private String id;
0226 private long durationMs;
0227 private int numEvents;
0228 private boolean expired;
0229
0230 public SessionUpdate() { }
0231
0232 public SessionUpdate(String id, long durationMs, int numEvents, boolean expired) {
0233 this.id = id;
0234 this.durationMs = durationMs;
0235 this.numEvents = numEvents;
0236 this.expired = expired;
0237 }
0238
0239 public String getId() { return id; }
0240 public void setId(String id) { this.id = id; }
0241
0242 public long getDurationMs() { return durationMs; }
0243 public void setDurationMs(long durationMs) { this.durationMs = durationMs; }
0244
0245 public int getNumEvents() { return numEvents; }
0246 public void setNumEvents(int numEvents) { this.numEvents = numEvents; }
0247
0248 public boolean isExpired() { return expired; }
0249 public void setExpired(boolean expired) { this.expired = expired; }
0250 }
0251 }