View Javadoc
1 package org.smartcomps.twister.engine.priv; 2 3 import org.smartcomps.twister.engine.TwisterEngine; 4 import org.smartcomps.twister.engine.exception.CorrelationViolationException; 5 import org.smartcomps.twister.engine.exception.ConflictingReceiveException; 6 import org.smartcomps.twister.engine.priv.messaging.MessagingFactory; 7 import org.smartcomps.twister.engine.priv.messaging.impl.MessageControllerImpl; 8 import org.smartcomps.twister.engine.priv.messaging.impl.CoreWrappingFactory; 9 import org.smartcomps.twister.engine.priv.core.dynamic.ReceiveEC; 10 import org.smartcomps.twister.common.event.EventManager; 11 import org.smartcomps.twister.common.transaction.TransactionManager; 12 import org.smartcomps.twister.common.transaction.TransactionException; 13 import org.smartcomps.twister.common.persistence.DBSessionException; 14 import org.smartcomps.twister.common.persistence.FinderException; 15 16 import org.dom4j.Document; 17 import org.apache.commons.logging.Log; 18 import org.apache.commons.logging.LogFactory; 19 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 20 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; 21 22 /*** 23 * Default engine implementation, finds the receive or pick execution concerned by the 24 * message and make it acknowledge the message. 25 */ 26 public class TwisterEngineImpl implements TwisterEngine { 27 28 private PooledExecutor threadPool = new PooledExecutor(new BoundedBuffer(10), 100); 29 protected static Log log = LogFactory.getLog(MessageControllerImpl.class); 30 31 public int acknowledge(String partnerLink, String portType, String operation, Document message) { 32 ReceiveEC msgTarget = null; 33 int errorCode = -1; 34 35 try { 36 TransactionManager.beginTransaction(); 37 } catch (TransactionException e) { 38 log.error("Could not initialize transaction to find a message target", e); 39 EventManager.fireError(e); 40 return TwisterEngine.SERVER_FAILURE; 41 } 42 43 try { 44 msgTarget = MessagingFactory.getMessageController().acknowledge(partnerLink, portType, operation, message); 45 } catch (CorrelationViolationException e) { 46 errorCode = TwisterEngine.CORRELATION_VIOLATION; 47 EventManager.fireError(e); 48 } catch (ConflictingReceiveException e) { 49 errorCode = TwisterEngine.CONFLICTING_RECEIVE_FAILURE; 50 EventManager.fireError(e); 51 } catch (Exception e) { 52 errorCode = TwisterEngine.SERVER_FAILURE; 53 EventManager.fireError(e); 54 } 55 errorCode = TwisterEngine.ACKNOWLEDGED; 56 57 System.out.println("Found message target : " + msgTarget); 58 59 if (msgTarget != null) { 60 try { 61 TransactionManager.commitTransaction(); 62 } catch (TransactionException e) { 63 log.error("Could not commit after finding a message target", e); 64 EventManager.fireError(e); 65 return TwisterEngine.SERVER_FAILURE; 66 } 67 68 // References must be final in this scope to be passed to the inner class 69 final ReceiveEC msgTargetRef = msgTarget; 70 final Document messageRef = message; 71 // Forward message to the targeted execution context running in a separate thread 72 try { 73 threadPool.execute(new Runnable() { 74 public void run() { 75 try { 76 TransactionManager.beginTransaction(); 77 } catch (TransactionException e) { 78 log.error("Could not initialize transaction to acknowledge a message", e); 79 EventManager.fireError(e); 80 } 81 ReceiveEC receiveEC = null; 82 try { 83 receiveEC = CoreWrappingFactory.reload(msgTargetRef); 84 } catch (DBSessionException e) { 85 log.error("A database error occured when forwarding a message to a receive execution context", e); 86 EventManager.fireError(e); 87 } catch (FinderException e) { 88 log.error("Could not find target execution context when forwarding a message to a receive execution context", e); 89 EventManager.fireError(e); 90 } 91 receiveEC.acknowledgeMessage(messageRef); 92 try { 93 TransactionManager.commitTransaction(); 94 } catch (TransactionException e) { 95 log.error("Could not commit transaction after acknowledging a message", e); 96 EventManager.fireError(e); 97 } 98 } 99 }); 100 } catch (InterruptedException e) { 101 log.error("Thread interrupted exception ?!", e); 102 return TwisterEngine.SERVER_FAILURE; 103 } 104 } 105 // Create the response 106 107 return errorCode; 108 } 109 110 public PooledExecutor getThreadPool() { 111 return threadPool; 112 } 113 }

This page was automatically generated by Maven