1 package iotruntime.master;
4 import iotruntime.slave.IoTAddress;
5 import iotruntime.slave.IoTDeviceAddress;
6 import iotruntime.messages.*;
9 import org.objectweb.asm.ClassReader;
10 import org.objectweb.asm.ClassWriter;
11 import org.objectweb.asm.ClassVisitor;
16 import java.io.BufferedReader;
17 import java.io.InputStream;
18 import java.io.InputStreamReader;
20 import java.io.FileInputStream;
21 import java.io.FileOutputStream;
22 import java.io.ObjectInputStream;
23 import java.io.ObjectOutputStream;
24 import java.io.IOException;
25 import java.lang.ClassNotFoundException;
26 import java.lang.Class;
27 import java.lang.reflect.*;
28 import java.net.Socket;
29 import java.net.ServerSocket;
31 import static java.lang.Math.toIntExact;
33 /** Class IoTMaster is responsible to use ClassRuntimeInstrumenterMaster
34 * to instrument the controller/device bytecode and starts multiple
35 * IoTSlave running on different JVM's in a distributed fashion.
37 * @author Rahmadi Trimananda <rahmadi.trimananda @ uci.edu>
41 public class IoTMaster {
44 * IoTMaster class properties
46 * CommunicationHandler maintains the data structure for hostnames and ports
47 * LoadBalancer assigns a job onto a host based on certain metrics
49 private CommunicationHandler commHan;
50 private LoadBalancer lbIoT;
51 private RouterConfig routerConfig;
52 private ObjectInitHandler objInitHand;
53 private ObjectAddressInitHandler objAddInitHand;
54 private String[] strObjectNames;
55 private Map<String,ClassRuntimeInstrumenterMaster> mapClassNameToCrim;
57 * These properties hold information of a certain object
60 private String strObjName;
61 private String strObjClassName;
62 private String strObjClassInterfaceName;
63 private String strObjStubClsIntfaceName;
64 private String strIoTMasterHostAdd;
65 private String strIoTSlaveControllerHostAdd;
66 private String strIoTSlaveObjectHostAdd;
67 private Class[] arrFieldClasses;
68 private Object[] arrFieldValues;
69 private Socket filesocket;
71 // Constants that are to be extracted from config file
72 private static String STR_MASTER_MAC_ADD;
73 private static String STR_IOT_CODE_PATH;
74 private static String STR_CONT_PATH;
75 private static String STR_RUNTIME_DIR;
76 private static String STR_CLS_PATH;
77 private static String STR_RMI_PATH;
78 private static String STR_RMI_HOSTNAME;
79 private static String STR_LOG_FILE_PATH;
80 private static String STR_SSH_USERNAME;
81 private static String STR_ROUTER_ADD;
82 private static String STR_MONITORING_HOST;
83 private static String STR_ZB_GATEWAY_ADDRESS;
84 private static String STR_ZB_GATEWAY_PORT;
85 private static String STR_ZB_IOTMASTER_PORT;
86 private static String STR_NUM_CALLBACK_PORTS;
87 private static boolean BOOL_VERBOSE;
90 * IoTMaster class constants
92 * Name constants - not to be configured by users
94 private static final String STR_IOT_MASTER_NAME = "IoTMaster";
95 private static final String STR_CFG_FILE_EXT = ".config";
96 private static final String STR_CLS_FILE_EXT = ".class";
97 private static final String STR_JAR_FILE_EXT = ".jar";
98 private static final String STR_ZIP_FILE_EXT = ".zip";
99 private static final String STR_TCP_PROTOCOL = "tcp";
100 private static final String STR_UDP_PROTOCOL = "udp";
101 private static final String STR_TCPGW_PROTOCOL = "tcpgw";
102 private static final String STR_NO_PROTOCOL = "nopro";
103 private static final String STR_SELF_MAC_ADD = "00:00:00:00:00:00";
104 private static final String STR_INTERFACE_CLS_CFG = "INTERFACE_CLASS";
105 private static final String STR_INT_STUB_CLS_CFG = "INTERFACE_STUB_CLASS";
106 private static final String STR_FILE_TRF_CFG = "ADDITIONAL_ZIP_FILE";
107 private static final String STR_YES = "Yes";
108 private static final String STR_NO = "No";
111 * Runtime class name constants - not to be configured by users
113 private static final String STR_REL_INSTRUMENTER_CLS = "iotruntime.master.RelationInstrumenter";
114 private static final String STR_SET_INSTRUMENTER_CLS = "iotruntime.master.SetInstrumenter";
115 private static final String STR_IOT_SLAVE_CLS = "iotruntime.slave.IoTSlave";
116 private static final String STR_IOT_DEV_ADD_CLS = "IoTDeviceAddress";
117 private static final String STR_IOT_ZB_ADD_CLS = "IoTZigbeeAddress";
118 private static final String STR_IOT_ADD_CLS = "IoTAddress";
124 public IoTMaster(String[] argObjNms) {
130 objAddInitHand = null;
131 strObjectNames = argObjNms;
133 strObjClassName = null;
134 strObjClassInterfaceName = null;
135 strObjStubClsIntfaceName = null;
136 strIoTMasterHostAdd = null;
137 strIoTSlaveControllerHostAdd = null;
138 strIoTSlaveObjectHostAdd = null;
139 arrFieldClasses = null;
140 arrFieldValues = null;
142 mapClassNameToCrim = null;
144 STR_MASTER_MAC_ADD = null;
145 STR_IOT_CODE_PATH = null;
146 STR_CONT_PATH = null;
147 STR_RUNTIME_DIR = null;
150 STR_RMI_HOSTNAME = null;
151 STR_LOG_FILE_PATH = null;
152 STR_SSH_USERNAME = null;
153 STR_ROUTER_ADD = null;
154 STR_MONITORING_HOST = null;
155 STR_ZB_GATEWAY_ADDRESS = null;
156 STR_ZB_GATEWAY_PORT = null;
157 STR_ZB_IOTMASTER_PORT = null;
158 STR_NUM_CALLBACK_PORTS = null;
159 BOOL_VERBOSE = false;
163 * A method to initialize CommunicationHandler, LoadBalancer, RouterConfig and ObjectInitHandler
167 private void initLiveDataStructure() {
169 commHan = new CommunicationHandler(BOOL_VERBOSE);
170 lbIoT = new LoadBalancer(BOOL_VERBOSE);
171 lbIoT.setupLoadBalancer();
172 routerConfig = new RouterConfig();
173 routerConfig.getAddressList(STR_ROUTER_ADD);
174 objInitHand = new ObjectInitHandler(BOOL_VERBOSE);
175 objAddInitHand = new ObjectAddressInitHandler(BOOL_VERBOSE);
176 mapClassNameToCrim = new HashMap<String,ClassRuntimeInstrumenterMaster>();
180 * A method to initialize constants from config file
184 private void parseIoTMasterConfigFile() {
185 // Parse configuration file
186 Properties prop = new Properties();
187 String strCfgFileName = STR_IOT_MASTER_NAME + STR_CFG_FILE_EXT;
188 File file = new File(strCfgFileName);
189 FileInputStream fis = null;
191 fis = new FileInputStream(file);
194 } catch (IOException ex) {
195 System.out.println("IoTMaster: Error reading config file: " + strCfgFileName);
196 ex.printStackTrace();
198 // Initialize constants from config file
199 STR_MASTER_MAC_ADD = prop.getProperty("MAC_ADDRESS");
200 STR_IOT_CODE_PATH = prop.getProperty("IOT_CODE_PATH");
201 STR_CONT_PATH = prop.getProperty("CONTROLLERS_CODE_PATH");
202 STR_RUNTIME_DIR = prop.getProperty("RUNTIME_DIR");
203 STR_CLS_PATH = prop.getProperty("CLASS_PATH");
204 STR_RMI_PATH = prop.getProperty("RMI_PATH");
205 STR_RMI_HOSTNAME = prop.getProperty("RMI_HOSTNAME");
206 STR_LOG_FILE_PATH = prop.getProperty("LOG_FILE_PATH");
207 STR_SSH_USERNAME = prop.getProperty("SSH_USERNAME");
208 STR_ROUTER_ADD = prop.getProperty("ROUTER_ADD");
209 STR_MONITORING_HOST = prop.getProperty("MONITORING_HOST");
210 STR_ZB_GATEWAY_ADDRESS = prop.getProperty("ZIGBEE_GATEWAY_ADDRESS");
211 STR_ZB_GATEWAY_PORT = prop.getProperty("ZIGBEE_GATEWAY_PORT");
212 STR_ZB_IOTMASTER_PORT = prop.getProperty("ZIGBEE_IOTMASTER_PORT");
213 STR_NUM_CALLBACK_PORTS = prop.getProperty("NUMBER_CALLBACK_PORTS");
214 if(prop.getProperty("VERBOSE").equals(STR_YES)) {
218 RuntimeOutput.print("IoTMaster: Extracting information from config file: " + strCfgFileName, BOOL_VERBOSE);
219 RuntimeOutput.print("STR_MASTER_MAC_ADD=" + STR_MASTER_MAC_ADD, BOOL_VERBOSE);
220 RuntimeOutput.print("STR_IOT_CODE_PATH=" + STR_IOT_CODE_PATH, BOOL_VERBOSE);
221 RuntimeOutput.print("STR_CONT_PATH=" + STR_CONT_PATH, BOOL_VERBOSE);
222 RuntimeOutput.print("STR_RUNTIME_DIR=" + STR_RUNTIME_DIR, BOOL_VERBOSE);
223 RuntimeOutput.print("STR_CLS_PATH=" + STR_CLS_PATH, BOOL_VERBOSE);
224 RuntimeOutput.print("STR_RMI_PATH=" + STR_RMI_PATH, BOOL_VERBOSE);
225 RuntimeOutput.print("STR_RMI_HOSTNAME=" + STR_RMI_HOSTNAME, BOOL_VERBOSE);
226 RuntimeOutput.print("STR_LOG_FILE_PATH=" + STR_LOG_FILE_PATH, BOOL_VERBOSE);
227 RuntimeOutput.print("STR_SSH_USERNAME=" + STR_SSH_USERNAME, BOOL_VERBOSE);
228 RuntimeOutput.print("STR_ROUTER_ADD=" + STR_ROUTER_ADD, BOOL_VERBOSE);
229 RuntimeOutput.print("STR_MONITORING_HOST=" + STR_MONITORING_HOST, BOOL_VERBOSE);
230 RuntimeOutput.print("STR_ZB_GATEWAY_ADDRESS=" + STR_ZB_GATEWAY_ADDRESS, BOOL_VERBOSE);
231 RuntimeOutput.print("STR_ZB_GATEWAY_PORT=" + STR_ZB_GATEWAY_PORT, BOOL_VERBOSE);
232 RuntimeOutput.print("STR_ZB_IOTMASTER_PORT=" + STR_ZB_IOTMASTER_PORT, BOOL_VERBOSE);
233 RuntimeOutput.print("STR_NUM_CALLBACK_PORTS=" + STR_NUM_CALLBACK_PORTS, BOOL_VERBOSE);
234 RuntimeOutput.print("BOOL_VERBOSE=" + BOOL_VERBOSE, BOOL_VERBOSE);
235 RuntimeOutput.print("IoTMaster: Information extracted successfully!", BOOL_VERBOSE);
239 * A method to parse information from a config file
241 * @param strCfgFileName Config file name
242 * @param strCfgField Config file field name
245 private String parseConfigFile(String strCfgFileName, String strCfgField) {
246 // Parse configuration file
247 Properties prop = new Properties();
248 File file = new File(strCfgFileName);
249 FileInputStream fis = null;
251 fis = new FileInputStream(file);
254 } catch (IOException ex) {
255 System.out.println("IoTMaster: Error reading config file: " + strCfgFileName);
256 ex.printStackTrace();
258 System.out.println("IoTMaster: Reading " + strCfgField +
259 " from config file: " + strCfgFileName + " with value: " +
260 prop.getProperty(strCfgField, null));
261 // NULL is returned if the property isn't found
262 return prop.getProperty(strCfgField, null);
266 * A method to send files from IoTMaster
268 * @param filesocket File socket object
269 * @param sFileName File name
270 * @param lFLength File length
273 private void sendFile(Socket filesocket, String sFileName, long lFLength) throws IOException {
275 File file = new File(sFileName);
276 byte[] bytFile = new byte[toIntExact(lFLength)];
277 InputStream inFileStream = new FileInputStream(file);
279 OutputStream outFileStream = filesocket.getOutputStream();
281 while ((iCount = inFileStream.read(bytFile)) > 0) {
282 outFileStream.write(bytFile, 0, iCount);
285 RuntimeOutput.print("IoTMaster: File sent!", BOOL_VERBOSE);
289 * A method to create a thread
291 * @param sSSHCmd SSH command
294 private void createThread(String sSSHCmd) throws IOException {
296 // Start a new thread to start a new JVM
298 Runtime runtime = Runtime.getRuntime();
299 Process process = runtime.exec(sSSHCmd);
301 RuntimeOutput.print("Executing: " + sSSHCmd, BOOL_VERBOSE);
305 * A method to send command from master and receive reply from slave
307 * @params msgSend Message object
308 * @params strPurpose String that prints purpose message
309 * @params inStream Input stream
310 * @params outStream Output stream
313 private void commMasterToSlave(Message msgSend, String strPurpose,
314 ObjectInputStream inStream, ObjectOutputStream outStream)
315 throws IOException, ClassNotFoundException {
317 // Send message/command from master
318 outStream.writeObject(msgSend);
319 RuntimeOutput.print("IoTMaster: Send message: " + strPurpose, BOOL_VERBOSE);
321 // Get reply from slave as acknowledgment
322 Message msgReply = (Message) inStream.readObject();
323 RuntimeOutput.print("IoTMaster: Reply message: " + msgReply.getMessage(), BOOL_VERBOSE);
327 * A private method to instrument IoTSet device
329 * @params strFieldIdentifier String field name + object ID
330 * @params strFieldName String field name
331 * @params strIoTSlaveObjectHostAdd String slave host address
332 * @params inStream ObjectInputStream communication
333 * @params inStream ObjectOutputStream communication
336 private void instrumentIoTSetDevice(String strFieldIdentifier, String strFieldName, String strIoTSlaveObjectHostAdd,
337 ObjectInputStream inStream, ObjectOutputStream outStream)
338 throws IOException, ClassNotFoundException, InterruptedException {
340 // Get information from the set
341 List<Object[]> listObject = objAddInitHand.getFields(strFieldIdentifier);
342 // Create a new IoTSet
343 Message msgCrtIoTSet = new MessageCreateSetRelation(IoTCommCode.CREATE_NEW_IOTSET, strFieldName);
344 commMasterToSlave(msgCrtIoTSet, "Create new IoTSet for IoTDeviceAddress!", inStream, outStream);
345 int iRows = listObject.size();
346 RuntimeOutput.print("IoTMaster: Number of rows for IoTDeviceAddress: " + iRows, BOOL_VERBOSE);
347 // Transfer the address
348 for(int iRow=0; iRow<iRows; iRow++) {
349 arrFieldValues = listObject.get(iRow);
350 // Get device address - if 00:00:00:00:00:00 that means it needs the driver object address (self)
351 String strDeviceAddress = null;
352 if (arrFieldValues[0].equals(STR_SELF_MAC_ADD)) {
353 strDeviceAddress = strIoTSlaveObjectHostAdd;
355 strDeviceAddress = routerConfig.getIPFromMACAddress((String) arrFieldValues[0]);
357 int iDestDeviceDriverPort = (int) arrFieldValues[1];
358 String strProtocol = (String) arrFieldValues[2];
359 // Check for wildcard feature
360 boolean bSrcPortWildCard = false;
361 boolean bDstPortWildCard = false;
362 if (arrFieldValues.length > 3) {
363 bSrcPortWildCard = (boolean) arrFieldValues[3];
364 bDstPortWildCard = (boolean) arrFieldValues[4];
366 // Add the port connection into communication handler - if it's not assigned yet
367 if (commHan.getComPort(strDeviceAddress) == null) {
368 commHan.addPortConnection(strIoTSlaveObjectHostAdd, strDeviceAddress);
370 // Send address one by one
371 Message msgGetIoTSetObj = null;
372 if (bDstPortWildCard) {
373 String strUniqueDev = strDeviceAddress + ":" + iRow;
374 msgGetIoTSetObj = new MessageGetDeviceObject(IoTCommCode.GET_DEVICE_IOTSET_OBJECT,
375 strDeviceAddress, commHan.getAdditionalPort(strUniqueDev), iDestDeviceDriverPort, bSrcPortWildCard, bDstPortWildCard);
377 msgGetIoTSetObj = new MessageGetDeviceObject(IoTCommCode.GET_DEVICE_IOTSET_OBJECT,
378 strDeviceAddress, commHan.getComPort(strDeviceAddress), iDestDeviceDriverPort, bSrcPortWildCard, bDstPortWildCard);
379 commMasterToSlave(msgGetIoTSetObj, "Get IoTSet objects!", inStream, outStream);
381 // Reinitialize IoTSet on device object
382 commMasterToSlave(new MessageSimple(IoTCommCode.REINITIALIZE_IOTSET_FIELD),
383 "Reinitialize IoTSet fields!", inStream, outStream);
388 * A private method to instrument IoTSet Zigbee device
390 * @params Map.Entry<String,Object> Entry of map IoTSet instrumentation
391 * @params strFieldName String field name
392 * @params strIoTSlaveObjectHostAdd String slave host address
393 * @params inStream ObjectInputStream communication
394 * @params inStream ObjectOutputStream communication
397 private void instrumentIoTSetZBDevice(Map.Entry<String,Object> map, String strFieldName, String strIoTSlaveObjectHostAdd,
398 ObjectInputStream inStream, ObjectOutputStream outStream)
399 throws IOException, ClassNotFoundException, InterruptedException {
401 // Get information from the set
402 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
403 // Create a new IoTSet
404 Message msgCrtIoTSet = new MessageCreateSetRelation(IoTCommCode.CREATE_NEW_IOTSET, strFieldName);
405 commMasterToSlave(msgCrtIoTSet, "Create new IoTSet for IoTZigbeeAddress!", inStream, outStream);
406 // Prepare ZigbeeConfig
407 String strZigbeeGWAddress = routerConfig.getIPFromMACAddress(STR_ZB_GATEWAY_ADDRESS);
408 int iZigbeeGWPort = Integer.parseInt(STR_ZB_GATEWAY_PORT);
409 int iZigbeeIoTMasterPort = Integer.parseInt(STR_ZB_IOTMASTER_PORT);
410 commHan.addDevicePort(iZigbeeIoTMasterPort);
411 ZigbeeConfig zbConfig = new ZigbeeConfig(strZigbeeGWAddress, iZigbeeGWPort, iZigbeeIoTMasterPort,
413 // Add the port connection into communication handler - if it's not assigned yet
414 if (commHan.getComPort(strZigbeeGWAddress) == null) {
415 commHan.addPortConnection(strIoTSlaveObjectHostAdd, strZigbeeGWAddress);
417 int iRows = setInstrumenter.numberOfRows();
418 RuntimeOutput.print("IoTMaster: Number of rows for IoTZigbeeAddress: " + iRows, BOOL_VERBOSE);
419 // Transfer the address
420 for(int iRow=0; iRow<iRows; iRow++) {
421 arrFieldValues = setInstrumenter.fieldValues(iRow);
422 // Get device address
423 String strZBDevAddress = (String) arrFieldValues[0];
424 // Send policy to Zigbee gateway - TODO: Need to clear policy first?
425 zbConfig.setPolicy(strIoTSlaveObjectHostAdd, commHan.getComPort(strZigbeeGWAddress), strZBDevAddress);
426 // Send address one by one
427 Message msgGetIoTSetZBObj = new MessageGetSimpleDeviceObject(IoTCommCode.GET_ZB_DEV_IOTSET_OBJECT,
429 commMasterToSlave(msgGetIoTSetZBObj, "Get IoTSet objects!", inStream, outStream);
431 zbConfig.closeConnection();
432 // Reinitialize IoTSet on device object
433 commMasterToSlave(new MessageSimple(IoTCommCode.REINITIALIZE_IOTSET_FIELD),
434 "Reinitialize IoTSet fields!", inStream, outStream);
439 * A private method to instrument IoTSet of addresses
441 * @params strFieldIdentifier String field name + object ID
442 * @params strFieldName String field name
443 * @params inStream ObjectInputStream communication
444 * @params inStream ObjectOutputStream communication
447 private void instrumentIoTSetAddress(String strFieldIdentifier, String strFieldName,
448 ObjectInputStream inStream, ObjectOutputStream outStream)
449 throws IOException, ClassNotFoundException, InterruptedException {
451 // Get information from the set
452 List<Object[]> listObject = objAddInitHand.getFields(strFieldIdentifier);
453 // Create a new IoTSet
454 Message msgCrtIoTSet = new MessageCreateSetRelation(IoTCommCode.CREATE_NEW_IOTSET, strFieldName);
455 commMasterToSlave(msgCrtIoTSet, "Create new IoTSet for IoTAddress!", inStream, outStream);
456 int iRows = listObject.size();
457 RuntimeOutput.print("IoTMaster: Number of rows for IoTAddress: " + iRows, BOOL_VERBOSE);
458 // Transfer the address
459 for(int iRow=0; iRow<iRows; iRow++) {
460 arrFieldValues = listObject.get(iRow);
461 // Get device address
462 String strAddress = (String) arrFieldValues[0];
463 // Send address one by one
464 Message msgGetIoTSetAddObj = new MessageGetSimpleDeviceObject(IoTCommCode.GET_ADD_IOTSET_OBJECT,
466 commMasterToSlave(msgGetIoTSetAddObj, "Get IoTSet objects!", inStream, outStream);
468 // Reinitialize IoTSet on device object
469 commMasterToSlave(new MessageSimple(IoTCommCode.REINITIALIZE_IOTSET_FIELD),
470 "Reinitialize IoTSet fields!", inStream, outStream);
475 * A private method to instrument an object on a specific machine and setting up policies
477 * @params strFieldObjectID String field object ID
480 private void instrumentObject(String strFieldObjectID) throws IOException {
482 // Extract the interface name for RMI
483 // e.g. ProximitySensorInterface, TempSensorInterface, etc.
485 String strObjCfgFile = STR_IOT_CODE_PATH + strObjClassName + "/" + strObjClassName + STR_CFG_FILE_EXT;
486 strObjClassInterfaceName = parseConfigFile(strObjCfgFile, STR_INTERFACE_CLS_CFG);
487 strObjStubClsIntfaceName = parseConfigFile(strObjCfgFile, STR_INT_STUB_CLS_CFG);
488 // Create an object name, e.g. ProximitySensorImplPS1
489 strObjName = strObjClassName + strFieldObjectID;
490 // Check first if host exists
491 if(commHan.objectExists(strObjName)) {
492 // If this object exists already ...
493 // Re-read IoTSlave object hostname for further reference
494 strIoTSlaveObjectHostAdd = commHan.getHostAddress(strObjName);
495 RuntimeOutput.print("IoTMaster: Object with name: " + strObjName + " has existed!", BOOL_VERBOSE);
497 // If this is a new object ... then create one
498 // Get host address for IoTSlave from LoadBalancer
499 //strIoTSlaveObjectHostAdd = lbIoT.selectHost();
500 strIoTSlaveObjectHostAdd = routerConfig.getIPFromMACAddress(lbIoT.selectHost());
501 if (strIoTSlaveControllerHostAdd == null)
502 throw new Error("IoTMaster: Could not translate MAC to IP address! Please check the router's /tmp/dhcp.leases!");
503 RuntimeOutput.print("IoTMaster: Object name: " + strObjName, BOOL_VERBOSE);
504 // Add port connection and get port numbers
505 // Naming for objects ProximitySensor becomes ProximitySensor0, ProximitySensor1, etc.
506 commHan.addPortConnection(strIoTSlaveObjectHostAdd, strObjName);
507 commHan.addActiveControllerObject(strFieldObjectID, strObjName, strObjClassName, strObjClassInterfaceName,
508 strObjStubClsIntfaceName, strIoTSlaveObjectHostAdd, arrFieldValues, arrFieldClasses);
509 // ROUTING POLICY: IoTMaster and device/controller object
510 // Master-slave communication
511 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTMasterHostAdd,
512 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjName));
513 // ROUTING POLICY: Send the same routing policy to both the hosts
514 routerConfig.configureHostMainPolicies(strIoTMasterHostAdd, strIoTMasterHostAdd,
515 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjName));
516 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTMasterHostAdd,
517 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjName));
518 // Need to accommodate callback functions here - open ports for TCP
519 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveControllerHostAdd,
520 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
521 routerConfig.configureHostMainPolicies(strIoTSlaveControllerHostAdd, strIoTSlaveControllerHostAdd,
522 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
523 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveControllerHostAdd,
524 strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
525 // Instrument the IoTSet declarations inside the class file
526 instrumentObjectIoTSet(strFieldObjectID);
528 // Send routing policy to router for controller object
529 // ROUTING POLICY: RMI communication - RMI registry and stub ports
530 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
531 STR_TCP_PROTOCOL, commHan.getRMIRegPort(strObjName));
532 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
533 STR_TCP_PROTOCOL, commHan.getRMIStubPort(strObjName));
534 // Send the same set of routing policies to compute nodes
535 routerConfig.configureHostMainPolicies(strIoTSlaveControllerHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
536 STR_TCP_PROTOCOL, commHan.getRMIRegPort(strObjName));
537 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
538 STR_TCP_PROTOCOL, commHan.getRMIRegPort(strObjName));
539 routerConfig.configureHostMainPolicies(strIoTSlaveControllerHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
540 STR_TCP_PROTOCOL, commHan.getRMIStubPort(strObjName));
541 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
542 STR_TCP_PROTOCOL, commHan.getRMIStubPort(strObjName));
543 // Send the same set of routing policies for callback ports
544 setCallbackPortsPolicy(strObjName, STR_ROUTER_ADD, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
548 * A private method to set router policies for callback ports
550 * @params strRouterAdd String router address
551 * @params strIoTSlaveControllerHostAdd String slave controller host address
552 * @params strIoTSlaveObjectHostAdd String slave object host address
553 * @params strProtocol String protocol
554 * @return iPort Integer port number
556 private void setCallbackPortsPolicy(String strObjName, String strRouterAdd, String strIoTSlaveControllerHostAdd,
557 String strIoTSlaveObjectHostAdd, String strProtocol) {
559 int iNumCallbackPorts = Integer.parseInt(STR_NUM_CALLBACK_PORTS);
560 Integer[] rmiCallbackPorts = commHan.getCallbackPorts(strObjName, iNumCallbackPorts);
562 // Iterate over port numbers and set up policies
563 for (int i=0; i<iNumCallbackPorts; i++) {
564 routerConfig.configureRouterMainPolicies(strRouterAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
565 strProtocol, rmiCallbackPorts[i]);
566 routerConfig.configureHostMainPolicies(strIoTSlaveControllerHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
567 strProtocol, rmiCallbackPorts[i]);
568 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveControllerHostAdd, strIoTSlaveObjectHostAdd,
569 strProtocol, rmiCallbackPorts[i]);
574 * A private method to set router policies for IoTDeviceAddress objects
576 * @params strFieldIdentifier String field name + object ID
577 * @params Map.Entry<String,Object> Entry of map IoTSet instrumentation
578 * @params strIoTSlaveObjectHostAdd String slave host address
581 private void setRouterPolicyIoTSetDevice(String strFieldIdentifier, Map.Entry<String,Object> map,
582 String strIoTSlaveObjectHostAdd) {
584 // Get information from the set
585 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
586 int iRows = setInstrumenter.numberOfRows();
587 RuntimeOutput.print("IoTMaster: Number of rows for IoTDeviceAddress: " + iRows, BOOL_VERBOSE);
588 // Transfer the address
589 for(int iRow=0; iRow<iRows; iRow++) {
590 arrFieldValues = setInstrumenter.fieldValues(iRow);
591 objAddInitHand.addField(strFieldIdentifier, arrFieldValues); // Save this for object instantiation
592 // Get device address - if 00:00:00:00:00:00 that means it needs the driver object address (self)
593 String strDeviceAddress = null;
594 if (arrFieldValues[0].equals(STR_SELF_MAC_ADD)) {
595 strDeviceAddress = strIoTSlaveObjectHostAdd;
597 strDeviceAddress = routerConfig.getIPFromMACAddress((String) arrFieldValues[0]);
599 int iDestDeviceDriverPort = (int) arrFieldValues[1];
600 String strProtocol = (String) arrFieldValues[2];
601 // Add the port connection into communication handler - if it's not assigned yet
602 if (commHan.getComPort(strDeviceAddress) == null) {
603 commHan.addPortConnection(strIoTSlaveObjectHostAdd, strDeviceAddress);
605 boolean bDstPortWildCard = false;
606 // Recognize this and allocate different ports for it
607 if (arrFieldValues.length > 3) {
608 bDstPortWildCard = (boolean) arrFieldValues[4];
609 if (bDstPortWildCard) { // This needs a unique source port
610 String strUniqueDev = strDeviceAddress + ":" + iRow;
611 commHan.addAdditionalPort(strUniqueDev);
614 // Send routing policy to router for device drivers and devices
615 // ROUTING POLICY: RMI communication - RMI registry and stub ports
616 if((iDestDeviceDriverPort == -1) && (!strProtocol.equals(STR_NO_PROTOCOL))) {
617 // Port number -1 means that we don't set the policy strictly to port number level
618 // "nopro" = no protocol specified for just TCP or just UDP (can be both used as well)
619 // ROUTING POLICY: Device driver and device
620 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveObjectHostAdd, strDeviceAddress,
622 // ROUTING POLICY: Send to the compute node where the device driver is
623 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveObjectHostAdd,
624 strDeviceAddress, strProtocol);
625 } else if((iDestDeviceDriverPort == -1) && (strProtocol.equals(STR_NO_PROTOCOL))) {
626 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveObjectHostAdd, strDeviceAddress);
627 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveObjectHostAdd, strDeviceAddress);
628 } else if(strProtocol.equals(STR_TCPGW_PROTOCOL)) {
629 // This is a TCP protocol that connects, e.g. a phone to our runtime system
630 // that provides a gateway access (accessed through destination port number)
631 commHan.addDevicePort(iDestDeviceDriverPort);
632 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveObjectHostAdd, strDeviceAddress,
633 STR_TCP_PROTOCOL, iDestDeviceDriverPort);
634 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveObjectHostAdd, strDeviceAddress,
635 STR_TCP_PROTOCOL, iDestDeviceDriverPort);
636 routerConfig.configureRouterHTTPPolicies(STR_ROUTER_ADD, strIoTSlaveObjectHostAdd, strDeviceAddress);
637 routerConfig.configureHostHTTPPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveObjectHostAdd, strDeviceAddress);
639 // Other port numbers...
640 commHan.addDevicePort(iDestDeviceDriverPort);
641 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTSlaveObjectHostAdd, strDeviceAddress,
642 strProtocol, commHan.getComPort(strDeviceAddress), iDestDeviceDriverPort);
643 routerConfig.configureHostMainPolicies(strIoTSlaveObjectHostAdd, strIoTSlaveObjectHostAdd, strDeviceAddress,
644 strProtocol, commHan.getComPort(strDeviceAddress), iDestDeviceDriverPort);
650 * A private method to set router policies for IoTAddress objects
652 * @params strFieldIdentifier String field name + object ID
653 * @params Map.Entry<String,Object> Entry of map IoTSet instrumentation
654 * @params strHostAddress String host address
657 private void setRouterPolicyIoTSetAddress(String strFieldIdentifier, Map.Entry<String,Object> map,
658 String strHostAddress) {
660 // Get information from the set
661 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
662 int iRows = setInstrumenter.numberOfRows();
663 RuntimeOutput.print("IoTMaster: Number of rows for IoTAddress: " + iRows, BOOL_VERBOSE);
664 // Transfer the address
665 for(int iRow=0; iRow<iRows; iRow++) {
666 arrFieldValues = setInstrumenter.fieldValues(iRow);
667 objAddInitHand.addField(strFieldIdentifier, arrFieldValues); // Save this for object instantiation
668 // Get device address
669 String strAddress = (String) arrFieldValues[0];
670 // Setting up router policies for HTTP/HTTPs
671 routerConfig.configureRouterHTTPPolicies(STR_ROUTER_ADD, strHostAddress, strAddress);
672 routerConfig.configureHostHTTPPolicies(strHostAddress, strHostAddress, strAddress);
677 * A private method to instrument an object's IoTSet and IoTRelation field to up policies
679 * Mostly the IoTSet fields would contain IoTDeviceAddress objects
681 * @params strFieldObjectID String field object ID
684 private void instrumentObjectIoTSet(String strFieldObjectID) throws IOException {
686 // If this is a new object ... then create one
687 // Instrument the class source code and look for IoTSet for device addresses
688 // e.g. @config private IoTSet<IoTDeviceAddress> lb_addresses;
689 String strObjectClassNamePath = STR_IOT_CODE_PATH + strObjClassName + "/" + strObjClassName + STR_CLS_FILE_EXT;
690 FileInputStream fis = new FileInputStream(strObjectClassNamePath);
691 ClassReader cr = new ClassReader(fis);
692 ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
693 // We need Object ID to instrument IoTDeviceAddress
694 ClassRuntimeInstrumenterMaster crim = new ClassRuntimeInstrumenterMaster(cw, strFieldObjectID, BOOL_VERBOSE);
697 RuntimeOutput.print("IoTMaster: Going to instrument for " + strObjClassName + " with objectID " +
698 strFieldObjectID, BOOL_VERBOSE);
699 // Get the object and the class names
700 // Build objects for IoTSet and IoTRelation fields in the device object classes
701 mapClassNameToCrim.put(strObjClassName + strFieldObjectID, crim);
702 HashMap<String,Object> hmObjectFieldObjects = crim.getFieldObjects();
703 for(Map.Entry<String,Object> map : hmObjectFieldObjects.entrySet()) {
704 RuntimeOutput.print("IoTMaster: Object name: " + map.getValue().getClass().getName(), BOOL_VERBOSE);
705 // Iterate over HashMap and choose between processing
706 String strFieldName = map.getKey();
707 String strClassName = map.getValue().getClass().getName();
708 String strFieldIdentifier = strFieldName + strFieldObjectID;
709 if(strClassName.equals(STR_SET_INSTRUMENTER_CLS)) {
710 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
711 if(setInstrumenter.getObjTableName().equals(STR_IOT_DEV_ADD_CLS)) {
712 // Instrument the normal IoTDeviceAddress
713 setRouterPolicyIoTSetDevice(strFieldIdentifier, map, strIoTSlaveObjectHostAdd);
714 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ADD_CLS)) {
715 // Instrument the IoTAddress
716 setRouterPolicyIoTSetAddress(strFieldIdentifier, map, strIoTSlaveObjectHostAdd);
717 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ZB_ADD_CLS)) {
718 // Instrument the IoTZigbeeAddress - special feature for Zigbee device support
719 RuntimeOutput.print("IoTMaster: IoTZigbeeAddress found! No router policy is set here..",
722 String strErrMsg = "IoTMaster: Device driver object" +
723 " can only have IoTSet<IoTAddress>, IoTSet<IoTDeviceAddress>," +
724 " or IoTSet<IoTZigbeeAddress>!";
725 throw new Error(strErrMsg);
728 String strErrMsg = "IoTMaster: Device driver object can only have IoTSet for addresses!";
729 throw new Error(strErrMsg);
736 * A private method to create an object on a specific machine
738 * @params strObjName String object name
739 * @params strObjClassName String object class name
740 * @params strObjClassInterfaceName String object class interface name
741 * @params strIoTSlaveObjectHostAdd String IoTSlave host address
742 * @params strFieldObjectID String field object ID
743 * @params arrFieldValues Array of field values
744 * @params arrFieldClasses Array of field classes
747 private void createObject(String strObjName, String strObjClassName, String strObjClassInterfaceName, String strObjStubClsIntfaceName,
748 String strIoTSlaveObjectHostAdd, String strFieldObjectID, Object[] arrFieldValues, Class[] arrFieldClasses)
749 throws IOException, FileNotFoundException, ClassNotFoundException, InterruptedException {
756 start = System.currentTimeMillis();
758 // Construct ssh command line
759 // e.g. ssh rtrimana@dw-2.eecs.uci.edu cd <path>;
760 // java -cp $CLASSPATH:./*.jar
761 // -Djava.rmi.server.codebase=file:./*.jar
762 // iotruntime.IoTSlave dw-1.eecs.uci.edu 46151 23829 42874 &
763 // The In-Port for IoTMaster is the Out-Port for IoTSlave and vice versa
764 String strSSHCommand = STR_SSH_USERNAME + strIoTSlaveObjectHostAdd + " cd " + STR_RUNTIME_DIR + " sudo java " +
765 STR_CLS_PATH + " " + STR_RMI_PATH + " " + STR_RMI_HOSTNAME +
766 strIoTSlaveObjectHostAdd + " " + STR_IOT_SLAVE_CLS + " " + strIoTMasterHostAdd + " " +
767 commHan.getComPort(strObjName) + " " + commHan.getRMIRegPort(strObjName) + " " +
768 commHan.getRMIStubPort(strObjName) + " >& " + STR_LOG_FILE_PATH + strObjName + ".log &";
769 RuntimeOutput.print(strSSHCommand, BOOL_VERBOSE);
770 // Start a new thread to start a new JVM
771 createThread(strSSHCommand);
772 ServerSocket serverSocket = new ServerSocket(commHan.getComPort(strObjName));
773 Socket socket = serverSocket.accept();
774 ObjectInputStream inStream = new ObjectInputStream(socket.getInputStream());
775 ObjectOutputStream outStream = new ObjectOutputStream(socket.getOutputStream());
778 result = System.currentTimeMillis()-start;
779 System.out.println("\n\n ==> Time needed to start JVM for " + strObjName + ": " + result + "\n\n");
782 start = System.currentTimeMillis();
784 // Create message to transfer file first
785 String sFileName = strObjClassName + STR_JAR_FILE_EXT;
786 String sPath = STR_IOT_CODE_PATH + strObjClassName + "/" + sFileName;
787 File file = new File(sPath);
788 commMasterToSlave(new MessageSendFile(IoTCommCode.TRANSFER_FILE, sFileName, file.length()),
789 "Sending file!", inStream, outStream);
790 // Send file - JAR file for object creation
791 sendFile(serverSocket.accept(), sPath, file.length());
792 Message msgReply = (Message) inStream.readObject();
793 RuntimeOutput.print("IoTMaster: Reply message: " + msgReply.getMessage(), BOOL_VERBOSE);
796 result = System.currentTimeMillis()-start;
797 System.out.println("\n\n ==> Time needed to send JAR file for " + strObjName + ": " + result + "\n\n");
800 start = System.currentTimeMillis();
802 // Pack object information to create object on a IoTSlave
803 Message msgObjIoTSlave = new MessageCreateObject(IoTCommCode.CREATE_OBJECT, strIoTSlaveObjectHostAdd,
804 strObjClassName, strObjName, strObjClassInterfaceName, strObjStubClsIntfaceName, commHan.getRMIRegPort(strObjName),
805 commHan.getRMIStubPort(strObjName), arrFieldValues, arrFieldClasses);
807 commMasterToSlave(msgObjIoTSlave, "Sending object information", inStream, outStream);
808 // Instrument the class source code and look for IoTSet for device addresses
809 // e.g. @config private IoTSet<IoTDeviceAddress> lb_addresses;
810 RuntimeOutput.print("IoTMaster: Instantiating for " + strObjClassName + " with objectID " +
811 strFieldObjectID, BOOL_VERBOSE);
812 // Get the object and the class names
813 // Build objects for IoTSet and IoTRelation fields in the device object classes
814 ClassRuntimeInstrumenterMaster crim = mapClassNameToCrim.get(strObjClassName + strFieldObjectID);
815 HashMap<String,Object> hmObjectFieldObjects = crim.getFieldObjects();
816 for(Map.Entry<String,Object> map : hmObjectFieldObjects.entrySet()) {
817 RuntimeOutput.print("IoTMaster: Object name: " + map.getValue().getClass().getName(), BOOL_VERBOSE);
818 // Iterate over HashMap and choose between processing
819 String strFieldName = map.getKey();
820 String strClassName = map.getValue().getClass().getName();
821 String strFieldIdentifier = strFieldName + strFieldObjectID;
822 if(strClassName.equals(STR_SET_INSTRUMENTER_CLS)) {
823 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
824 if(setInstrumenter.getObjTableName().equals(STR_IOT_DEV_ADD_CLS)) {
825 // Instrument the normal IoTDeviceAddress
827 instrumentIoTSetDevice(strFieldIdentifier, strFieldName, strIoTSlaveObjectHostAdd, inStream, outStream);
829 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ZB_ADD_CLS)) {
830 // Instrument the IoTZigbeeAddress - special feature for Zigbee device support
832 instrumentIoTSetZBDevice(map, strFieldName, strIoTSlaveObjectHostAdd, inStream, outStream);
834 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ADD_CLS)) {
835 // Instrument the IoTAddress
837 instrumentIoTSetAddress(strFieldIdentifier, strFieldName, inStream, outStream);
840 String strErrMsg = "IoTMaster: Device driver object" +
841 " can only have IoTSet<IoTAddress>, IoTSet<IoTDeviceAddress>," +
842 " or IoTSet<IoTZigbeeAddress>!";
843 throw new Error(strErrMsg);
846 String strErrMsg = "IoTMaster: Device driver object can only have IoTSet for addresses!";
847 throw new Error(strErrMsg);
851 // TODO: Change this later
852 outStream.writeObject(new MessageSimple(IoTCommCode.END_SESSION));
855 result = System.currentTimeMillis()-start;
856 System.out.println("\n\n ==> Time needed to create object " + strObjName + " and instrument IoTDeviceAddress: " + result + "\n\n");
862 serverSocket.close();
867 * A private method to create controller objects
871 private void createControllerObjects() throws InterruptedException {
873 // Create a list of threads
874 List<Thread> threads = new ArrayList<Thread>();
875 // Get the list of active controller objects and loop it
876 List<String> listActiveControllerObject = commHan.getActiveControllerObjectList();
877 for(String strObjName : listActiveControllerObject) {
879 ObjectCreationInfo objCrtInfo = commHan.getObjectCreationInfo(strObjName);
880 Thread objectThread = new Thread(new Runnable() {
884 createObject(strObjName, objCrtInfo.getObjectClassName(), objCrtInfo.getObjectClassInterfaceName(),
885 objCrtInfo.getObjectStubClassInterfaceName(), objCrtInfo.getIoTSlaveObjectHostAdd(),
886 commHan.getFieldObjectID(strObjName), commHan.getArrayFieldValues(strObjName),
887 commHan.getArrayFieldClasses(strObjName));
888 } catch (IOException |
889 ClassNotFoundException |
890 InterruptedException ex) {
891 ex.printStackTrace();
896 threads.add(objectThread);
897 objectThread.start();
900 for (Thread thread : threads) {
903 } catch (InterruptedException ex) {
904 ex.printStackTrace();
911 * A private method to instrument IoTSet
913 * @params Map.Entry<String,Object> Entry of map IoTSet instrumentation
914 * @params strFieldName String field name
917 private void instrumentIoTSet(Map.Entry<String,Object> map, String strFieldName)
918 throws IOException, ClassNotFoundException, InterruptedException {
920 // Get information from the set
921 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
922 objInitHand.addField(strFieldName, IoTCommCode.CREATE_NEW_IOTSET);
924 int iRows = setInstrumenter.numberOfRows();
925 for(int iRow=0; iRow<iRows; iRow++) {
926 // Get field classes and values
927 arrFieldClasses = setInstrumenter.fieldClasses(iRow);
928 arrFieldValues = setInstrumenter.fieldValues(iRow);
929 // Get object ID and class name
930 String strObjID = setInstrumenter.fieldObjectID(iRow);
931 strObjClassName = setInstrumenter.fieldEntryType(strObjID);
932 // Call the method to create an object
933 instrumentObject(strObjID);
934 int iNumOfPorts = Integer.parseInt(STR_NUM_CALLBACK_PORTS);
935 objInitHand.addObjectIntoField(strFieldName, strIoTSlaveObjectHostAdd, strObjName,
936 strObjClassName, strObjClassInterfaceName, strObjStubClsIntfaceName, commHan.getRMIRegPort(strObjName),
937 commHan.getRMIStubPort(strObjName), commHan.getCallbackPorts(strObjName, iNumOfPorts));
943 * A private method to instrument IoTRelation
945 * @params Map.Entry<String,Object> Entry of map IoTRelation instrumentation
946 * @params strFieldName String field name
949 private void instrumentIoTRelation(Map.Entry<String,Object> map, String strFieldName)
950 throws IOException, ClassNotFoundException, InterruptedException {
952 // Get information from the set
953 RelationInstrumenter relationInstrumenter = (RelationInstrumenter) map.getValue();
954 int iRows = relationInstrumenter.numberOfRows();
955 objInitHand.addField(strFieldName, IoTCommCode.CREATE_NEW_IOTRELATION);
957 for(int iRow=0; iRow<iRows; iRow++) {
958 // Operate on the first set first
959 arrFieldClasses = relationInstrumenter.firstFieldClasses(iRow);
960 arrFieldValues = relationInstrumenter.firstFieldValues(iRow);
961 String strObjID = relationInstrumenter.firstFieldObjectID(iRow);
962 strObjClassName = relationInstrumenter.firstEntryFieldType(strObjID);
963 // Call the method to create an object
964 instrumentObject(strObjID);
965 // Get the first object controller host address
966 String strFirstIoTSlaveObjectHostAdd = strIoTSlaveObjectHostAdd;
967 int iNumOfPorts = Integer.parseInt(STR_NUM_CALLBACK_PORTS);
968 objInitHand.addObjectIntoField(strFieldName, strIoTSlaveObjectHostAdd, strObjName,
969 strObjClassName, strObjClassInterfaceName, strObjStubClsIntfaceName,
970 commHan.getRMIRegPort(strObjName), commHan.getRMIStubPort(strObjName),
971 commHan.getCallbackPorts(strObjName, iNumOfPorts));
972 // Operate on the second set
973 arrFieldClasses = relationInstrumenter.secondFieldClasses(iRow);
974 arrFieldValues = relationInstrumenter.secondFieldValues(iRow);
975 strObjID = relationInstrumenter.secondFieldObjectID(iRow);
976 strObjClassName = relationInstrumenter.secondEntryFieldType(strObjID);
977 // Call the method to create an object
978 instrumentObject(strObjID);
979 // Get the second object controller host address
980 String strSecondIoTSlaveObjectHostAdd = strIoTSlaveObjectHostAdd;
981 objInitHand.addSecondObjectIntoField(strFieldName, strIoTSlaveObjectHostAdd, strObjName,
982 strObjClassName, strObjClassInterfaceName, strObjStubClsIntfaceName,
983 commHan.getRMIRegPort(strObjName), commHan.getRMIStubPort(strObjName),
984 commHan.getCallbackPorts(strObjName, iNumOfPorts));
985 // ROUTING POLICY: first and second controller objects in IoTRelation
986 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strFirstIoTSlaveObjectHostAdd,
987 strSecondIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
988 // ROUTING POLICY: Send the same routing policy to both the hosts
989 routerConfig.configureHostMainPolicies(strFirstIoTSlaveObjectHostAdd, strFirstIoTSlaveObjectHostAdd,
990 strSecondIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
991 routerConfig.configureHostMainPolicies(strSecondIoTSlaveObjectHostAdd, strFirstIoTSlaveObjectHostAdd,
992 strSecondIoTSlaveObjectHostAdd, STR_TCP_PROTOCOL);
997 * A method to reinitialize IoTSet and IoTRelation in the code based on ObjectInitHandler information
999 * @params inStream ObjectInputStream communication
1000 * @params outStream ObjectOutputStream communication
1003 private void initializeSetsAndRelations(ObjectInputStream inStream, ObjectOutputStream outStream)
1004 throws IOException, ClassNotFoundException {
1005 // Get list of fields
1006 List<String> strFields = objInitHand.getListOfFields();
1007 // Iterate on HostAddress
1008 for(String str : strFields) {
1009 IoTCommCode iotcommMsg = objInitHand.getFieldMessage(str);
1010 if (iotcommMsg == IoTCommCode.CREATE_NEW_IOTSET) {
1011 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO CREATE IOTSET
1012 Message msgCrtIoTSet = new MessageCreateSetRelation(IoTCommCode.CREATE_NEW_IOTSET, str);
1013 commMasterToSlave(msgCrtIoTSet, "Create new IoTSet!", inStream, outStream);
1014 List<ObjectInitInfo> listObject = objInitHand.getListObjectInitInfo(str);
1015 for (ObjectInitInfo objInitInfo : listObject) {
1016 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO FILL IN IOTSET
1017 commMasterToSlave(new MessageGetObject(IoTCommCode.GET_IOTSET_OBJECT, objInitInfo.getIoTSlaveObjectHostAdd(),
1018 objInitInfo.getObjectName(), objInitInfo.getObjectClassName(), objInitInfo.getObjectClassInterfaceName(),
1019 objInitInfo.getObjectStubClassInterfaceName(), objInitInfo.getRMIRegistryPort(), objInitInfo.getRMIStubPort(),
1020 objInitInfo.getRMICallbackPorts()), "Get IoTSet object!", inStream, outStream);
1022 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO REINITIALIZE IOTSET FIELD
1023 commMasterToSlave(new MessageSimple(IoTCommCode.REINITIALIZE_IOTSET_FIELD),
1024 "Renitialize IoTSet field!", inStream, outStream);
1025 } else if (iotcommMsg == IoTCommCode.CREATE_NEW_IOTRELATION) {
1026 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO CREATE IOTRELATION
1027 Message msgCrtIoTRel = new MessageCreateSetRelation(IoTCommCode.CREATE_NEW_IOTRELATION, str);
1028 commMasterToSlave(msgCrtIoTRel, "Create new IoTRelation!", inStream, outStream);
1029 List<ObjectInitInfo> listObject = objInitHand.getListObjectInitInfo(str);
1030 List<ObjectInitInfo> listSecondObject = objInitHand.getSecondObjectInitInfo(str);
1031 Iterator it = listSecondObject.iterator();
1032 for (ObjectInitInfo objInitInfo : listObject) {
1033 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO FILL IN IOTRELATION (FIRST OBJECT)
1034 commMasterToSlave(new MessageGetObject(IoTCommCode.GET_IOTRELATION_FIRST_OBJECT,
1035 objInitInfo.getIoTSlaveObjectHostAdd(), objInitInfo.getObjectName(), objInitInfo.getObjectClassName(),
1036 objInitInfo.getObjectClassInterfaceName(), objInitInfo.getObjectStubClassInterfaceName(),
1037 objInitInfo.getRMIRegistryPort(), objInitInfo.getRMIStubPort(), objInitInfo.getRMICallbackPorts()),
1038 "Get IoTRelation first object!", inStream, outStream);
1039 ObjectInitInfo objSecObj = (ObjectInitInfo) it.next();
1040 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO FILL IN IOTRELATION (SECOND OBJECT)
1041 commMasterToSlave(new MessageGetObject(IoTCommCode.GET_IOTRELATION_SECOND_OBJECT,
1042 objSecObj.getIoTSlaveObjectHostAdd(), objSecObj.getObjectName(), objSecObj.getObjectClassName(),
1043 objSecObj.getObjectClassInterfaceName(), objSecObj.getObjectStubClassInterfaceName(),
1044 objSecObj.getRMIRegistryPort(), objSecObj.getRMIStubPort(), objSecObj.getRMICallbackPorts()),
1045 "Get IoTRelation second object!", inStream, outStream);
1047 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO REINITIALIZE IOTRELATION FIELD
1048 commMasterToSlave(new MessageSimple(IoTCommCode.REINITIALIZE_IOTRELATION_FIELD),
1049 "Renitialize IoTRelation field!", inStream, outStream);
1055 * A method to set router basic policies at once
1057 * @param strRouter String router name
1060 private void setRouterBasicPolicies(String strRouter) {
1062 String strMonitorHost = routerConfig.getIPFromMACAddress(STR_MONITORING_HOST);
1063 routerConfig.configureRouterICMPPolicies(strRouter, strMonitorHost);
1064 routerConfig.configureRouterDHCPPolicies(strRouter);
1065 routerConfig.configureRouterDNSPolicies(strRouter);
1066 routerConfig.configureRouterSSHPolicies(strRouter, strMonitorHost);
1067 routerConfig.configureRejectPolicies(strRouter);
1071 * A method to set host basic policies at once
1073 * @param strHost String host name
1076 private void setHostBasicPolicies(String strHost) {
1078 String strMonitorHost = routerConfig.getIPFromMACAddress(STR_MONITORING_HOST);
1079 routerConfig.configureHostDHCPPolicies(strHost);
1080 routerConfig.configureHostDNSPolicies(strHost);
1081 if (strHost.equals(strMonitorHost)) {
1082 // Check if this is the monitoring host
1083 routerConfig.configureHostICMPPolicies(strHost);
1084 routerConfig.configureHostSSHPolicies(strHost);
1086 routerConfig.configureHostICMPPolicies(strHost, strMonitorHost);
1087 routerConfig.configureHostSSHPolicies(strHost, strMonitorHost);
1089 // Apply SQL allowance policies to master host
1090 if (strHost.equals(strIoTMasterHostAdd)) {
1091 routerConfig.configureHostSQLPolicies(strHost);
1093 routerConfig.configureRejectPolicies(strHost);
1097 * A method to create a thread for policy deployment
1099 * @param strRouterAddress String router address to configure
1100 * @param setHostAddresses Set of strings for host addresses to configure
1103 private void createPolicyThreads(String strRouterAddress, Set<String> setHostAddresses) throws IOException {
1105 // Create a list of threads
1106 List<Thread> threads = new ArrayList<Thread>();
1107 // Start threads for hosts
1108 for(String strAddress : setHostAddresses) {
1109 Thread policyThread = new Thread(new Runnable() {
1111 synchronized(this) {
1112 routerConfig.sendHostPolicies(strAddress);
1116 threads.add(policyThread);
1117 policyThread.start();
1118 RuntimeOutput.print("Deploying policies for: " + strAddress, BOOL_VERBOSE);
1120 // A thread for router
1121 Thread policyThread = new Thread(new Runnable() {
1123 synchronized(this) {
1124 routerConfig.sendRouterPolicies(strRouterAddress);
1128 threads.add(policyThread);
1129 policyThread.start();
1130 RuntimeOutput.print("Deploying policies on router: " + strRouterAddress, BOOL_VERBOSE);
1132 for (Thread thread : threads) {
1135 } catch (InterruptedException ex) {
1136 ex.printStackTrace();
1143 * A method to assign objects to multiple JVMs, including
1144 * the controller/device object that uses other objects
1145 * in IoTSet and IoTRelation
1149 private void createObjects() {
1156 // Extract hostname for this IoTMaster from MySQL DB
1157 strIoTMasterHostAdd = routerConfig.getIPFromMACAddress(STR_MASTER_MAC_ADD);
1158 // Loop as we can still find controller/device classes
1159 for(int i=0; i<strObjectNames.length; i++) {
1161 start = System.currentTimeMillis();
1163 // Assign a new list of PrintWriter objects
1164 routerConfig.renewPrintWriter();
1165 // Get controller names one by one
1166 String strObjControllerName = strObjectNames[i];
1167 // Use LoadBalancer to assign a host address
1168 //strIoTSlaveControllerHostAdd = lbIoT.selectHost();
1169 strIoTSlaveControllerHostAdd = routerConfig.getIPFromMACAddress(lbIoT.selectHost());
1170 if (strIoTSlaveControllerHostAdd == null)
1171 throw new Error("IoTMaster: Could not translate MAC to IP address! Please check the router's /tmp/dhcp.leases!");
1172 // == START INITIALIZING CONTROLLER/DEVICE IOTSLAVE ==
1173 // Add port connection and get port numbers
1174 // Naming for objects ProximitySensor becomes ProximitySensor0, ProximitySensor1, etc.
1175 commHan.addPortConnection(strIoTSlaveControllerHostAdd, strObjControllerName);
1176 // ROUTING POLICY: IoTMaster and main controller object
1177 routerConfig.configureRouterMainPolicies(STR_ROUTER_ADD, strIoTMasterHostAdd,
1178 strIoTSlaveControllerHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjControllerName));
1179 // ROUTING POLICY: Send the same routing policy to both the hosts
1180 routerConfig.configureHostMainPolicies(strIoTMasterHostAdd, strIoTMasterHostAdd,
1181 strIoTSlaveControllerHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjControllerName));
1182 routerConfig.configureHostMainPolicies(strIoTSlaveControllerHostAdd, strIoTMasterHostAdd,
1183 strIoTSlaveControllerHostAdd, STR_TCP_PROTOCOL, commHan.getComPort(strObjControllerName));
1185 // Construct ssh command line and create a controller thread for e.g. AcmeProximity
1186 String strSSHCommand = STR_SSH_USERNAME + strIoTSlaveControllerHostAdd + " cd " +
1187 STR_RUNTIME_DIR + " sudo java " + STR_CLS_PATH + " " +
1188 STR_RMI_PATH + " " + STR_IOT_SLAVE_CLS + " " + strIoTMasterHostAdd + " " +
1189 commHan.getComPort(strObjControllerName) + " " +
1190 commHan.getRMIRegPort(strObjControllerName) + " " +
1191 commHan.getRMIStubPort(strObjControllerName) + " >& " +
1192 STR_LOG_FILE_PATH + strObjControllerName + ".log &";
1193 RuntimeOutput.print(strSSHCommand, BOOL_VERBOSE);
1194 createThread(strSSHCommand);
1195 // Wait for connection
1196 // Create a new socket for communication
1197 ServerSocket serverSocket = new ServerSocket(commHan.getComPort(strObjControllerName));
1198 Socket socket = serverSocket.accept();
1199 ObjectInputStream inStream = new ObjectInputStream(socket.getInputStream());
1200 ObjectOutputStream outStream = new ObjectOutputStream(socket.getOutputStream());
1201 RuntimeOutput.print("IoTMaster: Communication established!", BOOL_VERBOSE);
1204 result = System.currentTimeMillis()-start;
1205 System.out.println("\n\n ==> From start until after SSH for main controller: " + result);
1207 start = System.currentTimeMillis();
1209 // Send files for every controller class
1210 // e.g. AcmeProximity.jar and AcmeProximity.zip
1211 String strControllerClassName = strObjControllerName + STR_CLS_FILE_EXT;
1212 String strControllerClassNamePath = STR_CONT_PATH + strObjControllerName + "/" +
1213 strControllerClassName;
1215 String strControllerJarName = strObjControllerName + STR_JAR_FILE_EXT;
1216 String strControllerJarNamePath = STR_CONT_PATH + strObjControllerName + "/" +
1217 strControllerJarName;
1218 File file = new File(strControllerJarNamePath);
1219 commMasterToSlave(new MessageSendFile(IoTCommCode.TRANSFER_FILE, strControllerJarName, file.length()),
1220 "Sending file!", inStream, outStream);
1221 // Send file - Class file for object creation
1222 sendFile(serverSocket.accept(), strControllerJarNamePath, file.length());
1223 Message msgReply = (Message) inStream.readObject();
1224 RuntimeOutput.print("IoTMaster: Reply message: " + msgReply.getMessage(), BOOL_VERBOSE);
1225 // Send .zip file if additional zip file is specified
1226 String strObjCfgFile = strObjControllerName + STR_CFG_FILE_EXT;
1227 String strObjCfgFilePath = STR_CONT_PATH + strObjControllerName + "/" + strObjCfgFile;
1228 String strAdditionalFile = parseConfigFile(strObjCfgFilePath, STR_FILE_TRF_CFG);
1229 if (strAdditionalFile.equals(STR_YES)) {
1230 String strControllerCmpName = strObjControllerName + STR_ZIP_FILE_EXT;
1231 String strControllerCmpNamePath = STR_CONT_PATH + strObjControllerName + "/" +
1232 strControllerCmpName;
1233 file = new File(strControllerCmpNamePath);
1234 commMasterToSlave(new MessageSendFile(IoTCommCode.TRANSFER_FILE, strControllerCmpName, file.length()),
1235 "Sending file!", inStream, outStream);
1236 // Send file - Class file for object creation
1237 sendFile(serverSocket.accept(), strControllerCmpNamePath, file.length());
1238 msgReply = (Message) inStream.readObject();
1239 RuntimeOutput.print("IoTMaster: Reply message: " + msgReply.getMessage(), BOOL_VERBOSE);
1241 // Create main controller/device object
1242 commMasterToSlave(new MessageCreateMainObject(IoTCommCode.CREATE_MAIN_OBJECT, strObjControllerName),
1243 "Create main object!", inStream, outStream);
1246 result = System.currentTimeMillis()-start;
1247 System.out.println("\n\n ==> From IoTSlave start until main controller object is created: " + result);
1248 System.out.println(" ==> Including file transfer times!\n\n");
1250 start = System.currentTimeMillis();
1252 // == END INITIALIZING CONTROLLER/DEVICE IOTSLAVE ==
1253 // Instrumenting one file
1254 RuntimeOutput.print("IoTMaster: Opening class file: " + strControllerClassName, BOOL_VERBOSE);
1255 RuntimeOutput.print("IoTMaster: Class file path: " + strControllerClassNamePath, BOOL_VERBOSE);
1256 FileInputStream fis = new FileInputStream(strControllerClassNamePath);
1257 ClassReader cr = new ClassReader(fis);
1258 ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
1259 ClassRuntimeInstrumenterMaster crim = new ClassRuntimeInstrumenterMaster(cw, null, BOOL_VERBOSE);
1262 // Get the object and the class names
1263 // Build objects for IoTSet and IoTRelation fields in the controller/device classes
1264 HashMap<String,Object> hmControllerFieldObjects = crim.getFieldObjects();
1265 for(Map.Entry<String,Object> map : hmControllerFieldObjects.entrySet()) {
1266 RuntimeOutput.print("IoTMaster: Object name: " + map.getValue().getClass().getName(), BOOL_VERBOSE);
1267 // Iterate over HashMap and choose between processing
1268 // SetInstrumenter vs. RelationInstrumenter
1269 String strFieldName = map.getKey();
1270 String strClassName = map.getValue().getClass().getName();
1271 if(strClassName.equals(STR_SET_INSTRUMENTER_CLS)) {
1272 SetInstrumenter setInstrumenter = (SetInstrumenter) map.getValue();
1273 if(setInstrumenter.getObjTableName().equals(STR_IOT_DEV_ADD_CLS)) {
1274 String strErrMsg = "IoTMaster: Controller object" +
1275 " cannot have IoTSet<IoTDeviceAddress>!";
1276 throw new Error(strErrMsg);
1277 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ZB_ADD_CLS)) {
1278 String strErrMsg = "IoTMaster: Controller object" +
1279 " cannot have IoTSet<ZigbeeAddress>!";
1280 throw new Error(strErrMsg);
1281 } else if(setInstrumenter.getObjTableName().equals(STR_IOT_ADD_CLS)) {
1282 // Instrument the IoTAddress
1283 setRouterPolicyIoTSetAddress(strFieldName, map, strIoTSlaveControllerHostAdd);
1284 instrumentIoTSetAddress(strFieldName, strFieldName, inStream, outStream);
1287 instrumentIoTSet(map, strFieldName);
1289 } else if (strClassName.equals(STR_REL_INSTRUMENTER_CLS)) {
1290 instrumentIoTRelation(map, strFieldName);
1294 result = System.currentTimeMillis()-start;
1295 System.out.println("\n\n ==> Time needed to instrument device driver objects: " + result + "\n\n");
1296 System.out.println(" ==> #Objects: " + commHan.getActiveControllerObjectList().size() + "\n\n");
1299 start = System.currentTimeMillis();
1301 // ROUTING POLICY: Deploy basic policies if this is the last controller
1302 if (i == strObjectNames.length-1) {
1303 // ROUTING POLICY: implement basic policies to reject all other irrelevant traffics
1304 for(String s: commHan.getHosts()) {
1305 setHostBasicPolicies(s);
1307 // We retain all the basic policies for router,
1308 // but we delete the initial allowance policies for internal all TCP and UDP communications
1309 setRouterBasicPolicies(STR_ROUTER_ADD);
1311 // Close access to policy files and deploy policies
1312 routerConfig.close();
1313 // Deploy the policy
1314 HashSet<String> setAddresses = new HashSet<String>(commHan.getHosts());
1315 setAddresses.add(strIoTMasterHostAdd);
1316 createPolicyThreads(STR_ROUTER_ADD, setAddresses);
1319 result = System.currentTimeMillis()-start;
1320 System.out.println("\n\n ==> Time needed to send policy files and deploy them : " + result + "\n\n");
1323 start = System.currentTimeMillis();
1325 // Separating object creations and Set/Relation initializations
1326 createControllerObjects();
1329 result = System.currentTimeMillis()-start;
1330 System.out.println("\n\n ==> Time needed to instantiate objects: " + result + "\n\n");
1332 start = System.currentTimeMillis();
1334 // Sets and relations initializations
1335 initializeSetsAndRelations(inStream, outStream);
1338 result = System.currentTimeMillis()-start;
1339 System.out.println("\n\n ==> Time needed to initialize sets and relations: " + result + "\n\n");
1341 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO EXECUTE INIT METHOD
1342 commMasterToSlave(new MessageSimple(IoTCommCode.INVOKE_INIT_METHOD),
1343 "Invoke init() method!", inStream, outStream);
1344 // == COMMUNICATION WITH IOTSLAVE CONTROLLER TO END PROCESS
1345 outStream.writeObject(new MessageSimple(IoTCommCode.END_SESSION));
1349 serverSocket.close();
1350 commHan.printLists();
1351 lbIoT.printHostInfo();
1354 } catch (IOException |
1355 InterruptedException |
1356 ClassNotFoundException ex) {
1357 System.out.println("IoTMaster: Exception: "
1359 ex.printStackTrace();
1363 public static void main(String args[]) {
1365 // Detect the available controller/device classes
1366 // Input args[] should be used to list the controllers/devices
1367 // e.g. java IoTMaster AcmeProximity AcmeThermostat AcmeVentController
1368 IoTMaster iotMaster = new IoTMaster(args);
1370 iotMaster.parseIoTMasterConfigFile();
1371 // Initialize CommunicationHandler, LoadBalancer, and RouterConfig
1372 iotMaster.initLiveDataStructure();
1374 iotMaster.createObjects();