4 import unicodecsv as csv
\r
6 from scapy.all import *
\r
11 For reassembling bidirectional sessions (streams). By default, Scapy only groups packets in one direction. That is,
\r
12 bidirectional sessions are split into two sessions, one with client-to-server packets, and one with server-to-client
\r
15 Note that this is simplified session reassembly as it does not consider TCP FIN/RST packets --- packets are mapped
\r
16 to their respective session based solely on the (src_ip, src_port, dst_ip, dst_port) four-tuple. If the client (or
\r
17 server) closes a TCP stream and the client by chance selects the same ephemeral port number when contacting the same
\r
18 server again, the two DIFFERENT TCP streams will be identified as a single stream.
\r
20 Code courtesy of: https://pen-testing.sans.org/blog/2017/10/13/scapy-full-duplex-stream-reassembly
\r
22 Also note that this assumes Ethernet as layer-2 wrapper for everything. This assumption holds for our TP-Link trace,
\r
23 but will not hold in general. See discussion at:
\r
24 https://gist.github.com/MarkBaggett/d8933453f431c111169158ce7f4e2222#file-scapy_helper-py
\r
26 :param p: A Scapy packet object.
\r
27 :return: Session identifier for the packet.
\r
33 sess = str(sorted(["TCP", p[IP].src, p[TCP].sport, p[IP].dst, p[TCP].dport],key=str))
\r
35 sess = str(sorted(["UDP", p[IP].src, p[UDP].sport, p[IP].dst, p[UDP].dport] ,key=str))
\r
37 sess = str(sorted(["ICMP", p[IP].src, p[IP].dst, p[ICMP].code, p[ICMP].type, p[ICMP].id] ,key=str))
\r
39 sess = str(sorted(["IP", p[IP].src, p[IP].dst, p[IP].proto] ,key=str))
\r
41 sess = str(sorted(["ARP", p[ARP].psrc, p[ARP].pdst],key=str))
\r
43 sess = p.sprintf("Ethernet type=%04xr,Ether.type%")
\r
47 def get_tls_app_data_pkts(session):
\r
49 Extract the TLS Application Data packets from a (TCP) stream.
\r
50 :param tcp_session: The (TCP) stream.
\r
51 :return: The (ordered) list of TLS application data packets in session.
\r
53 return session.filter(lambda pkt: TLS in pkt and pkt[TLS].type == 23)
\r
56 def find_matches(pcap_file, device_ip, sig_duration):
\r
58 Find all matches of [C-->S, S-->C] signatures in TLS conversations involving the device with IP=device_ip. Packet
\r
59 lengths are not considered, only directions and timing (packet lengths are assumed unavaiable due to TLS padding).
\r
60 :param pcap_file: The pcap file that is the target of the signature matching.
\r
61 :param device_ip: IP of the device whose TLS sessions are to be examined for matches.
\r
62 :param sig_duration: Maximum duration between request and response packets.
\r
63 :return: A list of (request_packet, reply_packets) tuples, where reply_packets is a list of reply packets that
\r
64 satisfy the signature match conditions (i.e., that they are within sig_duration after the request packet
\r
65 and that no other request packet interleaves the request_packet and the reply packet).
\r
67 # Read all packets into memory (stored as a list).
\r
68 # This is slow and consumes lots of memory.
\r
69 # There are more efficient ways to read the pcap (which clear each packet from memory after it's been processed).
\r
70 # However, to simplify the detection implementation we stick with the quick-and-dirty approach.
\r
71 pkts = rdpcap(pcap_file)
\r
73 # Group packets into sessions (streams)
\r
74 sessions_dict = pkts.sessions(full_duplex)
\r
75 for sess_key in sessions_dict:
\r
76 session = sessions_dict[sess_key]
\r
77 tls_app_data_pkts = get_tls_app_data_pkts(session)
\r
78 if len(tls_app_data_pkts) == 0:
\r
79 # Session w/o any TLS traffic, not relevant.
\r
81 first_pkt = tls_app_data_pkts[0]
\r
82 if IP not in first_pkt:
\r
83 # Only consider IPv4 traffic.
\r
85 if first_pkt[IP].src != device_ip and first_pkt[IP].dst != device_ip:
\r
86 # Traffic from some other device; ignore -- not relevant to us.
\r
88 if ipaddress.ip_address(first_pkt[IP].src).is_multicast or ipaddress.ip_address(first_pkt[IP].dst).is_multicast:
\r
89 # Don't include multicast traffic in the results.
\r
90 # (Should never occur as TLS is not used for multicast?)
\r
92 # Now let's find all the potential matches for the current TLS session.
\r
93 for i, request_pkt in enumerate(tls_app_data_pkts):
\r
94 if request_pkt[IP].src != device_ip:
\r
95 # We are trying to find matches for a simple [C->S, S->C] signature, so we want to first identify an
\r
96 # outbound (device-to-cloud) packet and then subsequently find all potential reply packets
\r
97 # (cloud-to-device). If this is a cloud-to-device packet, it is of no interest to us at this stage, so
\r
100 # All subsequent cloud-to-device packets (replies) in this TLS session that lie within the signature
\r
101 # duration after this packet AND that are not preceded by a device-to-cloud packet that is later than the
\r
102 # current packet can be paired with the current packet to constitute a potential signature match.
\r
105 while idx < len(tls_app_data_pkts) and tls_app_data_pkts[idx][IP].dst == device_ip:
\r
106 reply_pkt = tls_app_data_pkts[idx]
\r
107 if reply_pkt.time - request_pkt.time <= sig_duration:
\r
108 # Could have this check in the loop condition as well. But some times packet order != timestamp
\r
110 replies.append(reply_pkt)
\r
112 matches.append((request_pkt, replies))
\r
116 def get_pkt_key(pkt):
\r
118 Get a string representation of a packet that can be used as a key in a dictionary.
\r
119 :param pkt: A Scapy packet.
\r
120 :return: A string representation of a packet that can be used as a key in a dictionary.
\r
122 return f'src={pkt.src} dst={pkt.dst} timestamp={pkt.time}'
\r
125 def build_pkt_number_dict(pcap_file):
\r
127 Create a dictionary mapping packets to their packet number in pcap_file.
\r
128 The keys are generated by passing each packet to get_pkt_key(pkt).
\r
129 :param pcap_file: The pcap file for which a packet number dictionary is desired.
\r
130 :return: A dictionary mapping packet keys (obtainable from get_pkt_key(pkt)) to the packets packet number.
\r
132 pkts = rdpcap(pcap_file)
\r
134 for i, pkt in enumerate(pkts):
\r
136 key = get_pkt_key(pkt)
\r
137 assert(key not in map)
\r
139 assert(len(map) == len(pkts))
\r
140 # Double check that numbers come out right. Can be removed in final version.
\r
141 pkts = rdpcap(pcap_file)
\r
142 for i, pkt in enumerate(pkts):
\r
143 pkt_key = get_pkt_key(pkt)
\r
144 assert(pkt_key in map and map[pkt_key] == i+1)
\r
148 def add_pkt_numbers_to_matches(pcap_file, matches):
\r
150 Hacky way to augment the matches with packet numbers. Assumes the same device does not send or receive more than
\r
151 one packet at a given timestamp.
\r
152 :param pcap_file: The pcap file where the matches were found in.
\r
153 :param matches: The matches.
\r
154 :return: matches augmented with packet numbers; each packet is converted to a (pkt, pkt_number) tuple.
\r
156 pkt_nums_dict = build_pkt_number_dict(pcap_file)
\r
158 for req_pkt, replies in matches:
\r
159 req_pkt_num = pkt_nums_dict[get_pkt_key(req_pkt)] #find_pkt_number(req_pkt, pcap_file)
\r
160 numbered_req_pkt = (req_pkt, req_pkt_num)
\r
161 numbered_reply_pkts = []
\r
162 for reply_pkt in replies:
\r
163 reply_pkt_num = pkt_nums_dict[get_pkt_key(reply_pkt)] #find_pkt_number(reply_pkt, pcap_file)
\r
164 numbered_reply_pkts.append((reply_pkt, reply_pkt_num))
\r
165 result.append((numbered_req_pkt, numbered_reply_pkts))
\r
169 def write_matches_to_csv(matches, csv_filename):
\r
171 Output matches to a .csv file.
\r
172 matches argument is expected to be in the format returned by add_pkt_numbers_to_matches(pcap_file, matches).
\r
173 :param matches: A list of matches w/ packet numbers, as returned by add_pkt_numbers_to_matches(pcap_file, matches).
\r
174 :param csv_filename: Path to the .csv file where the output is to be written.
\r
177 key_req_pkt = 'request_pkt'
\r
178 key_reply_pkts = 'reply_pkts'
\r
179 key_reply_pkts_count = 'number_of_reply_pkts'
\r
180 key_conversation_info = 'tls_conversation_between'
\r
181 columns = [key_req_pkt, key_reply_pkts, key_reply_pkts_count, key_conversation_info]
\r
182 with open (csv_filename, 'wb') as csv_file:
\r
183 writer = csv.DictWriter(csv_file, fieldnames=columns)
\r
184 writer.writeheader()
\r
186 request_pkt = m[0][0]
\r
187 request_pkt_num = m[0][1]
\r
188 reply_pkts_numbers = []
\r
189 for (reply_pkt, reply_pkt_num) in m[1]:
\r
190 reply_pkts_numbers.append(reply_pkt_num)
\r
191 info = f'{request_pkt[IP].src+":"+str(request_pkt[TCP].sport)} and ' + \
\r
192 f'{request_pkt[IP].dst+":"+str(request_pkt[TCP].dport)}'
\r
193 row = { key_req_pkt: request_pkt_num,
\r
194 key_reply_pkts: '; '.join(str(pkt_num) for pkt_num in reply_pkts_numbers),
\r
195 key_reply_pkts_count: len(reply_pkts_numbers),
\r
196 key_conversation_info: info}
\r
197 writer.writerow(row)
\r
200 if __name__ == '__main__':
\r
201 desc = 'Perform detection on padded TLS traffic; ' + \
\r
202 'i.e., the detection is entirely based on timing information and packet directions. ' + \
\r
203 'NOTE: THIS CODE IS SIMPLIFIED AND ONLY WORKS FOR SIMPLE [Client-to-Server, Server-to-Client] TWO ' + \
\r
204 'PACKET SIGNATURES.'
\r
205 parser = argparse.ArgumentParser(description=desc)
\r
206 parser.add_argument('pcap_file', help='Full path to the target pcap file (detection target trace).')
\r
207 parser.add_argument('device_ip', help='Perform detection on TLS flows from this device (identified by IP) only.')
\r
208 h = 'Duration of the signature ' + \
\r
209 '(max time between request and reply packet for the two packets to be considered a match). ' + \
\r
210 'Unit: seconds (floating point number expected).'
\r
211 parser.add_argument('signature_duration',
\r
212 help=h, type=float)
\r
213 parser.add_argument('output_csv', help='Filename of CSV file where results are to be written.')
\r
214 args = parser.parse_args()
\r
216 pcap_file = args.pcap_file
\r
217 device_ip = args.device_ip
\r
218 signature_duration = args.signature_duration
\r
219 output_csv = args.output_csv
\r
223 matches = find_matches(pcap_file, device_ip, signature_duration)
\r
224 matches = add_pkt_numbers_to_matches(pcap_file, matches)
\r
225 write_matches_to_csv(matches, output_csv)
\r