1 package org.smartcomps.twister.engine.priv;
2
3 import org.smartcomps.twister.engine.TwisterEngine;
4 import org.smartcomps.twister.engine.exception.CorrelationViolationException;
5 import org.smartcomps.twister.engine.exception.ConflictingReceiveException;
6 import org.smartcomps.twister.engine.priv.messaging.MessagingFactory;
7 import org.smartcomps.twister.engine.priv.messaging.impl.MessageControllerImpl;
8 import org.smartcomps.twister.engine.priv.messaging.impl.CoreWrappingFactory;
9 import org.smartcomps.twister.engine.priv.core.dynamic.ReceiveEC;
10 import org.smartcomps.twister.common.event.EventManager;
11 import org.smartcomps.twister.common.transaction.TransactionManager;
12 import org.smartcomps.twister.common.transaction.TransactionException;
13 import org.smartcomps.twister.common.persistence.DBSessionException;
14 import org.smartcomps.twister.common.persistence.FinderException;
15
16 import org.dom4j.Document;
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
20 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
21
22 /***
23 * Default engine implementation, finds the receive or pick execution concerned by the
24 * message and make it acknowledge the message.
25 */
26 public class TwisterEngineImpl implements TwisterEngine {
27
28 private PooledExecutor threadPool = new PooledExecutor(new BoundedBuffer(10), 100);
29 protected static Log log = LogFactory.getLog(MessageControllerImpl.class);
30
31 public int acknowledge(String partnerLink, String portType, String operation, Document message) {
32 ReceiveEC msgTarget = null;
33 int errorCode = -1;
34
35 try {
36 TransactionManager.beginTransaction();
37 } catch (TransactionException e) {
38 log.error("Could not initialize transaction to find a message target", e);
39 EventManager.fireError(e);
40 return TwisterEngine.SERVER_FAILURE;
41 }
42
43 try {
44 msgTarget = MessagingFactory.getMessageController().acknowledge(partnerLink, portType, operation, message);
45 } catch (CorrelationViolationException e) {
46 errorCode = TwisterEngine.CORRELATION_VIOLATION;
47 EventManager.fireError(e);
48 } catch (ConflictingReceiveException e) {
49 errorCode = TwisterEngine.CONFLICTING_RECEIVE_FAILURE;
50 EventManager.fireError(e);
51 } catch (Exception e) {
52 errorCode = TwisterEngine.SERVER_FAILURE;
53 EventManager.fireError(e);
54 }
55 errorCode = TwisterEngine.ACKNOWLEDGED;
56
57 System.out.println("Found message target : " + msgTarget);
58
59 if (msgTarget != null) {
60 try {
61 TransactionManager.commitTransaction();
62 } catch (TransactionException e) {
63 log.error("Could not commit after finding a message target", e);
64 EventManager.fireError(e);
65 return TwisterEngine.SERVER_FAILURE;
66 }
67
68 // References must be final in this scope to be passed to the inner class
69 final ReceiveEC msgTargetRef = msgTarget;
70 final Document messageRef = message;
71 // Forward message to the targeted execution context running in a separate thread
72 try {
73 threadPool.execute(new Runnable() {
74 public void run() {
75 try {
76 TransactionManager.beginTransaction();
77 } catch (TransactionException e) {
78 log.error("Could not initialize transaction to acknowledge a message", e);
79 EventManager.fireError(e);
80 }
81 ReceiveEC receiveEC = null;
82 try {
83 receiveEC = CoreWrappingFactory.reload(msgTargetRef);
84 } catch (DBSessionException e) {
85 log.error("A database error occured when forwarding a message to a receive execution context", e);
86 EventManager.fireError(e);
87 } catch (FinderException e) {
88 log.error("Could not find target execution context when forwarding a message to a receive execution context", e);
89 EventManager.fireError(e);
90 }
91 receiveEC.acknowledgeMessage(messageRef);
92 try {
93 TransactionManager.commitTransaction();
94 } catch (TransactionException e) {
95 log.error("Could not commit transaction after acknowledging a message", e);
96 EventManager.fireError(e);
97 }
98 }
99 });
100 } catch (InterruptedException e) {
101 log.error("Thread interrupted exception ?!", e);
102 return TwisterEngine.SERVER_FAILURE;
103 }
104 }
105 // Create the response
106
107 return errorCode;
108 }
109
110 public PooledExecutor getThreadPool() {
111 return threadPool;
112 }
113 }
This page was automatically generated by Maven