View Javadoc
1 package org.smartcomps.twister.engine.priv.messaging.impl; 2 3 import org.smartcomps.twister.engine.priv.messaging.MessageController; 4 import org.smartcomps.twister.engine.priv.core.definition.Receive; 5 import org.smartcomps.twister.engine.priv.core.definition.CorrelationRef; 6 import org.smartcomps.twister.engine.priv.core.dynamic.ProcessInstanceFactory; 7 import org.smartcomps.twister.engine.priv.core.dynamic.ProcessInstance; 8 import org.smartcomps.twister.engine.priv.core.dynamic.ReceiveEC; 9 import org.smartcomps.twister.engine.priv.core.dynamic.ExecutionContext; 10 import org.smartcomps.twister.engine.priv.util.CorrelationExtractor; 11 import org.smartcomps.twister.common.persistence.FinderException; 12 import org.smartcomps.twister.engine.exception.EngineException; 13 import org.smartcomps.twister.engine.exception.EngineRuntimeException; 14 import org.smartcomps.twister.engine.exception.CorrelationViolationException; 15 import org.smartcomps.twister.engine.exception.ConflictingReceiveException; 16 import org.smartcomps.twister.common.persistence.DBSessionException; 17 import org.smartcomps.twister.common.persistence.XMLSessionException; 18 19 import org.dom4j.Document; 20 21 import org.apache.commons.logging.Log; 22 import org.apache.commons.logging.LogFactory; 23 24 import java.util.List; 25 import java.util.Iterator; 26 import java.util.Map; 27 import java.util.ArrayList; 28 29 /*** 30 * Implementation of the MessageController interface. 31 * @see org.smartcomps.twister.engine.priv.messaging.MessageController 32 * TODO test all cases 33 */ 34 public class MessageControllerImpl implements MessageController { 35 36 protected static Log log = LogFactory.getLog(MessageControllerImpl.class); 37 38 /*** 39 * As the acknowledge method requires a partner link it's always targeted at only one process. If 40 * a process includes several receives with createInstance, they must have same correlation. 41 * 1. Checking all receives and selecting their correlations 42 * 2. For each receive, finding their ec from correlation and instance. 43 * 3. If several ecs are found, send conflicting receive failure. Done. 44 * 4. If one ec is found, sending it the message. Done. 45 * 5. If no ecs are found : 46 * 5.1. If one receive is a createInstance, executing it. Done. 47 * 5.2. If several receives have createInstance and no instance exists, execute first one. Done. 48 * 5.3. If several receives have createInstance and instance exists, creating ec for one of the 49 * receives with no ec in instance and executing it. Done. 50 * Picks are processed exactly as receives using wrappers. 51 */ 52 public ReceiveEC acknowledge(String partnerLink, String portType, String operation, Document message) throws CorrelationViolationException, ConflictingReceiveException { 53 // Finding the activity from partnerLink, portType and operation 54 if (!"message".equals(message.getRootElement().getName())) { 55 throw new IllegalArgumentException("The message must have a 'message' element as root"); 56 } 57 List receives = null; 58 try { 59 receives = CoreWrappingFactory.findReceivesByInvoker(partnerLink, portType, operation); 60 } catch (DBSessionException e) { 61 log.error("Could not acknowledge a message from (" + partnerLink + ", " + portType 62 + ", " + operation + ") : " + message.asXML(), e); 63 throw new EngineRuntimeException("Server error", e); 64 } 65 66 67 ReceiveEC receiveEC = null; 68 Map receiveECCorrels = null; 69 List createInstanceReceive = new ArrayList(); 70 List createInstanceReceiveCorrel = new ArrayList(); 71 // 1 and 2 72 for (int m = 0; m < receives.size(); m++) { 73 Receive receive = (Receive) receives.get(m); 74 Map selectedCorrelRefs = null; 75 76 // Checking receive correlations to retrieve a value from the message for all properties 77 // of those correlations. 78 try { 79 selectedCorrelRefs = getValuedCorrelations(receive, message); 80 } catch (CorrelationViolationException e) { 81 log.error("Message doesn't include necessary correlation information : " + message.asXML(), e); 82 throw e; 83 } catch (RuntimeException e) { 84 log.error("Could not value correlations for receive " + receive + " and message " + message); 85 throw e; 86 } 87 88 // Taking profit of this iteration to prepare collections with receives that 89 // have the createInstance attribute to true and their valued correlation. 90 if (receive.isCreateInstance()) { 91 createInstanceReceive.add(receive); 92 createInstanceReceiveCorrel.add(selectedCorrelRefs); 93 } 94 95 // First correlation already initiated will lead us to the right receiveEC 96 CorrelationRef initiatedRef = null; 97 for (Iterator correlIter = selectedCorrelRefs.keySet().iterator(); correlIter.hasNext();) { 98 CorrelationRef setRef = (CorrelationRef) correlIter.next(); 99 if (!setRef.isInitiate()) { 100 initiatedRef = setRef; 101 } 102 } 103 // If we cannot find any initiated correlation, no instance can be found so the receive 104 // is ignored 105 if (initiatedRef != null) { 106 if (receiveEC == null) { 107 // The correlation ref helps us find the right instance and with the instance 108 // and the originating activity we can find the right waiting receive ec. 109 receiveEC = getExecutionContext(receive, initiatedRef.getSet(), 110 (Map)selectedCorrelRefs.get(initiatedRef), true); 111 receiveECCorrels = selectedCorrelRefs; 112 } else { 113 // 3 114 throw new ConflictingReceiveException("Several receive execution contexts are " + 115 "waiting at the same message at the same time. Message : " + message); 116 } 117 } 118 } 119 120 // Forwarding the message to the right execution context given activity and correlation 121 // 4 122 if (receiveEC != null) { 123 // Adding newly initiated correlations to the process instance 124 ProcessInstance instance = receiveEC.fetchInstance(); 125 try { 126 for (Iterator correlIter = receiveECCorrels.keySet().iterator(); correlIter.hasNext();) { 127 CorrelationRef correlationRef = (CorrelationRef) correlIter.next(); 128 if (correlationRef.isInitiate()) { 129 if (receiveECCorrels.get(correlationRef) != null) { 130 ProcessInstanceFactory.addCorrelation(instance, correlationRef.getSet(), 131 (Map)receiveECCorrels.get(correlationRef)); 132 } else { 133 throw new CorrelationViolationException("A correlation " + correlationRef.getSet() + 134 " defined as initiate could not be extracted from received message" + 135 message.asXML()); 136 } 137 } 138 } 139 // Making the waiting receive to acknowledge the message 140 // receiveEC.acknowledgeMessage(message); 141 } catch (DBSessionException e) { 142 throw new EngineRuntimeException("Could not add correlation to receive execution context " + receiveEC, e); 143 } catch (XMLSessionException e) { 144 throw new EngineRuntimeException("Could not add correlation to receive execution context " + receiveEC, e); 145 } 146 return receiveEC; 147 } else { 148 // 5.1 149 if (createInstanceReceive.size() == 1) { 150 ReceiveEC resultEC = executeReceive((Receive) createInstanceReceive.get(0), message, (Map)createInstanceReceiveCorrel.get(0)); 151 // No need to go further, only activity with createInstance is served. 152 return resultEC; 153 } else if (createInstanceReceive.size() > 1) { 154 // All receives with createInstance must have same correlation so they all have same instance 155 CorrelationRef correlationRef = (CorrelationRef) 156 ((Map)createInstanceReceiveCorrel.get(0)).keySet().iterator().next(); 157 Map correlationValues = (Map) ((Map)createInstanceReceiveCorrel.get(0)).get(correlationRef); 158 ProcessInstance instance = null; 159 try { 160 instance = ProcessInstanceFactory.findInstanceByCorrelation( 161 correlationRef.getSet(), correlationValues); 162 } catch (DBSessionException e) { 163 throw new EngineRuntimeException("Could not find instance from correlation " + correlationRef.getSet(), e); 164 } catch (FinderException e) { // instance is just null, it's ok this way 165 } 166 if (instance != null) { 167 // 5.3 168 for (int m = 0; m < createInstanceReceive.size(); m++) { 169 Receive receive = (Receive) createInstanceReceive.get(m); 170 CorrelationRef receiveCorrelationRef = (CorrelationRef) 171 ((Map)createInstanceReceiveCorrel.get(m)).keySet().iterator().next(); 172 Map receiveCorrelationValues = (Map) ((Map)createInstanceReceiveCorrel.get(m)).get(correlationRef); 173 ReceiveEC ec = getExecutionContext(receive, receiveCorrelationRef.getSet(), receiveCorrelationValues, false); 174 if (ec == null) { 175 // Executing first one not executed yet. 176 ReceiveEC resultEC = executeReceive(receive, message, receiveCorrelationValues); 177 return resultEC; 178 } 179 } 180 181 } else { 182 // 5.2 183 ReceiveEC resultEC = executeReceive((Receive) createInstanceReceive.get(0), message, (Map)createInstanceReceiveCorrel.get(0)); 184 // No need to go further, first activity with createInstance is served. 185 return resultEC; 186 } 187 } 188 } 189 return null; 190 } 191 192 /* 193 * Returns a map containing the CorrelationRef as key and a map of properties name/value 194 * pairs as value. 195 */ 196 private Map getValuedCorrelations(Receive receive, Document message) throws CorrelationViolationException { 197 Map selectedCorrelRefs = CorrelationExtractor.extractCorrelationsValues( 198 receive.fetchProcess(), receive.getCorrelations(), message); 199 return selectedCorrelRefs; 200 } 201 202 /*** 203 * Returns an execution context from its parent receive and its correlation values. The 204 * only solution here was to get the collection of all execution contexts for an activity 205 * and filtrate the collection the retain only the execution context belonging to the wanted 206 * process instance. 207 */ 208 private ReceiveEC getExecutionContext(Receive receive, String correlationName, Map correlationValues, boolean active) { 209 ReceiveEC result = null; 210 ProcessInstance instance = null; 211 try { 212 instance = ProcessInstanceFactory.findInstanceByCorrelation(correlationName, correlationValues); 213 } catch (DBSessionException e) { 214 log.error("An error occured when finding a receive ec from receive " + receive, e); 215 throw new RuntimeException("Server error"); 216 } catch (FinderException e) { 217 log.error("Could not find instance from correlation (" + correlationName + ", " + correlationValues + ")", e); 218 throw new RuntimeException("Server error"); 219 } 220 221 try { 222 result = (ReceiveEC) CoreWrappingFactory.findECsByActivityAndInstance(instance, receive); 223 } catch (FinderException e) { 224 log.error("Could not find the ec from receive " + receive + " and instance " + instance, e); 225 throw new RuntimeException("Server error"); 226 } 227 228 // for (Iterator recECsIter = receiveECs.iterator(); recECsIter.hasNext();) { 229 // ReceiveEC receiveEC = (ReceiveEC) recECsIter.next(); 230 // if (receiveEC.fetchInstance().equals(instance)) { 231 // if ((active && receiveEC.getStatus() == ExecutionContext.ACTIVE) || !active) 232 // result = receiveEC; 233 // } 234 // } 235 236 if (result == null) return null; 237 if (active && result.getStatus() != ExecutionContext.ACTIVE) { 238 return null; 239 } else { 240 return result; 241 } 242 } 243 244 private ReceiveEC executeReceive(Receive receive, Document message, Map receiveCorrelations) { 245 ReceiveEC createdEC = null; 246 try { 247 // Executing the activity (creates activity ec and its containers ec up to 248 // the process instance) providing it all correlations (if there are some). 249 if (receiveCorrelations.isEmpty()) { 250 createdEC = (ReceiveEC) receive.execute(null, null); 251 } else { 252 for (Iterator correlIter = receiveCorrelations.keySet().iterator(); correlIter.hasNext();) { 253 CorrelationRef setRef = (CorrelationRef) correlIter.next(); 254 if (setRef.isInitiate()) { 255 if (receiveCorrelations.get(setRef) != null) { 256 if (createdEC == null) { 257 createdEC = (ReceiveEC) receive.execute(setRef.getSet(), 258 (Map)receiveCorrelations.get(setRef)); 259 } else { 260 ProcessInstanceFactory.addCorrelation(createdEC.fetchInstance(), setRef.getSet(), 261 (Map)receiveCorrelations.get(setRef)); 262 } 263 } else { 264 throw new CorrelationViolationException("A correlation " + setRef.getSet() + 265 " defined as initiate could not be extracted from received message" + 266 message.asXML()); 267 } 268 } 269 } 270 } 271 } catch (EngineException e) { 272 log.error("Could not execute receive " + receive, e); 273 throw new EngineRuntimeException("Server error", e); 274 } catch (Exception e) { 275 log.error("Could not add correlation to receive execution context " + createdEC, e); 276 throw new EngineRuntimeException("Server error", e); 277 } 278 // Making the ReceiveEC to acknowledge the message. 279 // createdEC.acknowledgeMessage(message); 280 return createdEC; 281 } 282 283 }

This page was automatically generated by Maven