Major revamp of FlowPatternFinder to ensure thread safety. Added generic structure...
authorJanus Varmarken <varmarken@gmail.com>
Fri, 4 May 2018 08:00:44 +0000 (01:00 -0700)
committerJanus Varmarken <varmarken@gmail.com>
Fri, 4 May 2018 08:00:44 +0000 (01:00 -0700)
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPatternFinder.java
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/AbstractPatternComparisonResult.java [new file with mode: 0644]
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/ComparisonFunctions.java [new file with mode: 0644]
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/CompleteMatchPatternComparisonResult.java [new file with mode: 0644]
Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/PatternComparisonTask.java [new file with mode: 0644]

index 9bbd99ce985a329270ab12cdef38d01b68350240..9dba468538fbeee813aae64396f893e2e60ac5ec 100644 (file)
 package edu.uci.iotproject;
 
+import edu.uci.iotproject.comparison.ComparisonFunctions;
+import edu.uci.iotproject.comparison.CompleteMatchPatternComparisonResult;
+import edu.uci.iotproject.comparison.PatternComparisonTask;
 import org.pcap4j.core.NotOpenException;
 import org.pcap4j.core.PcapHandle;
 import org.pcap4j.core.PcapNativeException;
 import org.pcap4j.core.PcapPacket;
+import org.pcap4j.packet.DnsPacket;
 import org.pcap4j.packet.IpV4Packet;
-import org.pcap4j.packet.Packet;
 import org.pcap4j.packet.TcpPacket;
-import org.pcap4j.packet.DnsPacket;
 
 import java.io.EOFException;
 import java.net.UnknownHostException;
-import java.time.Instant;
 import java.util.*;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.*;
+
 
 /**
- * Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace.
- * We use 2 threads:
- *  1) The first thread (main thread) collects conversations from the PCAP stream and put them into our data structure.
- *  2) The second thread (checker thread) checks the collected conversation.
+ * <p>Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace.</p>
  *
- * @author Janus Varmarken
- * @author Rahmadi Trimananda
+ * <p>
+ * The (entire) PCAP trace is traversed and parsed on one thread (specifically, the thread that calls
+ * {@link #findFlowPattern()}). This thread builds a {@link DnsMap} using the DNS packets present in the trace and uses
+ * that {@code DnsMap} to reassemble {@link Conversation}s that <em>potentially</em> match the provided
+ * {@link FlowPattern} (in that one end/party of said conversations matches the hostname(s) specified by the given
+ * {@code FlowPattern}).
+ * These potential matches are then examined on background worker thread(s) to determine if they are indeed a (complete)
+ * matches of the provided {@code FlowPattern}.
+ * </p>
+ *
+ * @author Janus Varmarken {@literal <jvarmark@uci.edu>}
+ * @author Rahmadi Trimananda {@literal <rtrimana@uci.edu>}
  */
 public class FlowPatternFinder {
 
-    /* Class properties */
-    private Map<Conversation, List<PcapPacket>> connections;
-    private Queue<Conversation> conversations;
-    private DnsMap dnsMap;
-    private PcapHandle pcap;
-    private FlowPattern pattern;
-    private AtomicBoolean isEoF;
-    
-    
-    /* Constructor */
-    public FlowPatternFinder(PcapHandle _pcap, FlowPattern _pattern) {
+    /* Begin class properties */
+    /**
+     * {@link ExecutorService} responsible for parallelizing pattern searches.
+     * Declared as static to allow for reuse of threads across different instances of {@code FlowPatternFinder} and to
+     * avoid the overhead of initializing a new thread pool for each {@code FlowPatternFinder} instance.
+     */
+    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
+    /* End class properties */
 
-        this.connections = new ConcurrentHashMap<Conversation, List<PcapPacket>>();
-        this.conversations = new ConcurrentLinkedQueue<Conversation>();
-        this.dnsMap = new DnsMap();
-        this.isEoF = new AtomicBoolean(false);
+    /* Begin instance properties */
+    /**
+     * Holds a set of {@link Conversation}s that <em>potentially</em> match {@link #mPattern} since each individual
+     * {@code Conversation} is communication with the hostname identified by {@code mPattern.getHostname()}.
+     * Note that due to limitations of the {@link Set} interface (specifically, there is no {@code get(T t)} method),
+     * we have to resort to a {@link Map} (in which keys map to themselves) to "mimic" a set with {@code get(T t)}
+     * functionality.
+     *
+     * @see <a href="https://stackoverflow.com/questions/7283338/getting-an-element-from-a-set">this question on StackOverflow.com</a>
+     */
+    private final Map<Conversation, Conversation> mConversations;
 
-        // Get input parameters
-        this.pcap = _pcap;
-        this.pattern = _pattern;
-    }
-    
-    
-    public void start() {
-    
-        // Spawn the main thread
-        Thread mainThread = new Thread(new Runnable() {
-            public void run() {
-                findFlowPattern();
-            }
-        });
-        mainThread.start();
+    private final DnsMap mDnsMap;
+    private final PcapHandle mPcap;
+    private final FlowPattern mPattern;
 
-        // Spawn the checker thread
-        Thread checkerThread = new Thread(new Runnable() {
-            public void run() {
-                find();
-            }
-        });
-        checkerThread.start();
+    private final List<Future<CompleteMatchPatternComparisonResult>> mPendingComparisons = new ArrayList<>();
+    /* End instance properties */
 
-        /* TODO: Join the threads if we want it to be blocking
-        try {
-            mainThread.join();
-            checkerThread.join();
-        } catch(InterruptedException ex) {
-            ex.printStackTrace();
-        }*/
-        System.out.println("[ start ] Main and checker threads started!");
+    /**
+     * Constructs a new {@code FlowPatternFinder}.
+     * @param pcap an <em>open</em> {@link PcapHandle} that provides access to the trace that is to be examined.
+     * @param pattern the {@link FlowPattern} to search for.
+     */
+    public FlowPatternFinder(PcapHandle pcap, FlowPattern pattern) {
+        this.mConversations = new HashMap<>();
+        this.mDnsMap = new DnsMap();
+        this.mPcap = Objects.requireNonNull(pcap,
+                String.format("Argument of type '%s' cannot be null", PcapHandle.class.getSimpleName()));
+        this.mPattern = Objects.requireNonNull(pattern,
+                String.format("Argument of type '%s' cannot be null", FlowPattern.class.getSimpleName()));
     }
 
+    /**
+     * Starts the pattern search.
+     */
+    public void start() {
+        findFlowPattern();
+    }
 
     /**
      * Find patterns based on the FlowPattern object (run by a thread)
      */
     private void findFlowPattern() {
-        int counter = 0;
         try {
             PcapPacket packet;
-            Set<Integer> seqNumberSet = new HashSet<Integer>();
-            int patternLength = pattern.getLength();
-            while ((packet = pcap.getNextPacketEx()) != null) {
-
-                // Check if this is a valid DNS packet
-                dnsMap.validateAndAddNewEntry(packet);
+            int patternLength = mPattern.getLength();
+            while ((packet = mPcap.getNextPacketEx()) != null) {
+                // Let DnsMap handle DNS packets.
+                if (packet.get(DnsPacket.class) != null) {
+                    // Check if this is a valid DNS packet
+                    mDnsMap.validateAndAddNewEntry(packet);
+                    continue;
+                }
                 // For now, we only work support pattern search in TCP over IPv4.
                 IpV4Packet ipPacket = packet.get(IpV4Packet.class);
                 TcpPacket tcpPacket = packet.get(TcpPacket.class);
-                if (ipPacket == null || tcpPacket == null)
+                if (ipPacket == null || tcpPacket == null) {
                     continue;
+                }
                 String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress();
                 String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress();
                 int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt();
                 int dstPort = tcpPacket.getHeader().getDstPort().valueAsInt();
-                // Is this packet related to the pattern and coming to/from the cloud server?
-                boolean fromServer = dnsMap.isRelatedToCloudServer(srcAddress, pattern.getHostname());
-                boolean fromClient = dnsMap.isRelatedToCloudServer(dstAddress, pattern.getHostname());
-                if (!fromServer && !fromClient)  // Packet not related to pattern, skip it.
+                // Is this packet related to the pattern; i.e. is it going to (or coming from) the cloud server?
+                boolean fromServer = mDnsMap.isRelatedToCloudServer(srcAddress, mPattern.getHostname());
+                boolean fromClient = mDnsMap.isRelatedToCloudServer(dstAddress, mPattern.getHostname());
+                if (!fromServer && !fromClient) {
+                    // Packet not related to pattern, skip it.
                     continue;
-                if (tcpPacket.getPayload() == null) // We skip non-payload control packets as these are less predictable
-                    continue; 
-                // Identify conversations (connections/sessions) by the four-tuple (clientIp, clientPort, serverIp, serverPort).
-                // TODO: this is strictly not sufficient to differentiate one TCP session from another, but should suffice for now.
+                }
+                if (tcpPacket.getPayload() == null) {
+                    // We skip non-payload control packets as these are less predictable
+                    continue;
+                }
+                // Conversations (connections/sessions) are identified by the four-tuple
+                // (clientIp, clientPort, serverIp, serverPort) (see Conversation Javadoc).
+                // Create "dummy" conversation for looking up an existing entry.
                 Conversation conversation = fromClient ? new Conversation(srcAddress, srcPort, dstAddress, dstPort) :
                         new Conversation(dstAddress, dstPort, srcAddress, srcPort);
-                // Create new conversation entry, or append packet to existing.
-                List<PcapPacket> listPcapPacket = connections.get(conversation);
-                if (listPcapPacket == null) {
-                    listPcapPacket = new ArrayList<PcapPacket>();
-                    connections.put(conversation, listPcapPacket);
-                }
-                int seqNumber = packet.get(TcpPacket.class).getHeader().getSequenceNumber();
-                boolean retransmission = seqNumberSet.contains(seqNumber);
-                if (!retransmission) { // Do not add if retransmission -> avoid duplicate packets in flow
-                    listPcapPacket.add(packet);
-                    // End of conversation -> trigger thread to check
-                    if (listPcapPacket.size() == patternLength)
-                        conversations.add(conversation);
-                    seqNumberSet.add(seqNumber);
+                // Add the packet so that the "dummy" conversation can be immediately added to the map if no entry
+                // exists for the conversation that the current packet belongs to.
+                conversation.addPacket(packet, true);
+                // Add the new conversation to the map if an equal entry is not already present.
+                // If an existing entry is already present, the current packet is simply added to that conversation.
+                mConversations.merge(conversation, conversation, (existingEntry, toMerge) -> {
+                    // toMerge only has a single packet, which is the same as referred to by 'packet' variable, but need
+                    // this hack as 'packet' is not final and hence cannot be referred to in a lambda.
+                    existingEntry.addPacket(toMerge.getPackets().get(0), true);
+                    return existingEntry;
+                });
+                // Refresh reference to point to entry in map (in case packet was added to existing entry).
+                conversation = mConversations.get(conversation);
+                if (conversation.getPackets().size() == mPattern.getLength()) {
+                    // Conversation reached a size that matches the expected size.
+                    // Remove the Conversation from the map and start the analysis.
+                    // Any future packets identified by the same four tuple will be tied to a new Conversation instance.
+                    // This might, for example, occur if the same conversation is reused for multiple events.
+                    mConversations.remove(conversation);
+                    // Create comparison task and send to executor service.
+                    PatternComparisonTask<CompleteMatchPatternComparisonResult> comparisonTask =
+                            new PatternComparisonTask<>(conversation, mPattern, ComparisonFunctions.COMPLETE_MATCH);
+                    mPendingComparisons.add(EXECUTOR_SERVICE.submit(comparisonTask));
                 }
             }
         } catch (EOFException eofe) {
-            while (isEoF.compareAndSet(false, true) == false);  // Try to signal EoF!
             System.out.println("[ findFlowPattern ] Finished processing entire PCAP stream!");
+            System.out.println("[ findFlowPattern ] Now waiting for comparisons to finish...");
+            // Wait for all comparisons to finish, then output their results to std.out.
+            for(Future<CompleteMatchPatternComparisonResult> comparisonTask : mPendingComparisons) {
+                try {
+                    // Blocks until result is ready.
+                    CompleteMatchPatternComparisonResult comparisonResult = comparisonTask.get();
+                    if (comparisonResult.getResult()) {
+                        System.out.println(comparisonResult.getTextualDescription());
+                    }
+                } catch (InterruptedException|ExecutionException e) {
+                    e.printStackTrace();
+                }
+            }
         } catch (UnknownHostException |
                  PcapNativeException  |
                  NotOpenException     |
@@ -140,42 +171,46 @@ public class FlowPatternFinder {
             ex.printStackTrace();
         }
     }
-    
-
-    /**
-     * Checker to match collected patterns (run by a thread)
-     */
-    private void find() {
 
-        while (isEoF.get() == false) {  // Continue until EoF
-            // Get the object from the queue
-            while(conversations.peek() == null) {  // Wait until queue is not empty
-                if (isEoF.get() == true)    // Return if EoF
-                    return;
-            }            
-            Conversation conversation = conversations.poll();
-            // Get the object and remove it from the Map (housekeeping)
-            List<PcapPacket> packets = connections.remove(conversation);
-            boolean completeMatch = true;
-            for (int i = 0; i < packets.size(); i++) {
-                TcpPacket tcpPacket = packets.get(i).get(TcpPacket.class);
-                if (tcpPacket.getPayload().length() != pattern.getPacketOrder().get(i)) {
-                    completeMatch = false;
-                    break;
-                }
-            }
-            if (completeMatch) {
-                PcapPacket firstPacketInFlow = packets.get(0);
-                System.out.println(
-                        String.format("[ find ] Detected a complete match of pattern '%s' at %s!",
-                                pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
-            } /*else {
-                PcapPacket firstPacketInFlow = packets.get(0);
-                System.out.println(
-                        String.format("[ detected a mismatch of pattern '%s' at %s]",
-                                pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
-            }*/
-        }
-    }
+//    private static class PatternComparisonTask implements Runnable {
+//
+//        private final Conversation mConversation;
+//        private final FlowPattern mFlowPattern;
+//
+//        private PatternComparisonTask(Conversation conversation, FlowPattern pattern) {
+//            this.mConversation = conversation;
+//            this.mFlowPattern = pattern;
+//        }
+//
+//        @Override
+//        public void run() {
+//            if(isCompleteMatch()) {
+//                PcapPacket firstPacketInFlow = mConversation.getPackets().get(0);
+//                System.out.println(
+//                        String.format("[ find ] Detected a complete match of pattern '%s' at %s!",
+//                                mFlowPattern.getPatternId(), firstPacketInFlow.getTimestamp().toString()));
+//            }
+//        }
+//
+//        /**
+//         * Compares the order of packet lengths present in {@link #mConversation} with those found in
+//         * {@link #mFlowPattern}.
+//         * @return {@code true} if the packet lengths matches pairwise for all indices, {@code false} otherwise.
+//         */
+//        private boolean isCompleteMatch() {
+//            List<PcapPacket> convPackets = mConversation.getPackets();
+//            if (convPackets.size() != mFlowPattern.getLength()) {
+//                return false;
+//            }
+//            for (int i = 0; i < convPackets.size(); i++) {
+//                TcpPacket tcpPacket = convPackets.get(i).get(TcpPacket.class);
+//                if (tcpPacket.getPayload().length() != mFlowPattern.getPacketOrder().get(i)) {
+//                    return false;
+//                }
+//            }
+//            return true;
+//        }
+//
+//    }
 
 }
index d72768325b9f79b0d1782d321b01da6722232e29..0b01c2cfd9cacdee83af44c0f83c1dd6b252ac5e 100644 (file)
@@ -1,14 +1,9 @@
 package edu.uci.iotproject;
 
 import org.pcap4j.core.*;
-import org.pcap4j.packet.*;
-import org.pcap4j.packet.DnsPacket;
-import org.pcap4j.packet.namednumber.DnsResourceRecordType;
 
 import java.io.EOFException;
-import java.net.Inet4Address;
 import java.net.UnknownHostException;
-import java.util.*;
 import java.util.concurrent.TimeoutException;
 
 /**
@@ -25,8 +20,8 @@ public class Main {
 
 
     public static void main(String[] args) throws PcapNativeException, NotOpenException, EOFException, TimeoutException, UnknownHostException {
-        //final String fileName = "/users/varmarken/Desktop/wlan1.local.dns.pcap";
-        final String fileName = "/home/rtrimana/pcap_processing/smart_home_traffic/Code/Projects/SmartPlugDetector/pcap/wlan1.local.remote.dns.pcap";
+        final String fileName = "/users/varmarken/Desktop/wlan1.local.dns.pcap";
+//        final String fileName = "/home/rtrimana/pcap_processing/smart_home_traffic/Code/Projects/SmartPlugDetector/pcap/wlan1.local.remote.dns.pcap";
 
         // ====== Debug code ======
         PcapHandle handle;
diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/AbstractPatternComparisonResult.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/AbstractPatternComparisonResult.java
new file mode 100644 (file)
index 0000000..3b82573
--- /dev/null
@@ -0,0 +1,49 @@
+package edu.uci.iotproject.comparison;
+
+import edu.uci.iotproject.Conversation;
+import edu.uci.iotproject.FlowPattern;
+
+/**
+ * Models the result of comparing a {@link Conversation} and a {@link FlowPattern}.
+ *
+ * @param <T> The type of the result; can be something as simple as a {@code Boolean} for a complete match comparison or
+ *           or a complex data type for more sophisticated comparisons.
+ */
+public abstract class AbstractPatternComparisonResult<T> {
+
+    /**
+     * The result of the comparison.
+     */
+    private final T mResult;
+
+    /**
+     * The {@code Conversation} that was compared against {@link #mFlowPattern}.
+     */
+    protected final Conversation mConversation;
+
+    /**
+     * The {@code FlowPattern} that {@link #mConversation} was compared against.
+     */
+    protected final FlowPattern mFlowPattern;
+
+    public AbstractPatternComparisonResult(Conversation conversation, FlowPattern flowPattern, T result) {
+        this.mResult = result;
+        this.mConversation = conversation;
+        this.mFlowPattern = flowPattern;
+    }
+
+    /**
+     * Gets the result of the comparison.
+     * @return the result of the comparison
+     */
+    public T getResult() {
+        return mResult;
+    }
+
+    /**
+     * Get a textual description of the comparison result suitable for output on std.out.
+     * @returna a textual description of the comparison result suitable for output on std.out.
+     */
+    abstract public String getTextualDescription();
+
+}
\ No newline at end of file
diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/ComparisonFunctions.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/ComparisonFunctions.java
new file mode 100644 (file)
index 0000000..2f44f3b
--- /dev/null
@@ -0,0 +1,41 @@
+package edu.uci.iotproject.comparison;
+
+import edu.uci.iotproject.Conversation;
+import edu.uci.iotproject.FlowPattern;
+import org.pcap4j.core.PcapPacket;
+import org.pcap4j.packet.TcpPacket;
+
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * Contains concrete implementations of comparison functions that compare a {@link Conversation} and a {@link FlowPattern}.
+ * These functions are supplied to {@link PatternComparisonTask} which in turn facilitates comparison on a background thread.
+ * This design provides plugability: currently, we only support complete match comparison, but further down the road we
+ * can simply introduce more sophisticated comparison functions here (e.g. Least Common Substring) simply replace the
+ * argument passed to the {@link PatternComparisonTask} constructor to switch between the different implementations.
+ *
+ * @author Janus Varmarken {@literal <jvarmark@uci.edu>}
+ * @author Rahmadi Trimananda {@literal <rtrimana@uci.edu>}
+ */
+public class ComparisonFunctions {
+
+    /**
+     * Comparison function that checks for a <em>complete</em> match, i.e. a match in which every packet in the
+     * {@link Conversation} has the same length as the corresponding packet in the {@link FlowPattern}.
+     */
+    public static final BiFunction<Conversation, FlowPattern, CompleteMatchPatternComparisonResult> COMPLETE_MATCH = (conversation, flowPattern) -> {
+        List<PcapPacket> convPackets = conversation.getPackets();
+        if (convPackets.size() != flowPattern.getLength()) {
+            return new CompleteMatchPatternComparisonResult(conversation, flowPattern, false);
+        }
+        for (int i = 0; i < convPackets.size(); i++) {
+            TcpPacket tcpPacket = convPackets.get(i).get(TcpPacket.class);
+            if (tcpPacket.getPayload().length() != flowPattern.getPacketOrder().get(i)) {
+                return new CompleteMatchPatternComparisonResult(conversation, flowPattern, false);
+            }
+        }
+        return new CompleteMatchPatternComparisonResult(conversation, flowPattern, true);
+    };
+
+}
diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/CompleteMatchPatternComparisonResult.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/CompleteMatchPatternComparisonResult.java
new file mode 100644 (file)
index 0000000..3ddd884
--- /dev/null
@@ -0,0 +1,30 @@
+package edu.uci.iotproject.comparison;
+
+import edu.uci.iotproject.Conversation;
+import edu.uci.iotproject.FlowPattern;
+
+/**
+ * The result of a search for a complete match. Serves as an example implementation of
+ * {@link AbstractPatternComparisonResult}.
+ *
+ * @author Janus Varmarken {@literal <jvarmark@uci.edu>}
+ * @author Rahmadi Trimananda {@literal <rtrimana@uci.edu>}
+ */
+public class CompleteMatchPatternComparisonResult extends AbstractPatternComparisonResult<Boolean> {
+
+    public CompleteMatchPatternComparisonResult(Conversation conversation, FlowPattern flowPattern, Boolean result) {
+        super(conversation, flowPattern, result);
+    }
+
+    @Override
+    public String getTextualDescription() {
+        if (getResult()) {
+            return String.format("[ find ] Detected a COMPLETE MATCH of pattern '%s' at %s!",
+                    mFlowPattern.getPatternId(), mConversation.getPackets().get(0).getTimestamp().toString());
+        } else {
+            return String.format("[ miss ] flow starting at %s was **not** a complete match of pattern '%s'",
+                    mConversation.getPackets().get(0).getTimestamp().toString(), mFlowPattern.getPatternId());
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/PatternComparisonTask.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/comparison/PatternComparisonTask.java
new file mode 100644 (file)
index 0000000..705646c
--- /dev/null
@@ -0,0 +1,40 @@
+package edu.uci.iotproject.comparison;
+
+import edu.uci.iotproject.Conversation;
+import edu.uci.iotproject.FlowPattern;
+
+import java.util.concurrent.Callable;
+import java.util.function.BiFunction;
+
+/**
+ * A task that compares a given {@link Conversation} and {@link FlowPattern} using a provided comparison function.
+ * The task implements {@link Callable} and can hence be executed on a background thread.
+ *
+ * @author Janus Varmarken {@literal <jvarmark@uci.edu>}
+ * @author Rahmadi Trimananda {@literal <rtrimana@uci.edu>}
+ */
+public class PatternComparisonTask<R extends AbstractPatternComparisonResult<?>> implements Callable<R> {
+
+    private final Conversation mConversation;
+    private final FlowPattern mFlowPattern;
+    private final BiFunction<Conversation, FlowPattern, R> mComparitor;
+
+    /**
+     * Create a new {@code PatternComparisonTask}.
+     *
+     * @param conversation The conversation to compare against {@code pattern}.
+     * @param pattern The pattern to compare against {@code conversation}.
+     * @param comparisonFunction The function that compares {@code pattern} and {@code conversation}.
+     */
+    public PatternComparisonTask(Conversation conversation, FlowPattern pattern, BiFunction<Conversation, FlowPattern, R> comparisonFunction) {
+        this.mConversation = conversation;
+        this.mFlowPattern = pattern;
+        this.mComparitor = comparisonFunction;
+    }
+
+    @Override
+    public R call() throws Exception {
+        return mComparitor.apply(mConversation, mFlowPattern);
+    }
+
+}
\ No newline at end of file