1 package org.smartcomps.twister.engine.priv.messaging.impl;
2
3 import org.smartcomps.twister.engine.priv.messaging.MessageController;
4 import org.smartcomps.twister.engine.priv.core.definition.Receive;
5 import org.smartcomps.twister.engine.priv.core.definition.CorrelationRef;
6 import org.smartcomps.twister.engine.priv.core.dynamic.ProcessInstanceFactory;
7 import org.smartcomps.twister.engine.priv.core.dynamic.ProcessInstance;
8 import org.smartcomps.twister.engine.priv.core.dynamic.ReceiveEC;
9 import org.smartcomps.twister.engine.priv.core.dynamic.ExecutionContext;
10 import org.smartcomps.twister.engine.priv.util.CorrelationExtractor;
11 import org.smartcomps.twister.common.persistence.FinderException;
12 import org.smartcomps.twister.engine.exception.EngineException;
13 import org.smartcomps.twister.engine.exception.EngineRuntimeException;
14 import org.smartcomps.twister.engine.exception.CorrelationViolationException;
15 import org.smartcomps.twister.engine.exception.ConflictingReceiveException;
16 import org.smartcomps.twister.common.persistence.DBSessionException;
17 import org.smartcomps.twister.common.persistence.XMLSessionException;
18
19 import org.dom4j.Document;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23
24 import java.util.List;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.ArrayList;
28
29 /***
30 * Implementation of the MessageController interface.
31 * @see org.smartcomps.twister.engine.priv.messaging.MessageController
32 * TODO test all cases
33 */
34 public class MessageControllerImpl implements MessageController {
35
36 protected static Log log = LogFactory.getLog(MessageControllerImpl.class);
37
38 /***
39 * As the acknowledge method requires a partner link it's always targeted at only one process. If
40 * a process includes several receives with createInstance, they must have same correlation.
41 * 1. Checking all receives and selecting their correlations
42 * 2. For each receive, finding their ec from correlation and instance.
43 * 3. If several ecs are found, send conflicting receive failure. Done.
44 * 4. If one ec is found, sending it the message. Done.
45 * 5. If no ecs are found :
46 * 5.1. If one receive is a createInstance, executing it. Done.
47 * 5.2. If several receives have createInstance and no instance exists, execute first one. Done.
48 * 5.3. If several receives have createInstance and instance exists, creating ec for one of the
49 * receives with no ec in instance and executing it. Done.
50 * Picks are processed exactly as receives using wrappers.
51 */
52 public ReceiveEC acknowledge(String partnerLink, String portType, String operation, Document message) throws CorrelationViolationException, ConflictingReceiveException {
53 // Finding the activity from partnerLink, portType and operation
54 if (!"message".equals(message.getRootElement().getName())) {
55 throw new IllegalArgumentException("The message must have a 'message' element as root");
56 }
57 List receives = null;
58 try {
59 receives = CoreWrappingFactory.findReceivesByInvoker(partnerLink, portType, operation);
60 } catch (DBSessionException e) {
61 log.error("Could not acknowledge a message from (" + partnerLink + ", " + portType
62 + ", " + operation + ") : " + message.asXML(), e);
63 throw new EngineRuntimeException("Server error", e);
64 }
65
66
67 ReceiveEC receiveEC = null;
68 Map receiveECCorrels = null;
69 List createInstanceReceive = new ArrayList();
70 List createInstanceReceiveCorrel = new ArrayList();
71 // 1 and 2
72 for (int m = 0; m < receives.size(); m++) {
73 Receive receive = (Receive) receives.get(m);
74 Map selectedCorrelRefs = null;
75
76 // Checking receive correlations to retrieve a value from the message for all properties
77 // of those correlations.
78 try {
79 selectedCorrelRefs = getValuedCorrelations(receive, message);
80 } catch (CorrelationViolationException e) {
81 log.error("Message doesn't include necessary correlation information : " + message.asXML(), e);
82 throw e;
83 } catch (RuntimeException e) {
84 log.error("Could not value correlations for receive " + receive + " and message " + message);
85 throw e;
86 }
87
88 // Taking profit of this iteration to prepare collections with receives that
89 // have the createInstance attribute to true and their valued correlation.
90 if (receive.isCreateInstance()) {
91 createInstanceReceive.add(receive);
92 createInstanceReceiveCorrel.add(selectedCorrelRefs);
93 }
94
95 // First correlation already initiated will lead us to the right receiveEC
96 CorrelationRef initiatedRef = null;
97 for (Iterator correlIter = selectedCorrelRefs.keySet().iterator(); correlIter.hasNext();) {
98 CorrelationRef setRef = (CorrelationRef) correlIter.next();
99 if (!setRef.isInitiate()) {
100 initiatedRef = setRef;
101 }
102 }
103 // If we cannot find any initiated correlation, no instance can be found so the receive
104 // is ignored
105 if (initiatedRef != null) {
106 if (receiveEC == null) {
107 // The correlation ref helps us find the right instance and with the instance
108 // and the originating activity we can find the right waiting receive ec.
109 receiveEC = getExecutionContext(receive, initiatedRef.getSet(),
110 (Map)selectedCorrelRefs.get(initiatedRef), true);
111 receiveECCorrels = selectedCorrelRefs;
112 } else {
113 // 3
114 throw new ConflictingReceiveException("Several receive execution contexts are " +
115 "waiting at the same message at the same time. Message : " + message);
116 }
117 }
118 }
119
120 // Forwarding the message to the right execution context given activity and correlation
121 // 4
122 if (receiveEC != null) {
123 // Adding newly initiated correlations to the process instance
124 ProcessInstance instance = receiveEC.fetchInstance();
125 try {
126 for (Iterator correlIter = receiveECCorrels.keySet().iterator(); correlIter.hasNext();) {
127 CorrelationRef correlationRef = (CorrelationRef) correlIter.next();
128 if (correlationRef.isInitiate()) {
129 if (receiveECCorrels.get(correlationRef) != null) {
130 ProcessInstanceFactory.addCorrelation(instance, correlationRef.getSet(),
131 (Map)receiveECCorrels.get(correlationRef));
132 } else {
133 throw new CorrelationViolationException("A correlation " + correlationRef.getSet() +
134 " defined as initiate could not be extracted from received message" +
135 message.asXML());
136 }
137 }
138 }
139 // Making the waiting receive to acknowledge the message
140 // receiveEC.acknowledgeMessage(message);
141 } catch (DBSessionException e) {
142 throw new EngineRuntimeException("Could not add correlation to receive execution context " + receiveEC, e);
143 } catch (XMLSessionException e) {
144 throw new EngineRuntimeException("Could not add correlation to receive execution context " + receiveEC, e);
145 }
146 return receiveEC;
147 } else {
148 // 5.1
149 if (createInstanceReceive.size() == 1) {
150 ReceiveEC resultEC = executeReceive((Receive) createInstanceReceive.get(0), message, (Map)createInstanceReceiveCorrel.get(0));
151 // No need to go further, only activity with createInstance is served.
152 return resultEC;
153 } else if (createInstanceReceive.size() > 1) {
154 // All receives with createInstance must have same correlation so they all have same instance
155 CorrelationRef correlationRef = (CorrelationRef)
156 ((Map)createInstanceReceiveCorrel.get(0)).keySet().iterator().next();
157 Map correlationValues = (Map) ((Map)createInstanceReceiveCorrel.get(0)).get(correlationRef);
158 ProcessInstance instance = null;
159 try {
160 instance = ProcessInstanceFactory.findInstanceByCorrelation(
161 correlationRef.getSet(), correlationValues);
162 } catch (DBSessionException e) {
163 throw new EngineRuntimeException("Could not find instance from correlation " + correlationRef.getSet(), e);
164 } catch (FinderException e) { // instance is just null, it's ok this way
165 }
166 if (instance != null) {
167 // 5.3
168 for (int m = 0; m < createInstanceReceive.size(); m++) {
169 Receive receive = (Receive) createInstanceReceive.get(m);
170 CorrelationRef receiveCorrelationRef = (CorrelationRef)
171 ((Map)createInstanceReceiveCorrel.get(m)).keySet().iterator().next();
172 Map receiveCorrelationValues = (Map) ((Map)createInstanceReceiveCorrel.get(m)).get(correlationRef);
173 ReceiveEC ec = getExecutionContext(receive, receiveCorrelationRef.getSet(), receiveCorrelationValues, false);
174 if (ec == null) {
175 // Executing first one not executed yet.
176 ReceiveEC resultEC = executeReceive(receive, message, receiveCorrelationValues);
177 return resultEC;
178 }
179 }
180
181 } else {
182 // 5.2
183 ReceiveEC resultEC = executeReceive((Receive) createInstanceReceive.get(0), message, (Map)createInstanceReceiveCorrel.get(0));
184 // No need to go further, first activity with createInstance is served.
185 return resultEC;
186 }
187 }
188 }
189 return null;
190 }
191
192 /*
193 * Returns a map containing the CorrelationRef as key and a map of properties name/value
194 * pairs as value.
195 */
196 private Map getValuedCorrelations(Receive receive, Document message) throws CorrelationViolationException {
197 Map selectedCorrelRefs = CorrelationExtractor.extractCorrelationsValues(
198 receive.fetchProcess(), receive.getCorrelations(), message);
199 return selectedCorrelRefs;
200 }
201
202 /***
203 * Returns an execution context from its parent receive and its correlation values. The
204 * only solution here was to get the collection of all execution contexts for an activity
205 * and filtrate the collection the retain only the execution context belonging to the wanted
206 * process instance.
207 */
208 private ReceiveEC getExecutionContext(Receive receive, String correlationName, Map correlationValues, boolean active) {
209 ReceiveEC result = null;
210 ProcessInstance instance = null;
211 try {
212 instance = ProcessInstanceFactory.findInstanceByCorrelation(correlationName, correlationValues);
213 } catch (DBSessionException e) {
214 log.error("An error occured when finding a receive ec from receive " + receive, e);
215 throw new RuntimeException("Server error");
216 } catch (FinderException e) {
217 log.error("Could not find instance from correlation (" + correlationName + ", " + correlationValues + ")", e);
218 throw new RuntimeException("Server error");
219 }
220
221 try {
222 result = (ReceiveEC) CoreWrappingFactory.findECsByActivityAndInstance(instance, receive);
223 } catch (FinderException e) {
224 log.error("Could not find the ec from receive " + receive + " and instance " + instance, e);
225 throw new RuntimeException("Server error");
226 }
227
228 // for (Iterator recECsIter = receiveECs.iterator(); recECsIter.hasNext();) {
229 // ReceiveEC receiveEC = (ReceiveEC) recECsIter.next();
230 // if (receiveEC.fetchInstance().equals(instance)) {
231 // if ((active && receiveEC.getStatus() == ExecutionContext.ACTIVE) || !active)
232 // result = receiveEC;
233 // }
234 // }
235
236 if (result == null) return null;
237 if (active && result.getStatus() != ExecutionContext.ACTIVE) {
238 return null;
239 } else {
240 return result;
241 }
242 }
243
244 private ReceiveEC executeReceive(Receive receive, Document message, Map receiveCorrelations) {
245 ReceiveEC createdEC = null;
246 try {
247 // Executing the activity (creates activity ec and its containers ec up to
248 // the process instance) providing it all correlations (if there are some).
249 if (receiveCorrelations.isEmpty()) {
250 createdEC = (ReceiveEC) receive.execute(null, null);
251 } else {
252 for (Iterator correlIter = receiveCorrelations.keySet().iterator(); correlIter.hasNext();) {
253 CorrelationRef setRef = (CorrelationRef) correlIter.next();
254 if (setRef.isInitiate()) {
255 if (receiveCorrelations.get(setRef) != null) {
256 if (createdEC == null) {
257 createdEC = (ReceiveEC) receive.execute(setRef.getSet(),
258 (Map)receiveCorrelations.get(setRef));
259 } else {
260 ProcessInstanceFactory.addCorrelation(createdEC.fetchInstance(), setRef.getSet(),
261 (Map)receiveCorrelations.get(setRef));
262 }
263 } else {
264 throw new CorrelationViolationException("A correlation " + setRef.getSet() +
265 " defined as initiate could not be extracted from received message" +
266 message.asXML());
267 }
268 }
269 }
270 }
271 } catch (EngineException e) {
272 log.error("Could not execute receive " + receive, e);
273 throw new EngineRuntimeException("Server error", e);
274 } catch (Exception e) {
275 log.error("Could not add correlation to receive execution context " + createdEC, e);
276 throw new EngineRuntimeException("Server error", e);
277 }
278 // Making the ReceiveEC to acknowledge the message.
279 // createdEC.acknowledgeMessage(message);
280 return createdEC;
281 }
282
283 }
This page was automatically generated by Maven