From e1746e804c54cabf304e4d589dec8c862cf75b16 Mon Sep 17 00:00:00 2001 From: rtrimana Date: Wed, 2 May 2018 16:04:43 -0700 Subject: [PATCH] Separating pattern collection and analysis into 2 different threads. --- .../java/edu/uci/iotproject/FlowPattern.java | 10 +- .../edu/uci/iotproject/FlowPatternFinder.java | 149 ++++++++++++------ .../main/java/edu/uci/iotproject/Main.java | 4 +- 3 files changed, 112 insertions(+), 51 deletions(-) diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPattern.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPattern.java index 7ec2a74..a083e49 100644 --- a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPattern.java +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPattern.java @@ -54,10 +54,18 @@ public class FlowPattern { /** * Get the the sequence of packet lengths that defines this {@code FlowPattern}. - * @return the the sequence of packet lengths that defines this {@code FlowPattern}. + * @return the sequence of packet lengths that defines this {@code FlowPattern}. */ public List getPacketOrder() { return flowPacketOrder; } + + /** + * Get the length of the List of {@code FlowPattern}. + * @return the length of the List of {@code FlowPattern}. + */ + public int getLength() { + return flowPacketOrder.size(); + } } diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPatternFinder.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPatternFinder.java index cb5a0fd..aff4534 100644 --- a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPatternFinder.java +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/FlowPatternFinder.java @@ -14,38 +14,82 @@ import java.net.UnknownHostException; import java.time.Instant; import java.util.*; import java.util.concurrent.TimeoutException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; /** * Provides functionality for searching for the presence of a {@link FlowPattern} in a PCAP trace. + * We use 2 threads: + * 1) The first thread (main thread) collects conversations from the PCAP stream and put them into our data structure. + * 2) The second thread (checker thread) checks the collected conversation. * * @author Janus Varmarken + * @author Rahmadi Trimananda */ public class FlowPatternFinder { /* Class properties */ - private final Map> connections = new HashMap<>(); + private Map> connections; + private Queue conversations; private DnsMap dnsMap; + private PcapHandle pcap; + private FlowPattern pattern; + private AtomicBoolean isEoF; /* Constructor */ - public FlowPatternFinder() { + public FlowPatternFinder(PcapHandle _pcap, FlowPattern _pattern) { + + this.connections = new ConcurrentHashMap>(); + this.conversations = new ConcurrentLinkedQueue(); this.dnsMap = new DnsMap(); + this.isEoF = new AtomicBoolean(false); + + // Get input parameters + this.pcap = _pcap; + this.pattern = _pattern; + } + + + public void start() { + + // Spawn the main thread + Thread mainThread = new Thread(new Runnable() { + public void run() { + findFlowPattern(); + } + }); + mainThread.start(); + + // Spawn the checker thread + Thread checkerThread = new Thread(new Runnable() { + public void run() { + find(); + } + }); + checkerThread.start(); + + /* TODO: Join the threads if we want it to be blocking + try { + mainThread.join(); + checkerThread.join(); + } catch(InterruptedException ex) { + ex.printStackTrace(); + }*/ + System.out.println("[ start ] Main and checker threads started!"); } /** - * Find pattern based on the FlowPattern object - * - * @param pcap PCAP file handler - * @param pattern FlowPattern class object as a comparator + * Find patterns based on the FlowPattern object (run by a thread) */ - // TODO clean up exceptions etc. - public void findFlowPattern(PcapHandle pcap, FlowPattern pattern) - throws PcapNativeException, NotOpenException, TimeoutException { + private void findFlowPattern() { int counter = 0; try { PcapPacket packet; Set seqNumberSet = new HashSet(); + int patternLength = pattern.getLength(); while ((packet = pcap.getNextPacketEx()) != null) { // Check if this is a valid DNS packet @@ -53,62 +97,65 @@ public class FlowPatternFinder { // For now, we only work support pattern search in TCP over IPv4. IpV4Packet ipPacket = packet.get(IpV4Packet.class); TcpPacket tcpPacket = packet.get(TcpPacket.class); - if (ipPacket == null || tcpPacket == null) { + if (ipPacket == null || tcpPacket == null) continue; - } String srcAddress = ipPacket.getHeader().getSrcAddr().getHostAddress(); String dstAddress = ipPacket.getHeader().getDstAddr().getHostAddress(); int srcPort = tcpPacket.getHeader().getSrcPort().valueAsInt(); int dstPort = tcpPacket.getHeader().getDstPort().valueAsInt(); - // Is this packet related to the pattern and coming from the cloud server? + // Is this packet related to the pattern and coming to/from the cloud server? boolean fromServer = dnsMap.isRelatedToCloudServer(srcAddress, pattern.getHostname()); - // Is this packet related to the pattern and going to the cloud server? boolean fromClient = dnsMap.isRelatedToCloudServer(dstAddress, pattern.getHostname()); - if (!fromServer && !fromClient) { - // Packet not related to pattern, skip it. - continue; - } - if (tcpPacket.getPayload() == null) { - // We skip non-payload control packets as these are less predictable and should therefore not be - // part of a signature (e.g. receiver can choose not to ACK immediately) + if (!fromServer && !fromClient) // Packet not related to pattern, skip it. continue; - } - + if (tcpPacket.getPayload() == null) // We skip non-payload control packets as these are less predictable + continue; // Identify conversations (connections/sessions) by the four-tuple (clientIp, clientPort, serverIp, serverPort). // TODO: this is strictly not sufficient to differentiate one TCP session from another, but should suffice for now. Conversation conversation = fromClient ? new Conversation(srcAddress, srcPort, dstAddress, dstPort) : new Conversation(dstAddress, dstPort, srcAddress, srcPort); - List listWrappedPacket = new ArrayList<>(); - listWrappedPacket.add(packet); // Create new conversation entry, or append packet to existing. - connections.merge(conversation, listWrappedPacket, (v1, v2) -> { - int seqNumber = v2.get(0).get(TcpPacket.class).getHeader().getSequenceNumber(); - boolean retransmission = seqNumberSet.contains(seqNumber); - if (!retransmission) { - // Do not add if retransmission -> avoid duplicate packets in flow - v1.addAll(v2); - seqNumberSet.add(seqNumber); - } - return v1; - }); + List listPcapPacket = connections.get(conversation); + if (listPcapPacket == null) { + listPcapPacket = new ArrayList(); + connections.put(conversation, listPcapPacket); + } + int seqNumber = packet.get(TcpPacket.class).getHeader().getSequenceNumber(); + boolean retransmission = seqNumberSet.contains(seqNumber); + if (!retransmission) { // Do not add if retransmission -> avoid duplicate packets in flow + listPcapPacket.add(packet); + // End of conversation -> trigger thread to check + if (listPcapPacket.size() == patternLength) + conversations.add(conversation); + seqNumberSet.add(seqNumber); + } } } catch (EOFException eofe) { - System.out.println("findFlowPattern: finished processing entire file"); - find(pattern); - } catch (UnknownHostException ex) { - System.out.println(); + while (isEoF.compareAndSet(false, true) == false); // Try to signal EoF! + System.out.println("[ findFlowPattern ] Finished processing entire PCAP stream!"); + } catch (UnknownHostException | + PcapNativeException | + NotOpenException | + TimeoutException ex) { ex.printStackTrace(); } } + - - private void find(FlowPattern pattern) { - for (Conversation con : connections.keySet()) { - List packets = connections.get(con); - if (packets.size() != pattern.getPacketOrder().size()) { - // Not a complete match if different number of packets. - continue; - } + /** + * Checker to match collected patterns (run by a thread) + */ + private void find() { + + while (isEoF.get() == false) { // Continue until EoF + // Get the object from the queue + while(conversations.peek() == null) { // Wait until queue is not empty + if (isEoF.get() == true) // Return if EoF + return; + } + Conversation conversation = conversations.poll(); + // Get the object and remove it from the Map (housekeeping) + List packets = connections.remove(conversation); boolean completeMatch = true; for (int i = 0; i < packets.size(); i++) { TcpPacket tcpPacket = packets.get(i).get(TcpPacket.class); @@ -120,9 +167,14 @@ public class FlowPatternFinder { if (completeMatch) { PcapPacket firstPacketInFlow = packets.get(0); System.out.println( - String.format("[ detected a complete match of pattern '%s' at %s]", + String.format("[ find ] Detected a complete match of pattern '%s' at %s!", pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString())); - } + } /*else { + PcapPacket firstPacketInFlow = packets.get(0); + System.out.println( + String.format("[ detected a mismatch of pattern '%s' at %s]", + pattern.getPatternId(), firstPacketInFlow.getTimestamp().toString())); + }*/ } } @@ -153,6 +205,7 @@ public class FlowPatternFinder { public boolean equals(Object obj) { return obj instanceof Conversation && this.toString().equals(obj.toString()); } + @Override public int hashCode() { return toString().hashCode(); diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java index d2867df..e5a685f 100644 --- a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java @@ -35,8 +35,8 @@ public class Main { } catch (PcapNativeException pne) { handle = Pcaps.openOffline(fileName); } - FlowPatternFinder fpf = new FlowPatternFinder(); - fpf.findFlowPattern(handle, FlowPattern.TP_LINK_LOCAL_ON); + FlowPatternFinder fpf = new FlowPatternFinder(handle, FlowPattern.TP_LINK_LOCAL_ON); + fpf.start(); // ======================== } -- 2.34.1