System.out.println("Couldn't find the signature engine\n");
}
- /*
- switch (engineNo) {
- case 4:
- sig = computeSignature(engineNo,mail);
- break;
- case 8:
- sig = computeSignature(engineNo,mail);
- break;
- default:
- System.out.println("Couldn't find the signature engine\n");
- //sig = computeSignature(engineNo,curPart.getCleaned());
- break;
- }//switch engineNo
- */
-
- if (sig != null && sig.length() > 0) {
+ if (sig != null) {
String hash = engineNo + ":" + sig;
printableSigs.addElement(hash);
} else {
- /* we didn't produce a signature for the mail. */
+ // we didn't produce a signature for the mail.
}
}//engine
}//each emails part
*/
private String computeSignature(int engineNo, String mail) {
if(engineNo==4) {
- String s1 = this.sig4.computeSignature(mail);
+ //String s1 = this.sig4.computeSignature(mail);
+ return this.sig4.computeSignature(mail);
//return new String { this.sig4.computeSignature(mail) };
}
+ /*
if(engineNo==8) {
//String cleanedButKeepHTML = Preprocessor.preprocess(mail,Preprocessor.ConfigParams.NO_DEHTML);
//return this.sig8.computeSignature(cleanedButKeepHTML);
return this.sig8.computeSignature(mail);
}
+ */
return null;
-
- /*
- switch (engineNo) {
- case 4:
- return new String { this.sig4.computeSignature(mail) };
- case 8:
- //TODO device and equivalent for this
- //String cleanedButKeepHTML = Preprocessor.preprocess(mail,Preprocessor.ConfigParams.NO_DEHTML);
- return this.sig8.computeSignature(cleanedButKeepHTML);
- default:
- return null;
- }
- */
}
}
public class SpamFilter extends Thread {
DistributedHashMap mydhmap;
+
int id; //thread id
+
+ /**
+ * Total number of iterations
+ **/
int numiter;
+
+ /**
+ * Total number of emails
+ **/
int numemail;
+
/**
* Total number of threads
**/
}
- public SpamFilter(int numiter, int numemail,int id) {
+ public SpamFilter(int numiter, int numemail,int id, DistributedHashMap mydhmap, int nthreads) {
this.numiter=numiter;
this.numemail=numemail;
this.id = id;
+ this.mydhmap = mydhmap;
+ this.nthreads = nthreads;
}
public void run() {
int pickemail = rand.nextInt(100);
Mail email = new Mail("emails/email"+pickemail);
//Mail email = getEmail(pickemail);
- boolean filterAnswer = checkMail(email, thid);
+ Vector signatures = email.checkMail(thid);
+ //check with global data structure
+ int[] confidenceVals=null;
+ atomic {
+ confidenceVals = check(signatures,thid);
+ }
+
+ //---- create and return results --------
+ FilterResult filterResult = new FilterResult();
+ boolean filterAnswer = filterResult.getResult(confidenceVals);
+
boolean userAnswer = email.getIsSpam();
if(filterAnswer != userAnswer) {
- sendFeedBack(email, userAnswer);
+ atomic {
+ sendFeedBack(email, userAnswer, thid);
+ }
}
- }
- }
+ } //end num emails
+ }//end num iter
}
public static void main(String[] args) {
- int nthreads;
int[] mid = new int[8];
mid[0] = (128<<24)|(195<<16)|(136<<8)|162; //dc-1.calit2
mid[1] = (128<<24)|(195<<16)|(136<<8)|163; //dc-2.calit2
mid[6] = (128<<24)|(195<<16)|(136<<8)|168; //dc-7.calit2
mid[7] = (128<<24)|(195<<16)|(136<<8)|169; //dc-8.calit2
-
-
- //Read options from command prompt
+ //Read options from command prompt
SpamFilter sf = new SpamFilter();
SpamFilter.parseCmdLine(args, sf);
+ int nthreads = sf.nthreads;
Random rand = new Random(8);
//Randomly set Spam vals for each email
for(int i=0; i<sf.numemail; i++) {
Mail email = new Mail("./emails/email"+i);
int spamval = rand.nextInt(100);
- if(spamval<90) { //assume 90% are not spam and rest are spam
+ if(spamval<60) { //assume 60% are spam and rest are ham
email.setIsSpam(false);
} else {
email.setIsSpam(true);
}
//Create Global data structure
-
DistributedHashMap dhmap;
+ SpamFilter[] spf;
atomic {
dhmap = global new DistributedHashMap(500, 0.75f);
+ spf = global new SpamFilter[nthreads];
+ for(int i=0; i<nthreads; i++) {
+ spf[i] = global new SpamFilter(sf.numiter, sf.numemail, i, dhmap, nthreads);
+ }
+ }
+
+ /* ---- Start Threads ---- */
+ SpamFilter tmp;
+ for(int i = 0; i<nthreads; i++) {
+ atomic {
+ tmp = spf[i];
+ }
+ tmp.start(mid[i]);
+ }
+
+ /* ---- Join threads----- */
+ for(int i = 0; i<nthreads; i++) {
+ atomic {
+ tmp = spf[i];
+ }
+ tmp.join();
}
- //3. N times iteration of work that needs to be done
- // by each client
+ System.out.println("Finished");
}
public static void parseCmdLine(String args[], SpamFilter sf) {
/**
* Returns result to the Spam filter
**/
+ /*
public boolean checkMail(Mail mail, int userid) {
//Preprocess emails
//Vector partsOfMailStrings = mail.createMailStringsWithURL();
+ /*
Vector partsOfMailStrings = mail.getCommonPart();
partsOfMailStrings.addElement(mail.getBodyString());
return spam;
}
+ */
public int[] check(Vector signatures, int userid) {
int numparts = signatures.size();
for(int i=0; i<numparts; i++) {
String part = (String)(signatures.elementAt(i));
char tmpengine = part.charAt(0);
- String engine = new String(tmpengine);
- String signature = part.substring(2); //a:b index(a)=0, index(:)=1, index(b)=2
- HashEntry myhe = new HashEntry();
+ String engine = global new String(tmpengine);
+ String signature = global new String(part.substring(2));
+ //String signature = part.substring(2); //a:b index(a)=0, index(:)=1, index(b)=2
+ HashEntry myhe = global new HashEntry();
myhe.setengine(engine);
myhe.setsig(signature);
//find object in distributedhashMap: if no object then add object
//else read object
-
if(!mydhmap.containsKey(myhe)) {
//add new object
- myhe.stats = new HashStat();
+ myhe.stats = global new HashStat();
myhe.stats.setuser(userid, 0, 0, -1);
- FilterStatistic fs = new FilterStatistic(0,0,-1);
+ FilterStatistic fs = global new FilterStatistic(0,0,-1);
mydhmap.put(myhe, fs);
} else {
// ----- now connect to global data structure and ask for spam -----
- HashEntry tmphe = mydhmap.getKey(myhe);
- confidenceVals[i] = tmphe.askForSpam();
+ HashEntry tmphe = (HashEntry)(mydhmap.getKey(myhe));
+ FilterStatistic fs = (FilterStatistic) (mydhmap.get(myhe)); //get the value from hash
+ confidenceVals[i] = fs.getChecked();
}
}
for(int i=0;i<signatures.size();i++) {
String part = (String)(signatures.elementAt(i));
char tmpengine = part.charAt(0);
- String engine = new String(tmpengine);
- String signature = part.substring(2); //a:b index(a)=0, index(:)=1, index(b)=2
- HashEntry myhe = new HashEntry();
+ String engine = global new String(tmpengine);
+ String signature = global new String(part.substring(2));
+ //String signature = part.substring(2); //a:b index(a)=0, index(:)=1, index(b)=2
+ HashEntry myhe = global new HashEntry();
myhe.setengine(engine);
myhe.setsig(signature);
// ----- now connect to global data structure and upate spam count -----
- HashEntry tmphe = mydhmap.getKey(myhe);
+ HashEntry tmphe = (HashEntry)(mydhmap.getKey(myhe));
if(tmphe.stats.userid[id] != 1) {
tmphe.stats.setuserid(id);
}
+ FilterStatistic fs = (FilterStatistic) (mydhmap.get(myhe)); //get the value from hash
+
//Increment spam or ham value
if(isSpam) {
-
-
+ tmphe.stats.incSpamCount(id);
+ fs.increaseSpam();
} else {
-
+ tmphe.stats.incHamCount(id);
+ fs.increaseHam();
}
}
}
}
+
+