From bb93c630fdcf9e2cb90e42158433673321f14c3d Mon Sep 17 00:00:00 2001 From: Janus Varmarken Date: Fri, 13 Jul 2018 16:52:32 -0700 Subject: [PATCH] Code for reassembling TCP streams. Not thoroughly tested, but seems to work for a simple, small pcap file --- .../java/edu/uci/iotproject/Conversation.java | 27 ++- .../main/java/edu/uci/iotproject/Main.java | 72 +++--- .../uci/iotproject/PcapPacketConsumer.java | 14 ++ .../iotproject/PcapProcessingPipeline.java | 37 +++ .../java/edu/uci/iotproject/PcapReader.java | 50 ++++ .../edu/uci/iotproject/TcpReassembler.java | 218 ++++++++++++++++++ 6 files changed, 385 insertions(+), 33 deletions(-) create mode 100644 Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapPacketConsumer.java create mode 100644 Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapProcessingPipeline.java create mode 100644 Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapReader.java create mode 100644 Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/TcpReassembler.java diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Conversation.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Conversation.java index fdb4a7f..b9f3e07 100644 --- a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Conversation.java +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Conversation.java @@ -72,6 +72,29 @@ public class Conversation { private List mFinPackets; /* End instance properties */ + /** + * Factory method for creating a {@code Conversation} from a {@link PcapPacket}. + * @param pcapPacket The {@code PcapPacket} that wraps a TCP segment for which a {@code Conversation} is to be initiated. + * @param clientIsSrc If {@code true}, the source address and source port found in the IP datagram and TCP segment + * wrapped in the {@code PcapPacket} are regarded as pertaining to the client, and the destination + * address and destination port are regarded as pertaining to the server---and vice versa if set + * to {@code false}. + * @return A {@code Conversation} initiated with ip:port for client and server according to the direction of the packet. + */ + public static Conversation fromPcapPacket(PcapPacket pcapPacket, boolean clientIsSrc) { + IpV4Packet ipPacket = pcapPacket.get(IpV4Packet.class); + TcpPacket tcpPacket = pcapPacket.get(TcpPacket.class); + String clientIp = clientIsSrc ? ipPacket.getHeader().getSrcAddr().getHostAddress() : + ipPacket.getHeader().getDstAddr().getHostAddress(); + String srvIp = clientIsSrc ? ipPacket.getHeader().getDstAddr().getHostAddress() : + ipPacket.getHeader().getSrcAddr().getHostAddress(); + int clientPort = clientIsSrc ? tcpPacket.getHeader().getSrcPort().valueAsInt() : + tcpPacket.getHeader().getDstPort().valueAsInt(); + int srvPort = clientIsSrc ? tcpPacket.getHeader().getDstPort().valueAsInt() : + tcpPacket.getHeader().getSrcPort().valueAsInt(); + return new Conversation(clientIp, clientPort, srvIp, srvPort); + } + /** * Constructs a new {@code Conversation}. * @param clientIp The IP of the host that is considered the client (i.e. the host that initiates the conversation) @@ -213,6 +236,8 @@ public class Conversation { public void addFinPacket(PcapPacket finPacket) { // Precondition: verify that packet does indeed pertain to conversation. onAddPrecondition(finPacket); + // TODO: should call addSeqNumber here? + addSeqNumber(finPacket); mFinPackets.add(new FinAckPair(finPacket)); } @@ -327,7 +352,7 @@ public class Conversation { * @param packet The packet. * @return {@code true} if {@code packet} was determined to be a retransmission, {@code false} otherwise. */ - private boolean isRetransmission(PcapPacket packet) { + public boolean isRetransmission(PcapPacket packet) { // Extract sequence number. int seqNo = packet.get(TcpPacket.class).getHeader().getSequenceNumber(); switch (getDirection(packet)) { diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java index f3b8a00..94b7820 100644 --- a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/Main.java @@ -37,38 +37,46 @@ public class Main { // MacLayerFlowPatternFinder finder = new MacLayerFlowPatternFinder(handle, pattern); // finder.findFlowPattern(); // ------------------------------------------------------------------------------------------------------------- - - //final String fileName = args.length > 0 ? args[0] : "/home/rtrimana/pcap_processing/smart_home_traffic/Code/Projects/SmartPlugDetector/pcap/wlan1.local.dns.pcap"; - final String fileName = args.length > 0 ? args[0] : "/scratch/June-2018/TPLink/wlan1/tplink.wlan1.local.pcap"; - //final String fileName = args.length > 0 ? args[0] : "/scratch/June-2018/DLink/wlan1/dlink.wlan1.local.pcap"; - final String trainingFileName = "./pcap/TP_LINK_LOCAL_ON_SUBSET.pcap"; -// final String trainingFileName = "./pcap/TP_LINK_LOCAL_ON.pcap"; -// -// // ====== Debug code ====== - PcapHandle handle; - PcapHandle trainingPcap; - try { - handle = Pcaps.openOffline(fileName, PcapHandle.TimestampPrecision.NANO); - trainingPcap = Pcaps.openOffline(trainingFileName, PcapHandle.TimestampPrecision.NANO); - } catch (PcapNativeException pne) { - handle = Pcaps.openOffline(fileName); - trainingPcap = Pcaps.openOffline(trainingFileName); - } -// -// // TODO: The followings are the way to extract multiple hostnames and their associated packet lengths lists -// //List list = new ArrayList<>(); -// //list.add("events.tplinkra.com"); -// //FlowPattern fp = new FlowPattern("TP_LINK_LOCAL_ON", list, trainingPcap); -// //List list2 = new ArrayList<>(); -// //list2.add("devs.tplinkcloud.com"); -// //list2.add("events.tplinkra.com"); -// //FlowPattern fp3 = new FlowPattern("TP_LINK_REMOTE_ON", list2, trainingPcap); // - FlowPattern fp = new FlowPattern("TP_LINK_LOCAL_ON", "events.tplinkra.com", trainingPcap); - //FlowPattern fp = new FlowPattern("DLINK_LOCAL_ON", "rfe-us-west-1.dch.dlink.com", trainingPcap); - FlowPatternFinder fpf = new FlowPatternFinder(handle, fp); - fpf.start(); -// -// // ======================== +// //final String fileName = args.length > 0 ? args[0] : "/home/rtrimana/pcap_processing/smart_home_traffic/Code/Projects/SmartPlugDetector/pcap/wlan1.local.dns.pcap"; +// final String fileName = args.length > 0 ? args[0] : "/scratch/June-2018/TPLink/wlan1/tplink.wlan1.local.pcap"; +// //final String fileName = args.length > 0 ? args[0] : "/scratch/June-2018/DLink/wlan1/dlink.wlan1.local.pcap"; +// final String trainingFileName = "./pcap/TP_LINK_LOCAL_ON_SUBSET.pcap"; +//// final String trainingFileName = "./pcap/TP_LINK_LOCAL_ON.pcap"; +//// +//// // ====== Debug code ====== +// PcapHandle handle; +// PcapHandle trainingPcap; +// try { +// handle = Pcaps.openOffline(fileName, PcapHandle.TimestampPrecision.NANO); +// trainingPcap = Pcaps.openOffline(trainingFileName, PcapHandle.TimestampPrecision.NANO); +// } catch (PcapNativeException pne) { +// handle = Pcaps.openOffline(fileName); +// trainingPcap = Pcaps.openOffline(trainingFileName); +// } +//// +//// // TODO: The followings are the way to extract multiple hostnames and their associated packet lengths lists +//// //List list = new ArrayList<>(); +//// //list.add("events.tplinkra.com"); +//// //FlowPattern fp = new FlowPattern("TP_LINK_LOCAL_ON", list, trainingPcap); +//// //List list2 = new ArrayList<>(); +//// //list2.add("devs.tplinkcloud.com"); +//// //list2.add("events.tplinkra.com"); +//// //FlowPattern fp3 = new FlowPattern("TP_LINK_REMOTE_ON", list2, trainingPcap); +//// +// FlowPattern fp = new FlowPattern("TP_LINK_LOCAL_ON", "events.tplinkra.com", trainingPcap); +// //FlowPattern fp = new FlowPattern("DLINK_LOCAL_ON", "rfe-us-west-1.dch.dlink.com", trainingPcap); +// FlowPatternFinder fpf = new FlowPatternFinder(handle, fp); +// fpf.start(); +//// +//// // ======================== + + + PcapReader pcapReader = new PcapReader(args[0]); + PcapProcessingPipeline pipeline = new PcapProcessingPipeline(pcapReader); + TcpReassembler tcpReassembler = new TcpReassembler(); + pipeline.addPcapPacketConsumer(tcpReassembler); + pipeline.executePipeline(); + System.out.println("Pipeline terminated"); } } diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapPacketConsumer.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapPacketConsumer.java new file mode 100644 index 0000000..fb78eb7 --- /dev/null +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapPacketConsumer.java @@ -0,0 +1,14 @@ +package edu.uci.iotproject; + +import org.pcap4j.core.PcapPacket; + +/** + * TODO add class documentation. + * + * @author Janus Varmarken + */ +public interface PcapPacketConsumer { + + void consumePacket(PcapPacket pcapPacket); + +} diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapProcessingPipeline.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapProcessingPipeline.java new file mode 100644 index 0000000..5b3830c --- /dev/null +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapProcessingPipeline.java @@ -0,0 +1,37 @@ +package edu.uci.iotproject; + +import org.pcap4j.core.PcapPacket; + +import java.util.ArrayList; +import java.util.List; + +/** + * TODO add class documentation. + * + * @author Janus Varmarken {@literal } + * @author Rahmadi Trimananda {@literal } + */ +public class PcapProcessingPipeline { + + private final PcapReader mPcapReader; + private final List mPacketConsumers; + + public PcapProcessingPipeline(PcapReader pcapReader) { + mPcapReader = pcapReader; + mPacketConsumers = new ArrayList<>(); + } + + public void addPcapPacketConsumer(PcapPacketConsumer packetConsumer) { + mPacketConsumers.add(packetConsumer); + } + + public void executePipeline() { + PcapPacket packet; + while ((packet = mPcapReader.readNextPacket()) != null) { + for (PcapPacketConsumer consumer : mPacketConsumers) { + consumer.consumePacket(packet); + } + } + } + +} diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapReader.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapReader.java new file mode 100644 index 0000000..c8106b2 --- /dev/null +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/PcapReader.java @@ -0,0 +1,50 @@ +package edu.uci.iotproject; + +import org.pcap4j.core.*; + +import java.io.EOFException; +import java.util.concurrent.TimeoutException; +/** + * Opens and reads from a pcap file. + * This class is nothing but a simple wrapper around some functionality in {@link Pcaps} and {@link PcapHandle} which + * serves to simplify client code. + * Note that the file is read in offline mode, i.e., this class does not support live processing of packets. + * + * @author Janus Varmarken {@literal } + * @author Rahmadi Trimananda {@literal } + */ +public class PcapReader { + + private final PcapHandle mHandle; + + /** + * Create a new {@code PcapReader} that reads the file specified by the absolute path {@code fileName}. + * @param fileName The absolute path to the pcap file to be read. + * @throws PcapNativeException If the pcap file cannot be opened. + */ + public PcapReader(String fileName) throws PcapNativeException { + PcapHandle handle; + try { + handle = Pcaps.openOffline(fileName, PcapHandle.TimestampPrecision.NANO); + } catch (PcapNativeException pne) { + handle = Pcaps.openOffline(fileName); + } + mHandle = handle; + } + + /** + * Reads the next packet in the pcap file. + * @return The next packet in the pcap file, or {@code null} if all packets have been read. + */ + public PcapPacket readNextPacket() { + try { + return mHandle.getNextPacketEx(); + } catch (EOFException eofe) { + return null; + } catch (PcapNativeException|TimeoutException|NotOpenException e) { + // Wrap checked exceptions in unchecked exceptions to simplify client code. + throw new RuntimeException(e); + } + } + +} diff --git a/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/TcpReassembler.java b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/TcpReassembler.java new file mode 100644 index 0000000..0a66934 --- /dev/null +++ b/Code/Projects/SmartPlugDetector/src/main/java/edu/uci/iotproject/TcpReassembler.java @@ -0,0 +1,218 @@ +package edu.uci.iotproject; + +import org.pcap4j.core.PcapPacket; +import org.pcap4j.packet.IpV4Packet; +import org.pcap4j.packet.TcpPacket; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * TODO add class documentation. + * + * @author Janus Varmarken {@literal } + * @author Rahmadi Trimananda {@literal } + */ +public class TcpReassembler implements PcapPacketConsumer { + + /** + * Holds open {@link Conversation}s, i.e., {@code Conversation}s that have not been detected as + * (gracefully) terminated based on the set of packets observed thus far. + * A {@link Conversation} is moved to {@link #mTerminatedConversations} if it can be determined that it is has + * terminated. Termination can be detected by a) observing two {@link FinAckPair}s, one in each direction, (graceful + * termination, see {@link Conversation#isGracefullyShutdown()}) or b) by observing a SYN packet that matches the + * four tuple of an existing {@code Conversation}, but which holds a different sequence number than the + * same-direction SYN packet recorded for the {@code Conversation}. + *

+ * Note that due to limitations of the {@link Set} interface (specifically, there is no {@code get(T t)} method), + * we have to resort to a {@link Map} (in which keys map to themselves) to "mimic" a set with {@code get(T t)} + * functionality. + * + * @see this question on StackOverflow.com + */ + private final Map mOpenConversations = new HashMap<>(); + + /** + * Holds terminated {@link Conversation}s. + */ + private final Map mTerminatedConversations = new HashMap<>(); + + @Override + public void consumePacket(PcapPacket pcapPacket) { + TcpPacket tcpPacket = pcapPacket.get(TcpPacket.class); + if (tcpPacket == null) { + return; + } + // ... TODO? + processPacket(pcapPacket); + } + + private void processPacket(PcapPacket pcapPacket) { + TcpPacket tcpPacket = pcapPacket.get(TcpPacket.class); + // Handle client connection initiation attempts. + if (tcpPacket.getHeader().getSyn() && !tcpPacket.getHeader().getAck()) { + // A segment with the SYN flag set, but no ACK flag indicates that a client is attempting to initiate a new + // connection. + processNewConnectionRequest(pcapPacket); + return; + } + // Handle server connection initiation acknowledgement + if (tcpPacket.getHeader().getSyn() && tcpPacket.getHeader().getAck()) { + // A segment with both the SYN and ACK flags set indicates that the server has accepted the client's request + // to initiate a new connection. + processNewConnectionAck(pcapPacket); + return; + } + // Handle resets + if (tcpPacket.getHeader().getRst()) { + processRstPacket(pcapPacket); + return; + } + // Handle FINs + if (tcpPacket.getHeader().getFin()) { + // Handle FIN packet. + processFinPacket(pcapPacket); + } + // Handle ACKs (currently only ACKs of FINS) + if (tcpPacket.getHeader().getAck()) { + processAck(pcapPacket); + } + // Handle packets that carry payload (application data). + if (tcpPacket.getPayload() != null) { + processPayloadPacket(pcapPacket); + } + } + + private void processNewConnectionRequest(PcapPacket clientSynPacket) { + // A SYN w/o ACK always originates from the client. + Conversation conv = Conversation.fromPcapPacket(clientSynPacket, true); + conv.addSynPacket(clientSynPacket); + // Is there an ongoing conversation for the same four tuple (clientIp, clientPort, serverIp, serverPort) as + // found in the new SYN packet? + Conversation ongoingConv = mOpenConversations.get(conv); + if (ongoingConv != null) { + if (ongoingConv.isRetransmission(clientSynPacket)) { + // SYN retransmission detected, do nothing. + return; + // TODO: the way retransmission detection is implemented may cause a bug for connections where we have + // not recorded the initial SYN, but only the SYN ACK, as retransmission is determined by comparing the + // sequence numbers of initial SYNs -- and if no initial SYN is present for the Conversation, the new + // SYN will be interpreted as a retransmission. Possible fix: let isRentransmission ALWAYS return false + // when presented with a SYN packet when the Conversation already holds a SYN ACK packet? + } else { + // New SYN has different sequence number than SYN recorded for ongoingConv, so this must be an attempt + // to establish a new conversation with the same four tuple as ongoingConv. + // Mark existing connection as terminated. + // TODO: is this 100% theoretically correct, e.g., if many connection attempts are made back to back? And RST packets? + mTerminatedConversations.put(ongoingConv, ongoingConv); + mOpenConversations.remove(ongoingConv); + } + } + // Finally, update the map of open connections with the new connection. + mOpenConversations.put(conv, conv); + } + + + /* + * TODO a problem across the board for all processXPacket methods below: + * if we start the capture in the middle of a TCP connection, we will not have an entry for the conversation in the + * map as we have not seen the initial SYN packet. + * Two ways we can address this: + * a) Perform null-checks and ignore packets for which we have not seen SYN + * + easy to get correct + * - we discard data (issue for long-lived connections!) + * b) Add a corresponding conversation entry whenever we encounter a packet that does not map to a conversation + * + we consider all data + * - not immediately clear if this will introduce bugs (incorrectly mapping packets to wrong conversations?) + * + * [[[ I went with option b) for now; see getOngoingConversationOrCreateNew(PcapPacket pcapPacket). ]]] + */ + + private void processNewConnectionAck(PcapPacket srvSynPacket) { + // Find the corresponding ongoing connection, if any (if we start the capture just *after* the initial SYN, no + // ongoing conversation entry will exist, so it must be created in that case). +// Conversation conv = mOpenConversations.get(Conversation.fromPcapPacket(srvSynPacket, false)); + Conversation conv = getOngoingConversationOrCreateNew(srvSynPacket); + // Note: exploits &&'s short-circuit operation: only attempts to add non-retransmissions. + if (!conv.isRetransmission(srvSynPacket) && !conv.addSynPacket(srvSynPacket)) { + // For safety/debugging: if NOT a retransmission and add fails, + // something has gone terribly wrong/invariant is broken. + throw new IllegalStateException("Attempt to add SYN ACK packet that was NOT a retransmission failed." + + Conversation.class.getSimpleName() + " invariant broken."); + } + } + + private void processRstPacket(PcapPacket rstPacket) { + Conversation conv = getOngoingConversationOrCreateNew(rstPacket); + // Move conversation to set of terminated conversations. + mTerminatedConversations.put(conv, conv); + mOpenConversations.remove(conv, conv); + } + + private void processFinPacket(PcapPacket finPacket) { +// getOngoingConversationForPacket(finPacket).addFinPacket(finPacket); + getOngoingConversationOrCreateNew(finPacket).addFinPacket(finPacket); + } + + private void processAck(PcapPacket ackPacket) { +// getOngoingConversationForPacket(ackPacket).attemptAcknowledgementOfFin(ackPacket); + // Note that unlike the style for SYN, FIN, and payload packets, for "ACK only" packets, we want to avoid + // creating a new conversation. + Conversation conv = getOngoingConversationForPacket(ackPacket); + if (conv != null) { + // The ACK may be an ACK of a FIN, so attempt to mark the FIN as ack'ed. + conv.attemptAcknowledgementOfFin(ackPacket); + if (conv.isGracefullyShutdown()) { + // Move conversation to set of terminated conversations. + mTerminatedConversations.put(conv, conv); + mOpenConversations.remove(conv); + } + } + // Note: add (additional) processing of ACKs (that are not ACKs of FINs) as necessary here... + } + + private void processPayloadPacket(PcapPacket pcapPacket) { +// getOngoingConversationForPacket(pcapPacket).addPacket(pcapPacket, true); + getOngoingConversationOrCreateNew(pcapPacket).addPacket(pcapPacket, true); + } + + /** + * Locates an ongoing conversation (if any) that {@code pcapPacket} pertains to. + * @param pcapPacket The packet that is to be mapped to an ongoing {@code Conversation}. + * @return The {@code Conversation} matching {@code pcapPacket} or {@code null} if there is no match. + */ + private Conversation getOngoingConversationForPacket(PcapPacket pcapPacket) { + // We cannot know if this is a client-to-server or server-to-client packet without trying both options... + Conversation conv = mOpenConversations.get(Conversation.fromPcapPacket(pcapPacket, true)); + if (conv == null) { + conv = mOpenConversations.get(Conversation.fromPcapPacket(pcapPacket, false)); + } + return conv; + } + + /** + * Like {@link #getOngoingConversationForPacket(PcapPacket)}, but creates and inserts a new {@code Conversation} + * into {@link #mOpenConversations} if no open conversation is found (i.e., in the case that + * {@link #getOngoingConversationForPacket(PcapPacket)} returns {@code null}). + * + * @param pcapPacket The packet that is to be mapped to an ongoing {@code Conversation}. + * @return The existing, ongoing {@code Conversation} matching {@code pcapPacket} or the newly created one in case + * no match was found. + */ + private Conversation getOngoingConversationOrCreateNew(PcapPacket pcapPacket) { + Conversation conv = getOngoingConversationForPacket(pcapPacket); + if (conv == null) { + TcpPacket tcpPacket = pcapPacket.get(TcpPacket.class); + if (tcpPacket.getHeader().getSyn() && tcpPacket.getHeader().getAck()) { + // A SYN ACK packet always originates from the server (it is a reply to the initial SYN packet from the client) + conv = Conversation.fromPcapPacket(pcapPacket, false); + } else { + // TODO: can we do anything else but arbitrarily select who is designated as the server in this case? + conv = Conversation.fromPcapPacket(pcapPacket, false); + } + mOpenConversations.put(conv, conv); + } + return conv; + } +} -- 2.34.1