From: Janus Varmarken Date: Fri, 4 May 2018 08:00:44 +0000 (-0700) Subject: Major revamp of FlowPatternFinder to ensure thread safety. Added generic structure... X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=29b7ce5956be756276d451980aecbf7f4db6af55;p=pingpong.git Major revamp of FlowPatternFinder to ensure thread safety. Added generic structure for comparison code which should allow for easy plugin of more sophisticated comparison algorithm later on. --- diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPatternFinder.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPatternFinder.java index 9bbd99c..9dba468 100644 --- a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPatternFinder.java +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPatternFinder.java @@ -1,138 +1,169 @@ package edu.uci.iotproject; +import edu.uci.iotproject.comparison.ComparisonFunctions; +import edu.uci.iotproject.comparison.CompleteMatchPatternComparisonResult; +import edu.uci.iotproject.comparison.PatternComparisonTask; import org.pcap4j.core.NotOpenException; import org.pcap4j.core.PcapHandle; import org.pcap4j.core.PcapNativeException; import org.pcap4j.core.PcapPacket; +import org.pcap4j.packet.DnsPacket; import org.pcap4j.packet.IpV4Packet; -import org.pcap4j.packet.Packet; import org.pcap4j.packet.TcpPacket; -import org.pcap4j.packet.DnsPacket; import java.io.EOFException; import java.net.UnknownHostException; -import java.time.Instant; import java.util.*; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.*; + /** - * Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace. - * We use 2 threads: - * 1) The first thread (main thread) collects conversations from the PCAP stream and put them into our data structure. - * 2) The second thread (checker thread) checks the collected conversation. + *

Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace.

* - * @author Janus Varmarken - * @author Rahmadi Trimananda + *

+ * The (entire) PCAP trace is traversed and parsed on one thread (specifically, the thread that calls + * {@link #findFlowPattern()}). This thread builds a {@link DnsMap} using the DNS packets present in the trace and uses + * that {@code DnsMap} to reassemble {@link Conversation}s that potentially match the provided + * {@link FlowPattern} (in that one end/party of said conversations matches the hostname(s) specified by the given + * {@code FlowPattern}). + * These potential matches are then examined on background worker thread(s) to determine if they are indeed a (complete) + * matches of the provided {@code FlowPattern}. + *

+ * + * @author Janus Varmarken {@literal } + * @author Rahmadi Trimananda {@literal } */ public class FlowPatternFinder { - /* Class properties */ - private Map> connections; - private Queue conversations; - private DnsMap dnsMap; - private PcapHandle pcap; - private FlowPattern pattern; - private AtomicBoolean isEoF; - - - /* Constructor */ - public FlowPatternFinder(PcapHandle _pcap, FlowPattern _pattern) { + /* Begin class properties */ + /** + * {@link ExecutorService} responsible for parallelizing pattern searches. + * Declared as static to allow for reuse of threads across different instances of {@code FlowPatternFinder} and to + * avoid the overhead of initializing a new thread pool for each {@code FlowPatternFinder} instance. + */ + private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool(); + /* End class properties */ - this.connections = new ConcurrentHashMap>(); - this.conversations = new ConcurrentLinkedQueue(); - this.dnsMap = new DnsMap(); - this.isEoF = new AtomicBoolean(false); + /* Begin instance properties */ + /** + * Holds a set of {@link Conversation}s that potentially match {@link #mPattern} since each individual + * {@code Conversation} is communication with the hostname identified by {@code mPattern.getHostname()}. + * Note that due to limitations of the {@link Set} interface (specifically, there is no {@code get(T t)} method), + * we have to resort to a {@link Map} (in which keys map to themselves) to "mimic" a set with {@code get(T t)} + * functionality. + * + * @see this question on StackOverflow.com + */ + private final Map mConversations; - // Get input parameters - this.pcap = _pcap; - this.pattern = _pattern; - } - - - public void start() { - - // Spawn the main thread - Thread mainThread = new Thread(new Runnable() { - public void run() { - findFlowPattern(); - } - }); - mainThread.start(); + private final DnsMap mDnsMap; + private final PcapHandle mPcap; + private final FlowPattern mPattern; - // Spawn the checker thread - Thread checkerThread = new Thread(new Runnable() { - public void run() { - find(); - } - }); - checkerThread.start(); + private final List> mPendingComparisons = new ArrayList<>(); + /* End instance properties */ - /* TODO: Join the threads if we want it to be blocking - try { - mainThread.join(); - checkerThread.join(); - } catch(InterruptedException ex) { - ex.printStackTrace(); - }*/ - System.out.println("[ start ] Main and checker threads started!"); + /** + * Constructs a new {@code FlowPatternFinder}. + * @param pcap an open {@link PcapHandle} that provides access to the trace that is to be examined. + * @param pattern the {@link FlowPattern} to search for. + */ + public FlowPatternFinder(PcapHandle pcap, FlowPattern pattern) { + this.mConversations = new HashMap<>(); + this.mDnsMap = new DnsMap(); + this.mPcap = Objects.requireNonNull(pcap, + String.format("Argument of type '%s' cannot be null", PcapHandle.class.getSimpleName())); + this.mPattern = Objects.requireNonNull(pattern, + String.format("Argument of type '%s' cannot be null", FlowPattern.class.getSimpleName())); } + /** + * Starts the pattern search. + */ + public void start() { + findFlowPattern(); + } /** * Find patterns based on the FlowPattern object (run by a thread) */ private void findFlowPattern() { - int counter = 0; try { PcapPacket packet; - Set seqNumberSet = new HashSet(); - int patternLength = pattern.getLength(); - while ((packet = pcap.getNextPacketEx()) != null) { - - // Check if this is a valid DNS packet - dnsMap.validateAndAddNewEntry(packet); + int patternLength = mPattern.getLength(); + while ((packet = mPcap.getNextPacketEx()) != null) { + // Let DnsMap handle DNS packets. + if (packet.get(DnsPacket.class) != null) { + // Check if this is a valid DNS packet + mDnsMap.validateAndAddNewEntry(packet); + continue; + } // For now, we only work support pattern search in TCP over IPv4. IpV4Packet ipPacket = packet.get(IpV4Packet.class); TcpPacket tcpPacket = packet.get(TcpPacket.class); - if (ipPacket == null || tcpPacket == null) + if (ipPacket == null || tcpPacket == null) { continue; + } String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress(); String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress(); int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt(); int dstPort = tcpPacket.getHeader().getDstPort().valueAsInt(); - // Is this packet related to the pattern and coming to/from the cloud server? - boolean fromServer = dnsMap.isRelatedToCloudServer(srcAddress, pattern.getHostname()); - boolean fromClient = dnsMap.isRelatedToCloudServer(dstAddress, pattern.getHostname()); - if (!fromServer && !fromClient) // Packet not related to pattern, skip it. + // Is this packet related to the pattern; i.e. is it going to (or coming from) the cloud server? + boolean fromServer = mDnsMap.isRelatedToCloudServer(srcAddress, mPattern.getHostname()); + boolean fromClient = mDnsMap.isRelatedToCloudServer(dstAddress, mPattern.getHostname()); + if (!fromServer && !fromClient) { + // Packet not related to pattern, skip it. continue; - if (tcpPacket.getPayload() == null) // We skip non-payload control packets as these are less predictable - continue; - // Identify conversations (connections/sessions) by the four-tuple (clientIp, clientPort, serverIp, serverPort). - // TODO: this is strictly not sufficient to differentiate one TCP session from another, but should suffice for now. + } + if (tcpPacket.getPayload() == null) { + // We skip non-payload control packets as these are less predictable + continue; + } + // Conversations (connections/sessions) are identified by the four-tuple + // (clientIp, clientPort, serverIp, serverPort) (see Conversation Javadoc). + // Create "dummy" conversation for looking up an existing entry. Conversation conversation = fromClient ? new Conversation(srcAddress, srcPort, dstAddress, dstPort) : new Conversation(dstAddress, dstPort, srcAddress, srcPort); - // Create new conversation entry, or append packet to existing. - List listPcapPacket = connections.get(conversation); - if (listPcapPacket == null) { - listPcapPacket = new ArrayList(); - connections.put(conversation, listPcapPacket); - } - int seqNumber = packet.get(TcpPacket.class).getHeader().getSequenceNumber(); - boolean retransmission = seqNumberSet.contains(seqNumber); - if (!retransmission) { // Do not add if retransmission -> avoid duplicate packets in flow - listPcapPacket.add(packet); - // End of conversation -> trigger thread to check - if (listPcapPacket.size() == patternLength) - conversations.add(conversation); - seqNumberSet.add(seqNumber); + // Add the packet so that the "dummy" conversation can be immediately added to the map if no entry + // exists for the conversation that the current packet belongs to. + conversation.addPacket(packet, true); + // Add the new conversation to the map if an equal entry is not already present. + // If an existing entry is already present, the current packet is simply added to that conversation. + mConversations.merge(conversation, conversation, (existingEntry, toMerge) -> { + // toMerge only has a single packet, which is the same as referred to by 'packet' variable, but need + // this hack as 'packet' is not final and hence cannot be referred to in a lambda. + existingEntry.addPacket(toMerge.getPackets().get(0), true); + return existingEntry; + }); + // Refresh reference to point to entry in map (in case packet was added to existing entry). + conversation = mConversations.get(conversation); + if (conversation.getPackets().size() == mPattern.getLength()) { + // Conversation reached a size that matches the expected size. + // Remove the Conversation from the map and start the analysis. + // Any future packets identified by the same four tuple will be tied to a new Conversation instance. + // This might, for example, occur if the same conversation is reused for multiple events. + mConversations.remove(conversation); + // Create comparison task and send to executor service. + PatternComparisonTask comparisonTask = + new PatternComparisonTask<>(conversation, mPattern, ComparisonFunctions.COMPLETE_MATCH); + mPendingComparisons.add(EXECUTOR_SERVICE.submit(comparisonTask)); } } } catch (EOFException eofe) { - while (isEoF.compareAndSet(false, true) == false); // Try to signal EoF! System.out.println("[ findFlowPattern ] Finished processing entire PCAP stream!"); + System.out.println("[ findFlowPattern ] Now waiting for comparisons to finish..."); + // Wait for all comparisons to finish, then output their results to std.out. + for(Future comparisonTask : mPendingComparisons) { + try { + // Blocks until result is ready. + CompleteMatchPatternComparisonResult comparisonResult = comparisonTask.get(); + if (comparisonResult.getResult()) { + System.out.println(comparisonResult.getTextualDescription()); + } + } catch (InterruptedException|ExecutionException e) { + e.printStackTrace(); + } + } } catch (UnknownHostException | PcapNativeException | NotOpenException | @@ -140,42 +171,46 @@ public class FlowPatternFinder { ex.printStackTrace(); } } - - - /** - * Checker to match collected patterns (run by a thread) - */ - private void find() { - while (isEoF.get() == false) { // Continue until EoF - // Get the object from the queue - while(conversations.peek() == null) { // Wait until queue is not empty - if (isEoF.get() == true) // Return if EoF - return; - } - Conversation conversation = conversations.poll(); - // Get the object and remove it from the Map (housekeeping) - List packets = connections.remove(conversation); - boolean completeMatch = true; - for (int i = 0; i < packets.size(); i++) { - TcpPacket tcpPacket = packets.get(i).get(TcpPacket.class); - if (tcpPacket.getPayload().length() != pattern.getPacketOrder().get(i)) { - completeMatch = false; - break; - } - } - if (completeMatch) { - PcapPacket firstPacketInFlow = packets.get(0); - System.out.println( - String.format("[ find ] Detected a complete match of pattern '%s' at %s!", - pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString())); - } /*else { - PcapPacket firstPacketInFlow = packets.get(0); - System.out.println( - String.format("[ detected a mismatch of pattern '%s' at %s]", - pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString())); - }*/ - } - } +// private static class PatternComparisonTask implements Runnable { +// +// private final Conversation mConversation; +// private final FlowPattern mFlowPattern; +// +// private PatternComparisonTask(Conversation conversation, FlowPattern pattern) { +// this.mConversation = conversation; +// this.mFlowPattern = pattern; +// } +// +// @Override +// public void run() { +// if(isCompleteMatch()) { +// PcapPacket firstPacketInFlow = mConversation.getPackets().get(0); +// System.out.println( +// String.format("[ find ] Detected a complete match of pattern '%s' at %s!", +// mFlowPattern.getPatternId(), firstPacketInFlow.getTimestamp().toString())); +// } +// } +// +// /** +// * Compares the order of packet lengths present in {@link #mConversation} with those found in +// * {@link #mFlowPattern}. +// * @return {@code true} if the packet lengths matches pairwise for all indices, {@code false} otherwise. +// */ +// private boolean isCompleteMatch() { +// List convPackets = mConversation.getPackets(); +// if (convPackets.size() != mFlowPattern.getLength()) { +// return false; +// } +// for (int i = 0; i < convPackets.size(); i++) { +// TcpPacket tcpPacket = convPackets.get(i).get(TcpPacket.class); +// if (tcpPacket.getPayload().length() != mFlowPattern.getPacketOrder().get(i)) { +// return false; +// } +// } +// return true; +// } +// +// } } diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java index d727683..0b01c2c 100644 --- a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java @@ -1,14 +1,9 @@ package edu.uci.iotproject; import org.pcap4j.core.*; -import org.pcap4j.packet.*; -import org.pcap4j.packet.DnsPacket; -import org.pcap4j.packet.namednumber.DnsResourceRecordType; import java.io.EOFException; -import java.net.Inet4Address; import java.net.UnknownHostException; -import java.util.*; import java.util.concurrent.TimeoutException; /** @@ -25,8 +20,8 @@ public class Main { public static void main(String[] args) throws PcapNativeException, NotOpenException, EOFException, TimeoutException, UnknownHostException { - //final String fileName = "/users/varmarken/Desktop/wlan1.local.dns.pcap"; - final String fileName = "/home/rtrimana/pcap_processing/smart_home_traffic/Code/Projects/SmartPlugDetector/pcap/wlan1.local.remote.dns.pcap"; + final String fileName = "/users/varmarken/Desktop/wlan1.local.dns.pcap"; +// final String fileName = "/home/rtrimana/pcap_processing/smart_home_traffic/Code/Projects/SmartPlugDetector/pcap/wlan1.local.remote.dns.pcap"; // ====== Debug code ====== PcapHandle handle; diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/AbstractPatternComparisonResult.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/AbstractPatternComparisonResult.java new file mode 100644 index 0000000..3b82573 --- /dev/null +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/AbstractPatternComparisonResult.java @@ -0,0 +1,49 @@ +package edu.uci.iotproject.comparison; + +import edu.uci.iotproject.Conversation; +import edu.uci.iotproject.FlowPattern; + +/** + * Models the result of comparing a {@link Conversation} and a {@link FlowPattern}. + * + * @param The type of the result; can be something as simple as a {@code Boolean} for a complete match comparison or + * or a complex data type for more sophisticated comparisons. + */ +public abstract class AbstractPatternComparisonResult { + + /** + * The result of the comparison. + */ + private final T mResult; + + /** + * The {@code Conversation} that was compared against {@link #mFlowPattern}. + */ + protected final Conversation mConversation; + + /** + * The {@code FlowPattern} that {@link #mConversation} was compared against. + */ + protected final FlowPattern mFlowPattern; + + public AbstractPatternComparisonResult(Conversation conversation, FlowPattern flowPattern, T result) { + this.mResult = result; + this.mConversation = conversation; + this.mFlowPattern = flowPattern; + } + + /** + * Gets the result of the comparison. + * @return the result of the comparison + */ + public T getResult() { + return mResult; + } + + /** + * Get a textual description of the comparison result suitable for output on std.out. + * @returna a textual description of the comparison result suitable for output on std.out. + */ + abstract public String getTextualDescription(); + +} \ No newline at end of file diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/ComparisonFunctions.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/ComparisonFunctions.java new file mode 100644 index 0000000..2f44f3b --- /dev/null +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/ComparisonFunctions.java @@ -0,0 +1,41 @@ +package edu.uci.iotproject.comparison; + +import edu.uci.iotproject.Conversation; +import edu.uci.iotproject.FlowPattern; +import org.pcap4j.core.PcapPacket; +import org.pcap4j.packet.TcpPacket; + +import java.util.List; +import java.util.function.BiFunction; + +/** + * Contains concrete implementations of comparison functions that compare a {@link Conversation} and a {@link FlowPattern}. + * These functions are supplied to {@link PatternComparisonTask} which in turn facilitates comparison on a background thread. + * This design provides plugability: currently, we only support complete match comparison, but further down the road we + * can simply introduce more sophisticated comparison functions here (e.g. Least Common Substring) simply replace the + * argument passed to the {@link PatternComparisonTask} constructor to switch between the different implementations. + * + * @author Janus Varmarken {@literal } + * @author Rahmadi Trimananda {@literal } + */ +public class ComparisonFunctions { + + /** + * Comparison function that checks for a complete match, i.e. a match in which every packet in the + * {@link Conversation} has the same length as the corresponding packet in the {@link FlowPattern}. + */ + public static final BiFunction COMPLETE_MATCH = (conversation, flowPattern) -> { + List convPackets = conversation.getPackets(); + if (convPackets.size() != flowPattern.getLength()) { + return new CompleteMatchPatternComparisonResult(conversation, flowPattern, false); + } + for (int i = 0; i < convPackets.size(); i++) { + TcpPacket tcpPacket = convPackets.get(i).get(TcpPacket.class); + if (tcpPacket.getPayload().length() != flowPattern.getPacketOrder().get(i)) { + return new CompleteMatchPatternComparisonResult(conversation, flowPattern, false); + } + } + return new CompleteMatchPatternComparisonResult(conversation, flowPattern, true); + }; + +} diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/CompleteMatchPatternComparisonResult.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/CompleteMatchPatternComparisonResult.java new file mode 100644 index 0000000..3ddd884 --- /dev/null +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/CompleteMatchPatternComparisonResult.java @@ -0,0 +1,30 @@ +package edu.uci.iotproject.comparison; + +import edu.uci.iotproject.Conversation; +import edu.uci.iotproject.FlowPattern; + +/** + * The result of a search for a complete match. Serves as an example implementation of + * {@link AbstractPatternComparisonResult}. + * + * @author Janus Varmarken {@literal } + * @author Rahmadi Trimananda {@literal } + */ +public class CompleteMatchPatternComparisonResult extends AbstractPatternComparisonResult { + + public CompleteMatchPatternComparisonResult(Conversation conversation, FlowPattern flowPattern, Boolean result) { + super(conversation, flowPattern, result); + } + + @Override + public String getTextualDescription() { + if (getResult()) { + return String.format("[ find ] Detected a COMPLETE MATCH of pattern '%s' at %s!", + mFlowPattern.getPatternId(), mConversation.getPackets().get(0).getTimestamp().toString()); + } else { + return String.format("[ miss ] flow starting at %s was **not** a complete match of pattern '%s'", + mConversation.getPackets().get(0).getTimestamp().toString(), mFlowPattern.getPatternId()); + } + } + +} \ No newline at end of file diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/PatternComparisonTask.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/PatternComparisonTask.java new file mode 100644 index 0000000..705646c --- /dev/null +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/PatternComparisonTask.java @@ -0,0 +1,40 @@ +package edu.uci.iotproject.comparison; + +import edu.uci.iotproject.Conversation; +import edu.uci.iotproject.FlowPattern; + +import java.util.concurrent.Callable; +import java.util.function.BiFunction; + +/** + * A task that compares a given {@link Conversation} and {@link FlowPattern} using a provided comparison function. + * The task implements {@link Callable} and can hence be executed on a background thread. + * + * @author Janus Varmarken {@literal } + * @author Rahmadi Trimananda {@literal } + */ +public class PatternComparisonTask> implements Callable { + + private final Conversation mConversation; + private final FlowPattern mFlowPattern; + private final BiFunction mComparitor; + + /** + * Create a new {@code PatternComparisonTask}. + * + * @param conversation The conversation to compare against {@code pattern}. + * @param pattern The pattern to compare against {@code conversation}. + * @param comparisonFunction The function that compares {@code pattern} and {@code conversation}. + */ + public PatternComparisonTask(Conversation conversation, FlowPattern pattern, BiFunction comparisonFunction) { + this.mConversation = conversation; + this.mFlowPattern = pattern; + this.mComparitor = comparisonFunction; + } + + @Override + public R call() throws Exception { + return mComparitor.apply(mConversation, mFlowPattern); + } + +} \ No newline at end of file